Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: sketch of mode to emit events #887

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ humantime = "2.1.0"
parking_lot = "0.12.1"
ratatui = "0.26.1"
crossterm = { version = "0.27.0", default-features = false, features = [ "events", "windows" ] }
chrono = { version = "0.4.37", default-features = false, features = [ "clock" ] }
itertools = "0.12.1"
serde = { version = "1.0.197", default-features = false }
serde_json = { version = "1.0.115", default-features = false }
chrono = { version = "0.4.37", default-features = false, features = [ "clock", "serde" ] }
itertools = "0.12.0"
serde = { version = "1.0.193", default-features = false }
serde_json = { version = "1.0.108", default-features = false }
comfy-table = { version = "7.1.0", default-features = false }
strum = { version = "0.26.2", default-features = false, features = [ "std", "derive" ] }
etcetera = "0.8.0"
Expand All @@ -57,6 +57,7 @@ serde_with = "3.7.0"
encoding_rs_io = "0.1.7"
bitflags = "2.5.0"
clap_mangen = "0.2.19"
uuid = { version = "1.6.1", features = ["v7", "serde"] }

# Library dependencies (Linux)
[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
6 changes: 4 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ pub use theme::{TuiColor, TuiTheme, TuiThemeItem};
pub enum Mode {
/// Display interactive TUI.
Tui,
/// Display a continuous stream of tracing data
/// Display a continuous stream of tracing data.
Stream,
/// Display a continuous stream of tracing events.
Event,
/// Generate a pretty text table report for N cycles.
Pretty,
/// Generate a Markdown text table report for N cycles.
Expand Down Expand Up @@ -628,7 +630,7 @@ impl TrippyConfig {
};
let dns_timeout = humantime::parse_duration(&dns_timeout)?;
let max_rounds = match mode {
Mode::Stream | Mode::Tui => None,
Mode::Stream | Mode::Event | Mode::Tui => None,
Mode::Pretty
| Mode::Markdown
| Mode::Csv
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ fn run_frontend(
match args.mode {
Mode::Tui => frontend::run_frontend(traces, make_tui_config(args), resolver, geoip_lookup)?,
Mode::Stream => report::stream::report(&traces[0], &resolver)?,
Mode::Event => report::event::report(&traces[0], &resolver)?,

Mode::Csv => report::csv::report(&traces[0], args.report_cycles, &resolver)?,
Mode::Json => report::json::report(&traces[0], args.report_cycles, &resolver)?,
Mode::Pretty => report::table::report_pretty(&traces[0], args.report_cycles, &resolver)?,
Expand Down
1 change: 1 addition & 0 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

pub mod csv;
pub mod dot;
pub mod event;
pub mod flows;
pub mod json;
pub mod silent;
Expand Down
264 changes: 264 additions & 0 deletions src/report/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use crate::backend::flows::FlowId;
use crate::backend::trace::{Hop, Trace};
use crate::TraceInfo;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::collections::HashSet;
use std::net::IpAddr;
use std::thread::sleep;
use trippy::dns::Resolver;
use uuid::Uuid;

/// Display a continuous stream of trace events.
pub fn report<R: Resolver>(info: &TraceInfo, resolver: &R) -> anyhow::Result<()> {
let mut producer = EventProducer::new(resolver);
loop {
let trace_data = &info.data.read().clone();
let events = producer.produce(trace_data, info);
for event in &events {
emit_event(event);
}
if producer.state.failed {
return Err(anyhow!("failed"));
}
sleep(info.min_round_duration);
}
}

/// The state of the world.
#[derive(Debug)]
struct EventProducer<'a, R> {
/// Dns resolver.
resolver: &'a R,
/// Tracing state.
state: State,
}

impl<'a, R: Resolver> EventProducer<'a, R> {
pub fn new(resolver: &'a R) -> Self {
Self {
resolver,
state: State::new(),
}
}

/// Update state from latest trace and return events.
pub fn produce(&mut self, trace: &Trace, info: &TraceInfo) -> Vec<Event> {
let mut events = vec![];
events.extend(self.started_event(info));
events.extend(self.failed_event(trace));
events.extend(self.round_completed_event(trace));
events.extend(self.flow_discovered_event(trace));
events.extend(self.host_discovered_event(trace));
events
}

/// TODO
fn started_event(&mut self, info: &TraceInfo) -> Vec<Event> {
if self.state.started {
vec![]
} else {
self.state.started = true;
vec![Event::new(EventData::Started(Started::new(
info.target_hostname.clone(),
info.target_addr,
)))]
}
}

/// TODO
fn failed_event(&mut self, trace: &Trace) -> Vec<Event> {
if let Some(err) = trace.error() {
self.state.failed = true;
vec![Event::new(EventData::Failed(Failed::new(err.to_string())))]
} else {
vec![]
}
}

#[allow(clippy::unused_self)]
fn round_completed_event(&mut self, trace: &Trace) -> Vec<Event> {
vec![Event::new(EventData::RoundCompleted(RoundCompleted::new(
trace.round_count(Trace::default_flow_id()),
)))]
}

fn flow_discovered_event(&mut self, trace: &Trace) -> Vec<Event> {
let all_flow_ids = trace
.flows()
.iter()
.map(|(_, flow_id)| flow_id)
.copied()
.collect::<HashSet<FlowId>>();
let events = self
.state
.flow_ids
.symmetric_difference(&all_flow_ids)
.map(|flow_id| Event::new(EventData::FlowDiscovered(FlowDiscovered::new(flow_id.0))))
.collect();
self.state.flow_ids = all_flow_ids;
events
}

fn host_discovered_event(&mut self, trace: &Trace) -> Vec<Event> {
let all_hosts = trace
.hops(Trace::default_flow_id())
.iter()
.flat_map(Hop::addrs)
.map(ToOwned::to_owned)
.collect::<HashSet<IpAddr>>();
let events = self
.state
.hosts
.symmetric_difference(&all_hosts)
.map(|addr| {
Event::new(EventData::HostDiscovered(HostDiscovered::new(
*addr,
self.resolver.reverse_lookup(*addr).to_string(),
)))
})
.collect();
self.state.hosts = all_hosts;
events
}
}

#[derive(Debug)]
struct State {
pub started: bool,
pub failed: bool,
pub flow_ids: HashSet<FlowId>,
pub hosts: HashSet<IpAddr>,
}

impl State {
pub fn new() -> Self {
Self {
started: false,
failed: false,
flow_ids: HashSet::new(),
hosts: HashSet::new(),
}
}
}

#[derive(Debug, Serialize)]
struct Event {
id: Uuid,
timestamp: DateTime<Utc>,
#[serde(flatten)]
data: EventData,
}

impl Event {
fn new(data: EventData) -> Self {
Self {
id: Self::make_id(),
timestamp: Self::make_timestamp(),
data,
}
}

fn make_id() -> Uuid {
Uuid::now_v7()
}

fn make_timestamp() -> DateTime<Utc> {
chrono::Utc::now()
}
}

#[derive(Debug, Serialize)]
enum EventData {
/// Tracing has started.
///
/// Emitted exactly once on startup.
Started(Started),

/// A tracing round has finished.
///
/// Emitted once per round of tracing.
RoundCompleted(RoundCompleted),

/// A host has been discovered.
///
/// Emitted once for every host discovered during tracing.
HostDiscovered(HostDiscovered),

/// A flow has been discovered.
///
/// Emitted once for every flow discovered during tracing.
FlowDiscovered(FlowDiscovered),

/// Tracing has failed.
///
/// Emitted at most once if tracing fails.
Failed(Failed),
}

/// TODO would include all tracing parameters
#[derive(Debug, Serialize)]
struct Started {
target_hostname: String,
target_addr: IpAddr,
}

impl Started {
pub fn new(target_hostname: String, target_addr: IpAddr) -> Self {
Self {
target_hostname,
target_addr,
}
}
}

#[derive(Debug, Serialize)]
struct Failed {
err: String,
}

impl Failed {
pub fn new(err: String) -> Self {
Self { err }
}
}

/// TODO all the usual per-round info
#[derive(Debug, Serialize)]
struct RoundCompleted {
round_count: usize,
}

impl RoundCompleted {
pub fn new(round_count: usize) -> Self {
Self { round_count }
}
}

#[derive(Debug, Serialize)]
struct HostDiscovered {
addr: IpAddr,
hostname: String,
}

impl HostDiscovered {
pub fn new(addr: IpAddr, hostname: String) -> Self {
Self { addr, hostname }
}
}

#[derive(Debug, Serialize)]
struct FlowDiscovered {
flow_id: u64,
}

impl FlowDiscovered {
pub fn new(flow_id: u64) -> Self {
Self { flow_id }
}
}

fn emit_event(event: &Event) {
println!("{}", serde_json::to_string(event).unwrap());
}
4 changes: 2 additions & 2 deletions test_resources/config/completions_bash.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ _trip() {
return 0
;;
--mode)
COMPREPLY=($(compgen -W "tui stream pretty markdown csv json dot flows silent" -- "${cur}"))
COMPREPLY=($(compgen -W "tui stream event pretty markdown csv json dot flows silent" -- "${cur}"))
return 0
;;
-m)
COMPREPLY=($(compgen -W "tui stream pretty markdown csv json dot flows silent" -- "${cur}"))
COMPREPLY=($(compgen -W "tui stream event pretty markdown csv json dot flows silent" -- "${cur}"))
return 0
;;
--protocol)
Expand Down
2 changes: 1 addition & 1 deletion test_resources/config/completions_fish.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
complete -c trip -s c -l config-file -d 'Config file' -r -F
complete -c trip -s m -l mode -d 'Output mode [default: tui]' -r -f -a "{tui 'Display interactive TUI',stream 'Display a continuous stream of tracing data',pretty 'Generate a pretty text table report for N cycles',markdown 'Generate a Markdown text table report for N cycles',csv 'Generate a CSV report for N cycles',json 'Generate a JSON report for N cycles',dot 'Generate a Graphviz DOT file for N cycles',flows 'Display all flows for N cycles',silent 'Do not generate any tracing output for N cycles'}"
complete -c trip -s m -l mode -d 'Output mode [default: tui]' -r -f -a "{tui 'Display interactive TUI',stream 'Display a continuous stream of tracing data',event 'Display a continuous stream of tracing events',pretty 'Generate a pretty text table report for N cycles',markdown 'Generate a Markdown text table report for N cycles',csv 'Generate a CSV report for N cycles',json 'Generate a JSON report for N cycles',dot 'Generate a Graphviz DOT file for N cycles',flows 'Display all flows for N cycles',silent 'Do not generate any tracing output for N cycles'}"
complete -c trip -s p -l protocol -d 'Tracing protocol [default: icmp]' -r -f -a "{icmp 'Internet Control Message Protocol',udp 'User Datagram Protocol',tcp 'Transmission Control Protocol'}"
complete -c trip -s F -l addr-family -d 'The address family [default: Ipv4thenIpv6]' -r -f -a "{ipv4 'Ipv4 only',ipv6 'Ipv6 only',ipv6-then-ipv4 'Ipv6 with a fallback to Ipv4',ipv4-then-ipv6 'Ipv4 with a fallback to Ipv6'}"
complete -c trip -s P -l target-port -d 'The target port (TCP & UDP only) [default: 80]' -r
Expand Down
Loading