Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#14406: Add perf test for reduce scatter #14838

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions tests/ttnn/unit_tests/operations/ccl/perf/perf_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def perf_report(file_path):

df = df.dropna(subset=["DEVICE ERISC KERNEL DURATION [ns]"])
df = df[df["OP TO OP LATENCY [ns]"] != 0]
df = df[df["TRACE ID"].notna() & (df["TRACE ID"] != "")]
df = df[df["METAL TRACE ID"].notna() & (df["METAL TRACE ID"] != "")]

def remove_keys_from_attributes(attributes):
attributes = attributes.replace(";", ",").replace("'", '"')
Expand Down Expand Up @@ -56,7 +56,9 @@ def safe_parse_attributes(attributes):
)

df["dim"] = df["ATTRIBUTES"].apply(
lambda x: safe_parse_attributes(x).get("dim", "") if isinstance(safe_parse_attributes(x), dict) else ""
lambda x: safe_parse_attributes(x).get("dim", safe_parse_attributes(x).get("scatter_dim", ""))
if isinstance(safe_parse_attributes(x), dict)
else ""
)

df["num_links"] = df["ATTRIBUTES"].apply(
Expand Down Expand Up @@ -154,15 +156,15 @@ def calculate_bandwidth(row):
op_bw = (output_tensor_volume * (n_chips - 1) / n_chips) / longest_device_fw_time
link_bw = (output_tensor_volume * (n_chips - 1) / n_chips) / longest_erisc_fw_time
elif row["OP CODE"] == "ReduceScatter":
op_bw = (input_tensor_volume / n_chips) / longest_device_fw_time
link_bw = (input_tensor_volume * (n_chips - 1) / n_chips) / longest_erisc_fw_time
op_bw = input_tensor_volume / longest_device_fw_time
link_bw = input_tensor_volume / longest_erisc_fw_time
elif row["topology"] == "Linear":
if row["OP CODE"] == "AllGather":
op_bw = input_tensor_volume * n_chips / longest_device_fw_time
link_bw = input_tensor_volume * (n_chips - 1) / longest_erisc_fw_time
elif row["OP CODE"] == "ReduceScatter":
op_bw = input_tensor_volume / longest_device_fw_time
link_bw = input_tensor_volume * (n_chips - 1) / n_chips / longest_erisc_fw_time
link_bw = input_tensor_volume / longest_erisc_fw_time
return round(op_bw, 2), round(link_bw, 2)

for i, (group, group_df) in enumerate(grouped, start=1):
Expand Down Expand Up @@ -194,13 +196,17 @@ def calculate_bandwidth(row):
"output_mem_config": group_df["output_mem_config"].iloc[0] if "output_mem_config" in group_df else "",
"topology": group_df["topology"].iloc[0],
"Layout": group_df["Layout"].iloc[0] if "Layout" in group_df else "",
"Data Type": group_df["Data Type"].iloc[0] if "Data Type" in group_df else "",
}

for column in numeric_columns:
min_val = round(group_df[column].min(), 2)
largest_vals = group_df[column].nlargest(3)
max_val = round(largest_vals.iloc[-1], 2)
avg_val = round(group_df[column][~group_df[column].isin(largest_vals.head(2))].mean(), 2)
if min_val == max_val:
avg_val = min_val
else:
avg_val = round(group_df[column][~group_df[column].isin(largest_vals.head(2))].mean(), 2)

group_data[column] = f"{min_val} - {avg_val} - {max_val}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/bin/sh
MODULE_DIR="tests/ttnn/unit_tests/operations/ccl/perf"

# Defaults
DEBUG=false
TARGET="n300"

# Function to display help
show_help() {
echo "Usage: ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh [OPTIONS]"
echo
echo "Options:"
echo " -d, --debug Enable debug mode to show real-time output."
echo " -t, --target Specify the target configuration (t3000 or n300). Default is n300."
echo " -h, --help Display this help message."
echo
echo "Example:"
echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh --debug --target n300"
echo " ./tests/ttnn/unit_tests/operations/ccl/perf/run_profile.sh -h"
}

# Parse command-line arguments
while [ $# -gt 0 ]; do
case "$1" in
--debug|-d)
DEBUG=true
shift
;;
--help|-h)
show_help
exit 0
;;
--target|-t)
# Ensure there is an argument following the target flag
if [ -z "$2" ]; then
echo "Error: No target specified after $1."
show_help
exit 1
fi

