Skip to content

Commit

Permalink
feat(distributed): add scripts for distributed infer
Browse files Browse the repository at this point in the history
  • Loading branch information
PanZezhong1725 committed Nov 15, 2023
1 parent d5e73fe commit f3732a5
Show file tree
Hide file tree
Showing 17 changed files with 496 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ __pycache__/
*.so
*.log
*.onnx
*.pb
*.npy

/scripts/*.py
!/scripts/format.py
157 changes: 157 additions & 0 deletions examples/distributed/launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import argparse
import os
import time
import multiprocessing as mp
from refactor_graph.onnx import make_compiler
import onnx
from onnx.external_data_helper import convert_model_to_external_data
from onnx.shape_inference import infer_shapes_path
import numpy as np
from parallel_opt import parallel_model


os.environ["NVIDIA_TF32_OVERRIDE"] = "0"


def parse_args():
parser = argparse.ArgumentParser(description="launch distributed infinitensor")
parser.add_argument("--num_nodes", type=int, default=1, help="number of nodes")
parser.add_argument(
"--nproc_per_node", type=int, default=1, help="number of processes per node"
)
parser.add_argument(
"--name", type=str, default="test", help="name of this instance."
)
parser.add_argument(
"--model", type=str, required=True, help="path to the ONNX model file."
)
parser.add_argument("--batch_size", type=int, default=1, help="batch size.")
parser.add_argument("--length", type=int, default=512, help="sequence length.")
parser.add_argument(
"--gen_std",
action="store_true",
help="whether to generate the standard results.",
)
args = parser.parse_args()
print("arg setting: ", args)
return (
args.num_nodes,
args.nproc_per_node,
args.name,
args.model,
args.batch_size,
args.length,
args.gen_std,
)


def run_model(executor, inputs, n=10):
for i in range(executor.input_count()):
executor.set_input(i, inputs[i])

executor.prepare()
executor.run()
# get outputs
outputs = executor.get_output(0)

# bench
begin = time.time()
for _ in range(n):
executor.run()
end = time.time()
avg_time = (end - begin) / n
print(f"average time: {avg_time}")
return outputs


def run_and_compare(name, executor):
input_ids = np.load(f"{name}_inputs.npy")
position_ids = np.arange(input_ids.shape[-1])
results = np.load(f"{name}_results.npy")
outputs = run_model(executor, (input_ids, position_ids))
print("outputs abs mean:", abs(outputs).mean())
np.testing.assert_allclose(outputs, results, rtol=1e-6, atol=1e-3)


def start_worker(
name: str, world_size: int, rank: int, local_rank: int, model: onnx.ModelProto
):
dist_name = name + "_dist"
model = parallel_model(model, world_size, rank)
extern_path = f"./{dist_name}_rank{rank}.pb"
if os.path.exists(extern_path):
os.remove(extern_path)
onnx.save_model(
model,
f"./{dist_name}_rank{rank}.onnx",
save_as_external_data=True,
location=extern_path,
)
infer_shapes_path(f"./{dist_name}_rank{rank}.onnx")

compiler = make_compiler(model, ".")

executor = compiler.compile("cuda", "default", [], rank)
executor.set_cuda_commnication(world_size, rank)
run_and_compare(name, executor)


def start_single(name, model):
compiler = make_compiler(model)
executor = compiler.compile("cuda", "default", [], 0)
run_and_compare(name, executor)


def gen_standard(name, model, voc_size, bs, len):
# generate standard results
input_ids = np.random.randint(0, voc_size, (bs, len)).astype(np.int32)
position_ids = np.arange(len)
np.save(f"{name}_inputs", input_ids)
compiler = make_compiler(model)
executor = compiler.compile("cuda", "default", [], 0)
outputs = run_model(executor, (input_ids, position_ids), 1)
print("outputs abs mean:", abs(outputs).mean())
np.save(f"{name}_results", outputs)


def main():
nnodes, nproc_per_node, name, model_path, bs, length, gen_std = parse_args()

model = onnx.load(model_path)

gen_std =False
# generate standart output
if gen_std:
print(f"generate standard data for {name}.")
# a small vocabulary size to fit all LLM.
voc_size = 1000
gen_standard(name, model, voc_size, bs, length)
return

# run single process.
# use standalone process to isolate cuda.
print("run model by single GPU.")
p = mp.Process(target=start_single, args=(name, model))
p.start()
p.join()

# run distributed parallel.
world_size = nnodes * nproc_per_node
print(f"run model by {world_size} GPU in parallel.")
workers = [
mp.Process(
target=start_worker,
args=(name, world_size, rank, rank % nproc_per_node, model),
)
for rank in range(world_size)
]

for w in workers:
w.start()

for w in workers:
w.join()


if __name__ == "__main__":
main()
Loading

0 comments on commit f3732a5

Please sign in to comment.