diff --git a/index.js b/index.js index 9a68b7d4..16966683 100644 --- a/index.js +++ b/index.js @@ -1,12 +1,12 @@ const { AsyncLocalStorage } = require("node:async_hooks"); const knex = require("knex"); -const PgBoss = require("pg-boss"); const util = require("util"); const AppConfig = require("./src/app_config"); const ReportProcessingContext = require("./src/report_processing_context"); const Logger = require("./src/logger"); const Processor = require("./src/processor"); -const PgBossKnexAdapter = require("./src/pg_boss_knex_adapter"); +const Queue = require("./src/queue/queue"); +const ReportJobQueueMessage = require("./src/queue/report_job_queue_message"); /** * Gets an array of JSON report objects from the application confing, then runs @@ -37,12 +37,16 @@ async function run(options = {}) { const appConfig = new AppConfig(options); const context = new ReportProcessingContext(new AsyncLocalStorage()); const reportConfigs = appConfig.filteredReportConfigurations; + const knexInstance = appConfig.shouldWriteToDatabase + ? await knex(appConfig.knexConfig) + : undefined; const processor = Processor.buildAnalyticsProcessor( appConfig, Logger.initialize({ agencyName: appConfig.agencyLogName, scriptName: appConfig.scriptName, }), + knexInstance, ); for (const reportConfig of reportConfigs) { @@ -124,7 +128,11 @@ async function runQueuePublish(options = {}) { scriptName: appConfig.scriptName, }); const knexInstance = await knex(appConfig.knexConfig); - const queueClient = await _initQueueClient(knexInstance, appLogger); + const queueClient = await _initQueueClient( + knexInstance, + appConfig.messageQueueName, + appLogger, + ); for (const agency of agencies) { for (const reportConfig of reportConfigs) { @@ -134,47 +142,35 @@ async function runQueuePublish(options = {}) { scriptName: appConfig.scriptName, reportName: reportConfig.name, }); + let messageId; try { - let jobId = await queueClient.send( - appConfig.messageQueueName, - _createQueueMessage( - options, - agency, + messageId = await queueClient.sendMessage( + new ReportJobQueueMessage({ + agencyName: agency.agencyName, + analyticsReportIds: agency.analyticsReportIds, + awsBucketPath: agency.awsBucketPath, + reportOptions: options, reportConfig, - appConfig.scriptName, - ), - { - priority: _messagePriority(reportConfig), - retryLimit: 2, - retryDelay: 10, - retryBackoff: true, - singletonKey: `${appConfig.scriptName}-${agency.agencyName}-${reportConfig.name}`, - }, + scriptName: appConfig.scriptName, + }), ); - if (jobId) { + if (messageId) { reportLogger.info( - `Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`, + `Created message in queue: ${queueClient.name} with message ID: ${messageId}`, ); } else { reportLogger.info( - `Found a duplicate job in queue: ${appConfig.messageQueueName}`, + `Found a duplicate message in queue: ${queueClient.name}`, ); } } catch (e) { - reportLogger.error( - `Error sending to queue: ${appConfig.messageQueueName}`, - ); - reportLogger.error(util.inspect(e)); + // Do nothing so that the remaining messages still process. } } } try { await queueClient.stop(); - appLogger.debug(`Stopping queue client`); - } catch (e) { - appLogger.error("Error stopping queue client"); - appLogger.error(util.inspect(e)); } finally { appLogger.debug(`Destroying database connection pool`); knexInstance.destroy(); @@ -198,41 +194,17 @@ function _initAgencies(agencies_file) { return Array.isArray(agencies) ? agencies : legacyAgencies; } -async function _initQueueClient(knexInstance, logger) { - let queueClient; - try { - queueClient = new PgBoss({ db: new PgBossKnexAdapter(knexInstance) }); - await queueClient.start(); - logger.debug("Starting queue client"); - } catch (e) { - logger.error("Error starting queue client"); - logger.error(util.inspect(e)); - } - +async function _initQueueClient(knexInstance, queueName, logger) { + const queueClient = Queue.buildQueue({ + knexInstance, + queueName, + messageClass: ReportJobQueueMessage, + logger, + }); + await queueClient.start(); return queueClient; } -function _createQueueMessage(options, agency, reportConfig, scriptName) { - return { - ...agency, - options, - reportConfig, - scriptName, - }; -} - -function _messagePriority(reportConfig) { - if (!reportConfig.frequency) { - return 0; - } else if (reportConfig.frequency == "daily") { - return 1; - } else if (reportConfig.frequency == "hourly") { - return 2; - } else if (reportConfig.frequency == "realtime") { - return 3; - } -} - /** * @returns {Promise} when the process ends */ @@ -240,7 +212,11 @@ async function runQueueConsume() { const appConfig = new AppConfig(); const appLogger = Logger.initialize(); const knexInstance = await knex(appConfig.knexConfig); - const queueClient = await _initQueueClient(knexInstance, appLogger); + const queueClient = await _initQueueClient( + knexInstance, + appConfig.messageQueueName, + appLogger, + ); try { const context = new ReportProcessingContext(new AsyncLocalStorage()); @@ -250,24 +226,19 @@ async function runQueueConsume() { knexInstance, ); - await queueClient.work( - appConfig.messageQueueName, - { newJobCheckIntervalSeconds: 1 }, - async (message) => { - appLogger.info("Queue message received"); - process.env.AGENCY_NAME = message.data.agencyName; - process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds; - process.env.AWS_BUCKET_PATH = message.data.awsBucketPath; - process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName; - - await _processReport( - new AppConfig(message.data.options), - context, - message.data.reportConfig, - processor, - ); - }, - ); + await queueClient.poll(async (message) => { + process.env.AGENCY_NAME = message.agencyName; + process.env.ANALYTICS_REPORT_IDS = message.analyticsReportIds; + process.env.AWS_BUCKET_PATH = message.awsBucketPath; + process.env.ANALYTICS_SCRIPT_NAME = message.scriptName; + + await _processReport( + new AppConfig(message.options), + context, + message.reportConfig, + processor, + ); + }); } catch (e) { appLogger.error("Error polling queue for messages"); appLogger.error(util.inspect(e)); diff --git a/package-lock.json b/package-lock.json index 5131d18b..67f9c51b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29,6 +29,7 @@ "@cucumber/cucumber": "^10.3.1", "@eslint/js": "^8.57.0", "chai": "^4.4.0", + "chai-as-promised": "^8.0.1", "dotenv": "^16.4.5", "dotenv-cli": "^7.4.3", "eslint": "^8.56.0", @@ -3532,6 +3533,27 @@ "node": ">=4" } }, + "node_modules/chai-as-promised": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-8.0.1.tgz", + "integrity": "sha512-OIEJtOL8xxJSH8JJWbIoRjybbzR52iFuDHuF8eb+nTPD6tgXLjRqsgnUGqQfFODxYvq5QdirT0pN9dZ0+Gz6rA==", + "dev": true, + "dependencies": { + "check-error": "^2.0.0" + }, + "peerDependencies": { + "chai": ">= 2.1.2 < 6" + } + }, + "node_modules/chai-as-promised/node_modules/check-error": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/check-error/-/check-error-2.1.1.tgz", + "integrity": "sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==", + "dev": true, + "engines": { + "node": ">= 16" + } + }, "node_modules/chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", diff --git a/package.json b/package.json index 270a8bcf..ef0a371b 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "@cucumber/cucumber": "^10.3.1", "@eslint/js": "^8.57.0", "chai": "^4.4.0", + "chai-as-promised": "^8.0.1", "dotenv": "^16.4.5", "dotenv-cli": "^7.4.3", "eslint": "^8.56.0", diff --git a/src/pg_boss_knex_adapter.js b/src/queue/pg_boss_knex_adapter.js similarity index 100% rename from src/pg_boss_knex_adapter.js rename to src/queue/pg_boss_knex_adapter.js diff --git a/src/queue/queue.js b/src/queue/queue.js new file mode 100644 index 00000000..f6f54a59 --- /dev/null +++ b/src/queue/queue.js @@ -0,0 +1,121 @@ +const PgBoss = require("pg-boss"); +const PgBossKnexAdapter = require("./pg_boss_knex_adapter"); +const util = require("util"); + +/** + * Implements a message queue using the PgBoss library. + */ +class Queue { + #queueClient; + #queueName; + #messageClass; + #logger; + + /** + * @param {object} params the parameter object + * @param {import('pg-boss')} params.queueClient the queue client instance to + * use for queue operations. + * @param {string} params.queueName the identifier for the queue. + * @param {*} params.messageClass a class which implements the fromMessage + * static method to return an instance of the class from a PgBoss message + * object. This can be omitted if the queue instance only sends messages. + * @param {import('winston').Logger} params.logger an application logger instance. + */ + constructor({ queueClient, queueName, messageClass, logger }) { + this.#queueClient = queueClient; + this.#queueName = queueName; + this.#messageClass = messageClass; + this.#logger = logger; + } + + /** + * @returns {string} the queue name + */ + get name() { + return this.#queueName; + } + + /** + * @returns {Promise} resolves when the PgBoss queue client has started + */ + async start() { + try { + await this.#queueClient.start(); + this.#logger.debug("Starting queue client"); + } catch (e) { + this.#logger.error("Error starting queue client"); + this.#logger.error(util.inspect(e)); + throw e; + } + } + + /** + * @returns {Promise} resolves when the PgBoss queue client has stopped + */ + async stop() { + try { + await this.#queueClient.stop(); + this.#logger.debug(`Stopping queue client`); + } catch (e) { + this.#logger.error("Error stopping queue client"); + this.#logger.error(util.inspect(e)); + throw e; + } + } + + /** + * @param {import('./queue_message')} queueMessage a QueueMessage instance + * @returns {string} a message ID or null if a duplicate message exists on the + * queue. + */ + async sendMessage(queueMessage) { + try { + const messageId = await this.#queueClient.send( + this.#queueName, + queueMessage.toJSON(), + queueMessage.sendOptions(), + ); + return messageId; + } catch (e) { + this.#logger.error(`Error sending to queue: ${this.#queueName}`); + this.#logger.error(util.inspect(e)); + throw e; + } + } + + /** + * @param {Function} callback the function to call for each message + * @param {object} options the options to pass to the PgBoss work function + * @returns {Promise} resolves when the queue poller process stops + */ + poll(callback, options = { newJobCheckIntervalSeconds: 1 }) { + return this.#queueClient.work(this.#queueName, options, async (message) => { + this.#logger.info("Queue message received"); + await callback(this.#messageClass.fromMessage(message).toJSON()); + }); + } + + /** + * @param {object} params the parameter object + * @param {import('knex')} params.knexInstance an initialized instance of the knex + * library which provides a database connection. + * @param {string} params.queueName the name of the queue to use for the + * client. + * @param {*} params.messageClass a class which implements the fromMessage + * static method to return an instance of the class from a PgBoss message + * object. This can be omitted if the queue instance only sends messages. + * @param {import('winston').Logger} params.logger an application logger instance. + * @returns {Queue} the queue instance configured with the PgBoss queue + * client. + */ + static buildQueue({ knexInstance, queueName, messageClass, logger }) { + return new Queue({ + queueClient: new PgBoss({ db: new PgBossKnexAdapter(knexInstance) }), + queueName, + messageClass, + logger, + }); + } +} + +module.exports = Queue; diff --git a/src/queue/queue_message.js b/src/queue/queue_message.js new file mode 100644 index 00000000..1a9dc01a --- /dev/null +++ b/src/queue/queue_message.js @@ -0,0 +1,28 @@ +/** + * Abstract class for a queue message to be sent to a PgBoss queue client. + */ +class QueueMessage { + /** + * @returns {object} the class converted to a JSON object. + */ + toJSON() { + return {}; + } + + /** + * @returns {object} an options object for the PgBoss send method + */ + sendOptions() { + return {}; + } + + /** + * @param {object} message a PgBoss message object from the report job queue. + * @returns {QueueMessage} the built queue message instance. + */ + static fromMessage(message) { + return new QueueMessage(message.data); + } +} + +module.exports = QueueMessage; diff --git a/src/queue/report_job_queue_message.js b/src/queue/report_job_queue_message.js new file mode 100644 index 00000000..5c8b1522 --- /dev/null +++ b/src/queue/report_job_queue_message.js @@ -0,0 +1,109 @@ +const QueueMessage = require("./queue_message"); + +/** + * Data object for a report job queue message to be sent to a PgBoss queue + * client. + */ +class ReportJobQueueMessage extends QueueMessage { + #agencyName; + #analyticsReportIds; + #awsBucketPath; + #reportOptions; + #reportConfig; + #scriptName; + + /** + * @param {object} params the params object. + * @param {string} params.agencyName the name of the agency. + * @param {string} params.analyticsReportIds the google analytics property ids + * for the agency to use when running reports. + * @param {string} params.awsBucketPath the folder in the S3 bucket where + * report data is stored for the agency. + * @param {object} params.reportOptions the options passed to the reporter + * executable. + * @param {object} params.reportConfig the google analytics configuration + * object for the report to run. + * @param {string} params.scriptName the name of the script which was run to + * begin the reporter process. + * @returns {ReportJobQueueMessage} the built queue message instance. + */ + constructor({ + agencyName = "", + analyticsReportIds = "", + awsBucketPath = "", + reportOptions = {}, + reportConfig = {}, + scriptName = "", + }) { + super(); + this.#agencyName = agencyName; + this.#analyticsReportIds = analyticsReportIds; + this.#awsBucketPath = awsBucketPath; + this.#reportOptions = reportOptions; + this.#reportConfig = reportConfig; + this.#scriptName = scriptName; + } + + /** + * @returns {object} the class converted to a JSON object. + */ + toJSON() { + return { + agencyName: this.#agencyName, + analyticsReportIds: this.#analyticsReportIds, + awsBucketPath: this.#awsBucketPath, + options: this.#reportOptions, + reportConfig: this.#reportConfig, + scriptName: this.#scriptName, + }; + } + + /** + * @returns {object} an options object for the PgBoss send method + */ + sendOptions() { + return { + priority: this.#messagePriority(this.#reportConfig.frequency), + retryLimit: 2, + retryDelay: 10, + retryBackoff: true, + singletonKey: `${this.#scriptName}-${this.#agencyName}-${this.#reportConfig.name}`, + }; + } + + #messagePriority(reportFrequency) { + let priority; + switch (reportFrequency) { + case "realtime": + priority = 3; + break; + case "hourly": + priority = 2; + break; + case "daily": + priority = 1; + break; + default: + priority = 0; + } + return priority; + } + + /** + * @param {object} message a PgBoss message object from the report job queue. + * should have a data key with the expected fields. + * @returns {ReportJobQueueMessage} the built queue message instance. + */ + static fromMessage(message = { data: {} }) { + return new ReportJobQueueMessage({ + agencyName: message.data.agencyName, + analyticsReportIds: message.data.analyticsReportIds, + awsBucketPath: message.data.awsBucketPath, + reportOptions: message.data.options, + reportConfig: message.data.reportConfig, + scriptName: message.data.scriptName, + }); + } +} + +module.exports = ReportJobQueueMessage; diff --git a/test/index.test.js b/test/index.test.js index e52ef163..ac4738ee 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -12,6 +12,10 @@ class AppConfig { get filteredReportConfigurations() { return reportConfigs; } + + get shouldWriteToDatabase() { + return true; + } } class ReportProcessingContext { diff --git a/test/queue/pg_boss_knex_adapter.test.js b/test/queue/pg_boss_knex_adapter.test.js new file mode 100644 index 00000000..7a1b1ea1 --- /dev/null +++ b/test/queue/pg_boss_knex_adapter.test.js @@ -0,0 +1,51 @@ +const sinon = require("sinon"); +const PgBossKnexAdapter = require("../../src/queue/pg_boss_knex_adapter"); + +describe("PgBossKnexAdapter", () => { + let subject; + let knexInstance; + + beforeEach(async () => { + knexInstance = { + raw: sinon.spy(), + }; + subject = new PgBossKnexAdapter(knexInstance); + }); + + describe(".executeSql", () => { + describe("when parameters are passed with the SQL statement", () => { + const sql = "SELECT * FROM foobar-table where foo = $1 and bar = $2"; + const parameters = ["foo", "bar"]; + const expectedSql = + "SELECT * FROM foobar-table where foo = :param_1 and bar = :param_2"; + const expectedParameters = { + param_1: parameters[0], + param_2: parameters[1], + }; + + beforeEach(() => { + subject.executeSql(sql, parameters); + }); + + it("calls knex.raw with the changed SQL and the parameters array", () => { + sinon.assert.calledWith( + knexInstance.raw, + expectedSql, + expectedParameters, + ); + }); + }); + + describe("when parameters are not passed with the SQL statement", () => { + const sql = "SELECT * FROM foobar-table"; + + beforeEach(() => { + subject.executeSql(sql); + }); + + it("calls knex.raw with the unchanged SQL and an empty object for parameters", () => { + sinon.assert.calledWith(knexInstance.raw, sql, {}); + }); + }); + }); +}); diff --git a/test/queue/queue.test.js b/test/queue/queue.test.js new file mode 100644 index 00000000..cc2c0c4f --- /dev/null +++ b/test/queue/queue.test.js @@ -0,0 +1,259 @@ +const chai = require("chai"); +const sinon = require("sinon"); +const Queue = require("../../src/queue/queue"); + +let expect; +let messageJSON; +let messageOptions; + +class TestQueueMessage { + toJSON() { + return messageJSON; + } + + sendOptions() { + return messageOptions; + } + + static fromMessage() { + return new TestQueueMessage(); + } +} + +describe("Queue", () => { + const queueName = "foobar-queue"; + let queueClient; + const messageClass = TestQueueMessage; + let logger; + let subject; + + beforeEach(async () => { + const chaiAsPromised = await import("chai-as-promised"); + chai.use(chaiAsPromised.default); + expect = chai.expect; + queueClient = { + start: sinon.spy(), + stop: sinon.spy(), + send: sinon.spy(), + work: sinon.spy(), + }; + logger = { + info: sinon.spy(), + error: sinon.spy(), + debug: sinon.spy(), + }; + subject = new Queue({ queueName, queueClient, messageClass, logger }); + }); + + describe(".name", () => { + it("returns the queue name", () => { + expect(subject.name).to.equal(queueName); + }); + }); + + describe(".start", () => { + describe("when starting the queue client is successful", () => { + beforeEach(async () => { + await subject.start(); + }); + + it("starts the queue client", () => { + expect(queueClient.start.calledWith()).to.equal(true); + }); + }); + + describe("when starting the queue client is throws an error", () => { + beforeEach(async () => { + queueClient.start = () => {}; + sinon + .stub(queueClient, "start") + .throws("Error", "some fake error message"); + }); + + it("throws an error", async () => { + expect(subject.start()).to.eventually.be.rejected; + expect(queueClient.start.calledWith()).to.equal(true); + expect(logger.error.calledWith("Error starting queue client")).to.equal( + true, + ); + }); + }); + }); + + describe(".stop", () => { + describe("when stopping the queue client is successful", () => { + beforeEach(async () => { + await subject.stop(); + }); + + it("stops the queue client", () => { + expect(queueClient.stop.calledWith()).to.equal(true); + }); + }); + + describe("when stopping the queue client is throws an error", () => { + beforeEach(async () => { + queueClient.stop = () => {}; + sinon + .stub(queueClient, "stop") + .throws("Error", "some fake error message"); + }); + + it("throws an error", async () => { + expect(subject.stop()).to.eventually.be.rejected; + expect(queueClient.stop.calledWith()).to.equal(true); + expect(logger.error.calledWith("Error stopping queue client")).to.equal( + true, + ); + }); + }); + }); + + describe(".sendMessage", () => { + let queueMessage; + + beforeEach(() => { + messageJSON = { foo: "bar" }; + messageOptions = { test: 1 }; + queueMessage = new TestQueueMessage(); + }); + + describe("when sending a message to the queue is successful", () => { + let actual; + + describe("and a duplicate message is not found", () => { + const expected = "a fake job id"; + + beforeEach(async () => { + queueClient.send = () => {}; + sinon.stub(queueClient, "send").returns(expected); + actual = await subject.sendMessage(queueMessage); + }); + + it("sends the message to the queue with expected JSON and options", () => { + expect( + queueClient.send.calledWith(queueName, messageJSON, messageOptions), + ).to.equal(true); + }); + + it("returns the message ID", () => { + expect(actual).to.equal(expected); + }); + }); + + describe("and a duplicate message is found", () => { + const expected = null; + + beforeEach(async () => { + queueClient.send = () => {}; + sinon.stub(queueClient, "send").returns(expected); + actual = await subject.sendMessage(queueMessage); + }); + + it("attempts to send the message to the queue with expected JSON and options", () => { + expect( + queueClient.send.calledWith(queueName, messageJSON, messageOptions), + ).to.equal(true); + }); + + it("returns null", () => { + expect(actual).to.equal(expected); + }); + }); + }); + + describe("when sending a message to the queue is not successful", () => { + beforeEach(async () => { + queueClient.send = () => {}; + sinon + .stub(queueClient, "send") + .throws("Error", "some fake error message"); + }); + + it("throws an error", async () => { + expect(subject.sendMessage(queueMessage)).to.eventually.be.rejected; + expect( + queueClient.send.calledWith(queueName, messageJSON, messageOptions), + ).to.equal(true); + expect( + logger.error.calledWith(`Error sending to queue: ${queueName}`), + ).to.equal(true); + }); + }); + }); + + describe(".poll", () => { + describe("when options are not passed", () => { + const callback = () => { + return ""; + }; + + beforeEach(async () => { + await subject.poll(callback); + }); + + it("polls the queue with expected options", () => { + expect(queueClient.work.getCalls()[0].args[0]).to.equal(queueName); + expect(queueClient.work.getCalls()[0].args[1]).to.deep.equal({ + newJobCheckIntervalSeconds: 1, + }); + expect(typeof queueClient.work.getCalls()[0].args[2]).to.equal( + "function", + ); + }); + }); + + describe("when options are passed", () => { + const callback = () => { + return ""; + }; + const options = { foo: "bar" }; + + beforeEach(async () => { + await subject.poll(callback, options); + }); + + it("polls the queue with expected options", () => { + expect(queueClient.work.getCalls()[0].args[0]).to.equal(queueName); + expect(queueClient.work.getCalls()[0].args[1]).to.deep.equal(options); + expect(typeof queueClient.work.getCalls()[0].args[2]).to.equal( + "function", + ); + }); + }); + + describe("polling callback function", () => { + describe("when the polling callback is executed with a message", () => { + let callback; + const message = { foo: "bar" }; + + beforeEach(async () => { + callback = sinon.spy(); + await subject.poll(callback); + queueClient.work.getCalls()[0].args[2](message); + }); + + it("logs that a message was received", () => { + sinon.assert.calledWith(logger.info, "Queue message received"); + }); + + it("executes the callback with the message JSON", () => { + expect(callback.calledWith(messageJSON)).to.equal(true); + }); + }); + }); + }); + + describe(".buildQueue", () => { + it("returns an instance of Queue", () => { + expect( + Queue.buildQueue({ + knexInstance: {}, + queueName, + messageClass: TestQueueMessage, + logger: {}, + }) instanceof Queue, + ).to.equal(true); + }); + }); +}); diff --git a/test/queue/queue_message.test.js b/test/queue/queue_message.test.js new file mode 100644 index 00000000..470a2130 --- /dev/null +++ b/test/queue/queue_message.test.js @@ -0,0 +1,30 @@ +const expect = require("chai").expect; +const QueueMessage = require("../../src/queue/queue_message"); + +describe("QueueMessage", () => { + let subject; + + beforeEach(async () => { + subject = new QueueMessage(); + }); + + describe(".toJSON", () => { + it("returns an empty object", () => { + expect(subject.toJSON()).to.deep.equal({}); + }); + }); + + describe(".sendOptions", () => { + it("returns an empty object", () => { + expect(subject.sendOptions()).to.deep.equal({}); + }); + }); + + describe(".fromMessage", () => { + it("creates a new QueueMessage instance", () => { + expect(QueueMessage.fromMessage({}) instanceof QueueMessage).to.equal( + true, + ); + }); + }); +}); diff --git a/test/queue/report_job_queue_message.test.js b/test/queue/report_job_queue_message.test.js new file mode 100644 index 00000000..17c4f01b --- /dev/null +++ b/test/queue/report_job_queue_message.test.js @@ -0,0 +1,176 @@ +const expect = require("chai").expect; +const ReportJobQueueMessage = require("../../src/queue/report_job_queue_message"); + +describe("ReportJobQueueMessage", () => { + const agencyName = "test-agency"; + const analyticsReportIds = "12343567"; + const awsBucketPath = "/data/test-agency"; + const reportOptions = { foo: "bar" }; + const reportConfig = { query: "get some data", name: "foobar report" }; + const scriptName = "daily.sh"; + let subject; + + beforeEach(async () => { + subject = new ReportJobQueueMessage({}); + }); + + describe(".toJSON", () => { + describe("when no arguments are passed to the constructor", () => { + it("returns an object with default values", () => { + expect(subject.toJSON()).to.deep.equal({ + agencyName: "", + analyticsReportIds: "", + awsBucketPath: "", + reportConfig: {}, + options: {}, + scriptName: "", + }); + }); + }); + + describe("when all arguments are passed to the constructor", () => { + beforeEach(() => { + subject = new ReportJobQueueMessage({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig, + reportOptions, + scriptName, + }); + }); + + it("returns an object with default values", () => { + expect(subject.toJSON()).to.deep.equal({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig, + options: reportOptions, + scriptName, + }); + }); + }); + }); + + describe(".sendOptions", () => { + describe("when report frequency is not set", () => { + const noFrequencyReportConfig = { name: "no frequency" }; + + beforeEach(() => { + subject = new ReportJobQueueMessage({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig: noFrequencyReportConfig, + reportOptions, + scriptName, + }); + }); + + it("returns an options object with priority: 0", () => { + expect(subject.sendOptions()).to.deep.equal({ + priority: 0, + retryLimit: 2, + retryDelay: 10, + retryBackoff: true, + singletonKey: `${scriptName}-${agencyName}-${noFrequencyReportConfig.name}`, + }); + }); + }); + + describe("when report frequency is daily", () => { + const dailyReportConfig = { name: "daily", frequency: "daily" }; + + beforeEach(() => { + subject = new ReportJobQueueMessage({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig: dailyReportConfig, + reportOptions, + scriptName, + }); + }); + + it("returns an options object with priority: 1", () => { + expect(subject.sendOptions()).to.deep.equal({ + priority: 1, + retryLimit: 2, + retryDelay: 10, + retryBackoff: true, + singletonKey: `${scriptName}-${agencyName}-${dailyReportConfig.name}`, + }); + }); + }); + + describe("when report frequency is hourly", () => { + const hourlyReportConfig = { name: "hourly", frequency: "hourly" }; + + beforeEach(() => { + subject = new ReportJobQueueMessage({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig: hourlyReportConfig, + reportOptions, + scriptName, + }); + }); + + it("returns an options object with priority: 2", () => { + expect(subject.sendOptions()).to.deep.equal({ + priority: 2, + retryLimit: 2, + retryDelay: 10, + retryBackoff: true, + singletonKey: `${scriptName}-${agencyName}-${hourlyReportConfig.name}`, + }); + }); + }); + + describe("when report frequency is realtime", () => { + const realtimeReportConfig = { name: "realtime", frequency: "realtime" }; + + beforeEach(() => { + subject = new ReportJobQueueMessage({ + agencyName, + analyticsReportIds, + awsBucketPath, + reportConfig: realtimeReportConfig, + reportOptions, + scriptName, + }); + }); + + it("returns an options object with priority: 3", () => { + expect(subject.sendOptions()).to.deep.equal({ + priority: 3, + retryLimit: 2, + retryDelay: 10, + retryBackoff: true, + singletonKey: `${scriptName}-${agencyName}-${realtimeReportConfig.name}`, + }); + }); + }); + }); + + describe(".fromMessage", () => { + describe("when arguments are passed", () => { + it("creates a new ReportJobQueueMessage instance", () => { + expect( + ReportJobQueueMessage.fromMessage({ data: {} }) instanceof + ReportJobQueueMessage, + ).to.equal(true); + }); + }); + + describe("when no arguments are passed", () => { + it("creates a new ReportJobQueueMessage instance", () => { + expect( + ReportJobQueueMessage.fromMessage() instanceof ReportJobQueueMessage, + ).to.equal(true); + }); + }); + }); +});