diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py b/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py new file mode 100644 index 00000000000..d01c7bd9adb --- /dev/null +++ b/tests/ttnn/unit_tests/operations/ccl/perf/async_perf_csv.py @@ -0,0 +1,186 @@ +# SPDX-FileCopyrightText: © 2025 Tenstorrent Inc. + +# SPDX-License-Identifier: Apache-2.0 + +import pandas as pd +import os +import re +import time +import shutil + + +def perf_report(file_path): + df = pd.read_csv(file_path) + + df = df[df["OP TO OP LATENCY [ns]"] != 0] + df = df[df["METAL TRACE ID"].notna() & (df["METAL TRACE ID"] != "")] + + def remove_keys_from_attributes(attributes): + attributes = attributes.replace(";", ",").replace("'", '"') + + keys_to_remove = ["receiver_device_id", "ring_index", "sender_device_id"] + + try: + attributes_dict = eval(attributes) + + attributes_dict["topology"] = attributes_dict.get("topology", "").split("::")[-1] + + if "ring_size" not in attributes_dict: + raise KeyError("Missing 'ring_size' attribute") + + attributes_dict["n_chips"] = int(attributes_dict["ring_size"]) + + for key in keys_to_remove: + if key in attributes_dict: + del attributes_dict[key] + + modified_attributes = str(attributes_dict).replace(",", ";").replace('"', "'") + return modified_attributes + except Exception as e: + print(f"Error processing attributes: {e}") + return attributes + + df["ATTRIBUTES"] = df["ATTRIBUTES"].apply(remove_keys_from_attributes) + + def safe_parse_attributes(attributes): + attributes = attributes.replace(";", ",") + + try: + attr_dict = eval(attributes) + return attr_dict + except Exception as e: + print(f"Error processing attributes: {e}") + return {} + + df["topology"] = df["ATTRIBUTES"].apply( + lambda x: safe_parse_attributes(x).get("topology", "") if isinstance(safe_parse_attributes(x), dict) else "" + ) + + df["dim"] = df["ATTRIBUTES"].apply( + lambda x: safe_parse_attributes(x).get("dim", safe_parse_attributes(x).get("scatter_dim", "")) + if isinstance(safe_parse_attributes(x), dict) + else "" + ) + + df["num_links"] = df["ATTRIBUTES"].apply( + lambda x: safe_parse_attributes(x).get("num_links", "") if isinstance(safe_parse_attributes(x), dict) else "" + ) + + df["output_mem_config"] = df["ATTRIBUTES"].apply( + lambda x: ", ".join( + [ + match.split("::")[1] + for match in re.findall( + r"(BufferType::\w+|TensorMemoryLayout::\w+)", + str(safe_parse_attributes(x).get("output_mem_config", "")), + ) + ] + ) + if isinstance(safe_parse_attributes(x), dict) + else "" + ) + + df["n_chips"] = df["ATTRIBUTES"].apply( + lambda x: int(safe_parse_attributes(x).get("ring_size", "")) + if isinstance(safe_parse_attributes(x), dict) + else 0 + ) + + group_columns = [ + "ATTRIBUTES", + "INPUT_0_W", + "INPUT_0_Z", + "INPUT_0_Y", + "INPUT_0_X", + "INPUT_0_LAYOUT", + "INPUT_0_DATATYPE", + "OUTPUT_0_W", + "OUTPUT_0_Z", + "OUTPUT_0_Y", + "OUTPUT_0_X", + "OUTPUT_0_LAYOUT", + "OUTPUT_0_DATATYPE", + ] + + grouped = df.groupby(group_columns) + + numeric_columns = [ + "HOST DURATION [ns]", + "Cycles Count", + "OP TO OP LATENCY [ns]", + "DEVICE FW DURATION [ns]", + "DEVICE KERNEL DURATION [ns]", + ] + + averages_data = [] + + for i, (group, group_df) in enumerate(grouped, start=1): + group_df = group_df.iloc[2 * group_df["n_chips"].iloc[0] :] + + group_df = group_df.sort_values(by=["DEVICE ID", "OP TO OP LATENCY [ns]"]).reset_index(drop=True) + group_df = group_df.groupby("DEVICE ID").apply(lambda x: x.iloc[0:-1]).reset_index(drop=True) + + group_df.rename(columns={"INPUT_0_LAYOUT": "Layout", "INPUT_0_DATATYPE": "Data Type"}, inplace=True) + + group_df["Input Shape"] = group_df.apply( + lambda row: f"[{int(row['INPUT_0_W'])}, {int(row['INPUT_0_Z'])}, {int(row['INPUT_0_Y'])}, {int(row['INPUT_0_X'])}]", + axis=1, + ) + group_df["Output Shape"] = group_df.apply( + lambda row: f"[{int(row['OUTPUT_0_W'])}, {int(row['OUTPUT_0_Z'])}, {int(row['OUTPUT_0_Y'])}, {int(row['OUTPUT_0_X'])}]", + axis=1, + ) + group_df["Cycles Count"] = group_df["DEVICE FW END CYCLE"] - group_df["DEVICE FW START CYCLE"] + + group_file_path = file_path.replace(".csv", f"_group_{i}.csv") + + group_df.to_csv(group_file_path, index=False) + + group_data = { + "Input Shape": group_df["Input Shape"].iloc[0], + "OP CODE": group_df["OP CODE"].iloc[0], + "dim": group_df["dim"].iloc[0] if "dim" in group_df else "", + "num_links": group_df["num_links"].iloc[0] if "num_links" in group_df else "", + "output_mem_config": group_df["output_mem_config"].iloc[0] if "output_mem_config" in group_df else "", + "topology": group_df["topology"].iloc[0], + "Layout": group_df["Layout"].iloc[0] if "Layout" in group_df else "", + "Data Type": group_df["Data Type"].iloc[0] if "Data Type" in group_df else "", + } + + for column in numeric_columns: + min_val = round(group_df[column].min(), 2) + largest_vals = group_df[column].nlargest(3) + max_val = round(largest_vals.iloc[-1], 2) + if min_val == max_val: + avg_val = min_val + else: + avg_val = round(group_df[column][~group_df[column].isin(largest_vals.head(2))].mean(), 2) + + group_data[column] = f"{min_val} - {avg_val} - {max_val}" + + averages_data.append(group_data) + + averages_df = pd.DataFrame(averages_data) + op_code = averages_df.iloc[0]["OP CODE"] + + today = time.strftime("%Y_%m_%d") + if op_code == "AllGather": + ccl_perf_file_path = f"CCL_all_gather_Perf_{today}.csv" + elif op_code == "AllGatherAsyn": + ccl_perf_file_path = f"CCL_all_gather_async_Perf_{today}.csv" + elif op_code == "ReduceScatter": + ccl_perf_file_path = f"CCL_reduce_scatter_Perf_{today}.csv" + else: + ccl_perf_file_path = f"CCL_Perf_{today}.csv" + + shutil.copy(file_path, ccl_perf_file_path) + + logs_dir = "generated/profiler/.logs" + os.makedirs(logs_dir, exist_ok=True) + shutil.copy(ccl_perf_file_path, logs_dir) + + averages_df.to_csv(ccl_perf_file_path, index=False) + + print(f"CCL Perf report CSV saved to: {ccl_perf_file_path}") + + return averages_df diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/run_async_all_gather_profile.sh b/tests/ttnn/unit_tests/operations/ccl/perf/run_async_all_gather_profile.sh new file mode 100755 index 00000000000..e2133b6d084 --- /dev/null +++ b/tests/ttnn/unit_tests/operations/ccl/perf/run_async_all_gather_profile.sh @@ -0,0 +1,107 @@ +#!/bin/sh +MODULE_DIR="tests/ttnn/unit_tests/operations/ccl/perf" + +# Defaults +DEBUG=false +TARGET="n300" + +# Function to display help +show_help() { + echo "Usage: ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh [OPTIONS]" + echo + echo "Options:" + echo " -d, --debug Enable debug mode to show real-time output." + echo " -t, --target Specify the target configuration (t3000 or n300 or tg). Default is n300." + echo " -h, --help Display this help message." + echo + echo "Example:" + echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh --debug --target n300" + echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh -h" +} + +# Parse command-line arguments +while [ $# -gt 0 ]; do + case "$1" in + --debug|-d) + DEBUG=true + shift + ;; + --help|-h) + show_help + exit 0 + ;; + --target|-t) + # Ensure there is an argument following the target flag + if [ -z "$2" ]; then + echo "Error: No target specified after $1." + show_help + exit 1 + fi + + TARGET="$2" # Set the target configuration + shift 2 + + # Validate the target value + if [ "$TARGET" != "t3000" ] && [ "$TARGET" != "tg" ] && [ "$TARGET" != "n300" ]; then + echo "Error: Invalid target configuration: $TARGET. Must be 't3000' or 'n300' or 'tg'." + exit 1 + fi + ;; + *) + echo "Unknown option: $1" + show_help + exit 1 + ;; + esac +done + +# Function to run the profiling command and extract the CSV path +run_profile_and_extract_csv() { + command="./tt_metal/tools/profiler/profile_this.py -n all_gather_async_$TARGET -c 'pytest tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py::test_all_gather_async_$TARGET'" + + if [ "$DEBUG" = true ]; then + echo "Running profiling command for target $TARGET in debug mode..." + full_output=$(eval $command 2>&1 | tee /dev/tty) + else + echo "Running profiling command for target $TARGET..." + full_output=$(eval $command 2>&1) + fi + + # Extract the CSV path + csv_path=$(echo "$full_output" | grep -oE 'OPs csv generated at: (.+\.csv)' | sed -E 's/OPs csv generated at: //') + + if [ -n "$csv_path" ]; then + echo "CSV path found: $csv_path" + echo "Generating performance report..." + + tmp_file="/tmp/perf_report_output.log" + PYTHONPATH="$MODULE_DIR" python3 -c " +import sys +import pandas as pd +from async_perf_csv import perf_report +from tabulate import tabulate + +try: + # Generate the report and convert it to a DataFrame + average_df = perf_report('$csv_path') + # Print the DataFrame in a pretty table format + print('Min - Avg - Max by Common Runs:') + print(tabulate(average_df, headers='keys', tablefmt='pretty')) +except Exception as e: + print(f'Error in performance report generation: {e}', file=sys.stderr) + sys.exit(1) +" 2>&1 | tee "$tmp_file" + + if grep -q "Error in performance report generation" "$tmp_file"; then + echo "Error: Performance report generation failed." + exit 1 + fi + + else + echo "CSV path not found in the command output." + exit 1 + fi +} + +# Run the function +run_profile_and_extract_csv diff --git a/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py b/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py new file mode 100644 index 00000000000..b90598cc160 --- /dev/null +++ b/tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_async_perf.py @@ -0,0 +1,143 @@ +# SPDX-FileCopyrightText: © 2025 Tenstorrent Inc. + +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import ttnn +from models.utility_functions import skip_for_grayskull +from tests.ttnn.unit_tests.operations.ccl.test_new_all_gather import ( + run_all_gather_impl, +) +from tests.ttnn.unit_tests.operations.ccl.test_all_gather_TG_post_commit import ( + run_line_all_gather_on_TG_with_mesh_tensor_along_rows, +) + + +@skip_for_grayskull("Requires eth connected devices to run") +@pytest.mark.parametrize( + "num_devices, num_links, output_shape, dim, layout", + [ + (4, 1, [1, 1, 64, 512], 3, ttnn.TILE_LAYOUT), + (4, 1, [1, 1, 32, 1280], 0, ttnn.TILE_LAYOUT), + (4, 1, [1, 1, 32, 7168], 0, ttnn.TILE_LAYOUT), + (4, 1, [1, 1, 32, 3584], 0, ttnn.TILE_LAYOUT), + (4, 1, [1, 1, 32, 32], 0, ttnn.TILE_LAYOUT), + ], +) +@pytest.mark.parametrize( + "input_dtype", + [ + ttnn.bfloat16, + ], +) +@pytest.mark.parametrize( + "mem_config", + [ + ttnn.MemoryConfig(buffer_type=ttnn.BufferType.DRAM), + ], +) +@pytest.mark.parametrize("num_iters", [20]) +@pytest.mark.parametrize("enable_async", [True]) +@pytest.mark.parametrize("device_params", [{"trace_region_size": 1824800}], indirect=True) +def test_all_gather_async_t3000( + t3k_mesh_device, + num_devices, + output_shape, + dim, + num_links, + input_dtype, + layout, + mem_config, + num_iters, + use_program_cache, + function_level_defaults, + enable_async, +): + output_shape[dim] *= num_devices + run_all_gather_impl( + t3k_mesh_device, + num_devices, + output_shape, + dim, + num_links, + input_dtype, + layout, + use_program_cache, + function_level_defaults, + all_gather_topology=ttnn.Topology.Ring, + num_iters=num_iters, + enable_async=enable_async, + rand_tensor=True, + mem_config=mem_config, + trace_mode=True, + ) + + +@skip_for_grayskull("Requires eth connected devices to run") +@pytest.mark.parametrize( + "num_devices, num_links, per_chip_output_shape, dim, layout", + [ + (8, 1, [1, 8, 32, 1280], 1, ttnn.TILE_LAYOUT), + (8, 1, [8, 1, 32, 1280], 0, ttnn.TILE_LAYOUT), + (8, 1, [1, 8, 32, 2048], 1, ttnn.TILE_LAYOUT), + (8, 1, [1, 8, 32, 2304], 1, ttnn.TILE_LAYOUT), + (8, 1, [1, 8, 32, 4096], 1, ttnn.TILE_LAYOUT), + ], +) +@pytest.mark.parametrize( + "input_dtype", + [ + ttnn.bfloat16, + ttnn.bfloat8_b, + ], +) +@pytest.mark.parametrize( + "buffer_type", + [ + ttnn.BufferType.DRAM, + ttnn.BufferType.L1, + ], +) +@pytest.mark.parametrize("replication_factor", [4]) +@pytest.mark.parametrize("enable_async", [True]) +@pytest.mark.parametrize("mesh_device", [pytest.param((8, 4), id="8x4_grid")], indirect=True) +@pytest.mark.parametrize("device_params", [{"trace_region_size": 1824800}], indirect=True) +def test_all_gather_async_tg( + mesh_device, + num_devices, + per_chip_output_shape, + dim, + num_links, + input_dtype, + layout, + buffer_type, + use_program_cache, + function_level_defaults, + enable_async, + replication_factor, + num_iters=1, +): + if len(mesh_device.get_devices()) != 32: + pytest.skip("Not TG!") + run_line_all_gather_on_TG_with_mesh_tensor_along_rows( + mesh_device, + num_devices, + per_chip_output_shape, + ttnn.TensorMemoryLayout.INTERLEAVED, + dim, + num_links, + input_dtype, + layout, + buffer_type, + use_program_cache, + function_level_defaults, + enable_async=enable_async, + num_iters=num_iters, + num_all_gather_instances=replication_factor, + cluster_axis=0, + use_all_gather_async=True, + enable_persistent_fabric=True, + create_persistent_fabric=True, + teardown_persistent_fabric=True, + trace_mode=True, + ) diff --git a/tests/ttnn/unit_tests/operations/ccl/test_all_gather_TG_post_commit.py b/tests/ttnn/unit_tests/operations/ccl/test_all_gather_TG_post_commit.py index 7534038d205..0e080a3e219 100644 --- a/tests/ttnn/unit_tests/operations/ccl/test_all_gather_TG_post_commit.py +++ b/tests/ttnn/unit_tests/operations/ccl/test_all_gather_TG_post_commit.py @@ -57,28 +57,30 @@ def run_with_trace( num_links, cluster_axis, output_mem_config, + ccl_semaphore_handles, + worker_sub_device_id, + enable_persistent_fabric, n_worker=None, n_buffer=None, num_iter=20, + use_all_gather_async=False, ): # Compile Run logger.info("Compiling model") - tt_out_tensor = ttnn.all_gather( - input_tensor, - dim=dim, - cluster_axis=cluster_axis, - mesh_device=mesh_device, - num_links=num_links, - memory_config=output_mem_config, - topology=all_gather_topology, - ) - for d in mesh_device.get_devices(): - ttnn.synchronize_device(d) - - # Capture trace - logger.info("Capturing trace") - trace_id = ttnn.begin_trace_capture(mesh_device, cq_id=0) - for i in range(num_iter): + if use_all_gather_async: + tt_out_tensor = ttnn.experimental.all_gather_async( + input_tensor, + dim, + cluster_axis=cluster_axis, + mesh_device=mesh_device, + topology=ttnn.Topology.Linear, + multi_device_global_semaphore=ccl_semaphore_handles, + num_links=num_links, + memory_config=output_mem_config, + subdevice_id=worker_sub_device_id, + enable_persistent_fabric_mode=enable_persistent_fabric, + ) + else: tt_out_tensor = ttnn.all_gather( input_tensor, dim=dim, @@ -88,6 +90,37 @@ def run_with_trace( memory_config=output_mem_config, topology=all_gather_topology, ) + for d in mesh_device.get_devices(): + ttnn.synchronize_device(d) + + # Capture trace + logger.info("Capturing trace") + trace_id = ttnn.begin_trace_capture(mesh_device, cq_id=0) + for i in range(num_iter): + if use_all_gather_async: + logger.info("Running all-gather async") + tt_out_tensor = ttnn.experimental.all_gather_async( + input_tensor, + dim, + cluster_axis=cluster_axis, + mesh_device=mesh_device, + topology=ttnn.Topology.Linear, + multi_device_global_semaphore=ccl_semaphore_handles, + num_links=num_links, + memory_config=output_mem_config, + subdevice_id=worker_sub_device_id, + enable_persistent_fabric_mode=enable_persistent_fabric, + ) + else: + tt_out_tensor = ttnn.all_gather( + input_tensor, + dim=dim, + cluster_axis=cluster_axis, + mesh_device=mesh_device, + num_links=num_links, + memory_config=output_mem_config, + topology=all_gather_topology, + ) ttnn.end_trace_capture(mesh_device, trace_id, cq_id=0) for d in mesh_device.get_devices(): ttnn.synchronize_device(d) @@ -224,8 +257,12 @@ def run_line_all_gather_on_TG_with_mesh_tensor_along_rows( mesh_device=mesh_device, num_links=num_links, output_mem_config=output_mem_config, + ccl_semaphore_handles=ccl_semaphore_handles, + worker_sub_device_id=worker_sub_device_id, + enable_persistent_fabric=enable_persistent_fabric, all_gather_topology=ttnn.Topology.Linear, num_iter=num_iters, + use_all_gather_async=use_all_gather_async, ) else: for _ in range(num_iters): diff --git a/tests/ttnn/unit_tests/operations/ccl/test_new_all_gather.py b/tests/ttnn/unit_tests/operations/ccl/test_new_all_gather.py index 3118f06a5c5..47fd18fa202 100644 --- a/tests/ttnn/unit_tests/operations/ccl/test_new_all_gather.py +++ b/tests/ttnn/unit_tests/operations/ccl/test_new_all_gather.py @@ -68,6 +68,7 @@ def run_with_trace( dim, num_links, output_mem_config, + enable_persistent_fabric, multi_device_global_semaphore, num_iter=20, subdevice_id=None, @@ -82,6 +83,7 @@ def run_with_trace( memory_config=output_mem_config, topology=all_gather_topology, subdevice_id=subdevice_id, + enable_persistent_fabric_mode=enable_persistent_fabric, ) for d in mesh_device.get_devices(): ttnn.synchronize_device(d) @@ -98,6 +100,7 @@ def run_with_trace( memory_config=output_mem_config, topology=all_gather_topology, subdevice_id=subdevice_id, + enable_persistent_fabric_mode=enable_persistent_fabric, ) ttnn.end_trace_capture(mesh_device, trace_id, cq_id=0) for d in mesh_device.get_devices(): @@ -242,6 +245,7 @@ def run_all_gather_impl( dim, num_links, output_mem_config, + enable_persistent_fabric, multi_device_global_semaphore=ccl_semaphore_handles, num_iter=num_iters, subdevice_id=worker_sub_device_id,