Skip to content

Commit

Permalink
fix small issues
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Dec 8, 2024
1 parent 3d688b0 commit 3ae3732
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 97 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ on:
workflow_dispatch:
merge_group:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }}
cancel-in-progress: true

jobs:
pre-job:
runs-on: ubuntu-24.04
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fmt *args:
cargo +nightly fmt --all {{args}}

lint *args:
cargo +nightly clippy --fix --allow-dirty --all-targets --all-features {{args}}
cargo +nightly clippy --fix --allow-dirty --all-targets --all-features --allow-staged {{args}}

test *args:
#!/bin/bash
Expand Down
102 changes: 51 additions & 51 deletions crates/batching/benchmarks/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,38 @@ async fn run_scuffle_batcher_many(
size: usize,
loader: impl scuffle_batching::BatchExecutor<Request = usize, Response = usize> + Send + Sync + 'static,
) {
let batcher = Arc::new(scuffle_batching::Batcher::new(loader, size / 2, 100, std::time::Duration::from_millis(5)));

let spawn = || {
let batcher = batcher.clone();
tokio::spawn(async move {
batcher.execute_many(0..size/4).await
})
};

futures::future::join_all([
spawn(),
spawn(),
spawn(),
spawn(),
])
.await;
let batcher = Arc::new(scuffle_batching::Batcher::new(
loader,
size / 2,
100,
std::time::Duration::from_millis(5),
));

let spawn = || {
let batcher = batcher.clone();
tokio::spawn(async move { batcher.execute_many(0..size / 4).await })
};

futures::future::join_all([spawn(), spawn(), spawn(), spawn()]).await;
}

async fn run_scuffle_batcher_single(
size: usize,
loader: impl scuffle_batching::BatchExecutor<Request = usize, Response = usize> + Send + Sync + 'static,
) {
let batcher = Arc::new(scuffle_batching::Batcher::new(loader, size / 2, 100, std::time::Duration::from_millis(5)));

let spawn = |i| {
let batcher = batcher.clone();
tokio::spawn(async move {
batcher.execute(i).await
})
};

futures::future::join_all((0..size/4).cycle().take(size).map(spawn)).await;
let batcher = Arc::new(scuffle_batching::Batcher::new(
loader,
size / 2,
100,
std::time::Duration::from_millis(5),
));

let spawn = |i| {
let batcher = batcher.clone();
tokio::spawn(async move { batcher.execute(i).await })
};

futures::future::join_all((0..size / 4).cycle().take(size).map(spawn)).await;
}

fn delay(c: &mut Criterion) {
Expand All @@ -73,33 +73,33 @@ fn delay(c: &mut Criterion) {
let runtime = || tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

group.bench_with_input(BenchmarkId::new("many", size), &size, |b, &s| {
b.to_async(runtime())
.iter(|| async move {
run_scuffle_batcher_many(
s,
DataloaderImpl::new(|keys: Vec<(usize, BatchResponse<usize>)>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
for (key, resp) in keys {
resp.send(black_box(key));
}
}),
).await;
});
b.to_async(runtime()).iter(|| async move {
run_scuffle_batcher_many(
s,
DataloaderImpl::new(|keys: Vec<(usize, BatchResponse<usize>)>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
for (key, resp) in keys {
resp.send(black_box(key));
}
}),
)
.await;
});
});

group.bench_with_input(BenchmarkId::new("single", size), &size, |b, &s| {
b.to_async(runtime())
.iter(|| async move {
run_scuffle_batcher_single(
s,
DataloaderImpl::new(|keys: Vec<(usize, BatchResponse<usize>)>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
for (key, resp) in keys {
resp.send(black_box(key));
}
}),
).await;
});
group.bench_with_input(BenchmarkId::new("single", size), &size, |b, &s| {
b.to_async(runtime()).iter(|| async move {
run_scuffle_batcher_single(
s,
DataloaderImpl::new(|keys: Vec<(usize, BatchResponse<usize>)>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
for (key, resp) in keys {
resp.send(black_box(key));
}
}),
)
.await;
});
});

group.finish();
Expand Down
90 changes: 45 additions & 45 deletions crates/batching/benchmarks/dataloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,36 @@ async fn run_scuffle_dataloader_many(
size: usize,
loader: impl scuffle_batching::DataLoaderFetcher<Key = usize, Value = usize> + Send + Sync + 'static,
) {
let dataloader = Arc::new(scuffle_batching::DataLoader::new(loader, size / 2, 100, std::time::Duration::from_millis(5)));

let spawn = || {
let dataloader = dataloader.clone();
tokio::spawn(async move {
dataloader.load_many(0..size).await
})
};

futures::future::join_all([
spawn(),
spawn(),
spawn(),
spawn(),
])
.await;
let dataloader = Arc::new(scuffle_batching::DataLoader::new(
loader,
size / 2,
100,
std::time::Duration::from_millis(5),
));

let spawn = || {
let dataloader = dataloader.clone();
tokio::spawn(async move { dataloader.load_many(0..size).await })
};

futures::future::join_all([spawn(), spawn(), spawn(), spawn()]).await;
}

