Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#0: Port all Misc ops to use TensorSpec #15509

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 86 additions & 66 deletions tests/ttnn/unit_tests/gtests/test_ccl_on_galaxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ using namespace tt_metal;

// We use this to dispatch a single device operation asynchronously
// Needed to reproduce the deadlock scenario with a very specific pattern of commands
// This can go away once device_operation::run will be made async and ccl op is moved to the new tmp-based DeviceOperation
// This can go away once device_operation::run will be made async and ccl op is moved to the new tmp-based
// DeviceOperation
namespace async_detail {
template<typename OpConfig>
template <typename OpConfig>
std::vector<Tensor> run_operation(
uint8_t cq_id,
OpConfig devop,
const operation::Tensors& input_tensors,
const operation::OptionalConstTensors& optional_input_tensors = {},
const operation::OptionalTensors& optional_output_tensors = {}) {
static_assert(operation::detail::is_device_operation<OpConfig>(), "ttnn::run_operation can only dispatch Device Operations!");
static_assert(
operation::detail::is_device_operation<OpConfig>(), "ttnn::run_operation can only dispatch Device Operations!");
// Create output tensor vector by examining the number of output shapes created by the device operation
auto output_shapes = operation::DeviceOperation<operation::Tensors>(devop).compute_output_shapes(input_tensors);
auto output_shapes = operation::DeviceOperation<operation::Tensors>(devop).compute_output_shapes(input_tensors, {});
size_t output_shapes_size = 0;
if (std::holds_alternative<std::vector<ttnn::SimpleShape>>(output_shapes)) {
output_shapes_size = std::get<std::vector<ttnn::SimpleShape>>(output_shapes).size();
Expand All @@ -44,71 +46,69 @@ std::vector<Tensor> run_operation(
// Send the operation to the async engine, which will populate the output tensors.
for (auto worker : outputs.at(0).workers) {
tt::tt_metal::operation::launch_op(
[devop, worker, cq_id] (const std::vector<Tensor>& input_tensors, const std::vector<std::optional<const Tensor>>& optional_input_tensors, const std::vector<std::optional<Tensor>>& optional_output_tensors) mutable -> std::vector<Tensor> {
return operation::run(std::move(devop), input_tensors, optional_input_tensors, optional_output_tensors, cq_id);
}, input_tensors, outputs, optional_input_tensors, optional_output_tensors);
[devop, worker, cq_id](
const std::vector<Tensor>& input_tensors,
const std::vector<std::optional<const Tensor>>& optional_input_tensors,
const std::vector<std::optional<Tensor>>& optional_output_tensors) mutable -> std::vector<Tensor> {
return operation::run(
std::move(devop), input_tensors, optional_input_tensors, optional_output_tensors, cq_id);
},
input_tensors,
outputs,
optional_input_tensors,
optional_output_tensors);
}
return outputs;
}
} // namespace async_detail
} // namespace async_detail

