From 76da1da3536b7983e0cf7b48a883fbcb62e8d294 Mon Sep 17 00:00:00 2001 From: Harrison Date: Wed, 26 Feb 2025 12:01:08 +1300 Subject: [PATCH] Refactor concurrent iteration handling in benchmark to improve performance and clarity --- benchmark.py | 118 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 45 deletions(-) diff --git a/benchmark.py b/benchmark.py index 2899ee7..581dac4 100644 --- a/benchmark.py +++ b/benchmark.py @@ -45,7 +45,6 @@ def generate_sizes(): size *= 2 large_sizes.append(int(1e12)) sizes = sorted(set(small_sizes + large_sizes)) - print(f"Sizes: {sizes}") # Clear temporary lists to free memory. del small_sizes, large_sizes return sizes @@ -149,11 +148,14 @@ def update_missing_iterations_concurrent( """ Identify missing iterations for each algorithm and run them concurrently. - For each algorithm that has fewer iterations than desired (and is not skipped), - this function concurrently runs the missing iterations using a ProcessPoolExecutor. - After each iteration completes, the result is appended to the CSV file and the in-memory - results for that algorithm are updated. Finally, the algorithm's performance statistics are - recalculated; if the average exceeds the threshold, the algorithm is marked to be skipped in future tests. + This function creates a single ProcessPoolExecutor for all missing iterations across + all algorithms. It submits all missing iteration tasks concurrently and processes each + as soon as it completes. As each iteration's result is returned, it is immediately written + to the CSV and used to update the in-memory results. When all iterations for an algorithm + are complete, its performance statistics are recalculated and printed. + + Additionally, if there are already some results in memory, the function prints a notification + detailing which algorithms already have some iterations completed. Parameters: csv_path (str): Path to the CSV file for the current array size. @@ -170,7 +172,7 @@ def update_missing_iterations_concurrent( """ missing_algs = {} found_msgs = [] - # Determine the missing iterations for each algorithm. + # Determine how many additional iterations each algorithm needs. for alg in expected_algs: if alg in skip_list: continue @@ -182,6 +184,7 @@ def update_missing_iterations_concurrent( if count < iterations: missing_algs[alg] = iterations - count found_msgs.append(f"{alg} ({count})") + if missing_algs and any(data is not None for data in size_results.values()): if found_msgs: max_items = min(10, len(found_msgs)) @@ -207,47 +210,72 @@ def update_missing_iterations_concurrent( display_msg = ", ".join(missing_keys) print(f"Missing iterations for: {display_msg}") - # Execute all missing iterations concurrently for each algorithm. - for alg, missing in missing_algs.items(): - start_iter = (size_results[alg][4] + 1) if size_results[alg] is not None else 1 - tasks = {} - with ProcessPoolExecutor(max_workers=num_workers) as executor: + if not missing_algs: + return size_results, skip_list + + # Dictionary to track the number of completed iterations per algorithm. + completed_counts = {} + + # Dictionary mapping each future to its (algorithm, iteration index). + tasks = {} + + # Submit all missing iteration tasks concurrently across all algorithms. + with ProcessPoolExecutor(max_workers=num_workers) as executor: + for alg, missing in missing_algs.items(): + start_iter = ( + (size_results[alg][4] + 1) if size_results[alg] is not None else 1 + ) for i in range(missing): future = executor.submit(run_iteration, algorithms()[alg], size) - tasks[future] = start_iter + i - for future in as_completed(tasks): - iter_index = tasks[future] - try: - t = future.result() - # Append the result to the CSV file. - with open(csv_path, "a", newline="") as csv_file: - writer = csv.writer(csv_file) - writer.writerow([alg, size, iter_index, f"{t:.8f}"]) - # Update the in-memory results. - if size_results[alg] is None: - size_results[alg] = (None, None, None, None, 0, []) - size_results[alg] = ( - None, # Average (to be recalculated) - None, # Min (to be recalculated) - None, # Max (to be recalculated) - None, # Median (to be recalculated) - size_results[alg][4] + 1, - size_results[alg][5] + [t], + tasks[future] = (alg, start_iter + i) + + # Process each task as soon as it completes. + for future in as_completed(tasks): + alg, iter_index = tasks[future] + if alg not in completed_counts: + completed_counts[alg] = 0 + try: + t = future.result() + # Append the result to the CSV file. + with open(csv_path, "a", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow([alg, size, iter_index, f"{t:.8f}"]) + # Update the in-memory results for this algorithm. + if size_results[alg] is None: + size_results[alg] = (None, None, None, None, 0, []) + old_count = size_results[alg][4] + old_times = size_results[alg][5] + size_results[alg] = ( + None, # Average (to be recalculated) + None, # Min (to be recalculated) + None, # Max (to be recalculated) + None, # Median (to be recalculated) + old_count + 1, + old_times + [t], + ) + except Exception as e: + print(f"{alg} error on size {size} iteration {iter_index}: {e}") + # Increment the count for this algorithm. + completed_counts[alg] += 1 + # Once all iterations for an algorithm are complete, recalculate its stats. + if completed_counts[alg] == missing_algs[alg]: + times = size_results[alg][5] + avg = compute_average(times) + median = compute_median(times) + size_results[alg] = ( + avg, + min(times), + max(times), + median, + len(times), + times, + ) + print(f"Average for {alg} on size {size}: {format_time(avg, False)}") + if avg > threshold and alg not in skip_list: + skip_list[alg] = size + print( + f"Skipping {alg} for future sizes (average > 5min, skipped at size {size})." ) - except Exception as e: - print(f"{alg} error on size {size} iteration {iter_index}: {e}") - # Recalculate statistics after all iterations complete. - times = size_results[alg][5] - avg = compute_average(times) - median = compute_median(times) - size_results[alg] = (avg, min(times), max(times), median, len(times), times) - print(f"Average for {alg} on size {size}: {format_time(avg, False)}") - if avg > threshold and alg not in skip_list: - skip_list[alg] = size - print( - f"Skipping {alg} for future sizes (average > 5min, skipped at size {size})." - ) - del missing_algs, found_msgs return size_results, skip_list