Skip to content

Commit

Permalink
feat(lazer/js-sdk): add promises for open and disconnected (#2254)
Browse files Browse the repository at this point in the history
* feat: add promise for connection open

* feat: add promise for all connections down

* refactor: rename socket files

* fix: lint

* feat: bump ver

* feat: better interface for allConnectionsDown events

* fix: docs

* fix: naming
  • Loading branch information
tejasbadadare authored Jan 15, 2025
1 parent 8bf0ad2 commit 1c65304
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 77 deletions.
9 changes: 8 additions & 1 deletion lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { PythLazerClient } from "../src/index.js";
// Ignore debug messages
console.debug = () => {};

const client = new PythLazerClient(
const client = await PythLazerClient.create(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

// Read and process messages from the Lazer stream
client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
Expand All @@ -39,6 +40,12 @@ client.addMessageListener((message) => {
}
});

// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
client.addAllConnectionsDownListener(() => {
console.error("All connections are down!");
});

// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
Expand Down
2 changes: 1 addition & 1 deletion lazer/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/pyth-lazer-sdk",
"version": "0.2.1",
"version": "0.3.0",
"description": "Pyth Lazer SDK",
"publishConfig": {
"access": "public"
Expand Down
28 changes: 22 additions & 6 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
type Response,
SOLANA_FORMAT_MAGIC_BE,
} from "./protocol.js";
import { WebSocketPool } from "./socket/web-socket-pool.js";
import { WebSocketPool } from "./socket/websocket-pool.js";

export type BinaryResponse = {
subscriptionId: number;
Expand All @@ -30,24 +30,31 @@ const UINT32_NUM_BYTES = 4;
const UINT64_NUM_BYTES = 8;

export class PythLazerClient {
wsp: WebSocketPool;
private constructor(private readonly wsp: WebSocketPool) {}

/**
* Creates a new PythLazerClient instance.
* @param urls - List of WebSocket URLs of the Pyth Lazer service
* @param token - The access token for authentication
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs.
* @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
*/
constructor(
static async create(
urls: string[],
token: string,
numConnections = 3,
logger: Logger = dummyLogger
) {
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
): Promise<PythLazerClient> {
const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
return new PythLazerClient(wsp);
}

/**
* Adds a message listener that receives either JSON or binary responses from the WebSocket connections.
* The listener will be called for each message received, with deduplication across redundant connections.
* @param handler - Callback function that receives the parsed message. The message can be either a JSON response
* or a binary response containing EVM, Solana, or parsed payload data.
*/
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
this.wsp.addMessageListener((data: WebSocket.Data) => {
if (typeof data == "string") {
Expand Down Expand Up @@ -110,6 +117,15 @@ export class PythLazerClient {
await this.wsp.sendRequest(request);
}

/**
* Registers a handler function that will be called whenever all WebSocket connections are down or attempting to reconnect.
* The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`.
* @param handler - Function to be called when all connections are down
*/
addAllConnectionsDownListener(handler: () => void): void {
this.wsp.addAllConnectionsDownListener(handler);
}

shutdown(): void {
this.wsp.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,9 @@ import type { ClientRequestArgs } from "node:http";
import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
import type { Logger } from "ts-log";

// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
const HEARTBEAT_TIMEOUT_DURATION = 10_000;
const CONNECTION_TIMEOUT = 5000;

/**
* This class wraps websocket to provide a resilient web socket client.
*
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
* if it receives no ping request or regular message from server within a while as indication
* of timeout (assuming the server sends either regularly).
*
* This class also logs events if logger is given and by replacing onError method you can handle
* connection errors yourself (e.g: do not retry and close the connection).
*/
export class ResilientWebSocket {
endpoint: string;
wsClient: undefined | WebSocket;
Expand All @@ -24,10 +14,23 @@ export class ResilientWebSocket {
private wsFailedAttempts: number;
private heartbeatTimeout: undefined | NodeJS.Timeout;
private logger: undefined | Logger;
private connectionPromise: Promise<void> | undefined;
private resolveConnection: (() => void) | undefined;
private rejectConnection: ((error: Error) => void) | undefined;
private _isReconnecting = false;

get isReconnecting(): boolean {
return this._isReconnecting;
}

get isConnected(): boolean {
return this.wsClient?.readyState === WebSocket.OPEN;
}

onError: (error: ErrorEvent) => void;
onMessage: (data: WebSocket.Data) => void;
onReconnect: () => void;

constructor(
endpoint: string,
wsOptions?: ClientOptions | ClientRequestArgs,
Expand Down Expand Up @@ -64,23 +67,48 @@ export class ResilientWebSocket {
}
}

startWebSocket(): void {
async startWebSocket(): Promise<void> {
if (this.wsClient !== undefined) {
// If there's an existing connection attempt, wait for it
if (this.connectionPromise) {
return this.connectionPromise;
}
return;
}

this.logger?.info(`Creating Web Socket client`);

// Create a new promise for this connection attempt
this.connectionPromise = new Promise((resolve, reject) => {
this.resolveConnection = resolve;
this.rejectConnection = reject;
});

// Set a connection timeout
const timeoutId = setTimeout(() => {
if (this.rejectConnection) {
this.rejectConnection(
new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`)
);
}
}, CONNECTION_TIMEOUT);

this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
this.wsUserClosed = false;

this.wsClient.addEventListener("open", () => {
this.wsFailedAttempts = 0;
this.resetHeartbeat();
clearTimeout(timeoutId);
this._isReconnecting = false;
this.resolveConnection?.();
});

this.wsClient.addEventListener("error", (event) => {
this.onError(event);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket connection failed"));
}
});

this.wsClient.addEventListener("message", (event) => {
Expand All @@ -89,24 +117,23 @@ export class ResilientWebSocket {
});

this.wsClient.addEventListener("close", () => {
clearTimeout(timeoutId);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket closed before connecting"));
}
void this.handleClose();
});

// Handle ping events if supported (Node.js only)
if ("on" in this.wsClient) {
// Ping handler is undefined in browser side
this.wsClient.on("ping", () => {
this.logger?.info("Ping received");
this.resetHeartbeat();
});
}

return this.connectionPromise;
}

/**
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
* we assume the connection is dead and reconnect.
*/
private resetHeartbeat(): void {
if (this.heartbeatTimeout !== undefined) {
clearTimeout(this.heartbeatTimeout);
Expand Down Expand Up @@ -145,8 +172,13 @@ export class ResilientWebSocket {
} else {
this.wsFailedAttempts += 1;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;

const waitTime = expoBackoff(this.wsFailedAttempts);

this._isReconnecting = true;
this.logger?.error(
"Connection closed unexpectedly or because of timeout. Reconnecting after " +
String(waitTime) +
Expand All @@ -163,7 +195,7 @@ export class ResilientWebSocket {
return;
}

this.startWebSocket();
await this.startWebSocket();
await this.waitForMaybeReadyWebSocket();

if (this.wsClient === undefined) {
Expand All @@ -180,6 +212,9 @@ export class ResilientWebSocket {
if (this.wsClient !== undefined) {
const client = this.wsClient;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;
client.close();
}
this.wsUserClosed = true;
Expand Down
Loading

0 comments on commit 1c65304

Please sign in to comment.