diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 723a2bb..66c42b8 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -146,9 +146,7 @@ pub fn spawn_oracle( let (updates_tx, updates_rx) = mpsc::channel(config.updates_channel_capacity); if config.subscriber_enabled { let subscriber = Subscriber::new( - rpc_url.to_string(), wss_url.to_string(), - rpc_timeout, config.commitment, key_store.program_key, updates_tx, @@ -658,14 +656,9 @@ mod subscriber { /// Subscriber subscribes to all changes on the given account, and sends those changes /// on updates_tx. This is a convenience wrapper around the Blockchain Shadow crate. pub struct Subscriber { - /// HTTP RPC endpoint - rpc_url: String, /// WSS RPC endpoint wss_url: String, - /// Timeout for RPC requests - rpc_timeout: Duration, - /// Commitment level used to read account data commitment: CommitmentLevel, @@ -681,18 +674,14 @@ mod subscriber { impl Subscriber { pub fn new( - rpc_url: String, wss_url: String, - rpc_timeout: Duration, commitment: CommitmentLevel, program_key: Pubkey, updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, logger: Logger, ) -> Self { Subscriber { - rpc_url, wss_url, - rpc_timeout, commitment, program_key, updates_tx, @@ -707,7 +696,8 @@ mod subscriber { error!(self.logger, "{}", err); debug!(self.logger, "error context"; "context" => format!("{:?}", err)); if current_time.elapsed() < Duration::from_secs(30) { - tracing::error!( + warn!( + self.logger, "Subscriber restarting too quickly. Sleeping for 1 second." ); tokio::time::sleep(Duration::from_secs(1)).await; @@ -717,15 +707,15 @@ mod subscriber { } pub async fn start(&self) -> Result<()> { - debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string()); - let client = PubsubClient::new(self.wss_url.as_str()) .await .expect("failed to create pubsub client"); let config = RpcProgramAccountsConfig { account_config: RpcAccountInfoConfig { - commitment: Some(CommitmentConfig::confirmed()), + commitment: Some(CommitmentConfig { + commitment: self.commitment, + }), encoding: Some(UiAccountEncoding::Base64Zstd), ..Default::default() }, @@ -737,6 +727,8 @@ mod subscriber { .program_subscribe(&self.program_key, Some(config)) .await?; + debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string()); + loop { match tokio_stream::StreamExt::next(&mut notif).await { Some(update) => { @@ -754,7 +746,8 @@ mod subscriber { .map_err(|_| anyhow!("failed to send update to oracle"))?; } None => { - return Err(anyhow!("Subscriber closed connection")); + debug!(self.logger, "subscriber closed connection"); + return Ok(()); } } }