Skip to content

Commit

Permalink
Correctly reuse existing connections under high load
Browse files Browse the repository at this point in the history
  • Loading branch information
rdlowrey committed Oct 28, 2014
1 parent c4d67c6 commit 84dff80
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
### v1.0.0-rc4
#### v1.0.0-rc5

- Correctly reuse existing socket connections under high load

#### v1.0.0-rc4

- SocketPool now properly observes host connection limits
- New `Client::OP_CONCURRENCY_LIMIT` setting queues outstanding requests beyond a certain number
Expand Down
59 changes: 43 additions & 16 deletions lib/SocketPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SocketPool {
private $sockets = [];
private $queuedSocketRequests = [];
private $socketIdUriMap = [];
private $pendingSockets = [];
private $options = [
self::OP_HOST_CONNECTION_LIMIT => 8,
self::OP_MS_IDLE_TIMEOUT => 10000,
Expand Down Expand Up @@ -76,6 +77,7 @@ private function checkoutExistingSocket($uri, $options) {
} else {
$poolStruct->isAvailable = false;
$this->reactor->disable($poolStruct->idleWatcher);

return $poolStruct->resource;
}
}
Expand All @@ -89,6 +91,7 @@ private function checkoutNewSocket($uri, $options) {
$needsRebind = $this->needsRebind;
$this->needsRebind = null;
$future = new Future($this->reactor);

if ($this->allowsNewConnection($uri, $options) || $needsRebind) {
$this->initializeNewConnection($future, $uri, $options);
} else {
Expand All @@ -103,16 +106,23 @@ private function allowsNewConnection($uri, $options) {

if ($maxConnsPerHost <= 0) {
return true;
} elseif (empty($this->sockets[$uri])) {
return true;
} elseif (count($this->sockets[$uri]) < $maxConnsPerHost) {
}

$pendingCount = isset($this->pendingSockets[$uri]) ? $this->pendingSockets[$uri] : 0;
$existingCount = isset($this->sockets[$uri]) ? count($this->sockets[$uri]) : 0;
$totalCount = $pendingCount + $existingCount;

if ($totalCount < $maxConnsPerHost) {
return true;
} else {
return false;
}

return false;
}

private function initializeNewConnection(Future $future, $uri, $options) {
$this->pendingSockets[$uri] = isset($this->pendingSockets[$uri])
? $this->pendingSockets[$uri] + 1
: 1;
$futureSocket = $this->connector->connect($uri, $options);
$futureSocket->when(function($error, $socket) use ($future, $uri) {
if ($error) {
Expand All @@ -124,6 +134,9 @@ private function initializeNewConnection(Future $future, $uri, $options) {
}

private function finalizeNewConnection(Future $future, $uri, $socket) {
if (--$this->pendingSockets[$uri] === 0) {
unset($this->pendingSockets[$uri]);
}
$socketId = (int) $socket;
$poolStruct = new SocketPoolStruct;
$poolStruct->id = $socketId;
Expand All @@ -133,6 +146,10 @@ private function finalizeNewConnection(Future $future, $uri, $socket) {
$this->sockets[$uri][$socketId] = $poolStruct;
$this->socketIdUriMap[$socketId] = $uri;
$future->succeed($poolStruct->resource);

if (empty($this->queuedSocketRequests[$uri])) {
unset($this->queuedSocketRequests[$uri]);
}
}

/**
Expand Down Expand Up @@ -164,16 +181,29 @@ private function unloadSocket($uri, $socketId) {
$this->sockets[$uri][$socketId],
$this->socketIdUriMap[$socketId]
);

if (empty($this->sockets[$uri])) {
unset($this->sockets[$uri][$socketId]);
}

if (!empty($this->queuedSocketRequests[$uri])) {
$this->dequeueNextWaitingSocket($uri);
}
}

private function dequeueNextWaitingSocket($uri) {
list($future, $uri, $options) = array_shift($this->queuedSocketRequests[$uri]);
$this->initializeNewConnection($future, $uri, $options);
if (empty($this->queuedSocketRequests[$uri])) {
unset($this->queuedSocketRequests[$uri]);
$queueStruct = current($this->queuedSocketRequests[$uri]);
list($future, $uri, $options) = $queueStruct;

if ($socket = $this->checkoutExistingSocket($uri, $options)) {
array_shift($this->queuedSocketRequests[$uri]);
$future->succeed($socket);
return;
}

if ($this->allowsNewConnection($uri, $options)) {
array_shift($this->queuedSocketRequests[$uri]);
$this->initializeNewConnection($future, $uri, $options);
}
}

Expand All @@ -186,7 +216,6 @@ private function dequeueNextWaitingSocket($uri) {
*/
public function checkin($resource) {
$socketId = (int) $resource;

if (!isset($this->socketIdUriMap[$socketId])) {
throw new \DomainException(
sprintf('Unknown socket: %s', $resource)
Expand Down Expand Up @@ -220,14 +249,12 @@ private function finalizeSocketCheckin($uri, $socketId) {
}

private function initializeIdleTimeout(SocketPoolStruct $poolStruct) {
if ($poolStruct->idleWatcher === null) {
if (isset($poolStruct->idleWatcher)) {
$this->reactor->enable($poolStruct->idleWatcher);
} else {
$poolStruct->idleWatcher = $this->reactor->once(function() use ($poolStruct) {
$uri = $poolStruct->uri;
$socketId = $poolStruct->id;
$this->unloadSocket($uri, $socketId);
$this->unloadSocket($poolStruct->uri, $poolStruct->id);
}, $this->opMsIdleTimeout);
} else {
$this->reactor->enable($poolStruct->idleWatcher);
}
}

Expand Down

0 comments on commit 84dff80

Please sign in to comment.