From fe434f3e34356eb7d8c86fc49ff7d5f3a1a251a9 Mon Sep 17 00:00:00 2001 From: heromyth Date: Tue, 7 Apr 2020 14:09:52 +0800 Subject: [PATCH] Add MemoryQueueWorker --- .gitignore | 1 + .../framework/provider/QueueServiceProvider.d | 3 +- source/hunt/framework/queue/AmqpQueueWorker.d | 8 ++- .../hunt/framework/queue/MemoryQueueWorker.d | 65 ++++++++++++++----- source/hunt/framework/queue/package.d | 4 +- 5 files changed, 58 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index d97a3a087..66969c7ba 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ docs/ # Others /java hunt-framework-test-* +temp.txt diff --git a/source/hunt/framework/provider/QueueServiceProvider.d b/source/hunt/framework/provider/QueueServiceProvider.d index 2eb4db169..e1b3f3e64 100644 --- a/source/hunt/framework/provider/QueueServiceProvider.d +++ b/source/hunt/framework/provider/QueueServiceProvider.d @@ -25,8 +25,7 @@ class QueueServiceProvider : ServiceProvider { string typeName = config.queue.driver; if (typeName == QueueWorker.Memory) { - // _queueWorker = new MemoryQueueWorker(); - warningf("TODO: %s", typeName); + _queueWorker = new MemoryQueueWorker(); } else if (typeName == QueueWorker.AMQP) { auto amqpConf = config.amqp; diff --git a/source/hunt/framework/queue/AmqpQueueWorker.d b/source/hunt/framework/queue/AmqpQueueWorker.d index a1c27939a..a49b5b918 100644 --- a/source/hunt/framework/queue/AmqpQueueWorker.d +++ b/source/hunt/framework/queue/AmqpQueueWorker.d @@ -17,8 +17,8 @@ class AmqpQueueWorker : QueueWorker { private AmqpConnection _listenerConn; this(AmqpPool pool) { - super(); _pool = pool; + super(); } override void onListen() { @@ -38,10 +38,12 @@ class AmqpQueueWorker : QueueWorker { recv.handler(new class Handler!AmqpMessage { void handle(AmqpMessage msg) { - tracef("channel: %s Message: %s", channel, msg.bodyAsString()); // ubyte[] content = cast(ubyte[])msg.bodyAsBinary(); ubyte[] content = cast(ubyte[])msg.bodyAsString(); - tracef("%(%02X %)", content); + version(HUNT_FM_DEBUG) { + tracef("channel: %s Message: %s", channel, msg.bodyAsString()); + tracef("%(%02X %)", content); + } if(listener !is null) { listener(content); } diff --git a/source/hunt/framework/queue/MemoryQueueWorker.d b/source/hunt/framework/queue/MemoryQueueWorker.d index 07fc6fad0..6ee26bf2e 100644 --- a/source/hunt/framework/queue/MemoryQueueWorker.d +++ b/source/hunt/framework/queue/MemoryQueueWorker.d @@ -12,19 +12,52 @@ import std.parallelism; /** * */ -// class MemoryQueueWorker : QueueWorker { -// private SimpleQueue!Job _jobs; - -// this() { -// _jobs = new SimpleQueue!Job(); -// super(); -// } - -// override protected Job retrieveNextJob() { -// return _jobs.dequeue(); -// } - -// override void push(string channel, ubyte[] message) { -// _jobs.enqueue(job); -// } -// } \ No newline at end of file +class MemoryQueueWorker : QueueWorker { + private SimpleQueue!(ubyte[])[string] _queueMap; + + this() { + super(); + } + + override void onListen() { + if(listeners is null) { + return; + } + + foreach(string channel, QueueMessageListener listener; listeners) { + auto itemPtr = channel in _queueMap; + if(itemPtr is null) { + // version(HUNT_DEBUG) warningf("Can't find channel: %s", channel); + _queueMap[channel] = new SimpleQueue!(ubyte[])(); + itemPtr = channel in _queueMap; + } + + ubyte[] content = itemPtr.dequeue(); + version(HUNT_FM_DEBUG) { + tracef("channel: %s Message: %s", channel, msg.bodyAsString()); + tracef("%(%02X %)", content); + } + + if(listener !is null) { + listener(content); + } + } + } + + override void push(string channel, ubyte[] message) { + synchronized(this) { + auto itemPtr = channel in _queueMap; + if(itemPtr is null) { + _queueMap[channel] = new SimpleQueue!(ubyte[])(); + itemPtr = channel in _queueMap; + } + + itemPtr.enqueue(message); + } + } + + override protected void onStop() { + _queueMap.clear(); + } +} + diff --git a/source/hunt/framework/queue/package.d b/source/hunt/framework/queue/package.d index 5571c280e..f490d09aa 100755 --- a/source/hunt/framework/queue/package.d +++ b/source/hunt/framework/queue/package.d @@ -1,5 +1,5 @@ module hunt.framework.queue; -public import hunt.framework.queue.Job; -public import hunt.framework.queue.QueueWorker; public import hunt.framework.queue.AmqpQueueWorker; +public import hunt.framework.queue.QueueWorker; +public import hunt.framework.queue.MemoryQueueWorker;