TARGET="$2" # Set the target configuration
shift 2

# Validate the target value
if [ "$TARGET" != "t3000" ] && [ "$TARGET" != "n300" ]; then
echo "Error: Invalid target configuration: $TARGET. Must be either 't3000' or 'n300'."
exit 1
fi
;;
*)
echo "Unknown option: $1"
show_help
exit 1
;;
esac
done

# Function to run the profiling command and extract the CSV path
run_profile_and_extract_csv() {
command="./tt_metal/tools/profiler/profile_this.py -n reduce_scatter_$TARGET -c 'pytest tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_perf.py::test_reduce_scatter_on_$TARGET'"

if [ "$DEBUG" = true ]; then
echo "Running profiling command for target $TARGET in debug mode..."
full_output=$(eval $command 2>&1 | tee /dev/tty)
else
echo "Running profiling command for target $TARGET..."
full_output=$(eval $command 2>&1)
fi

# Extract the CSV path
csv_path=$(echo "$full_output" | grep -oE 'OPs csv generated at: (.+\.csv)' | sed -E 's/OPs csv generated at: //')

if [ -n "$csv_path" ]; then
echo "CSV path found: $csv_path"

# Run the Python script to generate performance report
average_values=$(PYTHONPATH="$MODULE_DIR" python3 -c "
import pandas as pd
from perf_csv import perf_report
from tabulate import tabulate

# Generate the report and convert it to a DataFrame
average_df = perf_report('$csv_path')
# Print the DataFrame in a pretty table format
print(tabulate(average_df, headers='keys', tablefmt='pretty'))
")

# Print the output
echo "Min - Avg - Max by Common Runs:"
echo "$average_values"
else
echo "CSV path not found in the command output."
fi
}

# Run the function
run_profile_and_extract_csv
138 changes: 138 additions & 0 deletions tests/ttnn/unit_tests/operations/ccl/perf/test_ccl_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
run_all_gather_on_n300_impl,
run_all_gather_on_t3000_impl_tight_loop,
)
from tests.ttnn.unit_tests.operations.ccl.test_reduce_scatter_post_commit import (
run_reduce_scatter_test,
)


@skip_for_grayskull("Requires eth connected devices to run")
Expand Down Expand Up @@ -128,3 +131,138 @@ def test_all_gather_on_t3000(
enable_async=enable_async,
trace_mode=True,
)


@skip_for_grayskull("Requires eth connected devices to run")
@pytest.mark.parametrize(
"num_devices, num_links",
[
(8, 1),
],
)
@pytest.mark.parametrize(
"per_chip_output_shape, scatter_dim, layout",
[
([1, 8, 1024, 1024], 3, ttnn.TILE_LAYOUT),
([1, 4, 1024, 1024], 3, ttnn.TILE_LAYOUT),
([1, 4, 2048, 1024], 3, ttnn.TILE_LAYOUT),
([1, 1, 32, 32 * 8], 3, ttnn.TILE_LAYOUT),
([1, 1, 32, 64 * 8], 3, ttnn.TILE_LAYOUT),
],
)
@pytest.mark.parametrize(
"input_dtype",
[
ttnn.bfloat16,
],
)
@pytest.mark.parametrize(
"mem_config",
[
ttnn.MemoryConfig(buffer_type=ttnn.BufferType.DRAM),
],
)
@pytest.mark.parametrize("num_iters", [20])
@pytest.mark.parametrize("math_op", [ttnn.ReduceType.Sum])
@pytest.mark.parametrize("enable_async", [True])
@pytest.mark.parametrize("topology", [ttnn.Topology.Linear, ttnn.Topology.Ring])
@pytest.mark.parametrize("device_params", [{"trace_region_size": 266240}], indirect=True)
def test_reduce_scatter_on_t3000(
t3k_mesh_device,
num_devices,
per_chip_output_shape,
scatter_dim,
num_links,
math_op,
input_dtype,
layout,
mem_config,
use_program_cache,
function_level_defaults,
enable_async,
num_iters,
topology,
):
run_reduce_scatter_test(
t3k_mesh_device,
num_devices,
per_chip_output_shape,
scatter_dim,
num_links,
math_op,
input_dtype,
layout,
mem_config,
use_program_cache,
function_level_defaults,
num_iters=num_iters,
enable_async=enable_async,
topology=topology,
trace_mode=True,
)