bool is_tg_system()
{
bool is_tg_system() {
const bool is_galaxy_system = tt::Cluster::instance().is_galaxy_cluster();
const size_t num_mmio_devices = tt::Cluster::instance().number_of_pci_devices();
const size_t num_devices = tt::Cluster::instance().number_of_user_devices();
return is_galaxy_system && (num_mmio_devices == 4) && (num_devices == 32);
}

bool is_tgg_system()
{
bool is_tgg_system() {
const bool is_galaxy_system = tt::Cluster::instance().is_galaxy_cluster();
const size_t num_mmio_devices = tt::Cluster::instance().number_of_pci_devices();
const size_t num_devices = tt::Cluster::instance().number_of_user_devices();
return is_galaxy_system && (num_mmio_devices == 8) && (num_devices == 64);
}

ttnn::MeshShape get_mesh_shape()
{
ttnn::MeshShape get_mesh_shape() {
ttnn::MeshShape shape;
if (is_tg_system())
{
if (is_tg_system()) {
shape = {8, 4};
}
else {
} else {
TT_FATAL(is_tgg_system(), "Unsupported Galaxy system");
shape = {8, 8};
}
return shape;
}

void validate_num_tunnels_and_tunnel_depth()
{
void validate_num_tunnels_and_tunnel_depth() {
const uint32_t num_devices_in_tunnel = tt::Cluster::instance().get_mmio_device_max_tunnel_depth(0);
const uint32_t num_mmio_devices = tt::Cluster::instance().number_of_pci_devices();
const uint32_t cluster_tunnel_count = tt::Cluster::instance().get_mmio_device_tunnel_count(0);
TT_FATAL(num_devices_in_tunnel == 4, "Expected Galaxy to have tunnel depth of 4, detected tunnel depth of {}", num_devices_in_tunnel);
TT_FATAL(
num_devices_in_tunnel == 4,
"Expected Galaxy to have tunnel depth of 4, detected tunnel depth of {}",
num_devices_in_tunnel);
const uint32_t num_tunnels = num_mmio_devices * cluster_tunnel_count;
if (is_tg_system())
{
if (is_tg_system()) {
TT_FATAL(num_tunnels == 8, "Expected 8 tunnels in a TG system, detected {} tunnels", num_tunnels);
}
else if (is_tgg_system())
{
} else if (is_tgg_system()) {
TT_FATAL(num_tunnels == 16, "Expected 16 tunnels in a TGG system, detected {} tunnels", num_tunnels);
}
}

std::shared_ptr<bfloat16 []> create_container_for_readback_data(const uint32_t buf_size_datums)
{
if (is_tg_system())
{
return std::shared_ptr<bfloat16 []>(new bfloat16[buf_size_datums * 4]);
}
else
{
std::shared_ptr<bfloat16[]> create_container_for_readback_data(const uint32_t buf_size_datums) {
if (is_tg_system()) {
return std::shared_ptr<bfloat16[]>(new bfloat16[buf_size_datums * 4]);
} else {
TT_FATAL(is_tgg_system(), "Unsupported Galaxy system");
return std::shared_ptr<bfloat16 []>(new bfloat16[buf_size_datums * 8]);
return std::shared_ptr<bfloat16[]>(new bfloat16[buf_size_datums * 8]);
}
}

Expand All @@ -119,18 +119,17 @@ TEST(GalaxyTests, TestAllGatherDeadlock) {
validate_num_tunnels_and_tunnel_depth();

ttnn::MeshShape mesh_shape = get_mesh_shape();
std::shared_ptr<ttnn::MeshDevice> mesh = ttnn::distributed::open_mesh_device(mesh_shape, 0, 0, 1, DispatchCoreType::WORKER);
std::shared_ptr<ttnn::MeshDevice> mesh =
ttnn::distributed::open_mesh_device(mesh_shape, 0, 0, 1, DispatchCoreType::WORKER);

// Setup input data and output data containers
MemoryConfig mem_cfg = MemoryConfig{
.memory_layout = TensorMemoryLayout::INTERLEAVED,
.buffer_type = BufferType::DRAM,
.shard_spec = std::nullopt};
.memory_layout = TensorMemoryLayout::INTERLEAVED, .buffer_type = BufferType::DRAM, .shard_spec = std::nullopt};
ttnn::SimpleShape shape{1, 1, 32, 16384};
const uint32_t buf_size_datums = 32 * 16384;
const uint32_t datum_size_bytes = 2;
auto host_data = std::shared_ptr<bfloat16 []>(new bfloat16[buf_size_datums]);
std::shared_ptr<bfloat16 []> readback_data = create_container_for_readback_data(buf_size_datums);
auto host_data = std::shared_ptr<bfloat16[]>(new bfloat16[buf_size_datums]);
std::shared_ptr<bfloat16[]> readback_data = create_container_for_readback_data(buf_size_datums);
const uint32_t outer_loops = 200;

// Input to CCL is a tensor of 1s. The output should contain 4x the amount of data, but all values should be 1.
Expand Down Expand Up @@ -167,20 +166,32 @@ TEST(GalaxyTests, TestAllGatherDeadlock) {
uint32_t receiver_device_id = device_ids[(dev_idx) + 1 % num_devices_in_row];
uint32_t sender_device_id = device_ids[(dev_idx + num_devices_in_row - 1) % num_devices_in_row];
auto all_gather_op = ttnn::AllGather{
3, 2, num_devices_in_row, dev_idx, std::nullopt, std::nullopt, receiver_device_id, sender_device_id, input_tensor.memory_config(), ttnn::ccl::Topology::Linear};
3,
2,
num_devices_in_row,
dev_idx,
std::nullopt,
std::nullopt,
receiver_device_id,
sender_device_id,
input_tensor.memory_config(),
ttnn::ccl::Topology::Linear};
// Send CCL to this device. All CCLs will complete simultaneously.
output_tensors.push_back(async_detail::run_operation(0, all_gather_op, {input_tensor}).at(0));
// Expose deadlock: After the CCL is sent to the first device in the tunnel, send enough data to it to backpressure prefetch_h. This will block the
// demux, which will prevent the CCL from being sent to additional chips. If the CCL has been tagged as having multi-device dependencies, deadlock should
// get bypassed.
// Expose deadlock: After the CCL is sent to the first device in the tunnel, send enough data to it to
// backpressure prefetch_h. This will block the demux, which will prevent the CCL from being sent to
// additional chips. If the CCL has been tagged as having multi-device dependencies, deadlock should get
// bypassed.
if (!dev_idx) {
ttnn::write_buffer(0, input_tensor, {host_data});
}
dev_idx++;
}
// Readback data and verify correctness.
for (auto& tensor : output_tensors) {
ASSERT_EQ(tensor.get_shape(), ttnn::Shape(LegacyShape({1, 1, 32, static_cast<uint32_t>(16384 * device_ids.size())})));
ASSERT_EQ(
tensor.get_shape(),
ttnn::Shape(LegacyShape({1, 1, 32, static_cast<uint32_t>(16384 * device_ids.size())})));
ttnn::read_buffer(0, tensor, {readback_data});
for (int j = 0; j < device_ids.size() * 32 * 16384; j++) {
ASSERT_EQ(readback_data[j].to_float(), 1);
Expand All @@ -198,17 +209,20 @@ TEST(GalaxyTests, TestReduceScatterDeadlock) {
validate_num_tunnels_and_tunnel_depth();

ttnn::MeshShape mesh_shape = get_mesh_shape();
std::shared_ptr<ttnn::MeshDevice> mesh = ttnn::distributed::open_mesh_device(mesh_shape, 0, 0, 1, DispatchCoreType::WORKER);
// Create the outer ring on which Reduce Scatter will be run. This allows us to verify that there are no deadlocks when we send CCLs to the
// first tunnel (forward path).
std::shared_ptr<ttnn::MeshDevice> mesh =
ttnn::distributed::open_mesh_device(mesh_shape, 0, 0, 1, DispatchCoreType::WORKER);
// Create the outer ring on which Reduce Scatter will be run. This allows us to verify that there are no deadlocks
// when we send CCLs to the first tunnel (forward path).
auto view = ttnn::MeshDeviceView(*mesh);
std::vector<Device*> ring_devices = view.get_devices_on_row(0); // Tunnel 0
std::vector<Device*> ring_devices_1 = view.get_devices_on_column(mesh_shape.second - 1); // Orthogonal to tunnel .. no deadlocks
std::vector<Device*> ring_devices = view.get_devices_on_row(0); // Tunnel 0
std::vector<Device*> ring_devices_1 =
view.get_devices_on_column(mesh_shape.second - 1); // Orthogonal to tunnel .. no deadlocks
ring_devices_1 = std::vector<Device*>(ring_devices_1.begin() + 1, ring_devices_1.end());
std::vector<Device*> ring_devices_2 = view.get_devices_on_row(7); // Tunnel 7 .. potential deadlocks with lack of buffering
std::vector<Device*> ring_devices_2 =
view.get_devices_on_row(7); // Tunnel 7 .. potential deadlocks with lack of buffering
std::reverse(ring_devices_2.begin(), ring_devices_2.end());
ring_devices_2 = std::vector<Device*>(ring_devices_2.begin() + 1, ring_devices_2.end());
std::vector<Device*> ring_devices_3 = view.get_devices_on_column(0); // Orthogonal to tunnel .. no deadlocks
std::vector<Device*> ring_devices_3 = view.get_devices_on_column(0); // Orthogonal to tunnel .. no deadlocks
std::reverse(ring_devices_3.begin(), ring_devices_3.end());
ring_devices_3 = std::vector<Device*>(ring_devices_3.begin() + 1, ring_devices_3.end() - 1);

Expand All @@ -218,15 +232,13 @@ TEST(GalaxyTests, TestReduceScatterDeadlock) {

// Setup input data and output data containers
MemoryConfig mem_cfg = MemoryConfig{
.memory_layout = TensorMemoryLayout::INTERLEAVED,
.buffer_type = BufferType::DRAM,
.shard_spec = std::nullopt};
.memory_layout = TensorMemoryLayout::INTERLEAVED, .buffer_type = BufferType::DRAM, .shard_spec = std::nullopt};
ttnn::SimpleShape shape{1, 2, 256, static_cast<uint32_t>(256 * ring_devices.size())};
const uint32_t buf_size_datums = 2 * 256 * 256 * ring_devices.size();
const uint32_t datum_size_bytes = 2;
// Output of reduce scatter is input_numel / num_devices_used_in_scatter_op
auto host_data = std::shared_ptr<bfloat16 []>(new bfloat16[buf_size_datums]);
auto readback_data = std::shared_ptr<bfloat16 []>(new bfloat16[buf_size_datums / ring_devices.size()]);
auto host_data = std::shared_ptr<bfloat16[]>(new bfloat16[buf_size_datums]);
auto readback_data = std::shared_ptr<bfloat16[]>(new bfloat16[buf_size_datums / ring_devices.size()]);
uint32_t scatter_dim = 3;
uint32_t outer_loops = 500;

Expand Down Expand Up @@ -265,16 +277,24 @@ TEST(GalaxyTests, TestReduceScatterDeadlock) {
uint32_t receiver_device_id = device_ids[(dev_idx + 1) % ring_devices.size()];
uint32_t sender_device_id = device_ids[(dev_idx + ring_devices.size() - 1) % ring_devices.size()];
auto all_gather_op = ttnn::ReduceScatter{
ttnn::operations::binary::BinaryOpType::ADD, scatter_dim, 1, static_cast<uint32_t>(ring_devices.size()), dev_idx, receiver_device_id, sender_device_id, input_tensor.memory_config(), ttnn::ccl::Topology::Ring};
ttnn::operations::binary::BinaryOpType::ADD,
scatter_dim,
1,
static_cast<uint32_t>(ring_devices.size()),
dev_idx,
receiver_device_id,
sender_device_id,
input_tensor.memory_config(),
ttnn::ccl::Topology::Ring};
// Send CCL to this device. All CCLs will complete simultaneously.
output_tensors.push_back(async_detail::run_operation(0, all_gather_op, {input_tensor}).at(0));
// Expose deadlock: After the CCL is sent to a device in the first tunnel, send enough data to it to backpressure prefetch_h. This will block the
// demux, which will prevent the CCL from being sent to additional chips on the tunnel. If the CCL has been tagged as having multi-device dependencies, deadlock should
// get bypassed.
// if (dev_idx < 3) {
for (int j = 0; j < 16; j++) {
ttnn::write_buffer(0, input_tensor, {host_data});
}
// Expose deadlock: After the CCL is sent to a device in the first tunnel, send enough data to it to
// backpressure prefetch_h. This will block the demux, which will prevent the CCL from being sent to
// additional chips on the tunnel. If the CCL has been tagged as having multi-device dependencies, deadlock
// should get bypassed. if (dev_idx < 3) {
for (int j = 0; j < 16; j++) {
ttnn::write_buffer(0, input_tensor, {host_data});
}
// }
dev_idx++;
}
Expand Down
Loading
Loading