Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lazer/sdk/rust): add rust client for Lazer #2310

Merged
merged 20 commits into from
Jan 31, 2025
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
bc150e4
feat: add rust consumer sdk for lazer
devin-ai-integration[bot] Jan 30, 2025
f0ce60d
feat: rename crate to pyth-lazer-sdk and client to LazerClient
devin-ai-integration[bot] Jan 30, 2025
f97880e
fix: update base64 encoding to use Engine::encode
devin-ai-integration[bot] Jan 30, 2025
aab0230
fix: add base64 Engine trait import
devin-ai-integration[bot] Jan 30, 2025
31a0959
feat: add example for pyth-lazer-sdk
devin-ai-integration[bot] Jan 30, 2025
0692f33
fix: fix example
tejasbadadare Jan 30, 2025
c31d118
refactor: separate client new() and start() functions, improve unsubs…
devin-ai-integration[bot] Jan 30, 2025
d3e8844
refactor: use Bearer token authentication in header and improve unsub…
devin-ai-integration[bot] Jan 30, 2025
9474cfc
refactor: make access token required parameter
devin-ai-integration[bot] Jan 30, 2025
9964432
fix: remove invalid client return from start function
devin-ai-integration[bot] Jan 30, 2025
2cd41a3
style: improve code formatting and fix example documentation
devin-ai-integration[bot] Jan 30, 2025
c15f283
fix: formatting, example
tejasbadadare Jan 30, 2025
4160bc8
Merge branch 'devin/1738263054-add-lazer-rust-consumer-sdk' of github…
tejasbadadare Jan 30, 2025
c9f700b
refactor: rename, remove doctest
tejasbadadare Jan 30, 2025
f71d95a
fix: example
tejasbadadare Jan 30, 2025
c3d11c3
feat: add close()
tejasbadadare Jan 30, 2025
5a3b3c9
refactor: move binary msg parsing to protocol crate, improve examples…
tejasbadadare Jan 31, 2025
d8fba66
feat: bump protocol ver
tejasbadadare Jan 31, 2025
a8472d0
fix: don't ser/de the binary message, leave that to the client. only …
tejasbadadare Jan 31, 2025
19ffbdd
fix: explicitly choose JsonBinaryEncoding::Base64 instead of default()
tejasbadadare Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion lazer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lazer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
resolver = "2"
members = [
"sdk/rust/protocol",
"sdk/rust/client",
"contracts/solana/programs/pyth-lazer-solana-contract",
]

26 changes: 26 additions & 0 deletions lazer/sdk/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "pyth-lazer-client"
version = "0.1.0"
edition = "2021"
description = "A Rust client for Pyth Lazer"
license = "Apache-2.0"

[dependencies]
pyth-lazer-protocol = { path = "../protocol" }
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
base64 = "0.22.1"
anyhow = "1.0"
tracing = "0.1"
url = "2.4"

[dev_dependencies]
bincode = "1.3.3"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
hex = "0.4.3"
libsecp256k1 = "0.7.1"
bs58 = "0.5.1"
alloy-primitives = "0.8.19"
158 changes: 158 additions & 0 deletions lazer/sdk/rust/client/examples/subscribe_price_feeds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use base64::Engine;
use futures_util::StreamExt;
use pyth_lazer_client::LazerClient;
use pyth_lazer_protocol::message::{EvmMessage, SolanaMessage};
use pyth_lazer_protocol::payload::PayloadData;
use pyth_lazer_protocol::router::{
Chain, Channel, DeliveryFormat, FixedRate, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
SubscriptionParams, SubscriptionParamsRepr,
};
use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};