@skip_for_grayskull("Requires eth connected devices to run")
@pytest.mark.parametrize(
"num_devices, num_links",
[
(2, 1),
],
)
@pytest.mark.parametrize(
"per_chip_output_shape, scatter_dim, layout",
[
([1, 1, 32, 4096], 3, ttnn.TILE_LAYOUT),
([1, 1, 32, 2048], 3, ttnn.TILE_LAYOUT),
([1, 1, 32, 1024], 3, ttnn.TILE_LAYOUT),
],
)
@pytest.mark.parametrize(
"input_dtype",
[
ttnn.bfloat16,
ttnn.bfloat8_b,
],
)
@pytest.mark.parametrize(
"mem_config",
[
ttnn.MemoryConfig(buffer_type=ttnn.BufferType.DRAM),
ttnn.MemoryConfig(buffer_type=ttnn.BufferType.L1),
],
)
@pytest.mark.parametrize("num_iters", [20])
@pytest.mark.parametrize("math_op", [ttnn.ReduceType.Sum])
@pytest.mark.parametrize("enable_async", [True])
@pytest.mark.parametrize("device_params", [{"trace_region_size": 266240}], indirect=True)
def test_reduce_scatter_on_n300(
n300_mesh_device,
num_devices,
per_chip_output_shape,
scatter_dim,
num_links,
math_op,
input_dtype,
layout,
mem_config,
use_program_cache,
function_level_defaults,
enable_async,
num_iters,
):
run_reduce_scatter_test(
n300_mesh_device,
num_devices,
per_chip_output_shape,
scatter_dim,
num_links,
math_op,
input_dtype,
layout,
mem_config,
use_program_cache,
function_level_defaults,
num_iters=num_iters,
enable_async=enable_async,
trace_mode=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ def run_with_trace(
num_links,
math_op,
output_mem_config,
n_worker,
n_buffer,
num_iters,
n_worker=None,
n_buffer=None,
num_iters=40,
topology=ttnn.Topology.Ring,
):
# Compile Run
logger.info("Compiling model")
Expand All @@ -46,6 +47,7 @@ def run_with_trace(
memory_config=output_mem_config,
num_workers=n_worker,
num_buffers_per_channel=n_buffer,
topology=topology,
)
for device_id in t3k_mesh_device.get_device_ids():
ttnn.synchronize_device(t3k_mesh_device.get_device(device_id))
Expand All @@ -62,6 +64,7 @@ def run_with_trace(
memory_config=output_mem_config,
num_workers=n_worker,
num_buffers_per_channel=n_buffer,
topology=topology,
)
ttnn.end_trace_capture(t3k_mesh_device, trace_id, cq_id=0)
for device_id in t3k_mesh_device.get_device_ids():
Expand Down Expand Up @@ -92,6 +95,7 @@ def run_reduce_scatter_test(
enable_async=True,
num_iters=1,
topology=ttnn.Topology.Ring,
trace_mode=False,
):
if len(mesh_device.get_device_ids()) < num_devices:
pytest.skip(
Expand Down Expand Up @@ -135,19 +139,31 @@ def run_reduce_scatter_test(

input_tensor_mesh = ttnn.aggregate_as_tensor(tt_input_tensors)
# Run the op
for i in range(num_iters):
output_tensor_mesh = ttnn.reduce_scatter(
if trace_mode:
output_tensor_mesh = run_with_trace(
mesh_device,
input_tensor_mesh,
scatter_dim=scatter_dim,
math_op=math_op,
num_links=num_links,
memory_config=mem_config,
scatter_dim,
num_links,
math_op,
mem_config,
num_iters=num_iters,
topology=topology,
)
else:
for i in range(num_iters):
output_tensor_mesh = ttnn.reduce_scatter(
input_tensor_mesh,
scatter_dim=scatter_dim,
math_op=math_op,
num_links=num_links,
memory_config=mem_config,
topology=topology,
)

for device_id in mesh_device.get_device_ids():
ttnn.synchronize_device(mesh_device.get_device(device_id))
logger.info(f"Done iteration {i}")
for device_id in mesh_device.get_device_ids():
ttnn.synchronize_device(mesh_device.get_device(device_id))
logger.info(f"Done iteration {i}")

# ttnn.visualize_mesh_device(t3k_mesh_device, tensor=output_tensor_mesh)
# Compute golden
Expand Down
Loading