Skip to content

Commit

Permalink
fix: handle uncatch exectipn on messager event
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Jan 1, 2025
1 parent d03a004 commit 1c72b38
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 73 deletions.
30 changes: 30 additions & 0 deletions src/lib/core/messenger/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { EventEmitter, captureRejectionSymbol } from 'node:events';
import { MessageUnhandledRejectionError } from '../../error/index.js';
import { EggApplicationCore } from '../../egg.js';

export class BaseMessenger extends EventEmitter {
protected readonly egg: EggApplicationCore;

constructor(egg: EggApplicationCore) {
super({ captureRejections: true });
this.egg = egg;
}

[captureRejectionSymbol](err: Error, event: string | symbol, ...args: any[]) {
this.egg.coreLogger.error(new MessageUnhandledRejectionError(err, event, args));
}

emit(eventName: string | symbol, ...args: any[]): boolean {
const hasListeners = this.listenerCount(eventName) > 0;
try {
return super.emit(eventName, ...args);
} catch (e: unknown) {
let err = e as Error;
if (!(err instanceof Error)) {
err = new Error(String(err));
}
this.egg.coreLogger.error(new MessageUnhandledRejectionError(err, eventName, args));
return hasListeners;
}
}
}
3 changes: 2 additions & 1 deletion src/lib/core/messenger/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export type { IMessenger } from './IMessenger.js';
* @class Messenger
*/
export function create(egg: EggApplicationCore): IMessenger {
return egg.options.mode === 'single'
const messenger = egg.options.mode === 'single'
? new LocalMessenger(egg)
: new IPCMessenger(egg);
return messenger;
}
8 changes: 3 additions & 5 deletions src/lib/core/messenger/ipc.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import { EventEmitter } from 'node:events';
import { debuglog } from 'node:util';
import workerThreads from 'node:worker_threads';
import { sendmessage } from 'sendmessage';
import type { IMessenger } from './IMessenger.js';
import type { EggApplicationCore } from '../../egg.js';
import { BaseMessenger } from './base.js';

const debug = debuglog('egg/lib/core/messenger/ipc');

/**
* Communication between app worker and agent worker by IPC channel
*/
export class Messenger extends EventEmitter implements IMessenger {
export class Messenger extends BaseMessenger implements IMessenger {
readonly pid: string;
readonly egg: EggApplicationCore;
opids: string[] = [];

constructor(egg: EggApplicationCore) {
super();
super(egg);
this.pid = String(process.pid);
this.egg = egg;
// pids of agent or app managed by master
// - retrieve app worker pids when it's an agent worker
// - retrieve agent worker pids when it's an app worker
Expand Down
8 changes: 3 additions & 5 deletions src/lib/core/messenger/local.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import { debuglog } from 'node:util';
import EventEmitter from 'node:events';
import type { IMessenger } from './IMessenger.js';
import type { EggApplicationCore } from '../../egg.js';
import { BaseMessenger } from './base.js';

const debug = debuglog('egg/lib/core/messenger/local');

/**
* Communication between app worker and agent worker with EventEmitter
*/
export class Messenger extends EventEmitter implements IMessenger {
export class Messenger extends BaseMessenger implements IMessenger {
readonly pid: string;
readonly egg: EggApplicationCore;

constructor(egg: EggApplicationCore) {
super();
this.egg = egg;
super(egg);
this.pid = String(process.pid);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/egg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ export class EggApplicationCore extends EggCore {
}

_unhandledRejectionHandler(err: any) {
this.coreLogger.error('[egg:unhandledRejection] %s', err && err.message || err);
if (!(err instanceof Error)) {
const newError = new Error(String(err));
// err maybe an object, try to copy the name, message and stack to the new error instance
Expand Down
12 changes: 12 additions & 0 deletions src/lib/error/MessageUnhandledRejectionError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export class MessageUnhandledRejectionError extends Error {
event: string | symbol;
args: any[];

constructor(err: Error, event: string | symbol, ...args: any[]) {
super(`event: ${String(event)}, error: ${err.message}`, { cause: err });
this.name = this.constructor.name;
this.event = event;
this.args = args;
Error.captureStackTrace(this, this.constructor);
}
}
1 change: 1 addition & 0 deletions src/lib/error/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './CookieLimitExceedError.js';
export * from './MessageUnhandledRejectionError.js';
8 changes: 5 additions & 3 deletions test/fixtures/apps/agent-throw/agent.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
'use strict';

module.exports = agent => {
agent.messenger.on('agent-throw', () => {
throw new Error('agent error');
throw new Error('agent error in sync function');
});

agent.messenger.on('agent-throw-async', async () => {
throw new Error('agent error in async function');
});

agent.messenger.on('agent-throw-string', () => {
Expand Down
7 changes: 5 additions & 2 deletions test/fixtures/apps/agent-throw/app/router.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
'use strict';

module.exports = app => {
app.get('/agent-throw', async function() {
app.messenger.broadcast('agent-throw');
this.body = 'done';
});

app.get('/agent-throw-async', async function() {
app.messenger.broadcast('agent-throw-async');
this.body = 'done';
});

app.get('/agent-throw-string', async function() {
app.messenger.broadcast('agent-throw-string');
this.body = 'done';
Expand Down
57 changes: 0 additions & 57 deletions test/lib/agent.test.js

This file was deleted.

61 changes: 61 additions & 0 deletions test/lib/agent.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { strict as assert } from 'node:assert';
import fs from 'node:fs';
import path from 'node:path';
import { mm } from '@eggjs/mock';
import { getFilepath, MockApplication, cluster } from '../utils.js';

describe('test/lib/agent.test.ts', () => {
afterEach(mm.restore);

describe('agent throw', () => {
const baseDir = getFilepath('apps/agent-throw');
let app: MockApplication;
before(() => {
app = cluster('apps/agent-throw');
return app.ready();
});
after(() => app.close());

it('should catch unhandled exception', done => {
app.httpRequest()
.get('/agent-throw-async')
.expect(200, err => {
assert(!err);
setTimeout(() => {
const body = fs.readFileSync(path.join(baseDir, 'logs/agent-throw/common-error.log'), 'utf8');
assert.match(body, /nodejs\.MessageUnhandledRejectionError: event: agent-throw-async, error: agent error in async function/);
app.notExpect('stderr', /nodejs.AgentWorkerDiedError/);
done();
}, 1000);
});
});

it('should exit on sync error throw', done => {
app.httpRequest()
.get('/agent-throw')
.expect(200, err => {
assert(!err);
setTimeout(() => {
const body = fs.readFileSync(path.join(baseDir, 'logs/agent-throw/common-error.log'), 'utf8');
assert.match(body, /nodejs\.MessageUnhandledRejectionError: event: agent-throw, error: agent error in sync function/);
app.notExpect('stderr', /nodejs.AgentWorkerDiedError/);
done();
}, 1000);
});
});

it('should catch uncaughtException string error', done => {
app.httpRequest()
.get('/agent-throw-string')
.expect(200, err => {
assert(!err);
setTimeout(() => {
const body = fs.readFileSync(path.join(baseDir, 'logs/agent-throw/common-error.log'), 'utf8');
assert.match(body, /nodejs\.MessageUnhandledRejectionError: event: agent-throw-string, error: agent error string/);
app.notExpect('stderr', /nodejs.AgentWorkerDiedError/);
done();
}, 1000);
});
});
});
});

0 comments on commit 1c72b38

Please sign in to comment.