fn get_lazer_access_token() -> String {
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
let token = "your token here";
std::env::var("LAZER_ACCESS_TOKEN").unwrap_or_else(|_| token.to_string())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create and start the client
let mut client = LazerClient::new(
"wss://pyth-lazer.dourolabs.app/v1/stream",
&get_lazer_access_token(),
)?;
let mut stream = client.start().await?;

let subscription_requests = vec![
// Example subscription: Parsed JSON feed targeting Solana
SubscribeRequest {
subscription_id: SubscriptionId(1),
params: SubscriptionParams::new(SubscriptionParamsRepr {
price_feed_ids: vec![PriceFeedId(1), PriceFeedId(2)],
properties: vec![
PriceFeedProperty::Price,
PriceFeedProperty::Exponent,
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Solana],
delivery_format: DeliveryFormat::Json,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: true,
channel: Channel::FixedRate(
FixedRate::from_ms(200).expect("unsupported update rate"),
),
})
.expect("invalid subscription params"),
},
// Example subscription: binary feed targeting Solana and EVM
SubscribeRequest {
subscription_id: SubscriptionId(2),
params: SubscriptionParams::new(SubscriptionParamsRepr {
price_feed_ids: vec![PriceFeedId(3), PriceFeedId(4)],
properties: vec![
PriceFeedProperty::Price,
PriceFeedProperty::Exponent,
PriceFeedProperty::BestAskPrice,
PriceFeedProperty::BestBidPrice,
],
chains: vec![Chain::Evm, Chain::Solana],
delivery_format: DeliveryFormat::Binary,
json_binary_encoding: JsonBinaryEncoding::Base64,
parsed: false,
channel: Channel::FixedRate(
FixedRate::from_ms(50).expect("unsupported update rate"),
),
})
.expect("invalid subscription params"),
},
];

for req in subscription_requests {
client.subscribe(Request::Subscribe(req)).await?;
}

println!("Subscribed to price feeds. Waiting for updates...");

// Process the first few updates
let mut count = 0;
while let Some(msg) = stream.next().await {
// The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
match msg? {
Response::StreamUpdated(update) => {
if let Some(evm_data) = update.payload.evm {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;

// Parse and verify the EVM message
let payload = parse_and_verify_evm_message(&evm_message);
println!("EVM payload: {payload:?}\n");
}

if let Some(solana_data) = update.payload.solana {
// Decode binary data
let binary_data =
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;

// Parse and verify the Solana message
let payload = parse_and_verify_solana_message(&solana_message);
println!("Solana payload: {payload:?}\n");
}

if let Some(parsed) = update.payload.parsed {
// Parsed payloads (`parsed: true`) are already decoded and ready to use
for feed in parsed.price_feeds {
println!(
"Parsed payload: {:?}: {:?} at {:?}\n",
feed.price_feed_id, feed, parsed.timestamp_us
);
}
}
}
_ => println!("Received non-update message"),
}

count += 1;
if count >= 50 {
break;
}
}

// Unsubscribe before exiting
for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
client.unsubscribe(sub_id).await?;
println!("Unsubscribed from {:?}", sub_id);
}

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
client.close().await?;
Ok(())
}

fn parse_and_verify_solana_message(solana_message: &SolanaMessage) -> anyhow::Result<PayloadData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added parse_and_verify functions to the example to make it end-to-end. kept it as example code to match protocol/examples/parse_and_verify.rs, but i'm open to adding it as a library function. i think this is fine since this is simple signature verification using common crates.

// Verify signature using the pubkey
let public_key = ed25519_dalek::VerifyingKey::from_bytes(&solana_message.public_key)?;
public_key.verify_strict(
&solana_message.payload,
&ed25519_dalek::Signature::from_bytes(&solana_message.signature),
)?;

let payload = PayloadData::deserialize_slice_le(&solana_message.payload)?;
Ok(payload)
}

fn parse_and_verify_evm_message(evm_message: &EvmMessage) -> anyhow::Result<PayloadData> {
// Recover pubkey from message
libsecp256k1::recover(
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&evm_message.payload)),
&libsecp256k1::Signature::parse_standard(&evm_message.signature)?,
&libsecp256k1::RecoveryId::parse(evm_message.recovery_id)?,
)?;

