Skip to content

Commit

Permalink
Internally queue requests (issue #58)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdlowrey committed Oct 27, 2014
1 parent b9380ba commit 97f6c9a
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 62 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
#### master

- SocketPool now properly observes host connection limits
- New `Client::OP_CONCURRENCY_LIMIT` setting queues outstanding requests beyond a certain number
(default 512) to help prevent naive applications from spiraling memory out of control without
worrying over the details of concurrency.

##### BC BREAKS:

- The `Client::OP_QUEUED_SOCKET_LIMIT` setting has been removed

#### v1.0.0-rc3

- Update amp dependency to eliminate combinator resolution memory leaks
Expand Down
96 changes: 61 additions & 35 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class Client implements HttpClient {
const OP_BINDTO = Connector::OP_BIND_IP_ADDRESS;
const OP_MS_CONNECT_TIMEOUT = Connector::OP_MS_CONNECT_TIMEOUT;
const OP_HOST_CONNECTION_LIMIT = SocketPool::OP_HOST_CONNECTION_LIMIT;
const OP_QUEUED_SOCKET_LIMIT = SocketPool::OP_MAX_QUEUE_SIZE;
const OP_MS_KEEP_ALIVE_TIMEOUT = SocketPool::OP_MS_IDLE_TIMEOUT;
const OP_PROXY_HTTP = HttpSocketPool::OP_PROXY_HTTP;
const OP_PROXY_HTTPS = HttpSocketPool::OP_PROXY_HTTPS;
Expand Down Expand Up @@ -48,11 +47,12 @@ class Client implements HttpClient {
private $encryptor;
private $writerFactory;
private $hasZlib;
private $queue;
private $dequeuer;
private $options = [
self::OP_BINDTO => '',
self::OP_MS_CONNECT_TIMEOUT => 30000,
self::OP_HOST_CONNECTION_LIMIT => 8,
self::OP_QUEUED_SOCKET_LIMIT => 512,
self::OP_MS_KEEP_ALIVE_TIMEOUT => 30000,
self::OP_PROXY_HTTP => '',
self::OP_PROXY_HTTPS => '',
Expand Down Expand Up @@ -84,6 +84,17 @@ public function __construct(
$this->encryptor = $encryptor ?: new Encryptor($reactor);
$this->writerFactory = $writerFactory ?: new WriterFactory;
$this->hasZlib = extension_loaded('zlib');
$this->dequeuer = function() {
if ($this->queue) {
$this->dequeueNextRequest();
}
};

/*
$this->reactor->repeat(function() {
printf("outstanding requests: %s\n", self::$outstandingRequests);
}, 1000);
*/
}

/**
Expand Down Expand Up @@ -113,31 +124,34 @@ public function requestMulti(array $urisAndRequests, array $options = []) {
* @return \Amp\Promise A promise to resolve the request at some point in the future
*/
public function request($uriOrRequest, array $options = []) {
$cycle = new RequestCycle;
$promisor = new Future($this->reactor);

try {
$cycle->futureResponse = new Future($this->reactor);

list($request, $uri) = $this->generateRequestFromUri($uriOrRequest);

$cycle->uri = $uri;
$cycle->request = $request;
$cycle->options = $options
? array_merge($this->options, $options)
: $this->options;

$body = $request->getBody();

$cycle = new RequestCycle;
$cycle->futureResponse = $promisor;
list($cycle->request, $cycle->uri) = $this->generateRequestFromUri($uriOrRequest);
$cycle->options = $options ? array_merge($this->options, $options) : $this->options;
$body = $cycle->request->getBody();
if ($body instanceof AggregateBody) {
$this->processAggregateBody($cycle, $body);
} else {
$this->finalizeRequest($cycle);
}
} catch (\Exception $e) {
$cycle->futureResponse->fail($e);
$promisor->fail($e);
}

return $cycle->futureResponse->promise();
return $promisor->promise();
}

private function dequeueNextRequest() {
$cycle = array_shift($this->queue);
$authority = $this->generateAuthorityFromUri($cycle->uri);
$checkoutUri = $cycle->uri->getScheme() . "://{$authority}";
$futureSocket = $this->socketPool->checkout($checkoutUri, $cycle->options);
$futureSocket->when(function($error, $result) use ($cycle) {
$this->onSocketResolve($cycle, $error, $result);
});
}

private function processAggregateBody(RequestCycle $cycle, AggregateBody $body) {
Expand Down Expand Up @@ -167,24 +181,24 @@ private function processAggregateBodyHeaders(RequestCycle $cycle, AggregateBody
private function finalizeRequest(RequestCycle $cycle) {
$uri = $cycle->uri;
$options = $cycle->options;
$future = $cycle->futureResponse;
$promisor = $cycle->futureResponse;
$request = $cycle->request;

$this->normalizeRequestMethod($request);
$this->normalizeRequestProtocol($request);
$this->normalizeRequestBodyHeaders($request, $options, $future);
$this->normalizeRequestBodyHeaders($request, $options, $promisor);
$this->normalizeRequestEncodingHeaderForZlib($request, $options);
$this->normalizeRequestHostHeader($request, $uri);
$this->normalizeRequestUserAgent($request, $options);
$this->normalizeRequestAcceptHeader($request);
$this->assignApplicableRequestCookies($request, $options);

$authority = $this->generateAuthorityFromUri($uri);
$checkoutUri = $uri->getScheme() . "://{$authority}";
$futureSocket = $this->socketPool->checkout($checkoutUri, $options);
$futureSocket->when(function($error, $result) use ($cycle) {
$this->onSocketResolve($cycle, $error, $result);
});
$this->queue[] = $cycle;
$promisor->when($this->dequeuer);

if (count($this->queue) < 512) {
$this->dequeueNextRequest();
}
}

private function generateRequestFromUri($uriOrRequest) {
Expand Down Expand Up @@ -413,8 +427,8 @@ private function onCryptoCompletion(RequestCycle $cycle) {
Parser::OP_DISCARD_BODY => $cycle->options[self::OP_DISCARD_BODY],
Parser::OP_RETURN_BEFORE_ENTITY => true,
Parser::OP_BODY_DATA_CALLBACK => function($data) use ($cycle) {
$cycle->futureResponse->update([Notify::RESPONSE_BODY_DATA, $data]);
}
$cycle->futureResponse->update([Notify::RESPONSE_BODY_DATA, $data]);
}
]);

$cycle->parser = $parser;
Expand All @@ -424,7 +438,7 @@ private function onCryptoCompletion(RequestCycle $cycle) {

$timeout = $cycle->options[self::OP_MS_TRANSFER_TIMEOUT];
if ($timeout > 0) {
$cycle->transferTimeoutWatcher = $this->reactor->once(function() use ($cycle, $timeout) {
$cycle->transferTimeoutWatcher = $this->reactor->once(function() use ($cycle, $timeout) {
$this->fail($cycle, new TimeoutException(
sprintf('Allowed transfer timeout exceeded: %d ms', $timeout)
));
Expand Down Expand Up @@ -677,12 +691,12 @@ private function redirect(RequestCycle $cycle, Uri $newUri) {
$this->assignRedirectRefererHeader($refererUri, $newUri, $request);
}

$cycle->futureResponse->update([Notify::REDIRECT, $refererUri, (string)$newUri]);

$futureSocket = $this->socketPool->checkout($checkoutUri);
$futureSocket->when(function($error, $result) use ($cycle) {
$this->onSocketResolve($cycle, $error, $result);
});

$cycle->futureResponse->update([Notify::REDIRECT, $refererUri, (string)$newUri]);
}

/**
Expand All @@ -706,16 +720,31 @@ private function isSocketDead($socketResource) {
}

private function processDeadSocket(RequestCycle $cycle) {
if ($cycle->parser->getState() == Parser::BODY_IDENTITY_EOF) {
$parserState = $cycle->parser->getState();
if ($parserState == Parser::BODY_IDENTITY_EOF) {
$parsedResponseArr = $cycle->parser->getParsedMessageArray();
$this->assignParsedResponse($cycle, $parsedResponseArr);
} elseif ($parserState == Parser::AWAITING_HEADERS && empty($cycle->retryCount)) {
echo "\n\n --retry-- \n\n";
$this->retry($cycle);
} else {
$this->fail($cycle, new SocketException(
'Socket connection disconnected prior to response completion :('
sprintf(
'Socket disconnected prior to response completion (Parser state: %s)',
$parserState
)
));
}
}

private function retry(RequestCycle $cycle) {
$cycle->retryCount++;
$this->collectRequestCycleWatchers($cycle);
$this->socketPool->clear($cycle->socket);
array_unshift($this->queue, $cycle);
$this->dequeueNextRequest();
}

private function writeRequest(RequestCycle $cycle) {
$rawHeaders = $this->generateRawRequestHeaders($cycle->request);
$writePromise = (new BufferWriter)->write($this->reactor, $cycle->socket, $rawHeaders);
Expand Down Expand Up @@ -858,9 +887,6 @@ public function setOption($option, $value) {
case self::OP_HOST_CONNECTION_LIMIT:
$this->options[self::OP_HOST_CONNECTION_LIMIT] = $value;
break;
case self::OP_QUEUED_SOCKET_LIMIT:
$this->options[self::OP_QUEUED_SOCKET_LIMIT] = $value;
break;
case self::OP_MS_CONNECT_TIMEOUT:
$this->options[self::OP_MS_CONNECT_TIMEOUT] = $value;
break;
Expand Down
1 change: 1 addition & 0 deletions lib/RequestCycle.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ class RequestCycle {
public $lastDataSentAt;
public $bytesRcvd;
public $bytesSent;
public $retryCount;
}
47 changes: 20 additions & 27 deletions lib/SocketPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

class SocketPool {
const OP_HOST_CONNECTION_LIMIT = 'op.host-conn-limit';
const OP_MAX_QUEUE_SIZE = 'op.max-queue-size';
const OP_MS_IDLE_TIMEOUT = 'op.ms-idle-timeout';
const OP_MS_CONNECT_TIMEOUT = Connector::OP_MS_CONNECT_TIMEOUT;
const OP_BINDTO = Connector::OP_BIND_IP_ADDRESS;
Expand All @@ -22,13 +21,10 @@ class SocketPool {
private $socketIdUriMap = [];
private $options = [
self::OP_HOST_CONNECTION_LIMIT => 8,
self::OP_MAX_QUEUE_SIZE => 512,
self::OP_MS_IDLE_TIMEOUT => 10000,
self::OP_MS_CONNECT_TIMEOUT => 30000,
self::OP_BINDTO => '',
];
private $opMaxConnectionsPerHost = 8;
private $opMaxQueuedSockets = 512;
private $opMsIdleTimeout = 10000;
private $needsRebind;

Expand Down Expand Up @@ -90,34 +86,30 @@ private function checkoutExistingSocket($uri, $options) {
}

private function checkoutNewSocket($uri, $options) {
if ($this->allowsNewConnection($uri) || $this->needsRebind) {
$future = new Future($this->reactor);
$needsRebind = $this->needsRebind;
$this->needsRebind = null;
$future = new Future($this->reactor);
if ($this->allowsNewConnection($uri, $options) || $needsRebind) {
$this->initializeNewConnection($future, $uri, $options);
$this->needsRebind = false;
return $future->promise();
} elseif (count($this->queuedSocketRequests) < $this->opMaxQueuedSockets) {
$future = new Future($this->reactor);
$this->queuedSocketRequests[$uri][] = [$future, $options];
return $future->promise();
} else {
return new Failure(new TooBusyException(
'Request rejected: too busy. Try upping the OP_MAX_QUEUE_SIZE setting.'
));
$this->queuedSocketRequests[$uri][] = [$future, $uri, $options];
}

return $future;
}

private function allowsNewConnection($uri) {
if ($this->opMaxConnectionsPerHost <= 0) {
private function allowsNewConnection($uri, $options) {
$maxConnsPerHost = $options[self::OP_HOST_CONNECTION_LIMIT];

if ($maxConnsPerHost <= 0) {
return true;
}
if (empty($this->sockets[$uri])) {
} elseif (empty($this->sockets[$uri])) {
return true;
}
if (count($this->sockets[$uri]) < $this->opMaxConnectionsPerHost) {
} elseif (count($this->sockets[$uri]) < $maxConnsPerHost) {
return true;
} else {
return false;
}

return false;
}

private function initializeNewConnection(Future $future, $uri, $options) {
Expand Down Expand Up @@ -160,6 +152,10 @@ public function clear($resource) {
}

private function unloadSocket($uri, $socketId) {
if (!isset($this->sockets[$uri][$socketId])) {
return;
}

$poolStruct = $this->sockets[$uri][$socketId];
if ($poolStruct->idleWatcher) {
$this->reactor->cancel($poolStruct->idleWatcher);
Expand All @@ -174,7 +170,7 @@ private function unloadSocket($uri, $socketId) {
}

private function dequeueNextWaitingSocket($uri) {
list($future, $options) = array_shift($this->queuedSocketRequests[$uri]);
list($future, $uri, $options) = array_shift($this->queuedSocketRequests[$uri]);
$this->initializeNewConnection($future, $uri, $options);
if (empty($this->queuedSocketRequests[$uri])) {
unset($this->queuedSocketRequests[$uri]);
Expand Down Expand Up @@ -248,9 +244,6 @@ public function setOption($option, $value) {
case self::OP_HOST_CONNECTION_LIMIT:
$this->options[self::OP_HOST_CONNECTION_LIMIT] = (int) $value;
break;
case self::OP_MAX_QUEUE_SIZE:
$this->options[self::OP_MAX_QUEUE_SIZE] = (int) $value;
break;
case self::OP_MS_CONNECT_TIMEOUT:
$this->options[self::OP_MS_CONNECT_TIMEOUT] = $value;
break;
Expand Down

0 comments on commit 97f6c9a

Please sign in to comment.