Skip to content

Commit

Permalink
sketch of mode to emit events
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiapple852 committed Dec 28, 2023
1 parent e781f03 commit d0c0713
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 3 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ humantime = "2.1.0"
parking_lot = "0.12.1"
ratatui = "0.25.0"
crossterm = { version = "0.27.0", default-features = false, features = [ "events", "windows" ] }
chrono = { version = "0.4.31", default-features = false, features = [ "clock" ] }
chrono = { version = "0.4.31", default-features = false, features = [ "clock", "serde" ] }
itertools = "0.12.0"
serde = { version = "1.0.193", default-features = false }
serde_json = { version = "1.0.108", default-features = false }
Expand All @@ -52,6 +52,7 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features =
tracing-chrome = "0.7.1"
petgraph = "0.6.4"
csv = "1.3.0"
uuid = { version = "1.6.1", features = ["v7", "serde"] }

# Library dependencies (Linux)
[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
6 changes: 4 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ pub use theme::{TuiColor, TuiTheme};
pub enum Mode {
/// Display interactive TUI.
Tui,
/// Display a continuous stream of tracing data
/// Display a continuous stream of tracing data.
Stream,
/// Display a continuous stream of tracing events.
Event,
/// Generate an pretty text table report for N cycles.
Pretty,
/// Generate a markdown text table report for N cycles.
Expand Down Expand Up @@ -554,7 +556,7 @@ impl TrippyConfig {
};
let dns_timeout = humantime::parse_duration(&dns_timeout)?;
let max_rounds = match mode {
Mode::Stream | Mode::Tui => None,
Mode::Stream | Mode::Event | Mode::Tui => None,
Mode::Pretty
| Mode::Markdown
| Mode::Csv
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ fn run_frontend(
match args.mode {
Mode::Tui => frontend::run_frontend(traces, make_tui_config(args), resolver, geoip_lookup)?,
Mode::Stream => report::stream::report(&traces[0], &resolver)?,
Mode::Event => report::event::report(&traces[0], &resolver)?,

Mode::Csv => report::csv::report(&traces[0], args.report_cycles, &resolver)?,
Mode::Json => report::json::report(&traces[0], args.report_cycles, &resolver)?,
Mode::Pretty => report::table::report_pretty(&traces[0], args.report_cycles, &resolver)?,
Expand Down
1 change: 1 addition & 0 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;

pub mod csv;
pub mod dot;
pub mod event;
pub mod flows;
pub mod json;
pub mod silent;
Expand Down
264 changes: 264 additions & 0 deletions src/report/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use crate::backend::flows::FlowId;
use crate::backend::trace::{Hop, Trace};
use crate::TraceInfo;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::collections::HashSet;
use std::net::IpAddr;
use std::thread::sleep;
use trippy::dns::Resolver;
use uuid::Uuid;

/// Display a continuous stream of trace events.
pub fn report<R: Resolver>(info: &TraceInfo, resolver: &R) -> anyhow::Result<()> {
let mut producer = EventProducer::new(resolver);
loop {
let trace_data = &info.data.read().clone();
let events = producer.produce(trace_data, info);
for event in &events {
emit_event(event);
}
if producer.state.failed {
return Err(anyhow!("failed"));
}
sleep(info.min_round_duration);
}
}

/// The state of the world.
#[derive(Debug)]
struct EventProducer<'a, R> {
/// Dns resolver.
resolver: &'a R,
/// Tracing state.
state: State,
}

impl<'a, R: Resolver> EventProducer<'a, R> {
pub fn new(resolver: &'a R) -> Self {
Self {
resolver,
state: State::new(),
}
}

/// Update state from latest trace and return events.
pub fn produce(&mut self, trace: &Trace, info: &TraceInfo) -> Vec<Event> {
let mut events = vec![];
events.extend(self.started_event(info));
events.extend(self.failed_event(trace));
events.extend(self.round_completed_event(trace));
events.extend(self.flow_discovered_event(trace));
events.extend(self.host_discovered_event(trace));
events
}

/// TODO
fn started_event(&mut self, info: &TraceInfo) -> Vec<Event> {
if self.state.started {
vec![]
} else {
self.state.started = true;
vec![Event::new(EventData::Started(Started::new(
info.target_hostname.clone(),
info.target_addr,
)))]
}
}

/// TODO
fn failed_event(&mut self, trace: &Trace) -> Vec<Event> {
if let Some(err) = trace.error() {
self.state.failed = true;
vec![Event::new(EventData::Failed(Failed::new(err.to_string())))]
} else {
vec![]
}
}

#[allow(clippy::unused_self)]
fn round_completed_event(&mut self, trace: &Trace) -> Vec<Event> {
vec![Event::new(EventData::RoundCompleted(RoundCompleted::new(
trace.round_count(Trace::default_flow_id()),
)))]
}

fn flow_discovered_event(&mut self, trace: &Trace) -> Vec<Event> {
let all_flow_ids = trace
.flows()
.iter()
.map(|(_, flow_id)| flow_id)
.copied()
.collect::<HashSet<FlowId>>();
let events = self
.state
.flow_ids
.symmetric_difference(&all_flow_ids)
.map(|flow_id| Event::new(EventData::FlowDiscovered(FlowDiscovered::new(flow_id.0))))
.collect();
self.state.flow_ids = all_flow_ids;
events
}

fn host_discovered_event(&mut self, trace: &Trace) -> Vec<Event> {
let all_hosts = trace
.hops(Trace::default_flow_id())
.iter()
.flat_map(Hop::addrs)
.map(ToOwned::to_owned)
.collect::<HashSet<IpAddr>>();
let events = self
.state
.hosts
.symmetric_difference(&all_hosts)
.map(|addr| {
Event::new(EventData::HostDiscovered(HostDiscovered::new(
*addr,
self.resolver.reverse_lookup(*addr).to_string(),
)))
})
.collect();
self.state.hosts = all_hosts;
events
}
}

#[derive(Debug)]
struct State {
pub started: bool,
pub failed: bool,
pub flow_ids: HashSet<FlowId>,
pub hosts: HashSet<IpAddr>,
}

impl State {
pub fn new() -> Self {
Self {
started: false,
failed: false,
flow_ids: HashSet::new(),
hosts: HashSet::new(),
}
}
}

#[derive(Debug, Serialize)]
struct Event {
id: Uuid,
timestamp: DateTime<Utc>,
#[serde(flatten)]
data: EventData,
}

impl Event {
fn new(data: EventData) -> Self {
Self {
id: Self::make_id(),
timestamp: Self::make_timestamp(),
data,
}
}

fn make_id() -> Uuid {
Uuid::now_v7()
}

fn make_timestamp() -> DateTime<Utc> {
chrono::Utc::now()
}
}

#[derive(Debug, Serialize)]
enum EventData {
/// Tracing has started.
///
/// Emitted exactly once on startup.
Started(Started),

/// A tracing round has finished.
///
/// Emitted once per round of tracing.
RoundCompleted(RoundCompleted),

/// A host has been discovered.
///
/// Emitted once for every host discovered during tracing.
HostDiscovered(HostDiscovered),

/// A flow has been discovered.
///
/// Emitted once for every flow discovered during tracing.
FlowDiscovered(FlowDiscovered),

/// Tracing has failed.
///
/// Emitted at most once if tracing fails.
Failed(Failed),
}

/// TODO would include all tracing parameters
#[derive(Debug, Serialize)]
struct Started {
target_hostname: String,
target_addr: IpAddr,
}

impl Started {
pub fn new(target_hostname: String, target_addr: IpAddr) -> Self {
Self {
target_hostname,
target_addr,
}
}
}

#[derive(Debug, Serialize)]
struct Failed {
err: String,
}

impl Failed {
pub fn new(err: String) -> Self {
Self { err }
}
}

/// TODO all the usual per-round info
#[derive(Debug, Serialize)]
struct RoundCompleted {
round_count: usize,
}

impl RoundCompleted {
pub fn new(round_count: usize) -> Self {
Self { round_count }
}
}

#[derive(Debug, Serialize)]
struct HostDiscovered {
addr: IpAddr,
hostname: String,
}

impl HostDiscovered {
pub fn new(addr: IpAddr, hostname: String) -> Self {
Self { addr, hostname }
}
}

#[derive(Debug, Serialize)]
struct FlowDiscovered {
flow_id: u64,
}

impl FlowDiscovered {
pub fn new(flow_id: u64) -> Self {
Self { flow_id }
}
}

fn emit_event(event: &Event) {
println!("{}", serde_json::to_string(event).unwrap());
}
1 change: 1 addition & 0 deletions trippy-config-sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Allowed values are:
# tui - Display interactive Tui [default]
# stream - Display a continuous stream of tracing data
# event - Display a continuous stream of tracing events
# pretty - Generate an pretty text table report for N cycles
# markdown - Generate a markdown text table report for N cycles
# csv - Generate a CSV report for N cycles
Expand Down

0 comments on commit d0c0713

Please sign in to comment.