async fn run_scuffle_dataloader_single(
size: usize,
loader: impl scuffle_batching::DataLoaderFetcher<Key = usize, Value = usize> + Send + Sync + 'static,
) {
let dataloader = Arc::new(scuffle_batching::DataLoader::new(loader, size / 2, 100, std::time::Duration::from_millis(5)));

let spawn = |i| {
let dataloader = dataloader.clone();
tokio::spawn(async move {
dataloader.load(i).await
})
};
let dataloader = Arc::new(scuffle_batching::DataLoader::new(
loader,
size / 2,
100,
std::time::Duration::from_millis(5),
));

let spawn = |i| {
let dataloader = dataloader.clone();
tokio::spawn(async move { dataloader.load(i).await })
};

futures::future::join_all((0..size).cycle().take(size * 4).map(spawn)).await;
}
Expand All @@ -73,29 +73,29 @@ fn delay(c: &mut Criterion) {
let runtime = || tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

group.bench_with_input(BenchmarkId::new("many", size), &size, |b, &s| {
b.to_async(runtime())
.iter(|| async move {
run_scuffle_dataloader_many(
s,
DataloaderImpl::new(|keys: HashSet<usize>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
black_box(Some(keys.into_iter().map(black_box).map(|k| (k, k)).collect()))
}),
).await;
});
b.to_async(runtime()).iter(|| async move {
run_scuffle_dataloader_many(
s,
DataloaderImpl::new(|keys: HashSet<usize>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
black_box(Some(keys.into_iter().map(black_box).map(|k| (k, k)).collect()))
}),
)
.await;
});
});

group.bench_with_input(BenchmarkId::new("single", size), &size, |b, &s| {
b.to_async(runtime())
.iter(|| async move {
run_scuffle_dataloader_single(
s,
DataloaderImpl::new(|keys: HashSet<usize>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
black_box(Some(keys.into_iter().map(black_box).map(|k| (k, k)).collect()))
}),
).await;
});
group.bench_with_input(BenchmarkId::new("single", size), &size, |b, &s| {
b.to_async(runtime()).iter(|| async move {
run_scuffle_dataloader_single(
s,
DataloaderImpl::new(|keys: HashSet<usize>| async move {
black_box(tokio::time::sleep(std::time::Duration::from_millis(1))).await;
black_box(Some(keys.into_iter().map(black_box).map(|k| (k, k)).collect()))
}),
)
.await;
});
});

group.finish();
Expand Down
4 changes: 4 additions & 0 deletions crates/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ axum-core = { version = "0.4", default-features = false, features = ["tracing"]
bitflags = { version = "2", default-features = false, features = ["serde"] }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4" }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "suggestions", "usage"] }
config = { version = "0.14", default-features = false, features = ["ini", "json", "json5", "ron", "toml", "yaml"] }
crossbeam-utils = { version = "0.8" }
either = { version = "1", default-features = false, features = ["use_std"] }
Expand Down Expand Up @@ -85,6 +87,8 @@ bitflags = { version = "2", default-features = false, features = ["serde"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4" }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "suggestions", "usage"] }
config = { version = "0.14", default-features = false, features = ["ini", "json", "json5", "ron", "toml", "yaml"] }
crossbeam-utils = { version = "0.8" }
either = { version = "1", default-features = false, features = ["use_std"] }
Expand Down
10 changes: 10 additions & 0 deletions dev-tools/xtask/src/cmd/power_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ pub struct PowerSet {
#[clap(long, default_value = "true")]
/// Fail fast
fail_fast: bool,
#[clap(long, default_value = "target/power-set")]
/// Target directory
target_dir: String,
#[clap(long, action = clap::ArgAction::SetTrue)]
/// Override target directory
no_override_target_dir: bool,
#[clap(name = "command", default_value = "clippy")]
/// Command to run
command: String,
Expand Down Expand Up @@ -118,6 +124,10 @@ impl PowerSet {
}
cmd.arg("--package").arg(package);

if !self.no_override_target_dir {
cmd.arg("--target-dir").arg(&self.target_dir);
}

println!("executing {:?} ({}/{})", cmd, i, total);

if !cmd.status()?.success() {
Expand Down

0 comments on commit 3ae3732

Please sign in to comment.