From 47adce4209c6c708f7c792ad6b809e1898c221fb Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Tue, 22 Oct 2024 11:10:39 -0700 Subject: [PATCH 1/3] properly end connection span --- transport/connection.ts | 20 ++++++++++++++++++++ transport/impls/ws/connection.ts | 12 +++--------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/transport/connection.ts b/transport/connection.ts index 7e05da0d..62d087fe 100644 --- a/transport/connection.ts +++ b/transport/connection.ts @@ -47,6 +47,26 @@ export abstract class Connection { return [...this._errorListeners]; } + onData(msg: Uint8Array) { + for (const cb of this._dataListeners) { + cb(msg); + } + } + + onError(err: Error) { + for (const cb of this._errorListeners) { + cb(err); + } + } + + onClose() { + for (const cb of this._closeListeners) { + cb(); + } + + this.telemetry?.span.end(); + } + /** * Handle adding a callback for when a message is received. * @param msg The message that was received. diff --git a/transport/impls/ws/connection.ts b/transport/impls/ws/connection.ts index f7a1cbe8..c436c3ac 100644 --- a/transport/impls/ws/connection.ts +++ b/transport/impls/ws/connection.ts @@ -40,20 +40,14 @@ export class WebSocketConnection extends Connection { `websocket closed with code and reason: ${code} - ${reason}`, ); - for (const cb of this.errorListeners) { - cb(err); - } + this.onError(err); } - for (const cb of this.closeListeners) { - cb(); - } + this.onClose(); }; this.ws.onmessage = (msg) => { - for (const cb of this.dataListeners) { - cb(msg.data as Uint8Array); - } + this.onData(msg.data as Uint8Array); }; } From aaf5be8462f5a6ab6b4ee96e835c65de0e2a534b Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Tue, 22 Oct 2024 11:10:47 -0700 Subject: [PATCH 2/3] 0.203.2 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 6885e9eb..40c24c6f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.203.1", + "version": "0.203.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.203.1", + "version": "0.203.2", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", diff --git a/package.json b/package.json index 9a4d2e91..d259095e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.203.1", + "version": "0.203.2", "type": "module", "exports": { ".": { From dcf26e8d88e75c03f74c627fc1616bd0a1f64afe Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Tue, 22 Oct 2024 11:16:50 -0700 Subject: [PATCH 3/3] use spread version of listeners --- transport/connection.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transport/connection.ts b/transport/connection.ts index 62d087fe..28f389f6 100644 --- a/transport/connection.ts +++ b/transport/connection.ts @@ -48,19 +48,19 @@ export abstract class Connection { } onData(msg: Uint8Array) { - for (const cb of this._dataListeners) { + for (const cb of this.dataListeners) { cb(msg); } } onError(err: Error) { - for (const cb of this._errorListeners) { + for (const cb of this.errorListeners) { cb(err); } } onClose() { - for (const cb of this._closeListeners) { + for (const cb of this.closeListeners) { cb(); }