Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC-1434 implement reconnect for Rabbit #114

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/FUNDING.yml

This file was deleted.

Empty file removed .github/ISSUE_TEMPLATE.md
Empty file.
23 changes: 0 additions & 23 deletions .github/ISSUE_TEMPLATE/Bug-Report.md

This file was deleted.

11 changes: 0 additions & 11 deletions .github/ISSUE_TEMPLATE/Feature-Request.md

This file was deleted.

9 changes: 0 additions & 9 deletions .github/ISSUE_TEMPLATE/Free-Style.md

This file was deleted.

20 changes: 0 additions & 20 deletions .github/PULL_REQUEST_TEMPLATE.md

This file was deleted.

43 changes: 0 additions & 43 deletions .github/workflows/nodejs.yml

This file was deleted.

24 changes: 24 additions & 0 deletions .github/workflows/npm-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Github action which publish a package after commits to master
name: Post-commit publish

# Controls when the workflow will run
on:
# Triggers the workflow on push or pull request events but only for the "master" branch
push:
branches: [ "master" ]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

jobs:
npm-publish-and-notify:
uses: TechSeePublic/techsee-actions/.github/workflows/npm-publish.yml@main
with:
branch: master
use-slack: true
package-name: ${{ github.repository }}
secrets:
git-token: ${{ secrets.GITHUB_TOKEN }}
npm-token: ${{ secrets.NPM_PUBLISH_TOKEN }}
slack-success-webhookurl: ${{ secrets.SLACK_NPM_PUBLISH_WEBHOOK_URL }}
slack-failure-webhookurl: ${{ secrets.SLACK_NPM_PUBLISH_FAIL_WEBHOOK_URL }}
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "celery-node",
"version": "0.5.9",
"name": "@techsee/celery-node-mod",
"version": "1.0.22",
"description": "celery written in nodejs",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -10,7 +10,7 @@
"format-check": "prettier --check \"{,!(node_modules)/**/}*.{js,ts}\"",
"dist": "tsc",
"test": "mocha -r ts-node/register ./test/**/*",
"prepublishOnly": "npm run dist & npm test"
"prepublishOnly": "npm run dist"
},
"repository": {
"type": "git",
Expand Down
21 changes: 18 additions & 3 deletions src/app/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
* writes here Base Parent class of Celery client and worker
* @author SunMyeong Lee <[email protected]>
*/
import { CeleryConf, defaultConf } from "./conf";
import { newCeleryBroker, CeleryBroker } from "../kombu/brokers";
import { newCeleryBackend, CeleryBackend } from "../backends";
import {CeleryConf, defaultConf} from "./conf";
import {CeleryBroker, newCeleryBroker} from "../kombu/brokers";
import {CeleryBackend, newCeleryBackend} from "../backends";
import {ICeleryStatus} from "../kombu/brokers/amqp";

export default class Base {
private _backend: CeleryBackend;
Expand Down Expand Up @@ -65,4 +66,18 @@ export default class Base {
public disconnect(): Promise<any> {
return this.broker.disconnect().then(() => this.backend.disconnect());
}

public status(): ICeleryStatus {
return this._broker ? this._broker.status() : ICeleryStatus.CLOSED;
}

public addEventListener(event: string,listener: any): void {
this._broker.addEventListener(event,listener);
}

public async connect() {
this.broker;
this.backend;
await this._broker.connect();
}
}
47 changes: 41 additions & 6 deletions src/kombu/brokers/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as amqplib from "amqplib";
import { CeleryBroker } from ".";
import { Message } from "../message";


class AMQPMessage extends Message {
constructor(payload: amqplib.ConsumeMessage) {
super(
Expand All @@ -14,8 +15,15 @@ class AMQPMessage extends Message {
}
}

export enum ICeleryStatus {
DISCONNECTED = 'disconnected',
CONNECTED = 'connected',
CLOSED = 'closed'
}

export default class AMQPBroker implements CeleryBroker {
connect: Promise<amqplib.Connection>;
state: ICeleryStatus = ICeleryStatus.CLOSED;
connection: amqplib.Connection;
channel: Promise<amqplib.Channel>;
queue: string;

Expand All @@ -26,10 +34,32 @@ export default class AMQPBroker implements CeleryBroker {
* @param {object} opts the options object for amqp connect of amqplib
* @param {string} queue optional. the queue to connect to.
*/
constructor(url: string, opts: object, queue = "celery") {
constructor(private url: string, private opts: object, queue: string = "celery") {
this.queue = queue;
this.connect = amqplib.connect(url, opts);
this.channel = this.connect.then(conn => conn.createChannel());
}

public addEventListener (event: string,listener: any) {
this.connection.on(event,listener);
}

public connect() {
return new Promise(async (resolve,reject)=>{
try {
this.connection = await amqplib.connect(this.url, this.opts); // Await the connection
this.connection.on('close',()=> {
if(this.state !== ICeleryStatus.CLOSED) {
this.state = ICeleryStatus.DISCONNECTED
}
});
this.channel = this.connection.createChannel(); // Create the channel
this.state = ICeleryStatus.CONNECTED;
return resolve();
} catch (err) {
this.state = ICeleryStatus.DISCONNECTED;
reject('Failed to connect AMQPBroker:');
}
})

}

/**
Expand All @@ -56,17 +86,22 @@ export default class AMQPBroker implements CeleryBroker {
})
])
.then(() => resolve())
.catch(reject);
.catch(()=>reject());
});
});
}

public status(): ICeleryStatus {
return this.state;
}

/**
* @method AMQPBroker#disconnect
* @returns {Promise} promises that continues if amqp disconnected.
*/
public disconnect(): Promise<void> {
return this.connect.then(conn => conn.close());
this.state = ICeleryStatus.CLOSED;
return this.connection.close();
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/kombu/brokers/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as url from "url";
import RedisBroker from "./redis";
import AMQPBroker from "./amqp";
import AMQPBroker, {ICeleryStatus} from "./amqp";


export interface CeleryBroker {
isReady: () => Promise<any>;
Expand All @@ -13,6 +14,9 @@ export interface CeleryBroker {
properties: object
) => Promise<any>;
subscribe: (queue: string, callback: Function) => Promise<any>;
status: () => ICeleryStatus
addEventListener: (event: string, listener: any) => void;
connect: () => Promise<any>;
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/kombu/brokers/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as Redis from "ioredis";
import { v4 } from "uuid";
import { CeleryBroker } from ".";
import { Message } from "../message";
import {ICeleryStatus} from "./amqp";


class RedisMessage extends Message {
Expand Down Expand Up @@ -209,4 +210,15 @@ export default class RedisBroker implements CeleryBroker {
return new RedisMessage(rawMsg);
});
}

addEventListener(event: string, listener: any): void {
}

connect(): Promise<any> {
return Promise.resolve(undefined);
}

status(): ICeleryStatus {
return;
}
}