Skip to content

Commit

Permalink
Merge pull request #29 from tidal/develop
Browse files Browse the repository at this point in the history
monitor trait refactoring
  • Loading branch information
tidal authored Jul 18, 2016
2 parents 2651dd2 + ace29e6 commit a129fb3
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 61 deletions.
78 changes: 76 additions & 2 deletions src/Tidal/WampWatch/MonitorTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
namespace Tidal\WampWatch;

use Evenement\EventEmitterTrait;
use React\Promise\Promise;
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
use Tidal\WampWatch\Subscription\Collection as SubscriptionCollection;

/**
* Description of MonitorTrait.
Expand All @@ -31,12 +33,32 @@ trait MonitorTrait
protected $session;

/**
* Wether the monitor is running.
* if the monitor is running.
*
* @var bool
*/
protected $isRunning = false;

/**
* @var SubscriptionCollection
*/
protected $subscriptionCollection;

/**
* @var string
*/
protected $initialCallProcedure;

/**
* @var callable
*/
protected $initialCallCallback;

/**
* @var bool
*/
protected $initialCallDone = false;

/**
* @param ClientSession $session
*/
Expand All @@ -60,7 +82,7 @@ public function start()

/**
* Stop the monitor.
* Returns boolean wether the monitor could be started.
* Returns boolean if the monitor could be started.
*
* @return bool
*/
Expand All @@ -69,6 +91,9 @@ public function stop()
if (!$this->isRunning()) {
return false;
}

$this->getSubscriptionCollection()->unsubscribe();

$this->isRunning = false;
$this->emit('stop', [$this]);

Expand Down Expand Up @@ -99,4 +124,53 @@ public function isRunning()
{
return $this->isRunning;
}

/**
* @return \Tidal\WampWatch\Subscription\Collection
*/
protected function getSubscriptionCollection()
{
return isset($this->subscriptionCollection)
? $this->subscriptionCollection
: $this->subscriptionCollection = new SubscriptionCollection($this->session);
}

protected function setInitialCall($pocedure, callable $callback)
{
$this->initialCallProcedure = (string) $pocedure;
$this->initialCallCallback = $callback;
}

/**
* @return \React\Promise\PromiseInterface
*/
protected function callInitialProcedure()
{
if (!isset($this->initialCallProcedure) || !isset($this->initialCallCallback)) {
$resolver = function (callable $resolve) {
$resolve();
};

return new Promise($resolver);
}

return $this->session->call($this->initialCallProcedure)->then(function ($res) {
$this->initialCallDone = true;

return $res;
});
}

/**
* Checks if all necessary subscriptions and calls have been responded to.
*/
protected function checkStarted()
{
if ($this->getSubscriptionCollection()->isSubscribed() &&
$this->initialCallDone &&
!$this->isRunning()
) {
$this->doStart();
}
}
}
88 changes: 32 additions & 56 deletions src/Tidal/WampWatch/SessionMonitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use Evenement\EventEmitterInterface;
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
use Thruway\Message\SubscribedMessage;

