diff --git a/graph/src/components/tracing.rs b/graph/src/components/tracing.rs index c6ac44fd3d7..804a37dc488 100644 --- a/graph/src/components/tracing.rs +++ b/graph/src/components/tracing.rs @@ -9,6 +9,10 @@ use crate::prelude::LoggerFactory; use super::store::DeploymentId; const DEFAULT_BUFFER_SIZE: usize = 100; +#[cfg(not(test))] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(30); +#[cfg(test)] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100); #[derive(Debug, Clone)] pub struct Subscriptions { @@ -57,22 +61,33 @@ pub struct TracingControl { // } impl TracingControl { - pub async fn start() -> Self { + pub fn start() -> Self { let subscriptions = Subscriptions::default(); let subs = subscriptions.clone(); - let watcher = indexer_watcher::new_watcher( - #[cfg(test)] - Duration::from_millis(100), - #[cfg(not(test))] - Duration::from_secs(30), - move || { - let subs = subs.clone(); - - async move { Ok(subs.inner.read().await.clone()) } - }, - ) - .await + + let watcher = std::thread::spawn(move || { + let handle = tokio::runtime::Handle::try_current().unwrap_or( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .handle() + .clone(), + ); + + handle.block_on(async move { + indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || { + let subs = subs.clone(); + + async move { Ok(subs.inner.read().await.clone()) } + }) + .await + }) + }) + .join() + .unwrap() .unwrap(); + Self { watcher, subscriptions, @@ -118,10 +133,37 @@ impl TracingControl { mod test { use anyhow::anyhow; + use tokio::time::{self, Instant}; use tokio_retry::Retry; use super::*; - use std::{future::IntoFuture, sync::Arc}; + use std::sync::Arc; + + #[tokio::test] + async fn test_watcher() { + let x = time::Instant::now(); + let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || { + let x = x.clone(); + + async move { + let now = Instant::now(); + Ok(now.duration_since(x)) + } + }) + .await + .unwrap(); + + Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || { + let x = x.clone(); + async move { + let count = x.borrow().clone(); + println!("{}", count.as_millis()); + Err::(anyhow!("millis: {}", count.as_millis())) + } + }) + .await + .unwrap(); + } #[tokio::test] async fn test_tracing_control() { @@ -133,11 +175,11 @@ mod test { assert!(tx.is_none()); // drop the subscription - let rx = control.subscribe(DeploymentId(123)); + let rx = control.subscribe(DeploymentId(123)).await; let c = control.clone(); // check subscription is none because channel is closed - let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || { + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || { let control = c.clone(); async move { match control.producer(DeploymentId(123)) { @@ -158,9 +200,22 @@ mod test { assert!(tx.is_none()); // re-create subscription - let _rx = control.subscribe(DeploymentId(123)); + let _rx = control.subscribe(DeploymentId(123)).await; + // check old subscription was replaced - let tx = control.producer(DeploymentId(123)); - assert!(!tx.unwrap().is_closed()) + let c = control.clone(); + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || { + let tx = c.producer(DeploymentId(123)); + async move { + match tx { + Some(sender) if !sender.is_closed() => Ok(sender), + Some(_) => Err(anyhow!("Sender is closed")), + None => Err(anyhow!("Sender not created yet")), + } + } + }) + .await + .unwrap(); + assert!(!tx.is_closed()) } } diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 1e2dc912713..83ccd93a8eb 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -18,8 +18,7 @@ use graph::data::query::Trace; use graph::prelude::lazy_static; lazy_static! { - pub static ref TRACING_CONTROL: Arc> = - Arc::new(TracingControl::default()); + pub static ref TRACING_CONTROL: Arc> = Arc::new(TracingControl::start()); } mod advisory_lock;