From 83ae95ead9e8e8a3402577a91fbe8b86df4e2959 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 17:16:53 -0800 Subject: [PATCH] feat: add promise for all connections down --- lazer/sdk/js/examples/index.ts | 131 +++++++++--------- lazer/sdk/js/src/client.ts | 7 + .../sdk/js/src/socket/resilient-web-socket.ts | 11 ++ lazer/sdk/js/src/socket/web-socket-pool.ts | 44 ++++++ 4 files changed, 129 insertions(+), 64 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 963e9b206d..3ca3b3993a 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -7,78 +7,81 @@ import { PythLazerClient } from "../src/index.js"; console.debug = () => {}; async function main() { - try { - const client = await PythLazerClient.create( - ["wss://pyth-lazer.dourolabs.app/v1/stream"], - "access_token", - 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. - console // Optionally log socket operations (to the console in this case.) - ); + const client = await PythLazerClient.create( + ["wss://pyth-lazer.dourolabs.app/v1/stream"], + "access_token", + 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. + console // Optionally log socket operations (to the console in this case.) + ); - client.addMessageListener((message) => { - console.info("got message:", message); - switch (message.type) { - case "json": { - if (message.value.type == "streamUpdated") { - console.info( - "stream updated for subscription", - message.value.subscriptionId, - ":", - message.value.parsed?.priceFeeds - ); - } - break; + // Monitor for all connections being down + client.onAllConnectionsDown().then(() => { + // Handle complete connection failure. + // The connections will keep attempting to reconnect with expo backoff. + // To shutdown the client completely, call shutdown(). + console.error("All connections are down!"); + }); + + client.addMessageListener((message) => { + console.info("got message:", message); + switch (message.type) { + case "json": { + if (message.value.type == "streamUpdated") { + console.info( + "stream updated for subscription", + message.value.subscriptionId, + ":", + message.value.parsed?.priceFeeds + ); + } + break; + } + case "binary": { + if ("solana" in message.value) { + console.info( + "solana message:", + message.value.solana?.toString("hex") + ); } - case "binary": { - if ("solana" in message.value) { - console.info( - "solana message:", - message.value.solana?.toString("hex") - ); - } - if ("evm" in message.value) { - console.info("evm message:", message.value.evm?.toString("hex")); - } - break; + if ("evm" in message.value) { + console.info("evm message:", message.value.evm?.toString("hex")); } + break; } - }); + } + }); - // Create and remove one or more subscriptions on the fly - await client.subscribe({ - type: "subscribe", - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ["price"], - chains: ["solana"], - deliveryFormat: "binary", - channel: "fixed_rate@200ms", - parsed: false, - jsonBinaryEncoding: "base64", - }); - await client.subscribe({ - type: "subscribe", - subscriptionId: 2, - priceFeedIds: [1, 2, 3, 4, 5], - properties: ["price"], - chains: ["evm"], - deliveryFormat: "json", - channel: "fixed_rate@200ms", - parsed: true, - jsonBinaryEncoding: "hex", - }); + // Create and remove one or more subscriptions on the fly + await client.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + chains: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", + }); + await client.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price"], + chains: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", + }); - await new Promise((resolve) => setTimeout(resolve, 10_000)); + await new Promise((resolve) => setTimeout(resolve, 10_000)); - await client.unsubscribe(1); - await client.unsubscribe(2); + await client.unsubscribe(1); + await client.unsubscribe(2); - await new Promise((resolve) => setTimeout(resolve, 10_000)); - client.shutdown(); - } catch (error) { - console.error("Error initializing client:", error); - process.exit(1); - } + await new Promise((resolve) => setTimeout(resolve, 10_000)); + client.shutdown(); } main().catch((error) => { diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index ca7d285172..05052c7ec2 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -111,6 +111,13 @@ export class PythLazerClient { await this.wsp.sendRequest(request); } + /** + * Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect + */ + onAllConnectionsDown(): Promise { + return this.wsp.onAllConnectionsDown(); + } + shutdown(): void { this.wsp.shutdown(); } diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-web-socket.ts index f6c48e1e83..8ee1588d41 100644 --- a/lazer/sdk/js/src/socket/resilient-web-socket.ts +++ b/lazer/sdk/js/src/socket/resilient-web-socket.ts @@ -16,6 +16,15 @@ export class ResilientWebSocket { private connectionPromise: Promise | undefined; private resolveConnection: (() => void) | undefined; private rejectConnection: ((error: Error) => void) | undefined; + private _isReconnecting: boolean = false; + + get isReconnecting(): boolean { + return this._isReconnecting; + } + + get isConnected(): boolean { + return this.wsClient?.readyState === WebSocket.OPEN; + } onError: (error: ErrorEvent) => void; onMessage: (data: WebSocket.Data) => void; @@ -90,6 +99,7 @@ export class ResilientWebSocket { this.wsFailedAttempts = 0; this.resetHeartbeat(); clearTimeout(timeoutId); + this._isReconnecting = false; this.resolveConnection?.(); }); @@ -167,6 +177,7 @@ export class ResilientWebSocket { const waitTime = expoBackoff(this.wsFailedAttempts); + this._isReconnecting = true; this.logger?.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + String(waitTime) + diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/web-socket-pool.ts index 8743c0de2b..99ecd9eb95 100644 --- a/lazer/sdk/js/src/socket/web-socket-pool.ts +++ b/lazer/sdk/js/src/socket/web-socket-pool.ts @@ -12,12 +12,18 @@ export class WebSocketPool { private cache: TTLCache; private subscriptions: Map; // id -> subscription Request private messageListeners: ((event: WebSocket.Data) => void)[]; + private allConnectionsDownListeners: (() => void)[]; + private wasAllDown: boolean = true; private constructor(private readonly logger: Logger = dummyLogger) { this.rwsPool = []; this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds this.subscriptions = new Map(); this.messageListeners = []; + this.allConnectionsDownListeners = []; + + // Start monitoring connection states + setInterval(() => this.checkConnectionStates(), 100); } /** @@ -172,6 +178,43 @@ export class WebSocketPool { this.messageListeners.push(handler); } + /** + * Monitors if all websocket connections are currently down or in reconnecting state + * Returns a promise that resolves when all connections are down + */ + onAllConnectionsDown(): Promise { + return new Promise((resolve) => { + if (this.areAllConnectionsDown()) { + resolve(); + } else { + this.allConnectionsDownListeners.push(resolve); + } + }); + } + + private areAllConnectionsDown(): boolean { + return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting); + } + + private checkConnectionStates(): void { + const allDown = this.areAllConnectionsDown(); + + // If all connections just went down + if (allDown && !this.wasAllDown) { + this.wasAllDown = true; + this.logger.error("All WebSocket connections are down or reconnecting"); + // Notify all listeners + while (this.allConnectionsDownListeners.length > 0) { + const listener = this.allConnectionsDownListeners.shift(); + listener?.(); + } + } + // If at least one connection was restored + if (!allDown && this.wasAllDown) { + this.wasAllDown = false; + } + } + shutdown(): void { for (const rws of this.rwsPool) { rws.closeWebSocket(); @@ -179,5 +222,6 @@ export class WebSocketPool { this.rwsPool = []; this.subscriptions.clear(); this.messageListeners = []; + this.allConnectionsDownListeners = []; } }