diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f925b9f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +config_withpasswords.php + diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/LICENSE-Phirehose b/LICENSE-Phirehose old mode 100644 new mode 100755 diff --git a/LICENSE-oauthlogin.php b/LICENSE-oauthlogin.php old mode 100644 new mode 100755 diff --git a/OAuth.php b/OAuth.php old mode 100644 new mode 100755 diff --git a/OauthPhirehose.php b/OauthPhirehose.php new file mode 100644 index 0000000..a8415a3 --- /dev/null +++ b/OauthPhirehose.php @@ -0,0 +1,157 @@ +consumerKey?$this->consumerKey:TWITTER_CONSUMER_KEY; + $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); + $oauth['oauth_signature_method'] = 'HMAC-SHA1'; + $oauth['oauth_timestamp'] = time(); + $oauth['oauth_version'] = '1.0'; + $oauth['oauth_token'] = $this->username; + if (isset($params['oauth_verifier'])) + { + $oauth['oauth_verifier'] = $params['oauth_verifier']; + unset($params['oauth_verifier']); + } + // encode all oauth values + foreach ($oauth as $k => $v) + $oauth[$k] = $this->encode_rfc3986($v); + + // encode all non '@' params + // keep sigParams for signature generation (exclude '@' params) + // rename '@key' to 'key' + $sigParams = array(); + $hasFile = false; + if (is_array($params)) + { + foreach ($params as $k => $v) + { + if (strncmp('@', $k, 1) !== 0) + { + $sigParams[$k] = $this->encode_rfc3986($v); + $params[$k] = $this->encode_rfc3986($v); + } + else + { + $params[substr($k, 1)] = $v; + unset($params[$k]); + $hasFile = true; + } + } + + if ($hasFile === true) + $sigParams = array(); + } + + $sigParams = array_merge($oauth, (array) $sigParams); + + // sorting + ksort($sigParams); + + // signing + $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); + return array('request' => $params, 'oauth' => $oauth); + } + + protected function encode_rfc3986($string) + { + return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); + } + + protected function generateSignature($method = null, $url = null, + $params = null) + { + if (empty($method) || empty($url)) + return false; + + // concatenating and encode + $concat = ''; + foreach ((array) $params as $key => $value) + $concat .= "{$key}={$value}&"; + $concat = substr($concat, 0, -1); + $concatenatedParams = $this->encode_rfc3986($concat); + + // normalize url + $urlParts = parse_url($url); + $scheme = strtolower($urlParts['scheme']); + $host = strtolower($urlParts['host']); + $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; + $retval = strtolower($scheme) . '://' . strtolower($host); + if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) + $retval .= ":{$port}"; + + $retval .= $urlParts['path']; + if (!empty($urlParts['query'])) + $retval .= "?{$urlParts['query']}"; + + $normalizedUrl = $this->encode_rfc3986($retval); + $method = $this->encode_rfc3986($method); // don't need this but why not? + + $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; + + # sign the signature string + $key = $this->encode_rfc3986($this->consumerSecret?$this->consumerSecret:TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password); + return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); + } + + protected function getOAuthHeader($method, $url, $params = array()) + { + $params = $this->prepareParameters($method, $url, $params); + $oauthHeaders = $params['oauth']; + + $urlParts = parse_url($url); + $oauth = 'OAuth realm="",'; + foreach ($oauthHeaders as $name => $value) + { + $oauth .= "{$name}=\"{$value}\","; + } + $oauth = substr($oauth, 0, -1); + + return $oauth; + } + + protected function getAuthorizationHeader() + { + $url = self::URL_BASE . $this->method . '.' . $this->format; + $urlParts = parse_url($url); + + // Setup params appropriately + $requestParams = array('delimited' => 'length'); + + // Filter takes additional parameters + if (count($this->trackWords) > 0) + { + $requestParams['track'] = implode(',', $this->trackWords); + } + if (count($this->followIds) > 0) + { + $requestParams['follow'] = implode(',', $this->followIds); + } + + return $this->getOAuthHeader('POST', $url, $requestParams); + } +} \ No newline at end of file diff --git a/Phirehose.php b/Phirehose.php old mode 100644 new mode 100755 index e6dcd1f..1cd41f5 --- a/Phirehose.php +++ b/Phirehose.php @@ -1,13 +1,12 @@ - * @version 0.2.4 ($Id: Phirehose.php 28 2010-04-07 01:44:43Z fenn.bailey $) + * @version 0.2.gitmaster */ abstract class Phirehose { @@ -15,20 +14,14 @@ abstract class Phirehose /** * Class constants */ - const URL_BASE = 'https://stream.twitter.com/1/statuses/'; + const URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; const FORMAT_JSON = 'json'; const FORMAT_XML = 'xml'; const METHOD_FILTER = 'filter'; const METHOD_SAMPLE = 'sample'; const METHOD_RETWEET = 'retweet'; const METHOD_FIREHOSE = 'firehose'; - const USER_AGENT = 'Phirehose/0.2.4 +http://code.google.com/p/phirehose/'; - const FILTER_CHECK_MIN = 5; - const FILTER_UPD_MIN = 120; - const TCP_BACKOFF = 1; - const TCP_BACKOFF_MAX = 16; - const HTTP_BACKOFF = 10; - const HTTP_BACKOFF_MAX = 240; + const METHOD_LINKS = 'links'; const EARTH_RADIUS_KM = 6371; @@ -39,7 +32,7 @@ abstract class Phirehose protected $password; protected $method; protected $format; - protected $count; + protected $count; //Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count protected $followIds; protected $trackWords; protected $locationBoxes; @@ -49,23 +42,119 @@ abstract class Phirehose // State vars protected $filterChanged; protected $reconnect; + + /** + * The number of tweets received per second in previous minute; calculated fresh + * just before each call to statusUpdate() + * I.e. if fewer than 30 tweets in last minute then this will be zero; if 30 to 90 then it + * will be 1, if 90 to 150 then 2, etc. + * + * @var integer + */ protected $statusRate; + protected $lastErrorNo; protected $lastErrorMsg; + + /** + * Number of tweets received. + * + * Note: by default this is the sum for last 60 seconds, and is therefore + * reset every 60 seconds. + * To change this behaviour write a custom statusUpdate() function. + * + * @var integer + */ + protected $statusCount=0; + + /** + * The number of calls to $this->checkFilterPredicates(). + * + * By default it is called every 5 seconds, so if doing statusUpdates every + * 60 seconds and then resetting it, this will usually be 12. + * + * @var integer + */ + protected $filterCheckCount=0; + + /** + * Total number of seconds (fractional) spent in the enqueueStatus() calls (i.e. the customized + * function that handles each received tweet). + * + * @var float + */ + protected $enqueueSpent=0; + + /** + * Total number of seconds (fractional) spent in the checkFilterPredicates() calls + * + * @var float + */ + protected $filterCheckSpent=0; + + /** + * Number of seconds since the last tweet arrived (or the keep-alive newline) + * + * @var integer + */ + protected $idlePeriod=0; + + /** + * The maximum value $this->idlePeriod has reached. + * + * @var integer + */ + protected $maxIdlePeriod=0; + + /** + * Time spent on each call to enqueueStatus() (i.e. average time spent, in milliseconds, + * spent processing received tweet). + * + * Simply: enqueueSpent divided by statusCount + * Note: by default, calculated fresh for past 60 seconds, every 60 seconds. + * + * @var float + */ + protected $enqueueTimeMS=0; + + /** + * Like $enqueueTimeMS but for the checkFilterPredicates() function. + * @var float + */ + protected $filterCheckTimeMS=0; + + /** + * Seconds since the last call to statusUpdate() + * + * Reset to zero after each call to statusUpdate() + * Highest value it should ever reach is $this->avgPeriod + * + * @var integer + */ + protected $avgElapsed=0; + // Config type vars - override in subclass if desired - protected $connectFailuresMax = 20; + protected $connectFailuresMax = 20; protected $connectTimeout = 5; protected $readTimeout = 5; protected $idleReconnectTimeout = 90; protected $avgPeriod = 60; - + protected $status_length_base = 10; + protected $userAgent = 'Phirehose/0.2.gitmaster +https://github.com/fennb/phirehose'; + protected $filterCheckMin = 5; + protected $filterUpdMin = 120; + protected $tcpBackoff = 1; + protected $tcpBackoffMax = 16; + protected $httpBackoff = 10; + protected $httpBackoffMax = 240; + /** - * Create a new Phirehose object attached to the appropriate twitter stream method. - * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER + * Create a new Phirehose object attached to the appropriate twitter stream method. + * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER, METHOD_LINKS * Formats are: FORMAT_JSON, FORMAT_XML * @see Phirehose::METHOD_SAMPLE * @see Phirehose::FORMAT_JSON - * + * * @param string $username Any twitter username * @param string $password Any twitter password * @param string $method @@ -80,17 +169,17 @@ public function __construct($username, $password, $method = Phirehose::METHOD_SA } /** - * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies + * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies * ("@user Hello!" created without pressing the reply button) are not matched. It is up to you to find the integer * IDs of each twitter user. * Applies to: METHOD_FILTER - * + * * @param array $userIds Array of Twitter integer userIDs */ public function setFollow($userIds) { $userIds = ($userIds === NULL) ? array() : $userIds; - sort($userIds); // Non-optimal but necessary + sort($userIds); // Non-optimal but necessary if ($this->followIds != $userIds) { $this->filterChanged = TRUE; } @@ -108,15 +197,15 @@ public function getFollow() } /** - * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring + * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring * punctuation. Phrases, keywords with spaces, are not supported. Queries are subject to Track Limitations. * Applies to: METHOD_FILTER - * + * * See: http://apiwiki.twitter.com/Streaming-API-Documentation#TrackLimiting * * @param array $trackWords */ - public function setTrack($trackWords) + public function setTrack(array $trackWords) { $trackWords = ($trackWords === NULL) ? array() : $trackWords; sort($trackWords); // Non-optimal, but necessary @@ -127,7 +216,7 @@ public function setTrack($trackWords) } /** - * Returns an array of keywords being tracked + * Returns an array of keywords being tracked * * @return array */ @@ -137,24 +226,24 @@ public function getTrack() } /** - * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , + * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , * . Only tweets that are both created using the Geotagging API and are placed from within a tracked * bounding box will be included in the stream. The user's location field is not used to filter tweets. Bounding boxes - * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with + * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with * track parameters, but note that all terms are logically ORd. - * + * * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * + * * Applies to: METHOD_FILTER - * + * * See: http://apiwiki.twitter.com/Streaming-API-Documentation#locations * - * Eg: + * Eg: * setLocations(array( * array(-122.75, 36.8, -121.75, 37.8), // San Francisco - * array(-74, 40, -73, 41), // New York + * array(-74, 40, -73, 41), // New York * )); - * + * * @param array $boundingBoxes */ public function setLocations($boundingBoxes) @@ -167,7 +256,7 @@ public function setLocations($boundingBoxes) // Sanity check if (count($boundingBox) != 4) { // Invalid - Not much we can do here but log error - $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']'); + $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']','error'); return FALSE; } // Append this lat/lon pairs to flattened array @@ -182,7 +271,7 @@ public function setLocations($boundingBoxes) } /** - * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the + * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the * Geotagging API. * * @see setLocations() @@ -201,19 +290,19 @@ public function getLocations() { } /** - * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually + * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually * specified bounding boxes. Each array element should contain 3 element subarray containing a latitude, longitude and * radius. Radius is specified in kilometers and is approximate (as boxes are square). * * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * - * Eg: + * + * Eg: * setLocationsByCircle(array( * array(144.9631, -37.8142, 30), // Melbourne, 3km radius - * array(-0.1262, 51.5001, 25), // London 10km radius + * array(-0.1262, 51.5001, 25), // London 10km radius * )); - * - * + * + * * @see setLocations() * @param array */ @@ -223,7 +312,7 @@ public function setLocationsByCircle($locations) { // Sanity check if (count($locTriplet) != 3) { // Invalid - Not much we can do here but log error - $this->log('Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']'); + $this->log('Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']','error'); return FALSE; } list($lon, $lat, $radius) = $locTriplet; @@ -237,8 +326,8 @@ public function setLocationsByCircle($locations) { // Add to bounding box array $boundingBoxes[] = array($minLon, $minLat, $maxLon, $maxLat); // Debugging is handy - $this->log('Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . - ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']'); + $this->log('Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . + ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']'); } // Set by bounding boxes $this->setLocations($boundingBoxes); @@ -247,21 +336,21 @@ public function setLocationsByCircle($locations) { /** * Sets the number of previous statuses to stream before transitioning to the live stream. Applies only to firehose * and filter + track methods. This is generally used internally and should not be needed by client applications. - * Applies to: METHOD_FILTER, METHOD_FIREHOSE - * + * Applies to: METHOD_FILTER, METHOD_FIREHOSE, METHOD_LINKS + * * @param integer $count */ public function setCount($count) { - $this->count = $count; + $this->count = $count; } /** * Connects to the stream API and consumes the stream. Each status update in the stream will cause a call to the * handleStatus() method. - * + * * @see handleStatus() - * @param boolean $reconnect Reconnects as per recommended + * @param boolean $reconnect Reconnects as per recommended * @throws ErrorException */ public function consume($reconnect = TRUE) @@ -276,7 +365,6 @@ public function consume($reconnect = TRUE) $this->reconnect(); // Init state - $statusCount = $filterCheckCount = $enqueueSpent = $filterCheckSpent = $idlePeriod = $maxIdlePeriod = 0; $lastAverage = $lastFilterCheck = $lastFilterUpd = $lastStreamActivity = time(); $fdw = $fde = NULL; // Placeholder write/error file descriptors for stream_select @@ -288,7 +376,7 @@ public function consume($reconnect = TRUE) */ if ((time() - $lastStreamActivity) > $this->idleReconnectTimeout) { $this->log('Idle timeout: No stream activity for > ' . $this->idleReconnectTimeout . ' seconds. ' . - ' Reconnecting.'); + ' Reconnecting.','info'); $this->reconnect(); $lastStreamActivity = time(); continue; @@ -300,77 +388,97 @@ public function consume($reconnect = TRUE) continue; // We need a newline } // Track maximum idle period - $idlePeriod = (time() - $lastStreamActivity); - $maxIdlePeriod = ($idlePeriod > $maxIdlePeriod) ? $idlePeriod : $maxIdlePeriod; + $this->idlePeriod = (time() - $lastStreamActivity); + $this->maxIdlePeriod = ($this->idlePeriod > $this->maxIdlePeriod) ? $this->idlePeriod : $this->maxIdlePeriod; // We got a newline, this is stream activity $lastStreamActivity = time(); // Read status length delimiter $delimiter = substr($this->buff, 0, $eol); $this->buff = substr($this->buff, $eol + 2); // consume off buffer, + 2 = "\r\n" - $statusLength = intval($delimiter); + $statusLength = intval($delimiter, $this->status_length_base); if ($statusLength > 0) { // Read status bytes and enqueue $bytesLeft = $statusLength - strlen($this->buff); - while ($bytesLeft > 0 && $this->conn !== NULL && !feof($this->conn) && ($numChanged = stream_select($this->fdrPool, $fdw, $fde, 0, 20000)) !== FALSE) { + while ( $bytesLeft > 0 + && $this->conn !== NULL + && !feof($this->conn) + && ($numChanged = stream_select($this->fdrPool, $fdw, $fde, 0, 20000)) !== FALSE + && (time() - $lastStreamActivity) <= $this->idleReconnectTimeout) { $this->fdrPool = array($this->conn); // Reassign $this->buff .= fread($this->conn, $bytesLeft); // Read until all bytes are read into buffer $bytesLeft = ($statusLength - strlen($this->buff)); } // Accrue/enqueue and track time spent enqueing - $statusCount ++; $enqueueStart = microtime(TRUE); $this->enqueueStatus($this->buff); - $enqueueSpent += (microtime(TRUE) - $enqueueStart); + $this->enqueueSpent += (microtime(TRUE) - $enqueueStart); + $this->statusCount++; } else { // Timeout/no data after readTimeout seconds } - // Calc counter averages - $avgElapsed = time() - $lastAverage; - if ($avgElapsed >= $this->avgPeriod) { - // Calc tweets-per-second - $this->statusRate = round($statusCount / $avgElapsed, 0); + // Calc counter averages + $this->avgElapsed = time() - $lastAverage; + if ($this->avgElapsed >= $this->avgPeriod) { + $this->statusRate = round($this->statusCount / $this->avgElapsed, 0); // Calc tweets-per-second // Calc time spent per enqueue in ms - $enqueueTimeMS = ($statusCount > 0) ? round($enqueueSpent / $statusCount * 1000, 2) : 0; + $this->enqueueTimeMS = ($this->statusCount > 0) ? round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; // Calc time spent total in filter predicate checking - $filterCheckTimeMS = ($filterCheckCount > 0) ? round($filterCheckSpent / $filterCheckCount * 1000, 2) : 0; - $this->log('Consume rate: ' . $this->statusRate . ' status/sec (' . $statusCount . ' total), avg ' . - 'enqueueStatus(): ' . $enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $filterCheckTimeMS . 'ms (' . - $filterCheckCount . ' total) over ' . $this->avgPeriod . ' seconds, max stream idle period: ' . - $maxIdlePeriod . ' seconds.'); - // Reset - $statusCount = $filterCheckCount = $enqueueSpent = $filterCheckSpent = $idlePeriod = $maxIdlePeriod = 0; + $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; + + $this->heartbeat(); + $this->statusUpdate(); $lastAverage = time(); } // Check if we're ready to check filter predicates - if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= self::FILTER_CHECK_MIN) { - $filterCheckCount ++; + if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= $this->filterCheckMin) { + $this->filterCheckCount++; $lastFilterCheck = time(); $filterCheckStart = microtime(TRUE); $this->checkFilterPredicates(); // This should be implemented in subclass if required - $filterCheckSpent += (microtime(TRUE) - $filterCheckStart); + $this->filterCheckSpent += (microtime(TRUE) - $filterCheckStart); } // Check if filter is ready + allowed to be updated (reconnect) - if ($this->filterChanged == TRUE && (time() - $lastFilterUpd) >= self::FILTER_UPD_MIN) { - $this->log('Reconnecting due to changed filter predicates.'); + if ($this->filterChanged == TRUE && (time() - $lastFilterUpd) >= $this->filterUpdMin) { + $this->log('Reconnecting due to changed filter predicates.','info'); $this->reconnect(); $lastFilterUpd = time(); } } // End while-stream-activity + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + // Some sort of socket error has occured $this->lastErrorNo = is_resource($this->conn) ? @socket_last_error($this->conn) : NULL; $this->lastErrorMsg = ($this->lastErrorNo > 0) ? @socket_strerror($this->lastErrorNo) : 'Socket disconnected'; - $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg); - - // Reconnect + $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg,'error'); + + // Reconnect } while ($this->reconnect); // Exit $this->log('Exiting.'); } + + + /** + * Called every $this->avgPeriod (default=60) seconds, and this default implementation + * calculates some rates, logs them, and resets the counters. + */ + protected function statusUpdate() + { + $this->log('Consume rate: ' . $this->statusRate . ' status/sec (' . $this->statusCount . ' total), avg ' . + 'enqueueStatus(): ' . $this->enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $this->filterCheckTimeMS . 'ms (' . + $this->filterCheckCount . ' total) over ' . $this->avgElapsed . ' seconds, max stream idle period: ' . + $this->maxIdlePeriod . ' seconds.'); + // Reset + $this->statusCount = $this->filterCheckCount = $this->enqueueSpent = 0; + $this->filterCheckSpent = $this->idlePeriod = $this->maxIdlePeriod = 0; + } /** * Returns the last error message (TCP or HTTP) that occured with the streaming API or client. State is cleared upon @@ -383,11 +491,11 @@ public function getLastErrorMsg() } /** - * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the + * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the * fsockopen() error states (in the case of TCP errors) or HTTP error codes from Twitter (in the case of HTTP errors). - * + * * State is cleared upon successful reconnect. - * + * * @return string */ public function getLastErrorNo() @@ -400,13 +508,13 @@ public function getLastErrorNo() * Connects to the stream URL using the configured method. * @throws ErrorException */ - protected function connect() + protected function connect() { // Init state $connectFailures = 0; - $tcpRetry = self::TCP_BACKOFF / 2; - $httpRetry = self::HTTP_BACKOFF / 2; + $tcpRetry = $this->tcpBackoff / 2; + $httpRetry = $this->httpBackoff / 2; // Keep trying until connected (or max connect failures exceeded) do { @@ -419,7 +527,6 @@ protected function connect() // Construct URL/HTTP bits $url = self::URL_BASE . $this->method . '.' . $this->format; $urlParts = parse_url($url); - $authCredentials = base64_encode($this->username . ':' . $this->password); // Setup params appropriately $requestParams = array('delimited' => 'length'); @@ -429,56 +536,57 @@ protected function connect() $requestParams['track'] = implode(',', $this->trackWords); } if ($this->method == self::METHOD_FILTER && count($this->followIds) > 0) { - $requestParams['follow'] = implode(',', $this->followIds); + $requestParams['follow'] = implode(',', $this->followIds); } if ($this->method == self::METHOD_FILTER && count($this->locationBoxes) > 0) { - $requestParams['locations'] = implode(',', $this->locationBoxes); + $requestParams['locations'] = implode(',', $this->locationBoxes); } - if ($this->count > 0) { + if ($this->count <> 0) { $requestParams['count'] = $this->count; } // Debugging is useful - $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', + $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', var_export($requestParams, TRUE))); /** - * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native + * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). */ $errNo = $errStr = NULL; $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; + $port = ($urlParts['scheme'] == 'https') ? 443 : 80; /** - * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and + * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and * PHP appears to cache it the result if in a long running process (as per Phirehose). */ $streamIPs = gethostbynamel($urlParts['host']); - if (count($streamIPs) == 0) { - throw new ErrorException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); + if(empty($streamIPs)) { + throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); } // Choose one randomly (if more than one) $this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs)); $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; - $this->log('Connecting to ' . $streamIP); + $this->log("Connecting to {$scheme}{$streamIP}, port={$port}, connectTimeout={$this->connectTimeout}"); - @$this->conn = fsockopen($scheme . $streamIP, 443, $errNo, $errStr, $this->connectTimeout); + @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); // No go - handle errors/backoff if (!$this->conn || !is_resource($this->conn)) { $this->lastErrorMsg = $errStr; $this->lastErrorNo = $errNo; - $connectFailures ++; + $connectFailures++; if ($connectFailures > $this->connectFailuresMax) { $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg); - throw new ErrorException($msg, $errNo); // Throw an exception for other code to handle + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle } // Increase retry/backoff up to max - $tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry * 2 : self::TCP_BACKOFF_MAX; + $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.'); + $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); sleep($tcpRetry); continue; } @@ -492,20 +600,35 @@ protected function connect() stream_set_blocking($this->conn, 1); // Encode request data - $postData = http_build_query($requestParams); + $postData = http_build_query($requestParams, NULL, '&'); + $postData = str_replace('+','%20',$postData); //Change it from RFC1738 to RFC3986 (see + //enc_type parameter in http://php.net/http_build_query and note that enc_type is + //not available as of php 5.3) + $authCredentials = $this->getAuthorizationHeader(); // Do it fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n"); + fwrite($this->conn, "Host: " . $urlParts['host'] . ':' . $port . "\r\n"); fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); fwrite($this->conn, "Accept: */*\r\n"); - fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n"); - fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); + fwrite($this->conn, 'Authorization: ' . $authCredentials . "\r\n"); + fwrite($this->conn, 'User-Agent: ' . $this->userAgent . "\r\n"); fwrite($this->conn, "\r\n"); fwrite($this->conn, $postData . "\r\n"); fwrite($this->conn, "\r\n"); + $this->log("POST " . $urlParts['path'] . " HTTP/1.0\r\n"); + $this->log("Host: " . $urlParts['host'] . ':' . $port . "\r\n"); + $this->log("Content-type: application/x-www-form-urlencoded\r\n"); + $this->log("Content-length: " . strlen($postData) . "\r\n"); + $this->log("Accept: */*\r\n"); + $this->log('Authorization: ' . $authCredentials . "\r\n"); + $this->log('User-Agent: ' . $this->userAgent . "\r\n"); + $this->log("\r\n"); + $this->log($postData . "\r\n"); + $this->log("\r\n"); + // First line is response list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); @@ -519,7 +642,7 @@ protected function connect() // If we got a non-200 response, we need to backoff and retry if ($httpCode != 200) { - $connectFailures ++; + $connectFailures++; // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) while ($bLine = trim(fgets($this->conn, 4096))) { @@ -527,7 +650,7 @@ protected function connect() } // Construct error - $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; + $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; // Set last error state $this->lastErrorMsg = $errStr; @@ -536,13 +659,13 @@ protected function connect() // Have we exceeded maximum failures? if ($connectFailures > $this->connectFailuresMax) { $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg); - throw new ErrorException($msg); // We eventually throw an exception for other code to handle + $this->log($msg,'error'); + throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle } // Increase retry/backoff up to max - $httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ? $httpRetry * 2 : self::HTTP_BACKOFF_MAX; + $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . '. Sleeping for ' . $httpRetry . ' seconds.'); + $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); sleep($httpRetry); continue; @@ -556,7 +679,7 @@ protected function connect() $this->lastErrorMsg = NULL; $this->lastErrorNo = NULL; - // Switch to non-blocking to consume the stream (important) + // Switch to non-blocking to consume the stream (important) stream_set_blocking($this->conn, 0); // Connect always causes the filterChanged status to be cleared @@ -567,18 +690,24 @@ protected function connect() $this->buff = ''; } + + protected function getAuthorizationHeader() + { + $authCredentials = base64_encode($this->username . ':' . $this->password); + return "Basic: ".$authCredentials; + } /** * Method called as frequently as practical (every 5+ seconds) that is responsible for checking if filter predicates * (ie: track words or follow IDs) have changed. If they have, they should be set using the setTrack() and setFollow() - * methods respectively within the overridden implementation. - * + * methods respectively within the overridden implementation. + * * Note that even if predicates are changed every 5 seconds, an actual reconnect will not happen more frequently than * every 2 minutes (as per Twitter Streaming API documentation). - * - * Note also that this method is called upon every connect attempt, so if your predicates are causing connection + * + * Note also that this method is called upon every connect attempt, so if your predicates are causing connection * errors, they should be checked here and corrected. - * + * * This should be implemented/overridden in any subclass implementing the FILTER method. * * @see setTrack() @@ -596,8 +725,13 @@ protected function checkFilterPredicates() * * @see error_log() * @param string $messages + * @param String $level 'error', 'info', 'notice'. Defaults to 'notice', so you should set this + * parameter on the more important error messages. + * 'info' is used for problems that the class should be able to recover from automatically. + * 'error' is for exceptional conditions that may need human intervention. (For instance, emailing + * them to a system administrator may make sense.) */ - protected function log($message) + protected function log($message,$level='notice') { @error_log('Phirehose: ' . $message, 0); } @@ -624,15 +758,26 @@ private function reconnect() $reconnect = $this->reconnect; $this->disconnect(); // Implicitly sets reconnect to FALSE $this->reconnect = $reconnect; // Restore state to prev - $this->connect(); + $this->connect(); } /** * This is the one and only method that must be implemented additionally. As per the streaming API documentation, - * statuses should NOT be processed within the same process that is performing collection + * statuses should NOT be processed within the same process that is performing collection * * @param string $status */ abstract public function enqueueStatus($status); - + + /** + * Reports a periodic heartbeat. Keep execution time minimal. + * + * @return NULL + */ + public function heartbeat() {} + } // End of class + +class PhirehoseException extends Exception {} +class PhirehoseNetworkException extends PhirehoseException {} +class PhirehoseConnectLimitExceeded extends PhirehoseException {} \ No newline at end of file diff --git a/README b/README old mode 100644 new mode 100755 diff --git a/SQL b/SQL old mode 100644 new mode 100755 diff --git a/apiGetTweets.php b/apiGetTweets.php old mode 100644 new mode 100755 diff --git a/apiListArchives.php b/apiListArchives.php old mode 100644 new mode 100755 diff --git a/archive.php b/archive.php old mode 100644 new mode 100755 diff --git a/callback.php b/callback.php old mode 100644 new mode 100755 diff --git a/clearsessions.php b/clearsessions.php old mode 100644 new mode 100755 diff --git a/config.php b/config.php index 66c66bc..67d28ff 100644 --- a/config.php +++ b/config.php @@ -1,72 +1,72 @@ -connection = mysql_connect(DB_SERVER, DB_USER, DB_PASS) or die(mysql_error()); - mysql_select_db(DB_NAME, $this->connection) or die(mysql_error()); - } - -} -$db = new MySQLDB; - -?> +connection = mysql_connect(DB_SERVER, DB_USER, DB_PASS) or die(mysql_error()); + mysql_select_db(DB_NAME, $this->connection) or die(mysql_error()); + } + +} +$db = new MySQLDB; + +?> \ No newline at end of file diff --git a/create.php b/create.php old mode 100644 new mode 100755 diff --git a/delete.php b/delete.php old mode 100644 new mode 100755 diff --git a/excel.php b/excel.php old mode 100644 new mode 100755 diff --git a/function.php b/function.php old mode 100644 new mode 100755 diff --git a/index.php b/index.php old mode 100644 new mode 100755 diff --git a/oauthlogin.php b/oauthlogin.php old mode 100644 new mode 100755 diff --git a/rss.php b/rss.php old mode 100644 new mode 100755 diff --git a/startarchiving.php b/startarchiving.php old mode 100644 new mode 100755 diff --git a/stoparchiving.php b/stoparchiving.php old mode 100644 new mode 100755 diff --git a/table.php b/table.php old mode 100644 new mode 100755 diff --git a/twitteroauth.php b/twitteroauth.php old mode 100644 new mode 100755 index 5ee7399..dcf18ac --- a/twitteroauth.php +++ b/twitteroauth.php @@ -18,7 +18,7 @@ class TwitterOAuth { /* Contains the last API call. */ public $url; /* Set up the API root URL. */ - public $host = "https://api.twitter.com/1/"; + public $host = "https://api.twitter.com/1.1/"; /* Set timeout default. */ public $timeout = 30; /* Set connect timeout. */ diff --git a/twitteroauth_search.php b/twitteroauth_search.php old mode 100644 new mode 100755 diff --git a/update.php b/update.php old mode 100644 new mode 100755 diff --git a/yourtwapperkeeper_crawl.php b/yourtwapperkeeper_crawl.php old mode 100644 new mode 100755 index d27699a..518537e --- a/yourtwapperkeeper_crawl.php +++ b/yourtwapperkeeper_crawl.php @@ -2,7 +2,7 @@ // load important files require_once('config.php'); require_once('function.php'); -require_once('twitteroauth_search.php'); +require_once('twitteroauth.php'); // setup values $pid = getmypid(); @@ -27,35 +27,66 @@ echo $row_archives['id']." - ".$row_archives['keyword']."\n"; // Loop for 15 pages + $max_id = NULL; + for ($page_counter = 1; $page_counter <=15 ; $page_counter = $page_counter + 1) { - - $search = $connection->get('search.twitter.com/search', array('q' => $row_archives['keyword'], 'rpp' => 100, 'page' => $page_counter)); + echo "****TIME AROUND = ".$page_counter."****\n"; + if ($max_id == NULL) { + $search = $connection->get('search/tweets', array('q' => $row_archives['keyword'], 'count'=>100)); + echo "NO - no max_id is not set\n"; + + } else { + $search = $connection->get('search/tweets', array('q' => $row_archives['keyword'], 'count'=>100, 'max_id'=>$max_id)); + echo "YES - max_id is set\n"; + } + + $searchresult = get_object_vars($search); - $count = count($searchresult['results']); + $count = count($searchresult['statuses']); // parse results - foreach ($searchresult['results'] as $key=>$value) { + foreach ($searchresult['statuses'] as $key=>$value) { $value = get_object_vars($value); // extract data - extract($value,EXTR_PREFIX_ALL,'temp'); + //extract($value,EXTR_PREFIX_ALL,'temp'); + $temp_text = $value['text']; + $temp_to_user_id = $value['in_reply_to_user_id']; + $temp_from_user = $value['user']->screen_name; + $temp_id = $value['id_str']; + $temp_from_user_id = $value['user']->id; + $temp_iso_language_code = $value['metadata']->iso_language_code; + $temp_source = $value['source']; + $temp_profile_image_url = $value['user']->profile_background_image_url; + $temp_created_at = $value['created_at']; - // extract geo information - $geo = get_object_vars($temp_geo); - $geo_type = $geo['type']; - $geo_coordinates_0 = $geo['coordinates'][0]; - $geo_coordinates_1 = $geo['coordinates'][1]; + // extract geo information + if ($value['geo'] != NULL) { + $geo = get_object_vars($value['geo']); + $geo_type = $geo['type']; + $geo_coordinates_0 = $geo['coordinates'][0]; + $geo_coordinates_1 = $geo['coordinates'][1]; + } else { + $geo_type = NULL; + $geo_coordinates_0 = 0; + $geo_coordinates_1 = 0; + } + // duplicate record check and insert into proper cache table if not a duplicate $q_check = "select id from z_".$row_archives['id']." where id = '".$value['id']."'"; $result_check = mysql_query($q_check, $db->connection); if (mysql_numrows($result_check)==0) { $q = "insert into z_".$row_archives['id']." values ('twitter-search','".mysql_real_escape_string($temp_text)."','$temp_to_user_id','$temp_from_user','$temp_id','$temp_from_user_id','$temp_iso_language_code','$temp_source','$temp_profile_image_url','$geo_type','$geo_coordinates_0','$geo_coordinates_1','$temp_created_at','".strtotime($temp_created_at)."')"; + mysql_query($q, $db->connection); - echo "[".$row['id']."-".$row['keyword']."] $page_counter - $temp_id - insert\n"; - } else {echo "$page_counter - $temp_id - duplicate\n";} + echo "[".$row_archives['id']."-".$row_archives['keyword']."] $page_counter - $temp_id - insert\n"; + } else { + echo "[".$row_archives['id']."-".$row_archives['keyword']."] $page_counter - $temp_id - duplicate\n"; + } + $max_id = $temp_id; // resetting to lowest tweet id } // If count for page is less than 100, break since there is no reason to keep going @@ -63,19 +94,10 @@ break; } + echo "\nmaxid = $max_id.\n"; + } - // adjust sleep if being rate limited - $rate_check = $connection->get('api.twitter.com/1/account/rate_limit_status'); - echo "rate left = ".$rate_check->remaining_hits."\n"; - if ($rate_check->remaining_hits < 1) { - $sleep = $sleep * 2; - } else { - if ($sleep > $twitter_api_sleep_min) { - $sleep = $sleep / 2; - } - } - // update counts $q_count_total = "select count(id) from z_".$row_archives['id']; $r_count_total = mysql_query($q_count_total, $db->connection); @@ -88,6 +110,7 @@ mysql_query("update processes set last_ping = '".time()."' where pid = '$pid'", $db->connection); echo "update pid\n"; } + diff --git a/yourtwapperkeeper_stream.php b/yourtwapperkeeper_stream.php old mode 100644 new mode 100755 index 9bbf479..6a0684f --- a/yourtwapperkeeper_stream.php +++ b/yourtwapperkeeper_stream.php @@ -1,10 +1,13 @@ connection); + echo "."; + } } @@ -70,5 +75,5 @@ public function checkFilterPredicates() } // Start streaming -$sc = new DynamicTrackConsumer($tk_twitter_username, $tk_twitter_password, Phirehose::METHOD_FILTER); +$sc = new DynamicTrackConsumer($tk_oauth_token, $tk_oauth_token_secret, Phirehose::METHOD_FILTER); $sc->consume(); \ No newline at end of file diff --git a/yourtwapperkeeper_stream_process.php b/yourtwapperkeeper_stream_process.php old mode 100644 new mode 100755