Skip to content

Commit

Permalink
Merge pull request #13 from reinierl/master
Browse files Browse the repository at this point in the history
Share ChannelFactory and Timer between clients
  • Loading branch information
casualjim committed May 7, 2014
2 parents 75f2857 + a9f60e7 commit 20bc616
Showing 1 changed file with 12 additions and 27 deletions.
39 changes: 12 additions & 27 deletions src/main/scala/io/backchat/hookup/client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import collection.JavaConverters._
import websocketx._
import org.jboss.netty.buffer.ChannelBuffers
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Await, Promise, Future }
import scala.concurrent.forkjoin.ForkJoinPool
import java.lang.Thread.UncaughtExceptionHandler
import scala.concurrent.{ Await, Promise, Future }
import org.jboss.netty.handler.timeout.{ IdleStateAwareChannelHandler, IdleStateEvent, IdleState, IdleStateHandler }
import org.jboss.netty.logging.{ InternalLogger, InternalLoggerFactory }
import io.backchat.hookup.HookupServer.MessageAckingHandler
Expand All @@ -21,12 +19,11 @@ import java.net.{ ConnectException, InetSocketAddress, URI }
import java.nio.channels.ClosedChannelException
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{Closeable, File}
import java.io.Closeable
import java.util.concurrent.{ConcurrentSkipListSet, TimeUnit, Executors}
import beans.BeanProperty
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference, AtomicLong}
import akka.util.Timeout
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext

/**
Expand Down Expand Up @@ -139,14 +136,15 @@ object HookupClient {
*/
private final class HookupClientHost(val client: HookupClient)(implicit executionContext: ExecutionContext) extends HookupClientLike with BroadcastChannel with Connectable with Reconnectable {

import HookupClientHost._

private[this] val normalized = client.settings.uri.normalize()
private[this] val tgt = if (normalized.getPath == null || normalized.getPath.trim().isEmpty) {
new URI(normalized.getScheme, normalized.getAuthority, "/", normalized.getQuery, normalized.getFragment)
} else normalized

private[this] val bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
private[this] val bootstrap = new ClientBootstrap(clientSocketChannelFactory)
private[this] var handshaker: WebSocketClientHandshaker = null
private[this] val timer = new HashedWheelTimer()
private[this] var channel: Channel = null
private[this] var _isConnected: Promise[OperationResult] = Promise[OperationResult]()
private[this] val buffer = client.settings.buffer
Expand Down Expand Up @@ -332,26 +330,7 @@ object HookupClient {
}

disconnected.future onComplete {
case _ {
_isConnected = Promise[OperationResult]()
try {
if (!isReconnecting && bootstrap != null) {
val thread = new Thread {
override def run = {
timer.stop.asScala foreach (_.cancel())
bootstrap.releaseExternalResources()
}
}
thread.setDaemon(false)
thread.start()
thread.join()
}
} catch {
case e: Throwable logger.error("error while closing the connection", e)
} finally {
if (!closing.isCompleted) closing.success(Success)
}
}
_ if (!closing.isCompleted) closing.success(Success)
}

closing.future
Expand Down Expand Up @@ -389,7 +368,13 @@ object HookupClient {
}
}
}
}

object HookupClientHost {
private lazy val clientSocketChannelFactory =
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())

private lazy val timer = new HashedWheelTimer
}

/**
Expand Down

0 comments on commit 20bc616

Please sign in to comment.