/**
* Description of SessionMonitor.
Expand All @@ -24,7 +23,6 @@ class SessionMonitor implements MonitorInterface, EventEmitterInterface
{
use MonitorTrait {
start as doStart;
stop as doStop;
}

const SESSION_JOIN_TOPIC = 'wamp.session.on_join';
Expand Down Expand Up @@ -70,7 +68,10 @@ public function __construct(ClientSession $session)
*/
public function start()
{
$this->startSubscriptions();
$this->initSetupCalls();
$this->getSubscriptionCollection()->subscribe()->done(function () {
$this->checkStarted();
});
$this->retrieveSessionIds();

return true;
Expand All @@ -81,29 +82,14 @@ public function start()
*/
protected function checkStarted()
{
if ($this->joinSubscriptionId > 0 &&
$this->leaveSubscriptionId > 0 &&
if ($this->getSubscriptionCollection()->isSubscribed() &&
$this->calledList &&
!$this->isRunning()
) {
$this->doStart();
}
}

/**
* Stop the monitor.
* Returns boolean wether the monitor could be started.
*
* @return bool
*/
public function stop()
{
$this->stopSubscriptions();
$this->doStop();

return true;
}

/**
* Retrieves the session-info for given sessionId
* and populates it in via given callback.
Expand Down Expand Up @@ -214,44 +200,28 @@ protected function validateSessionInfo($sessionInfo)
/**
* Initializes the subscription to the meta-events.
*/
protected function startSubscriptions()
protected function initSetupCalls()
{
// subscription to 'wamp.session.on_join'
$this->session->subscribe(self::SESSION_JOIN_TOPIC, function (array $res) {
// @var \Tidal\WampWatch\Subscription\Collection
$collection = $this->getSubscriptionCollection();

$collection->addSubscription(self::SESSION_JOIN_TOPIC, function (array $res) {
$sessionInfo = $res[0];
if (!$this->validateSessionInfo($sessionInfo) || $this->hasSession($sessionInfo)) {
return;
}
$this->addSession($sessionInfo);
})->then(function (SubscribedMessage $msg) {
$this->joinSubscriptionId = $msg->getSubscriptionId();
$this->checkStarted();
});

// subscription to 'wamp.session.on_leave'
$this->session->subscribe(self::SESSION_LEAVE_TOPIC, function (array $res) {
$collection->addSubscription(self::SESSION_LEAVE_TOPIC, function (array $res) {
// @bug : wamp.session.on_leave is bugged as of crossbar.io 0.11.0
// will provide sessionID when Browser closes/reloads,
// but not when calling connection.close();
$sessionId = (int) $res[0];
$this->removeSessionId($sessionId);
})->then(function (SubscribedMessage $msg) {
$this->leaveSubscriptionId = $msg->getSubscriptionId();
$this->checkStarted();
});
}

/**
* Unsubscribes from the meta-events.
*/
protected function stopSubscriptions()
{
if ($this->joinSubscriptionId > 0) {
Util::unsubscribe($this->session, $this->joinSubscriptionId);
}
if ($this->leaveSubscriptionId > 0) {
Util::unsubscribe($this->session, $this->leaveSubscriptionId);
}
$this->setInitialCall(self::SESSION_LIST_TOPIC, $this->getSessionIdRetrievalCallback());
}

/**
Expand All @@ -261,22 +231,28 @@ protected function stopSubscriptions()
*/
protected function retrieveSessionIds(callable $callback = null)
{
$this->session->call(self::SESSION_LIST_TOPIC, [])->then(
function ($res) use ($callback) {
// remove our own sessionID from the tracked sessions
$sessionIds = $this->removeOwnSessionId($res[0]);
$this->setList($sessionIds);
$this->emit('list', [$this->getList()]);
$this->session->call(self::SESSION_LIST_TOPIC, [])
->then(
$this->getSessionIdRetrievalCallback()
)->done(function ($res) use ($callback) {
if ($callback !== null) {
$callback($this->sessionIds);
$callback($res);
}
$this->calledList = true;
$this->checkStarted();
},
function ($error) {
$this->emit('error', [$error]);
}
);
});
}

protected function getSessionIdRetrievalCallback()
{
return function ($res) {
// remove our own sessionID from the tracked sessions
$sessionIds = $this->removeOwnSessionId($res[0]);
$this->setList($sessionIds);
$this->emit('list', [$this->getList()]);
$this->calledList = true;
$this->checkStarted();

return $this->getList();
};
}

protected function setList($list)
Expand Down
6 changes: 3 additions & 3 deletions src/Tidal/WampWatch/Stub/ClientSessionStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@ public function subscribe($topicName, callable $callback, $options = null)
*
* @param $topicName
* @param $requestId
* @param $sessionId
* @param $subscriptionId
*
* @throws UnknownTopicException if the topic is unknown
*/
public function completeSubscription($topicName, $requestId = 1, $sessionId = 1)
public function completeSubscription($topicName, $requestId = 1, $subscriptionId = 1)
{
if (!isset($this->subscriptions[$topicName])) {
throw new UnknownTopicException($topicName);
}

/* @var $futureResult Deferred */
$futureResult = $this->subscriptions[$topicName];
$result = new SubscribedMessage($requestId, $sessionId);
$result = new SubscribedMessage($requestId, $subscriptionId);

$futureResult->resolve($result);
}
Expand Down
Loading

0 comments on commit a129fb3

Please sign in to comment.