From 00515f2fdd5bde501fdfe587a5d45c5554b35640 Mon Sep 17 00:00:00 2001 From: comfuture Date: Thu, 12 Jan 2023 16:11:59 +0900 Subject: [PATCH 1/4] add extra task options parameter --- src/app/client.ts | 13 ++++++++----- src/app/task.ts | 24 +++++++++++++++++++++--- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/app/client.ts b/src/app/client.ts index f756732..34fc1e4 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -1,6 +1,7 @@ import { v4 } from "uuid"; import Base from "./base"; import Task from "./task"; +import type { TaskOptions } from "./task"; import { AsyncResult } from "./result"; class TaskMessage { @@ -12,6 +13,7 @@ class TaskMessage { ) {} } + export default class Client extends Base { private taskProtocols = { 1: this.asTaskV1, @@ -22,7 +24,7 @@ export default class Client extends Base { return this.taskProtocols[this.conf.TASK_PROTOCOL]; } - public sendTaskMessage(taskName: string, message: TaskMessage): void { + public sendTaskMessage(taskName: string, message: TaskMessage, options: TaskOptions = {}): void { const { headers, properties, body /*, sentEvent */ } = message; const exchange = ""; @@ -32,8 +34,8 @@ export default class Client extends Base { this.isReady().then(() => this.broker.publish( body, - exchange, - this.conf.CELERY_QUEUE, + options?.exchange ?? exchange, + options?.routingKey ?? this.conf.CELERY_QUEUE, headers, properties ) @@ -136,11 +138,12 @@ export default class Client extends Base { taskName: string, args?: Array, kwargs?: object, - taskId?: string + taskId?: string, + options?: TaskOptions ): AsyncResult { taskId = taskId || v4(); const message = this.createTaskMessage(taskId, taskName, args, kwargs); - this.sendTaskMessage(taskName, message); + this.sendTaskMessage(taskName, message, options); const result = new AsyncResult(taskId, this.backend); return result; diff --git a/src/app/task.ts b/src/app/task.ts index a84c84b..59f4b09 100644 --- a/src/app/task.ts +++ b/src/app/task.ts @@ -1,19 +1,37 @@ import Client from "./client"; import { AsyncResult } from "./result"; +/** Task executation options + * Originally allows these keys: + * ['queue', 'routing_key', 'exchange', 'priority', 'expires', + * 'serializer', 'delivery_mode', 'compression', 'time_limit', + * 'soft_time_limit', 'immediate', 'mandatory'] + * but now only part of them are supported. +*/ +export type TaskOptions = { + exchange?: string; + queue?: string; + routingKey?: string; +} + export default class Task { client: Client; name: string; + options?: TaskOptions; /** * Asynchronous Task * @constructor Task * @param {Client} clinet celery client instance * @param {string} name celery task name + * @param {Object} [options] + * @param {string} [options.queue] queue name + * @param {string} [options.routingKey] routing key */ - constructor(client: Client, name: string) { + constructor(client: Client, name: string, options: TaskOptions = {}) { this.client = client; this.name = name; + this.options = options; } /** @@ -28,7 +46,7 @@ export default class Task { return this.applyAsync([...args]); } - public applyAsync(args: Array, kwargs?: object): AsyncResult { + public applyAsync(args: Array, kwargs?: object, options?: TaskOptions): AsyncResult { if (args && !Array.isArray(args)) { throw new Error("args is not array"); } @@ -37,6 +55,6 @@ export default class Task { throw new Error("kwargs is not object"); } - return this.client.sendTask(this.name, args || [], kwargs || {}); + return this.client.sendTask(this.name, args || [], kwargs || {}, undefined, options || this.options); } } From 9135681f05b2ed275fcba70ae66a3da5e792b415 Mon Sep 17 00:00:00 2001 From: comfuture Date: Thu, 12 Jan 2023 17:35:35 +0900 Subject: [PATCH 2/4] add options parameter on `createTask` method --- src/app/client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/app/client.ts b/src/app/client.ts index 34fc1e4..7daf1ef 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -121,8 +121,8 @@ export default class Client extends Base { * @example * client.createTask('task.add').delay([1, 2]) */ - public createTask(name: string): Task { - return new Task(this, name); + public createTask(name: string, options?: TaskOptions): Task { + return new Task(this, name, options); } /** From d0a729f6b4b4a64f0bbcce8025949c1cf34d2d72 Mon Sep 17 00:00:00 2001 From: comfuture Date: Thu, 12 Jan 2023 17:36:03 +0900 Subject: [PATCH 3/4] add test for runtime routingKey changes --- test/app/test_client.ts | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/test/app/test_client.ts b/test/app/test_client.ts index 29f662e..3879aa9 100644 --- a/test/app/test_client.ts +++ b/test/app/test_client.ts @@ -16,6 +16,12 @@ describe("celery functional tests", () => { "redis://localhost:6379/0" ); + const worker2 = new Worker( + "redis://localhost:6379/0", + "redis://localhost:6379/0", + "my_queue" + ); + before(() => { worker.register("tasks.add", (a, b) => a + b); worker.register( @@ -25,16 +31,21 @@ describe("celery functional tests", () => { setTimeout(() => resolve(result), delay); }) ); - worker.start(); + + worker2.register("tasks.multiply", (a, b) => a * b); + Promise.all([worker.start(), worker2.start()]); }); afterEach(() => { sinon.restore(); - return worker.whenCurrentJobsFinished(); + return Promise.all([ + worker.whenCurrentJobsFinished(), + worker2.whenCurrentJobsFinished() + ]); }); after(() => { - Promise.all([client.disconnect(), worker.disconnect()]); + Promise.all([client.disconnect(), worker.disconnect(), worker2.disconnect()]); const redis = new Redis(); redis.flushdb().then(() => redis.quit()); @@ -112,4 +123,24 @@ describe("celery functional tests", () => { }) }); }); + + describe("custom routing key", () => { + it("should create a task with another routing key", done => { + const task = client.createTask("tasks.multiply", { routingKey: "my_queue" }); + const result = task.applyAsync([2, 3]); + result.get(500).then((message) => { + assert.equal(message, 6); + }) + done(); + }); + + it('should send_task with another routing key', done => { + const task = client.createTask("tasks.multiply"); + const result = task.applyAsync([2, 3], undefined, { routingKey: "my_queue" }); + result.get(500).then((message) => { + assert.equal(message, 6); + }) + done(); + }) + }) }); From a5f3d39adfb7b1648f193bc0ea9057cedd20abe0 Mon Sep 17 00:00:00 2001 From: comfuture Date: Thu, 12 Jan 2023 17:50:36 +0900 Subject: [PATCH 4/4] version bump --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 70e6084..741c246 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "celery-node", - "version": "0.5.9", + "version": "0.5.10", "description": "celery written in nodejs", "main": "dist/index.js", "types": "dist/index.d.ts",