Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling on persistent connexion #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 150 additions & 27 deletions src/Adoy/FastCGI/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class Client
*/
private $_keepAlive = false;

/**
* Timer
* @var Float
*/
public $start = false;

/**
* Format response
* @var Boolean
*/
private $_formatResponse = true;

/**
* Outstanding request statuses keyed by request id
*
Expand Down Expand Up @@ -131,11 +143,13 @@ class Client
*
* @param String $host Host of the FastCGI application
* @param Integer $port Port of the FastCGI application
* @param Boolean $formatResponse format FastCGI response
*/
public function __construct($host, $port)
public function __construct($host, $port, $formatResponse = true)
{
$this->_host = $host;
$this->_port = $port;
$this->_formatResponse = $formatResponse;
}

/**
Expand All @@ -148,7 +162,7 @@ public function setKeepAlive($b)
{
$this->_keepAlive = (boolean)$b;
if (!$this->_keepAlive && $this->_sock) {
fclose($this->_sock);
$this->close();
}
}

Expand All @@ -173,7 +187,7 @@ public function setPersistentSocket($b)
$was_persistent = ($this->_sock && $this->_persistentSocket);
$this->_persistentSocket = (boolean)$b;
if (!$this->_persistentSocket && $was_persistent) {
fclose($this->_sock);
$this->close();
}
}

Expand Down Expand Up @@ -276,14 +290,14 @@ private function buildPacket($type, $content, $requestId = 1)
{
$clen = strlen($content);
return chr(self::VERSION_1) /* version */
. chr($type) /* type */
. chr(($requestId >> 8) & 0xFF) /* requestIdB1 */
. chr($requestId & 0xFF) /* requestIdB0 */
. chr(($clen >> 8 ) & 0xFF) /* contentLengthB1 */
. chr($clen & 0xFF) /* contentLengthB0 */
. chr(0) /* paddingLength */
. chr(0) /* reserved */
. $content; /* content */
. chr($type) /* type */
. chr(($requestId >> 8) & 0xFF) /* requestIdB1 */
. chr($requestId & 0xFF) /* requestIdB0 */
. chr(($clen >> 8 ) & 0xFF) /* contentLengthB1 */
. chr($clen & 0xFF) /* contentLengthB0 */
. chr(0) /* paddingLength */
. chr(0) /* reserved */
. $content; /* content */
}

/**
Expand Down Expand Up @@ -437,12 +451,12 @@ public function request(array $params, $stdin)

/**
* Execute a request to the FastCGI application asyncronously
*
*
* This sends request to application and returns the assigned ID for that request.
*
* You should keep this id for later use with wait_for_response(). Ids are chosen randomly
* rather than seqentially to guard against false-positives when using persistent sockets.
* In that case it is possible that a delayed response to a request made by a previous script
* In that case it is possible that a delayed response to a request made by a previous script
* invocation comes back on this socket and is mistaken for response to request made with same ID
* during this request.
*
Expand All @@ -452,18 +466,19 @@ public function request(array $params, $stdin)
*/
public function async_request(array $params, $stdin)
{
$this->start = microtime(true);
$id = mt_rand(1, (1 << 16) - 1);
$this->connect();

// Pick random number between 1 and max 16 bit unsigned int 65535
$id = mt_rand(1, (1 << 16) - 1);

// Using persistent sockets implies you want them keept alive by server!
$keepAlive = intval($this->_keepAlive || $this->_persistentSocket);

$request = $this->buildPacket(self::BEGIN_REQUEST
,chr(0) . chr(self::RESPONDER) . chr($keepAlive) . str_repeat(chr(0), 5)
,$id
);
,chr(0) . chr(self::RESPONDER) . chr($keepAlive) . str_repeat(chr(0), 5)
,$id
);

$paramsRequest = '';
foreach ($params as $key => $value) {
Expand All @@ -488,7 +503,7 @@ public function async_request(array $params, $stdin)
}

// Broken pipe, tear down so future requests might succeed
fclose($this->_sock);
$this->close();
throw new \Exception('Failed to write request to socket');
}

Expand All @@ -502,10 +517,10 @@ public function async_request(array $params, $stdin)

