diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 3f32cf2d41..ff3102f845 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -6,13 +6,14 @@ import { PythLazerClient } from "../src/index.js"; // Ignore debug messages console.debug = () => {}; -const client = new PythLazerClient( +const client = await PythLazerClient.create( ["wss://pyth-lazer.dourolabs.app/v1/stream"], "access_token", 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. console // Optionally log socket operations (to the console in this case.) ); +// Read and process messages from the Lazer stream client.addMessageListener((message) => { console.info("got message:", message); switch (message.type) { @@ -39,6 +40,12 @@ client.addMessageListener((message) => { } }); +// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) +// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). +client.addAllConnectionsDownListener(() => { + console.error("All connections are down!"); +}); + // Create and remove one or more subscriptions on the fly await client.subscribe({ type: "subscribe", diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 29a76a74c9..95e5a906d2 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "0.2.1", + "version": "0.3.0", "description": "Pyth Lazer SDK", "publishConfig": { "access": "public" diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index fc732e5acc..755b426129 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -10,7 +10,7 @@ import { type Response, SOLANA_FORMAT_MAGIC_BE, } from "./protocol.js"; -import { WebSocketPool } from "./socket/web-socket-pool.js"; +import { WebSocketPool } from "./socket/websocket-pool.js"; export type BinaryResponse = { subscriptionId: number; @@ -30,24 +30,31 @@ const UINT32_NUM_BYTES = 4; const UINT64_NUM_BYTES = 8; export class PythLazerClient { - wsp: WebSocketPool; + private constructor(private readonly wsp: WebSocketPool) {} /** * Creates a new PythLazerClient instance. * @param urls - List of WebSocket URLs of the Pyth Lazer service * @param token - The access token for authentication - * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. + * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs. * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - constructor( + static async create( urls: string[], token: string, numConnections = 3, logger: Logger = dummyLogger - ) { - this.wsp = new WebSocketPool(urls, token, numConnections, logger); + ): Promise { + const wsp = await WebSocketPool.create(urls, token, numConnections, logger); + return new PythLazerClient(wsp); } + /** + * Adds a message listener that receives either JSON or binary responses from the WebSocket connections. + * The listener will be called for each message received, with deduplication across redundant connections. + * @param handler - Callback function that receives the parsed message. The message can be either a JSON response + * or a binary response containing EVM, Solana, or parsed payload data. + */ addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { this.wsp.addMessageListener((data: WebSocket.Data) => { if (typeof data == "string") { @@ -110,6 +117,15 @@ export class PythLazerClient { await this.wsp.sendRequest(request); } + /** + * Registers a handler function that will be called whenever all WebSocket connections are down or attempting to reconnect. + * The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`. + * @param handler - Function to be called when all connections are down + */ + addAllConnectionsDownListener(handler: () => void): void { + this.wsp.addAllConnectionsDownListener(handler); + } + shutdown(): void { this.wsp.shutdown(); } diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts similarity index 71% rename from lazer/sdk/js/src/socket/resilient-web-socket.ts rename to lazer/sdk/js/src/socket/resilient-websocket.ts index 7b221f3245..0c88c83bd9 100644 --- a/lazer/sdk/js/src/socket/resilient-web-socket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -3,19 +3,9 @@ import type { ClientRequestArgs } from "node:http"; import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws"; import type { Logger } from "ts-log"; -// Reconnect with expo backoff if we don't get a message or ping for 10 seconds const HEARTBEAT_TIMEOUT_DURATION = 10_000; +const CONNECTION_TIMEOUT = 5000; -/** - * This class wraps websocket to provide a resilient web socket client. - * - * It will reconnect if connection fails with exponential backoff. Also, it will reconnect - * if it receives no ping request or regular message from server within a while as indication - * of timeout (assuming the server sends either regularly). - * - * This class also logs events if logger is given and by replacing onError method you can handle - * connection errors yourself (e.g: do not retry and close the connection). - */ export class ResilientWebSocket { endpoint: string; wsClient: undefined | WebSocket; @@ -24,10 +14,23 @@ export class ResilientWebSocket { private wsFailedAttempts: number; private heartbeatTimeout: undefined | NodeJS.Timeout; private logger: undefined | Logger; + private connectionPromise: Promise | undefined; + private resolveConnection: (() => void) | undefined; + private rejectConnection: ((error: Error) => void) | undefined; + private _isReconnecting = false; + + get isReconnecting(): boolean { + return this._isReconnecting; + } + + get isConnected(): boolean { + return this.wsClient?.readyState === WebSocket.OPEN; + } onError: (error: ErrorEvent) => void; onMessage: (data: WebSocket.Data) => void; onReconnect: () => void; + constructor( endpoint: string, wsOptions?: ClientOptions | ClientRequestArgs, @@ -64,23 +67,48 @@ export class ResilientWebSocket { } } - startWebSocket(): void { + async startWebSocket(): Promise { if (this.wsClient !== undefined) { + // If there's an existing connection attempt, wait for it + if (this.connectionPromise) { + return this.connectionPromise; + } return; } this.logger?.info(`Creating Web Socket client`); + // Create a new promise for this connection attempt + this.connectionPromise = new Promise((resolve, reject) => { + this.resolveConnection = resolve; + this.rejectConnection = reject; + }); + + // Set a connection timeout + const timeoutId = setTimeout(() => { + if (this.rejectConnection) { + this.rejectConnection( + new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`) + ); + } + }, CONNECTION_TIMEOUT); + this.wsClient = new WebSocket(this.endpoint, this.wsOptions); this.wsUserClosed = false; this.wsClient.addEventListener("open", () => { this.wsFailedAttempts = 0; this.resetHeartbeat(); + clearTimeout(timeoutId); + this._isReconnecting = false; + this.resolveConnection?.(); }); this.wsClient.addEventListener("error", (event) => { this.onError(event); + if (this.rejectConnection) { + this.rejectConnection(new Error("WebSocket connection failed")); + } }); this.wsClient.addEventListener("message", (event) => { @@ -89,24 +117,23 @@ export class ResilientWebSocket { }); this.wsClient.addEventListener("close", () => { + clearTimeout(timeoutId); + if (this.rejectConnection) { + this.rejectConnection(new Error("WebSocket closed before connecting")); + } void this.handleClose(); }); - // Handle ping events if supported (Node.js only) if ("on" in this.wsClient) { - // Ping handler is undefined in browser side this.wsClient.on("ping", () => { this.logger?.info("Ping received"); this.resetHeartbeat(); }); } + + return this.connectionPromise; } - /** - * Reset the heartbeat timeout. This is called when we receive any message (ping or regular) - * from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION, - * we assume the connection is dead and reconnect. - */ private resetHeartbeat(): void { if (this.heartbeatTimeout !== undefined) { clearTimeout(this.heartbeatTimeout); @@ -145,8 +172,13 @@ export class ResilientWebSocket { } else { this.wsFailedAttempts += 1; this.wsClient = undefined; + this.connectionPromise = undefined; + this.resolveConnection = undefined; + this.rejectConnection = undefined; + const waitTime = expoBackoff(this.wsFailedAttempts); + this._isReconnecting = true; this.logger?.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + String(waitTime) + @@ -163,7 +195,7 @@ export class ResilientWebSocket { return; } - this.startWebSocket(); + await this.startWebSocket(); await this.waitForMaybeReadyWebSocket(); if (this.wsClient === undefined) { @@ -180,6 +212,9 @@ export class ResilientWebSocket { if (this.wsClient !== undefined) { const client = this.wsClient; this.wsClient = undefined; + this.connectionPromise = undefined; + this.resolveConnection = undefined; + this.rejectConnection = undefined; client.close(); } this.wsUserClosed = true; diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts similarity index 67% rename from lazer/sdk/js/src/socket/web-socket-pool.ts rename to lazer/sdk/js/src/socket/websocket-pool.ts index c35d9c6f3c..3ee44ba15a 100644 --- a/lazer/sdk/js/src/socket/web-socket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -2,10 +2,9 @@ import TTLCache from "@isaacs/ttlcache"; import WebSocket from "isomorphic-ws"; import { dummyLogger, type Logger } from "ts-log"; -import { ResilientWebSocket } from "./resilient-web-socket.js"; +import { ResilientWebSocket } from "./resilient-websocket.js"; import type { Request, Response } from "../protocol.js"; -// Number of redundant parallel WebSocket connections const DEFAULT_NUM_CONNECTIONS = 3; export class WebSocketPool { @@ -13,6 +12,21 @@ export class WebSocketPool { private cache: TTLCache; private subscriptions: Map; // id -> subscription Request private messageListeners: ((event: WebSocket.Data) => void)[]; + private allConnectionsDownListeners: (() => void)[]; + private wasAllDown = true; + + private constructor(private readonly logger: Logger = dummyLogger) { + this.rwsPool = []; + this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds + this.subscriptions = new Map(); + this.messageListeners = []; + this.allConnectionsDownListeners = []; + + // Start monitoring connection states + setInterval(() => { + this.checkConnectionStates(); + }, 100); + } /** * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability. @@ -22,22 +36,21 @@ export class WebSocketPool { * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3) * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - constructor( + static async create( urls: string[], token: string, numConnections: number = DEFAULT_NUM_CONNECTIONS, - private readonly logger: Logger = dummyLogger - ) { + logger: Logger = dummyLogger + ): Promise { if (urls.length === 0) { throw new Error("No URLs provided"); } - // This cache is used to deduplicate messages received across different websocket clients in the pool. - // A TTL cache is used to prevent unbounded memory usage. A very short TTL of 10 seconds is chosen since - // deduplication only needs to happen between messages received very close together in time. - this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds - this.rwsPool = []; - this.subscriptions = new Map(); - this.messageListeners = []; + + const pool = new WebSocketPool(logger); + + // Create all websocket instances + const connectionPromises: Promise[] = []; + for (let i = 0; i < numConnections; i++) { const url = urls[i % urls.length]; if (!url) { @@ -52,36 +65,44 @@ export class WebSocketPool { // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish // the connection and call the onReconnect callback. - // When we reconnect, replay all subscription messages to resume the data stream. rws.onReconnect = () => { if (rws.wsUserClosed) { return; } - for (const [, request] of this.subscriptions) { + for (const [, request] of pool.subscriptions) { try { void rws.send(JSON.stringify(request)); } catch (error) { - this.logger.error( + pool.logger.error( "Failed to resend subscription on reconnect:", error ); } } }; + // Handle all client messages ourselves. Dedupe before sending to registered message handlers. - rws.onMessage = this.dedupeHandler; - this.rwsPool.push(rws); + rws.onMessage = pool.dedupeHandler; + pool.rwsPool.push(rws); + + // Start the websocket and collect the promise + connectionPromises.push(rws.startWebSocket()); } - // Let it rip - // TODO: wait for sockets to receive `open` msg before subscribing? - for (const rws of this.rwsPool) { - rws.startWebSocket(); + // Wait for all connections to be established + try { + await Promise.all(connectionPromises); + } catch (error) { + // If any connection fails, clean up and throw + pool.shutdown(); + throw error; } - this.logger.info( - `Using ${numConnections.toString()} redundant WebSocket connections` + pool.logger.info( + `Successfully established ${numConnections.toString()} redundant WebSocket connections` ); + + return pool; } /** @@ -105,23 +126,18 @@ export class WebSocketPool { * multiple connections before forwarding to registered handlers */ dedupeHandler = (data: WebSocket.Data): void => { - // For string data, use the whole string as the cache key. This avoids expensive JSON parsing during deduping. - // For binary data, use the hex string representation as the cache key const cacheKey = typeof data === "string" ? data : Buffer.from(data as Buffer).toString("hex"); - // If we've seen this exact message recently, drop it if (this.cache.has(cacheKey)) { this.logger.debug("Dropping duplicate message"); return; } - // Haven't seen this message, cache it and forward to handlers this.cache.set(cacheKey, true); - // Check for errors in JSON responses if (typeof data === "string") { this.handleErrorMessages(data); } @@ -131,28 +147,18 @@ export class WebSocketPool { } }; - /** - * Sends a message to all websockets in the pool - * @param request - The request to send - */ async sendRequest(request: Request): Promise { - // Send to all websockets in the pool const sendPromises = this.rwsPool.map(async (rws) => { try { await rws.send(JSON.stringify(request)); } catch (error) { this.logger.error("Failed to send request:", error); - throw error; // Re-throw the error + throw error; } }); await Promise.all(sendPromises); } - /** - * Adds a subscription by sending a subscribe request to all websockets in the pool - * and storing it for replay on reconnection - * @param request - The subscription request to send - */ async addSubscription(request: Request): Promise { if (request.type !== "subscribe") { throw new Error("Request must be a subscribe request"); @@ -161,11 +167,6 @@ export class WebSocketPool { await this.sendRequest(request); } - /** - * Removes a subscription by sending an unsubscribe request to all websockets in the pool - * and removing it from stored subscriptions - * @param subscriptionId - The ID of the subscription to remove - */ async removeSubscription(subscriptionId: number): Promise { this.subscriptions.delete(subscriptionId); const request: Request = { @@ -175,17 +176,40 @@ export class WebSocketPool { await this.sendRequest(request); } - /** - * Adds a message handler function to receive websocket messages - * @param handler - Function that will be called with each received message - */ addMessageListener(handler: (data: WebSocket.Data) => void): void { this.messageListeners.push(handler); } /** - * Elegantly closes all websocket connections in the pool + * Calls the handler if all websocket connections are currently down or in reconnecting state. + * The connections may still try to reconnect in the background. */ + addAllConnectionsDownListener(handler: () => void): void { + this.allConnectionsDownListeners.push(handler); + } + + private areAllConnectionsDown(): boolean { + return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting); + } + + private checkConnectionStates(): void { + const allDown = this.areAllConnectionsDown(); + + // If all connections just went down + if (allDown && !this.wasAllDown) { + this.wasAllDown = true; + this.logger.error("All WebSocket connections are down or reconnecting"); + // Notify all listeners + for (const listener of this.allConnectionsDownListeners) { + listener(); + } + } + // If at least one connection was restored + if (!allDown && this.wasAllDown) { + this.wasAllDown = false; + } + } + shutdown(): void { for (const rws of this.rwsPool) { rws.closeWebSocket(); @@ -193,5 +217,6 @@ export class WebSocketPool { this.rwsPool = []; this.subscriptions.clear(); this.messageListeners = []; + this.allConnectionsDownListeners = []; } }