Skip to content

Commit

Permalink
Add MemoryQueueWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
Heromyth committed Apr 7, 2020
1 parent a36cdab commit fe434f3
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ docs/
# Others
/java
hunt-framework-test-*
temp.txt
3 changes: 1 addition & 2 deletions source/hunt/framework/provider/QueueServiceProvider.d
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class QueueServiceProvider : ServiceProvider {
string typeName = config.queue.driver;

if (typeName == QueueWorker.Memory) {
// _queueWorker = new MemoryQueueWorker();
warningf("TODO: %s", typeName);
_queueWorker = new MemoryQueueWorker();
} else if (typeName == QueueWorker.AMQP) {
auto amqpConf = config.amqp;

Expand Down
8 changes: 5 additions & 3 deletions source/hunt/framework/queue/AmqpQueueWorker.d
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class AmqpQueueWorker : QueueWorker {
private AmqpConnection _listenerConn;

this(AmqpPool pool) {
super();
_pool = pool;
super();
}

override void onListen() {
Expand All @@ -38,10 +38,12 @@ class AmqpQueueWorker : QueueWorker {

recv.handler(new class Handler!AmqpMessage {
void handle(AmqpMessage msg) {
tracef("channel: %s Message: %s", channel, msg.bodyAsString());
// ubyte[] content = cast(ubyte[])msg.bodyAsBinary();
ubyte[] content = cast(ubyte[])msg.bodyAsString();
tracef("%(%02X %)", content);
version(HUNT_FM_DEBUG) {
tracef("channel: %s Message: %s", channel, msg.bodyAsString());
tracef("%(%02X %)", content);
}
if(listener !is null) {
listener(content);
}
Expand Down
65 changes: 49 additions & 16 deletions source/hunt/framework/queue/MemoryQueueWorker.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,52 @@ import std.parallelism;
/**
*
*/
// class MemoryQueueWorker : QueueWorker {
// private SimpleQueue!Job _jobs;

// this() {
// _jobs = new SimpleQueue!Job();
// super();
// }

// override protected Job retrieveNextJob() {
// return _jobs.dequeue();
// }

// override void push(string channel, ubyte[] message) {
// _jobs.enqueue(job);
// }
// }
class MemoryQueueWorker : QueueWorker {
private SimpleQueue!(ubyte[])[string] _queueMap;

this() {
super();
}

override void onListen() {
if(listeners is null) {
return;
}

foreach(string channel, QueueMessageListener listener; listeners) {
auto itemPtr = channel in _queueMap;
if(itemPtr is null) {
// version(HUNT_DEBUG) warningf("Can't find channel: %s", channel);
_queueMap[channel] = new SimpleQueue!(ubyte[])();
itemPtr = channel in _queueMap;
}

ubyte[] content = itemPtr.dequeue();
version(HUNT_FM_DEBUG) {
tracef("channel: %s Message: %s", channel, msg.bodyAsString());
tracef("%(%02X %)", content);
}

if(listener !is null) {
listener(content);
}
}
}

override void push(string channel, ubyte[] message) {
synchronized(this) {
auto itemPtr = channel in _queueMap;
if(itemPtr is null) {
_queueMap[channel] = new SimpleQueue!(ubyte[])();
itemPtr = channel in _queueMap;
}

itemPtr.enqueue(message);
}
}

override protected void onStop() {
_queueMap.clear();
}
}

4 changes: 2 additions & 2 deletions source/hunt/framework/queue/package.d
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module hunt.framework.queue;

public import hunt.framework.queue.Job;
public import hunt.framework.queue.QueueWorker;
public import hunt.framework.queue.AmqpQueueWorker;
public import hunt.framework.queue.QueueWorker;
public import hunt.framework.queue.MemoryQueueWorker;

0 comments on commit fe434f3

Please sign in to comment.