/**
* Blocking call that waits for response to specific request
*
*
* @param Integer $requestId
* @param Integer $timeoutMs [optional] the number of milliseconds to wait. Defaults to the ReadWriteTimeout value set.
* @return string response body
* @return array response body
*/
public function wait_for_response($requestId, $timeoutMs = 0) {

Expand All @@ -516,8 +531,12 @@ public function wait_for_response($requestId, $timeoutMs = 0) {
// If we already read the response during an earlier call for different id, just return it
if ($this->_requests[$requestId]['state'] == self::REQ_STATE_OK
|| $this->_requests[$requestId]['state'] == self::REQ_STATE_ERR
) {
return $this->_requests[$requestId]['response'];
) {
if ($this->_formatResponse) {
return self::formatResponse($this->_requests[$requestId]['response']);
} else {
return $this->_requests[$requestId]['response'];
}
}

if ($timeoutMs > 0) {
Expand All @@ -542,14 +561,14 @@ public function wait_for_response($requestId, $timeoutMs = 0) {
}
if ($resp['type'] == self::END_REQUEST) {
$this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_OK;
if ($resp['requestId'] == $requestId) {
if ($resp['requestId'] == $requestId) {
break;
}
}
if (microtime(true) - $startTime >= ($timeoutMs * 1000)) {
// Reset
$this->set_ms_timeout($this->_readWriteTimeout);
throw new \Exception('Timed out');
throw new \TimedOutException('Timed out');
}
} while ($resp);

Expand All @@ -564,8 +583,8 @@ public function wait_for_response($requestId, $timeoutMs = 0) {
}

if ($info['unread_bytes'] == 0
&& $info['blocked']
&& $info['eof']) {
&& $info['blocked']
&& $info['eof']) {
throw new ForbiddenException('Not in white list. Check listen.allowed_clients.');
}

Expand All @@ -586,7 +605,111 @@ public function wait_for_response($requestId, $timeoutMs = 0) {
throw new \Exception('Role value not known [UNKNOWN_ROLE]');
break;
case self::REQUEST_COMPLETE:
return $this->_requests[$requestId]['response'];
if ($this->_formatResponse) {
return self::formatResponse($this->_requests[$requestId]['response']);
} else {
return $this->_requests[$requestId]['response'];
}
}
}

/**
* formatResponse()
*
* Format the response into an array with separate statusCode, headers, body, and error output.
*
* @param $stdout The plain, unformatted response.
* @param $stderr The plain, unformatted error output.
*
* @return array An array containing the headers and body content.
*/
private static function formatResponse($stdout, $stderr = null) {

// Split the header from the body. Split on \n\n.
$doubleCr = strpos($stdout, "\r\n\r\n");
$rawHeader = substr($stdout, 0, $doubleCr);
$rawBody = substr($stdout, $doubleCr, strlen($stdout));

// Format the header.
$header = array();
$headerLines = explode("\n", $rawHeader);

// Initialize the status code and the status header
$code = '200';
$headerStatus = '200 OK';

// Iterate over the headers found in the response.
foreach ($headerLines as $line) {

// Extract the header data.
if (preg_match('/([\w-]+):\s*(.*)$/', $line, $matches)) {

// Initialize header name/value.
$headerName = strtolower($matches[1]);
$headerValue = trim($matches[2]);

// If we found an status header (will only be available if not have a 200).
if ($headerName == 'status') {

// Initialize the status header and the code.
$headerStatus = $headerValue;
$code = $headerValue;
if (false !== ($pos = strpos($code, ' '))) {
$code = substr($code, 0, $pos);
}
}

// We need to know if this header is already availble
if (array_key_exists($headerName, $header)) {

// Check if the value is an array already
if (is_array($header[$headerName])) {
// Simply append the next header value
$header[$headerName][] = $headerValue;
} else {
// Convert the existing value into an array and append the new header value
$header[$headerName] = array($header[$headerName], $headerValue);
}

} else {
$header[$headerName] = $headerValue;
}
}
}

// Set the status header finally
$header['status'] = $headerStatus;

if (false === ctype_digit($code)) {
throw new Exception("Unrecognizable status code returned from fastcgi: $code");
}

return array(
'statusCode' => (int) $code,
'headers' => $header,
'body' => trim($rawBody),
'stderr' => $stderr,
);
}

/**
* Close socket on client destruct
*
* @return void
*/
public function close() {
if ($this->_sock) {
fclose($this->_sock);
unset($this->_sock);
}
}

/**
* Destructor
*
* @return void
*/
public function __destruct() {
$this->close();
}
}