Skip to content

Commit

Permalink
Migrate from mocha to jest, minor bug fix, code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Nov 26, 2019
1 parent 5d4c924 commit ebee319
Show file tree
Hide file tree
Showing 47 changed files with 1,112 additions and 1,149 deletions.
3 changes: 2 additions & 1 deletion .jshintignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
test
test
tests
4 changes: 4 additions & 0 deletions docs/api/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class TestQueueConsumer extends Consumer {

### Consumer.prototype.stop()

Deprecated. Use `shutdown()` instead.

### Consumer.prototype.shutdown()

Disconnect from Redis server and stop consuming messages. This method is used to gracefully shutdown the consumer and
go offline.

Expand Down
9 changes: 5 additions & 4 deletions example/ns1-test-queue-producer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use strict';

const config = require('./config');
const { Producer, Message } = require('../'); // replace with require('redis-smq')
const { Producer, Message } = require('../'); // replace with require('redis-smq')

const producer = new Producer('test_queue', config);

/*
function produceNTimes(payload, n, cb) {
n -= 1;
if (true) {
Expand All @@ -24,10 +25,10 @@ produceNTimes({ hello: 'world' }, 1000000, (err) => {
producer.shutdown();
}
});
*/

/*
producer.produceMessage({hello: 123}, (err) => {

producer.produceMessage({ hello: 123 }, (err) => {
if (err) throw err;
else producer.shutdown();
});
*/
9 changes: 9 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const path = require('path');

module.exports = {
verbose: true,
rootDir: path.resolve('./test'),
setupFilesAfterEnv: ['<rootDir>/jest.setup.js'],
};
12 changes: 5 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,23 @@
"ioredis": "4.14.1",
"lodash": "4.17.15",
"redis": "2.8.0",
"redis-smq-monitor": "1",
"redlock": "4.1.0",
"uuid": "3.0.1",
"redis-smq-monitor": "1"
"uuid": "3.0.1"
},
"devDependencies": {
"chai": "3.5.0",
"bluebird": "3.7.1",
"eslint": "6.6.0",
"eslint-config-airbnb-base": "14.0.0",
"eslint-config-google": "0.14.0",
"eslint-config-standard": "14.1.0",
"eslint-plugin-import": "2.18.2",
"eslint-plugin-promise": "4.2.1",
"eslint-plugin-standard": "4.0.1",
"mocha": "5.2.0",
"sinon": "1.17.7",
"sinon-chai": "2.11.0"
"jest": "24.9.0"
},
"scripts": {
"test": "eslint --ignore-path .jshintignore src/*.js && NODE_ENV=test mocha --reporter --harmony spec test/run/*.js",
"test": "eslint --ignore-path .jshintignore src/*.js && NODE_ENV=test jest --runInBand",
"lint": "eslint --ignore-path .jshintignore src/*.js",
"fix": "eslint --ignore-path .jshintignore src/*.js --fix"
},
Expand Down
28 changes: 2 additions & 26 deletions src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
'use strict';

const { EventEmitter } = require('events');
const dispatcher = require('./dispatcher');
const Instance = require('./instance');