let payload = PayloadData::deserialize_slice_be(&evm_message.payload)?;
Ok(payload)
}
121 changes: 121 additions & 0 deletions lazer/sdk/rust/client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use pyth_lazer_protocol::subscription::{
ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use url::Url;
/// A WebSocket client for consuming Pyth Lazer price feed updates
///
/// This client provides a simple interface to:
/// - Connect to a Lazer WebSocket endpoint
/// - Subscribe to price feed updates
/// - Receive updates as a stream of messages
///
pub struct LazerClient {
endpoint: Url,
access_token: String,
ws_sender: Option<
futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
Message,
>,
>,
}

impl LazerClient {
/// Creates a new Lazer client instance
///
/// # Arguments
/// * `endpoint` - The WebSocket URL of the Lazer service
/// * `access_token` - Access token for authentication
///
/// # Returns
/// Returns a new client instance (not yet connected)
pub fn new(endpoint: &str, access_token: &str) -> Result<Self> {
let endpoint = Url::parse(endpoint)?;
let access_token = access_token.to_string();
Ok(Self {
endpoint,
access_token,
ws_sender: None,
})
}

/// Starts the WebSocket connection
///
/// # Returns
/// Returns a stream of responses from the server
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<Response>>> {
let url = self.endpoint.clone();
let mut request =
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;

request.headers_mut().insert(
"Authorization",
format!("Bearer {}", self.access_token).parse().unwrap(),
);

let (ws_stream, _) = connect_async(request).await?;
let (ws_sender, ws_receiver) = ws_stream.split();

self.ws_sender = Some(ws_sender);
let response_stream = ws_receiver.map(|msg| -> Result<Response> {
let msg = msg?;
match msg {
Message::Text(text) => Ok(serde_json::from_str(&text)?),
Message::Binary(data) => Ok(Response::from_binary(&data)?),
Message::Close(_) => Ok(Response::Error(ErrorResponse {
error: "WebSocket connection closed".to_string(),
})),
_ => Ok(Response::Error(ErrorResponse {
error: "Unexpected message type".to_string(),
})),
}
});

Ok(response_stream)
}

/// Subscribes to price feed updates
///
/// # Arguments
/// * `request` - A subscription request containing feed IDs and parameters
pub async fn subscribe(&mut self, request: Request) -> Result<()> {
if let Some(sender) = &mut self.ws_sender {
let msg = serde_json::to_string(&request)?;
sender.send(Message::Text(msg)).await?;
Ok(())
} else {
anyhow::bail!("WebSocket connection not started")
}
}

/// Unsubscribes from a previously subscribed feed
///
/// # Arguments
/// * `subscription_id` - The ID of the subscription to cancel
pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
if let Some(sender) = &mut self.ws_sender {
let request = Request::Unsubscribe(UnsubscribeRequest { subscription_id });
let msg = serde_json::to_string(&request)?;
sender.send(Message::Text(msg)).await?;
Ok(())
} else {
anyhow::bail!("WebSocket connection not started")
}
}

/// Closes the WebSocket connection
pub async fn close(&mut self) -> Result<()> {
if let Some(sender) = &mut self.ws_sender {
sender.send(Message::Close(None)).await?;
self.ws_sender = None;
Ok(())
} else {
anyhow::bail!("WebSocket connection not started")
}
}
}
4 changes: 3 additions & 1 deletion lazer/sdk/rust/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-protocol"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
description = "Pyth Lazer SDK - protocol types."
license = "Apache-2.0"
@@ -10,9 +10,11 @@ repository = "https://github.com/pyth-network/pyth-crosschain"
byteorder = "1.5.0"
anyhow = "1.0.89"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0"
derive_more = { version = "1.0.0", features = ["from"] }
itertools = "0.13.0"
rust_decimal = "1.36.0"
base64 = "0.22.1"

[dev-dependencies]
bincode = "1.3.3"
69 changes: 67 additions & 2 deletions lazer/sdk/rust/protocol/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,15 @@
//! used across publishers, agents and routers.
use {
crate::router::{JsonUpdate, SubscriptionParams},
crate::{
payload::{
BINARY_UPDATE_FORMAT_MAGIC, EVM_FORMAT_MAGIC, PARSED_FORMAT_MAGIC,
SOLANA_FORMAT_MAGIC_BE,
},
router::{JsonBinaryData, JsonBinaryEncoding, JsonUpdate, SubscriptionParams},
},
anyhow::bail,
base64::Engine,
derive_more::From,
serde::{Deserialize, Serialize},
};
@@ -33,7 +41,7 @@ pub struct UnsubscribeRequest {
pub subscription_id: SubscriptionId,
}

/// A response sent from the server to the client.
/// A JSON response sent from the server to the client.
#[derive(Debug, Clone, Serialize, Deserialize, From)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
@@ -45,6 +53,63 @@ pub enum Response {
StreamUpdated(StreamUpdatedResponse),
}

impl Response {
/// Parse a binary server message into a Response
pub fn from_binary(data: &[u8]) -> anyhow::Result<Self> {
let mut pos = 0;
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);
pos += 4;

if magic != BINARY_UPDATE_FORMAT_MAGIC {
bail!("binary update format magic mismatch");
}

let subscription_id = SubscriptionId(u64::from_be_bytes(data[pos..pos + 8].try_into()?));
pos += 8;

let mut evm = None;
let mut solana = None;
let mut parsed = None;

while pos < data.len() {
let len = u16::from_be_bytes(data[pos..pos + 2].try_into()?) as usize;
pos += 2;
let magic = u32::from_be_bytes(data[pos..pos + 4].try_into()?);

match magic {
EVM_FORMAT_MAGIC => {
evm = Some(JsonBinaryData {
encoding: JsonBinaryEncoding::Base64,
data: base64::engine::general_purpose::STANDARD
.encode(&data[pos..pos + len]),
});
}
SOLANA_FORMAT_MAGIC_BE => {
solana = Some(JsonBinaryData {
encoding: JsonBinaryEncoding::Base64,
data: base64::engine::general_purpose::STANDARD
.encode(&data[pos..pos + len]),
});
}
PARSED_FORMAT_MAGIC => {
parsed = Some(serde_json::from_slice(&data[pos + 4..pos + len])?);
}
_ => bail!("unknown magic: {}", magic),
}
pos += len;
}

Ok(Response::StreamUpdated(StreamUpdatedResponse {
subscription_id,
payload: JsonUpdate {
evm,
solana,
parsed,
},
}))
}
}

/// Sent from the server after a successul subscription.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]