Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 24, 2025
1 parent c53576f commit bd813de
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
93 changes: 74 additions & 19 deletions graph/src/components/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use crate::prelude::LoggerFactory;
use super::store::DeploymentId;

const DEFAULT_BUFFER_SIZE: usize = 100;
#[cfg(not(test))]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(30);
#[cfg(test)]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100);

#[derive(Debug, Clone)]
pub struct Subscriptions<T> {
Expand Down Expand Up @@ -57,22 +61,33 @@ pub struct TracingControl<T> {
// }

impl<T: Send + Clone + 'static> TracingControl<T> {
pub async fn start() -> Self {
pub fn start() -> Self {
let subscriptions = Subscriptions::default();
let subs = subscriptions.clone();
let watcher = indexer_watcher::new_watcher(
#[cfg(test)]
Duration::from_millis(100),
#[cfg(not(test))]
Duration::from_secs(30),
move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
},
)
.await

let watcher = std::thread::spawn(move || {
let handle = tokio::runtime::Handle::try_current().unwrap_or(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.handle()
.clone(),
);

handle.block_on(async move {
indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
})
.await
})
})
.join()
.unwrap()
.unwrap();

Self {
watcher,
subscriptions,
Expand Down Expand Up @@ -118,10 +133,37 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
mod test {

use anyhow::anyhow;
use tokio::time::{self, Instant};
use tokio_retry::Retry;

use super::*;
use std::{future::IntoFuture, sync::Arc};
use std::sync::Arc;

#[tokio::test]
async fn test_watcher() {
let x = time::Instant::now();
let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || {
let x = x.clone();

async move {
let now = Instant::now();
Ok(now.duration_since(x))
}
})
.await
.unwrap();

Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || {
let x = x.clone();
async move {
let count = x.borrow().clone();
println!("{}", count.as_millis());
Err::<Duration, anyhow::Error>(anyhow!("millis: {}", count.as_millis()))
}
})
.await
.unwrap();
}

#[tokio::test]
async fn test_tracing_control() {
Expand All @@ -133,11 +175,11 @@ mod test {
assert!(tx.is_none());

// drop the subscription
let rx = control.subscribe(DeploymentId(123));
let rx = control.subscribe(DeploymentId(123)).await;

let c = control.clone();
// check subscription is none because channel is closed
let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || {
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let control = c.clone();
async move {
match control.producer(DeploymentId(123)) {
Expand All @@ -158,9 +200,22 @@ mod test {
assert!(tx.is_none());

// re-create subscription
let _rx = control.subscribe(DeploymentId(123));
let _rx = control.subscribe(DeploymentId(123)).await;

// check old subscription was replaced
let tx = control.producer(DeploymentId(123));
assert!(!tx.unwrap().is_closed())
let c = control.clone();
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let tx = c.producer(DeploymentId(123));
async move {
match tx {
Some(sender) if !sender.is_closed() => Ok(sender),
Some(_) => Err(anyhow!("Sender is closed")),
None => Err(anyhow!("Sender not created yet")),
}
}
})
.await
.unwrap();
assert!(!tx.is_closed())
}
}
3 changes: 1 addition & 2 deletions store/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use graph::data::query::Trace;
use graph::prelude::lazy_static;

lazy_static! {
pub static ref TRACING_CONTROL: Arc<TracingControl<Trace>> =
Arc::new(TracingControl::default());
pub static ref TRACING_CONTROL: Arc<TracingControl<Trace>> = Arc::new(TracingControl::start());
}

mod advisory_lock;
Expand Down

0 comments on commit bd813de

Please sign in to comment.