-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathnexmark-latency.rs
221 lines (198 loc) · 6.37 KB
/
nexmark-latency.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
use clap::Parser;
use nexmark::config::NexmarkConfig;
use renoir::operator::Operator;
use renoir::operator::Timestamp;
use renoir::prelude::*;
use renoir::Replication;
use renoir::Stream;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use nexmark::event::*;
const WATERMARK_INTERVAL: usize = 1 << 20;
#[allow(unused)]
fn timestamp_gen((_, e): &(SystemTime, Event)) -> Timestamp {
e.timestamp() as i64
}
fn watermark_gen(ts: &Timestamp, count: &mut usize, interval: usize) -> Option<Timestamp> {
*count = (*count + 1) % interval;
if *count == 0 {
Some(*ts)
} else {
None
}
}
/// Query 2: Selection
///
/// ```text
/// SELECT Rstream(auction, price)
/// FROM Bid [NOW]
/// WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
/// ```
fn query2(events: Stream<impl Operator<Out = (SystemTime, Event)> + 'static>) {
events
.filter_map(|(s, e)| {
if let Event::Bid(b) = e {
Some((s, b))
} else {
None
}
})
// .shuffle()
.filter(|(_, b)| b.auction % 123 == 0)
.map(|(t, _)| t)
// .replication(Replication::One)
.for_each(|t| TRACK_POINT.get_or_init("q2").record(t.elapsed().unwrap()));
}
/// Query 3: Local Item Suggestion
///
/// ```text
/// SELECT Istream(P.name, P.city, P.state, A.id)
/// FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
/// WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category = 10;
/// ```
fn query3(events: Stream<impl Operator<Out = (SystemTime, Event)> + 'static>) {
let mut routes = events
.route()
.add_route(|(_, e)| matches!(e, Event::Person(_)))
.add_route(|(_, e)| matches!(e, Event::Auction(_)))
.build()
.into_iter();
// WHERE P.state = `OR' OR P.state = `ID' OR P.state = `CA'
let person = routes
.next()
.unwrap()
.map(|(t, e)| (t, unwrap_person(e)))
.filter(|(_, p)| p.state == "or" || p.state == "id" || p.state == "ca");
// WHERE A.category = 10
let auction = routes
.next()
.unwrap()
.map(|(t, e)| (t, unwrap_auction(e)))
.filter(|(_, a)| a.category == 10);
person
// WHERE A.seller = P.id
.join(auction, |(_, p)| p.id, |(_, a)| a.seller)
.drop_key()
// SELECT person, auction.id
.map(|((t0, p), (t1, a))| (t0.max(t1), p.name, p.city, p.state, a.id))
.map(|(t, ..)| t)
.replication(Replication::One)
.for_each(|t| TRACK_POINT.get_or_init("q3").record(t.elapsed().unwrap()))
}
/// Query 5: Hot Items
///
/// ```text
/// SELECT Rstream(auction)
/// FROM (SELECT B1.auction, count(*) AS num
/// FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
/// GROUP BY B1.auction)
/// WHERE num >= ALL (SELECT count(*)
/// FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
/// GROUP BY B2.auction);
/// ```
fn query5(events: Stream<impl Operator<Out = (SystemTime, Event)> + 'static>) {
let window_descr = EventTimeWindow::sliding(1_000, 100);
let bid = events
.filter_map(filter_bid)
.add_timestamps(|(_, b)| b.date_time as i64, {
let mut count = 0;
move |_, ts| watermark_gen(ts, &mut count, WATERMARK_INTERVAL)
});
// count how bids in each auction, for every window
let counts = bid
.map(|(t, b)| (t, b.auction))
.group_by(|(_, a)| *a)
.map(|(_, (t, _))| t)
.window(window_descr.clone())
.fold((UNIX_EPOCH, 0), |(t, count), t1| {
*t = t1.max(*t);
*count += 1;
})
.unkey();
counts
.window_all(window_descr)
.fold_first(|(max, (t, max_count)), (id, (t1, count))| {
*t = t1.max(*t);
if count > *max_count {
*max = id;
*max_count = count;
}
})
.drop_key()
.map(|(_k, (t, _count))| t)
.for_each(|t| TRACK_POINT.get_or_init("q5").record(t.elapsed().unwrap()))
}
static TRACK_POINT: micrometer::TrackPoint = micrometer::TrackPoint::new_thread_local();
fn events(env: &StreamContext, args: &Args) -> Stream<impl Operator<Out = (SystemTime, Event)>> {
env.stream_iter({
let conf = NexmarkConfig {
num_event_generators: 1,
avg_auction_byte_size: 0,
avg_bid_byte_size: 0,
avg_person_byte_size: 0,
first_rate: 10_000_000,
next_rate: 10_000_000,
..Default::default()
};
nexmark::EventGenerator::new(conf).take(args.n).map(|e| {
let start = SystemTime::now();
(start, e)
})
})
.batch_mode(BatchMode::adaptive(
args.batch,
std::time::Duration::from_micros(args.dt_us),
))
}
fn unwrap_auction(e: Event) -> Auction {
match e {
Event::Auction(x) => x,
_ => panic!("tried to unwrap wrong event type!"),
}
}
fn unwrap_person(e: Event) -> Person {
match e {
Event::Person(x) => x,
_ => panic!("tried to unwrap wrong event type!"),
}
}
fn filter_bid((t, e): (SystemTime, Event)) -> Option<(SystemTime, Bid)> {
match e {
Event::Bid(x) => Some((t, x)),
_ => None,
}
}
#[derive(Parser)]
#[clap(name = "nexmark-latency")]
struct Args {
n: usize,
q: String,
#[clap(short, long, default_value_t = 1024)]
batch: usize,
#[clap(short, long, default_value_t = 1000)]
dt_us: u64,
#[clap(short, long, default_value_t = 32 << 10)]
watermark_interval: usize,
}
fn main() {
env_logger::init();
let (config, args) = RuntimeConfig::from_args();
let args = Args::parse_from(std::iter::once("nexmark-latency".into()).chain(args));
let q = &args.q[..];
config.spawn_remote_workers();
let env = StreamContext::new(config);
micrometer::start();
match q {
"2" => query2(events(&env, &args)),
"3" => query3(events(&env, &args)),
"5" => query5(events(&env, &args)),
_ => panic!("Invalid query! {q}"),
}
let start = Instant::now();
env.execute_blocking();
println!("q{q}:elapsed:{:?}", start.elapsed());
eprintln!("==================================================");
micrometer::summary_grouped();
micrometer::append_csv("/tmp/nexmark-latency.csv", "renoir").unwrap();
}