class Consumer extends EventEmitter {
class Consumer extends Instance {
/**
* See docs.
*
Expand All @@ -13,32 +12,9 @@ class Consumer extends EventEmitter {
*/
constructor(config = {}, options = {}) {
super();
this.dispatcher = dispatcher();
this.dispatcher.bootstrapConsumer(this, config, options);
}

/**
*
*/
run() {
this.dispatcher.run();
}

/**
*
*/
stop() {
this.dispatcher.shutdown();
}

/**
*
* @returns {boolean}
*/
isRunning() {
return this.dispatcher.isRunning();
}

/**
*
* @param {*} message
Expand Down
16 changes: 13 additions & 3 deletions src/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const events = {
SCHEDULER_DOWN: 'scheduler.down',
STATS_UP: 'stats.up',
STATS_DOWN: 'stats.down',
MESSAGE_PRODUCED: 'message.produced',
MESSAGE_NEXT: 'message.next',
MESSAGE_RECEIVED: 'message.new',
MESSAGE_ACKNOWLEDGED: 'message.consumed',
Expand Down Expand Up @@ -313,7 +314,7 @@ module.exports = function dispatcher() {
* @param err
*/
function handleError(dispatcherInstance, err) {
if (!isGoingDown()) {
if (isUp() && !isGoingDown()) {
dispatcherInstance.shutdown();
throw err;
}
Expand Down Expand Up @@ -482,9 +483,18 @@ module.exports = function dispatcher() {
* @param cb
*/
produce(msg, cb) {
if (!(msg instanceof Message)) {
const m = new Message();
m.setBody(msg);
msg = m;
}
const onProduced = () => {
instance.emit(events.MESSAGE_PRODUCED, msg);
cb();
};
const proceed = () => {
if (schedulerInstance.isScheduled(msg)) schedulerInstance.schedule(msg, null, cb);
else this.enqueue(msg, null, cb);
if (schedulerInstance.isScheduled(msg)) schedulerInstance.schedule(msg, null, onProduced);
else this.enqueue(msg, null, onProduced);
};
if (!isUp()) {
if (bootstrapping || isGoingUp()) instance.once(events.UP, proceed);
Expand Down
49 changes: 49 additions & 0 deletions src/instance.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict';

const { EventEmitter } = require('events');
const dispatcher = require('./dispatcher');


class Instance extends EventEmitter {
/**
* See docs.
*
* @param {object} config
* @param {object} options
*/
constructor(config = {}, options = {}) {
super();
this.dispatcher = dispatcher();
}

/**
*
*/
run() {
this.dispatcher.run();
}

/**
*
*/
shutdown() {
this.dispatcher.shutdown();
}

/**
* @deprecated use shutdown() instead.
*/
stop() {
this.shutdown();
}

/**
*
* @returns {boolean}
*/
isRunning() {
return this.dispatcher.isRunning();
}
}

module.exports = Instance;
7 changes: 7 additions & 0 deletions src/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Message {
this[Message.PROPERTY_ATTEMPTS] = 0;
this[Message.PROPERTY_SCHEDULED_REPEAT_COUNT] = 0;
this[Message.PROPERTY_DELAYED] = false;
this[Message.PROPERTY_SCHEDULED_CRON_FIRED] = false;
}
}

Expand Down Expand Up @@ -316,6 +317,12 @@ Message.PROPERTY_CREATED_AT = 'createdAt';
*/
Message.PROPERTY_SCHEDULED_CRON = 'scheduledCron';

/**
*
* @type {boolean}
*/
Message.PROPERTY_SCHEDULED_CRON_FIRED = 'scheduledCronFired';

/**
* The time in milliseconds that a message will wait before being scheduled to be delivered
* @type {string}
Expand Down
19 changes: 3 additions & 16 deletions src/producer.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
'use strict';

const { EventEmitter } = require('events');
const uuid = require('uuid/v4');
const dispatcher = require('./dispatcher');
const Instance = require('./instance');
const Message = require('./message');


class Producer extends EventEmitter {
class Producer extends Instance {
/**
* See docs.
*
Expand All @@ -15,7 +13,6 @@ class Producer extends EventEmitter {
*/
constructor(queueName, config = {}) {
super();
this.dispatcher = dispatcher();
this.dispatcher.bootstrapProducer(this, config, queueName);
this.dispatcher.run();
}
Expand All @@ -36,9 +33,7 @@ class Producer extends EventEmitter {
* @param cb
*/
produce(payload, cb) {
const msg = new Message();
msg.setBody(payload);
this.produceMessage(msg, cb);
this.produceMessage(payload, cb);
}

/**
Expand All @@ -53,14 +48,6 @@ class Producer extends EventEmitter {
msg.setBody(payload).setTTL(ttl);
this.produceMessage(msg, cb);
}

/**
*
*/
shutdown() {
/* eslint class-methods-use-this : 0 */
this.dispatcher.shutdown();
}
}


Expand Down
5 changes: 4 additions & 1 deletion src/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ function scheduler(dispatcher, tickPeriod = 1000) {
? cronParser.parseExpression(message[Message.PROPERTY_SCHEDULED_CRON]).next().getTime() : 0;
const nextRepeatTimestamp = getScheduleRepeatTimestamp();
if (nextCRONTimestamp && nextRepeatTimestamp) {
if (nextCRONTimestamp < nextRepeatTimestamp) {
if (!message[Message.PROPERTY_SCHEDULED_CRON_FIRED]
|| nextCRONTimestamp < nextRepeatTimestamp) {
message[Message.PROPERTY_SCHEDULED_REPEAT_COUNT] = 0;
message[Message.PROPERTY_SCHEDULED_CRON_FIRED] = true;
return nextCRONTimestamp;
}
return nextRepeatTimestamp;
Expand Down
Loading

0 comments on commit ebee319

Please sign in to comment.