Skip to content

Commit

Permalink
Refactor concurrent iteration handling in benchmark to improve perfor…
Browse files Browse the repository at this point in the history
…mance and clarity
  • Loading branch information
wobkobi committed Feb 25, 2025
1 parent a7f12b1 commit 76da1da
Showing 1 changed file with 73 additions and 45 deletions.
118 changes: 73 additions & 45 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def generate_sizes():
size *= 2
large_sizes.append(int(1e12))
sizes = sorted(set(small_sizes + large_sizes))
print(f"Sizes: {sizes}")
# Clear temporary lists to free memory.
del small_sizes, large_sizes
return sizes
Expand Down Expand Up @@ -149,11 +148,14 @@ def update_missing_iterations_concurrent(
"""
Identify missing iterations for each algorithm and run them concurrently.
For each algorithm that has fewer iterations than desired (and is not skipped),
this function concurrently runs the missing iterations using a ProcessPoolExecutor.
After each iteration completes, the result is appended to the CSV file and the in-memory
results for that algorithm are updated. Finally, the algorithm's performance statistics are
recalculated; if the average exceeds the threshold, the algorithm is marked to be skipped in future tests.
This function creates a single ProcessPoolExecutor for all missing iterations across
all algorithms. It submits all missing iteration tasks concurrently and processes each
as soon as it completes. As each iteration's result is returned, it is immediately written
to the CSV and used to update the in-memory results. When all iterations for an algorithm
are complete, its performance statistics are recalculated and printed.
Additionally, if there are already some results in memory, the function prints a notification
detailing which algorithms already have some iterations completed.
Parameters:
csv_path (str): Path to the CSV file for the current array size.
Expand All @@ -170,7 +172,7 @@ def update_missing_iterations_concurrent(
"""
missing_algs = {}
found_msgs = []
# Determine the missing iterations for each algorithm.
# Determine how many additional iterations each algorithm needs.
for alg in expected_algs:
if alg in skip_list:
continue
Expand All @@ -182,6 +184,7 @@ def update_missing_iterations_concurrent(
if count < iterations:
missing_algs[alg] = iterations - count
found_msgs.append(f"{alg} ({count})")

if missing_algs and any(data is not None for data in size_results.values()):
if found_msgs:
max_items = min(10, len(found_msgs))
Expand All @@ -207,47 +210,72 @@ def update_missing_iterations_concurrent(
display_msg = ", ".join(missing_keys)
print(f"Missing iterations for: {display_msg}")

# Execute all missing iterations concurrently for each algorithm.
for alg, missing in missing_algs.items():
start_iter = (size_results[alg][4] + 1) if size_results[alg] is not None else 1
tasks = {}
with ProcessPoolExecutor(max_workers=num_workers) as executor:
if not missing_algs:
return size_results, skip_list

# Dictionary to track the number of completed iterations per algorithm.
completed_counts = {}

# Dictionary mapping each future to its (algorithm, iteration index).
tasks = {}

# Submit all missing iteration tasks concurrently across all algorithms.
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for alg, missing in missing_algs.items():
start_iter = (
(size_results[alg][4] + 1) if size_results[alg] is not None else 1
)
for i in range(missing):
future = executor.submit(run_iteration, algorithms()[alg], size)
tasks[future] = start_iter + i
for future in as_completed(tasks):
iter_index = tasks[future]
try:
t = future.result()
# Append the result to the CSV file.
with open(csv_path, "a", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow([alg, size, iter_index, f"{t:.8f}"])
# Update the in-memory results.
if size_results[alg] is None:
size_results[alg] = (None, None, None, None, 0, [])
size_results[alg] = (
None, # Average (to be recalculated)
None, # Min (to be recalculated)
None, # Max (to be recalculated)
None, # Median (to be recalculated)
size_results[alg][4] + 1,
size_results[alg][5] + [t],
tasks[future] = (alg, start_iter + i)

# Process each task as soon as it completes.
for future in as_completed(tasks):
alg, iter_index = tasks[future]
if alg not in completed_counts:
completed_counts[alg] = 0
try:
t = future.result()
# Append the result to the CSV file.
with open(csv_path, "a", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow([alg, size, iter_index, f"{t:.8f}"])
# Update the in-memory results for this algorithm.
if size_results[alg] is None:
size_results[alg] = (None, None, None, None, 0, [])
old_count = size_results[alg][4]
old_times = size_results[alg][5]
size_results[alg] = (
None, # Average (to be recalculated)
None, # Min (to be recalculated)
None, # Max (to be recalculated)
None, # Median (to be recalculated)
old_count + 1,
old_times + [t],
)
except Exception as e:
print(f"{alg} error on size {size} iteration {iter_index}: {e}")
# Increment the count for this algorithm.
completed_counts[alg] += 1
# Once all iterations for an algorithm are complete, recalculate its stats.
if completed_counts[alg] == missing_algs[alg]:
times = size_results[alg][5]
avg = compute_average(times)
median = compute_median(times)
size_results[alg] = (
avg,
min(times),
max(times),
median,
len(times),
times,
)
print(f"Average for {alg} on size {size}: {format_time(avg, False)}")
if avg > threshold and alg not in skip_list:
skip_list[alg] = size
print(
f"Skipping {alg} for future sizes (average > 5min, skipped at size {size})."
)
except Exception as e:
print(f"{alg} error on size {size} iteration {iter_index}: {e}")
# Recalculate statistics after all iterations complete.
times = size_results[alg][5]
avg = compute_average(times)
median = compute_median(times)
size_results[alg] = (avg, min(times), max(times), median, len(times), times)
print(f"Average for {alg} on size {size}: {format_time(avg, False)}")
if avg > threshold and alg not in skip_list:
skip_list[alg] = size
print(
f"Skipping {alg} for future sizes (average > 5min, skipped at size {size})."
)
del missing_algs, found_msgs
return size_results, skip_list


Expand Down

0 comments on commit 76da1da

Please sign in to comment.