From 1f21be59ff21ddd61a7e3fc70d7440d416848a9b Mon Sep 17 00:00:00 2001 From: wangxiaoyuvvv <48796714+wangxiaoyuvvv@users.noreply.github.com> Date: Wed, 26 Jun 2024 10:52:30 +0800 Subject: [PATCH] [Hexaflake] 20240614 change config (#579) * test merge * Hexaflake-20240607 * Hexaflake-20240614 change config --- .../llama2_7b_mmlu/pytorch/__init__.py | 20 + .../llama2_7b_mmlu/pytorch/hexaflake.py | 206 ++++++ .../hexaflake/hexaflake_analysis.py | 18 + .../hexaflake/hexaflake_monitor.py | 258 +++++++ .../hexaflake/pytorch_foo/Dockerfile | 8 + .../hexaflake/pytorch_foo/pytorch_install.sh | 7 + .../inference_engine/hexaflake/hx_infexec.py | 607 +++++++++++++++++ .../inference_engine/hexaflake/pytorch_foo.py | 221 ++++++ inference/run_hexaflake.py | 637 ++++++++++++++++++ inference/run_inference.py | 4 +- 10 files changed, 1984 insertions(+), 2 deletions(-) create mode 100644 inference/benchmarks/llama2_7b_mmlu/pytorch/hexaflake.py create mode 100644 inference/docker_images/hexaflake/hexaflake_analysis.py create mode 100644 inference/docker_images/hexaflake/hexaflake_monitor.py create mode 100644 inference/docker_images/hexaflake/pytorch_foo/Dockerfile create mode 100644 inference/docker_images/hexaflake/pytorch_foo/pytorch_install.sh create mode 100644 inference/inference_engine/hexaflake/hx_infexec.py create mode 100644 inference/inference_engine/hexaflake/pytorch_foo.py create mode 100644 inference/run_hexaflake.py diff --git a/inference/benchmarks/llama2_7b_mmlu/pytorch/__init__.py b/inference/benchmarks/llama2_7b_mmlu/pytorch/__init__.py index 1f6cdf49b..ba7bb5b76 100644 --- a/inference/benchmarks/llama2_7b_mmlu/pytorch/__init__.py +++ b/inference/benchmarks/llama2_7b_mmlu/pytorch/__init__.py @@ -3,3 +3,23 @@ from .export import export_model from .evaluator import evaluator from .forward import model_forward, engine_forward +import os + +env = os.environ['vendor'] + +if env == "hexaflake": + from .hexaflake import hx_dataloader + build_dataloader = hx_dataloader + + from .hexaflake import hx_model + create_model = hx_model + + from .hexaflake import hx_export_model + export_model = hx_export_model + + from .hexaflake import hx_evaluator + evaluator = hx_evaluator + + from .hexaflake import hx_model_forward, hx_engine_forward + model_forward = hx_model_forward + engine_forward = hx_engine_forward \ No newline at end of file diff --git a/inference/benchmarks/llama2_7b_mmlu/pytorch/hexaflake.py b/inference/benchmarks/llama2_7b_mmlu/pytorch/hexaflake.py new file mode 100644 index 000000000..e15b6dd79 --- /dev/null +++ b/inference/benchmarks/llama2_7b_mmlu/pytorch/hexaflake.py @@ -0,0 +1,206 @@ +import os +import pandas as pd +from transformers import AutoTokenizer +import torch +from torch.utils.data import DataLoader, Dataset +from loguru import logger +from tools import torch_sync +import time + +TASKS = [ + 'abstract_algebra', + 'anatomy', + 'astronomy', + 'business_ethics', + 'clinical_knowledge', + 'college_biology', + 'college_chemistry', + 'college_computer_science', + 'college_mathematics', + 'college_medicine', + 'college_physics', + 'computer_security', + 'conceptual_physics', + 'econometrics', + 'electrical_engineering', + 'elementary_mathematics', + 'formal_logic', + 'global_facts', + 'high_school_biology', + 'high_school_chemistry', + 'high_school_computer_science', + 'high_school_european_history', + 'high_school_geography', + 'high_school_government_and_politics', + 'high_school_macroeconomics', + 'high_school_mathematics', + 'high_school_microeconomics', + 'high_school_physics', + 'high_school_psychology', + 'high_school_statistics', + 'high_school_us_history', + 'high_school_world_history', + 'human_aging', + 'human_sexuality', + 'international_law', + 'jurisprudence', + 'logical_fallacies', + 'machine_learning', + 'management', + 'marketing', + 'medical_genetics', + 'miscellaneous', + 'moral_disputes', + 'moral_scenarios', + 'nutrition', + 'philosophy', + 'prehistory', + 'professional_accounting', + 'professional_law', + 'professional_medicine', + 'professional_psychology', + 'public_relations', + 'security_studies', + 'sociology', + 'us_foreign_policy', + 'virology', + 'world_religions' + ] +choices = ["A", "B", "C", "D"] + +def format_subject(subject): + l = subject.split("_") + s = "" + for entry in l: + s += " " + entry + return s + + +def gen_prompt(train_df, subject, k=-1): + prompt = "The following are multiple choice questions (with answers) about {}.\n\n".format(format_subject(subject)) + if k == -1: + k = train_df.shape[0] + for i in range(k): + prompt += format_example(train_df, i) + return prompt + + +def format_example(df, idx, include_answer=True): + prompt = df.iloc[idx, 0] + k = df.shape[1] - 2 + for j in range(k): + prompt += "\n{}. {}".format(choices[j], df.iloc[idx, j+1]) + prompt += "\nAnswer:" + if include_answer: + prompt += " {}\n\n".format(df.iloc[idx, k + 1]) + return prompt + + + +def mmlu(config): + tokenizer = AutoTokenizer.from_pretrained(os.path.join(config.data_dir, config.weight_dir)) + records = [] + length = 0 + labels = [] + + for task in TASKS: + + logger.debug("Loading 5-shot " + str(task)) + + dev_df = pd.read_csv(os.path.join(config.data_dir, config.mmlu_dir, "dev", task + "_dev.csv"), header=None)[:config.few_shots] + test_df = pd.read_csv(os.path.join(config.data_dir, config.mmlu_dir, "test", task + "_test.csv"), header=None) + + for i in range(test_df.shape[0]): + k = config.few_shots + label = test_df.iloc[i, test_df.shape[1]-1] + prompt_end = format_example(test_df, i, include_answer=False) + prompt = gen_prompt(dev_df, task, k) + prompt = prompt + prompt_end + while len(tokenizer.tokenize(prompt)) + 1> 2048: + prompt_split = prompt.split("\n\n") + prompt_split.pop(1) + prompt = "\n\n".join(prompt_split) + records.append(prompt) + labels.append(label) + return records, labels + + +def hx_dataloader(config): + dataset = mmlu(config) + assert config.batch_size == 1 + + return dataset + + +def hx_model_forward(model, dataloader, evaluator, config): + if config.no_validation: + return None, None, None + pass + + +def cal_perf(config, tokens, duration, core_time, str_prefix): + model_forward_perf = config.repeat * tokens / duration + logger.info(str_prefix + "(" + config.framework + ") Perf: " + + str(model_forward_perf) + " tps") + model_forward_core_perf = config.repeat * tokens / core_time + logger.info(str_prefix + "(" + config.framework + ") core Perf: " + + str(model_forward_core_perf) + " tps") + return round(model_forward_perf, 3), round(model_forward_core_perf, 3) + + +def hx_engine_forward(model, dataloader, evaluator, config): + start = time.time() + core_time = 0.0 + foo_time = 0.0 + + token_cnt = 0 + correct = 0 + whole = 0 + + for times in range(config.repeat): + + logger.debug("Repeat: " + str(times + 1)) + data = dataloader[0] + label = dataloader[1] + for i in range(len(data)): + with torch.no_grad(): + torch_sync(config) + core_time_start = time.time() + + y = model(data[i]) + + torch_sync(config) + core_time += time.time() - core_time_start + + token_cnt += y[2] + foo_time += y[1] + model_outputs = y[0] + + r = evaluator(model_outputs, label[i]) + + correct += r + whole += 1 + + logger.info("MMLU" + str(config.few_shots) + "-shots Acc: " + str(correct / whole)) + + duration = time.time() - start + model_forward_perf, model_forward_core_perf = cal_perf( + config, token_cnt, duration, core_time - foo_time, "Inference") + + return model_forward_perf, model_forward_core_perf, round(correct / whole, 3) + + +def hx_model(config): + pass + + +def hx_export_model(model,config): + return None + + +def hx_evaluator(pred, y): + if pred == y: + return 1 + else: + return 0 + diff --git a/inference/docker_images/hexaflake/hexaflake_analysis.py b/inference/docker_images/hexaflake/hexaflake_analysis.py new file mode 100644 index 000000000..3d089cc89 --- /dev/null +++ b/inference/docker_images/hexaflake/hexaflake_analysis.py @@ -0,0 +1,18 @@ +import re + +def analysis_log(logpath): + logfile = open(logpath) + + max_usage = 0.0 ## usage_mem + max_mem = 0.0 + for line in logfile.readlines(): + ''' + hxsmi pwr DTemp MUsed Mem + ''' + if "/" in line: + line = line[:-1] + match = re.search(r'(\d+)MiB / (\d+)MiB', line) + max_mem = float(match.group(2)) + usage = float(match.group(1)) + max_usage = max(max_usage, usage) + return round(max_usage, 2), round(max_mem, 2), eval("30e12"), eval("120e12") diff --git a/inference/docker_images/hexaflake/hexaflake_monitor.py b/inference/docker_images/hexaflake/hexaflake_monitor.py new file mode 100644 index 000000000..ef7863668 --- /dev/null +++ b/inference/docker_images/hexaflake/hexaflake_monitor.py @@ -0,0 +1,258 @@ +# !/usr/bin/env python3 +# encoding: utf-8 +''' +Usage: python3 sys-monitor.py -o operation -l [log_path] + -o, --operation start|stop|restart|status + -l, --log log path , ./logs/ default +''' + +import os +import sys +import time +import signal +import atexit +import argparse +import datetime +from multiprocessing import Process +import subprocess +import schedule + + +class Daemon: + ''' + daemon subprocess class. + usage: subclass this daemon and override the run() method. + sys-monitor.pid: in the /tmp/, auto del when unexpected exit. + verbose: debug mode, disabled default. + ''' + + def __init__(self, + pid_file, + log_file, + err_file, + gpu_log, + log_path, + rate=5, + stdin=os.devnull, + stdout=os.devnull, + stderr=os.devnull, + home_dir='.', + umask=0o22, + verbose=0): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.home_dir = home_dir + self.verbose = verbose + self.pidfile = pid_file + self.logfile = log_file + self.errfile = err_file + self.gpufile = gpu_log + self.logpath = log_path + self.rate = rate + self.umask = umask + self.verbose = verbose + self.daemon_alive = True + + def get_pid(self): + try: + with open(self.pidfile, 'r') as pf: + pid = int(pf.read().strip()) + except IOError: + pid = None + except SystemExit: + pid = None + return pid + + def del_pid(self): + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + + def run(self): + ''' + NOTE: override the method in subclass + ''' + + def gpu_mon(file): + TIMESTAMP = datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') + cmd = "hxsmi | grep 'stb'| awk '{print $3 $4, $6 $7, $8 $9}' > temp1 && \ + hxsmi | grep 'MiB'| awk '{print $5, $6 $7, $8, $9 $10}' > temp2 && paste temp1 temp2 | awk '{$1=$1; print}' && rm temp1 temp2" ## pwr DTemp MUsed Mem + process = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding='utf-8') + try: + out = process.communicate(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + out = process.communicate() + + if process.returncode != 0: + result = "error" + result = TIMESTAMP + "\n" + out[0] + "\n" + with open(file, 'a') as f: + f.write(result) + + def timer_gpu_mon(): + gpu_process = Process(target=gpu_mon, args=(self.gpufile, )) + gpu_process.start() + + schedule.every(self.rate).seconds.do(timer_gpu_mon) + while True: + schedule.run_pending() + time.sleep(5) + + def daemonize(self): + if self.verbose >= 1: + print('daemon process starting ...') + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #1 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + os.chdir(self.home_dir) + os.setsid() + os.umask(self.umask) + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #2 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + sys.stdout.flush() + sys.stderr.flush() + si = open(self.stdin, 'r') + so = open(self.stdout, 'a+') + if self.stderr: + se = open(self.stderr, 'a+') + else: + se = so + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + atexit.register(self.del_pid) + pid = str(os.getpid()) + with open(self.pidfile, 'w+') as f: + f.write('%s\n' % pid) + + def start(self): + if not os.path.exists(self.logpath): + os.makedirs(self.logpath) + elif os.path.exists(self.gpufile): + os.remove(self.gpufile) + if self.verbose >= 1: + print('ready to start ......') + # check for a pid file to see if the daemon already runs + pid = self.get_pid() + if pid: + msg = 'pid file %s already exists, is it already running?\n' + sys.stderr.write(msg % self.pidfile) + sys.exit(1) + # start the daemon + self.daemonize() + self.run() + + def stop(self): + if self.verbose >= 1: + print('stopping ...') + pid = self.get_pid() + if not pid: + msg = 'pid file [%s] does not exist. Not running?\n' % self.pidfile + sys.stderr.write(msg) + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + return + # try to kill the daemon process + try: + i = 0 + while 1: + os.kill(pid, signal.SIGTERM) + time.sleep(1) + i = i + 1 + if i % 10 == 0: + os.kill(pid, signal.SIGHUP) + except OSError as err: + err = str(err) + if err.find('No such process') > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print(str(err)) + sys.exit(1) + if self.verbose >= 1: + print('Stopped!') + + def restart(self): + self.stop() + self.start() + + def status(self): + pid = self.get_pid() + if pid: + if os.path.exists('/proc/%d' % pid): + return pid + return False + + +def parse_args(): + ''' Check script input parameter. ''' + parse = argparse.ArgumentParser(description='Sys monitor script') + parse.add_argument('-o', + type=str, + metavar='[operation]', + required=True, + help='start|stop|restart|status') + parse.add_argument('-l', + type=str, + metavar='[log_path]', + required=False, + default='./logs/', + help='log path') + args = parse.parse_args() + return args + + +def main(): + sample_rate1 = 5 + args = parse_args() + operation = args.o + log_path = args.l + pid_fn = str('/tmp/hexaflake_monitor.pid') + log_fn = str(log_path + '/hexaflake_monitor.log') + err_fn = str(log_path + '/hexaflake_monitor.err') + # result for gpu + gpu_fn = str(log_path + '/hexaflake_monitor.log') + + subdaemon = Daemon(pid_fn, + log_fn, + err_fn, + gpu_fn, + log_path, + verbose=1, + rate=sample_rate1) + if operation == 'start': + subdaemon.start() + elif operation == 'stop': + subdaemon.stop() + elif operation == 'restart': + subdaemon.restart() + elif operation == 'status': + pid = subdaemon.status() + if pid: + print('process [%s] is running ......' % pid) + else: + print('daemon process [%s] stopped' % pid) + else: + print("invalid argument!") + sys.exit(1) + + +if __name__ == '__main__': + main() + diff --git a/inference/docker_images/hexaflake/pytorch_foo/Dockerfile b/inference/docker_images/hexaflake/pytorch_foo/Dockerfile new file mode 100644 index 000000000..da640a72f --- /dev/null +++ b/inference/docker_images/hexaflake/pytorch_foo/Dockerfile @@ -0,0 +1,8 @@ +FROM hexaflake_rocky:master.996 + +#RUN apt-get update +RUN pip3 install loguru +#RUN pip3 install pycuda +RUN pip3 install schedule +RUN pip3 install munch + diff --git a/inference/docker_images/hexaflake/pytorch_foo/pytorch_install.sh b/inference/docker_images/hexaflake/pytorch_foo/pytorch_install.sh new file mode 100644 index 000000000..da455f6fe --- /dev/null +++ b/inference/docker_images/hexaflake/pytorch_foo/pytorch_install.sh @@ -0,0 +1,7 @@ +#!/bin/bash +pip3 install ./packages/TopsInference-2.4.12-py3.8-none-any.whl + +dpkg -i ./sdk_installers/topsruntime_2.4.12-1_amd64.deb +dpkg -i ./sdk_installers/tops-sdk_2.4.12-1_amd64.deb + + diff --git a/inference/inference_engine/hexaflake/hx_infexec.py b/inference/inference_engine/hexaflake/hx_infexec.py new file mode 100644 index 000000000..45e9451c9 --- /dev/null +++ b/inference/inference_engine/hexaflake/hx_infexec.py @@ -0,0 +1,607 @@ +import os +import time +import torch +import argparse +import numpy as np +from ctypes import * +from tqdm import tqdm +from copy import deepcopy +from transformers.generation.utils import GenerationMixin +from transformers import ( + PretrainedConfig, + GenerationConfig, + LogitsProcessorList, + StoppingCriteriaList, + LogitsProcessor, +) + +from tcinfer import TcInfer + + +infer_lib = cdll.LoadLibrary("libtcinfer.so") + + +def align(size): + if size % 128 == 0: + return size + else: + return ((size // 128) + 1) * 128 + + +def check_ret(ret, msg): + if ret is None or ret != 0: + print(msg) + exit(1) + + +class InvalidScoreLogitsProcessor(LogitsProcessor): + def __call__( + self, input_ids: torch.LongTensor, scores: torch.FloatTensor + ) -> torch.FloatTensor: + if torch.isnan(scores).any() or torch.isinf(scores).any(): + scores.zero_() + scores[..., 5] = 5e4 + return scores + + +class GLMInfer(GenerationMixin): + def __init__(self, engine_file, **kwargs): + """A inference class + Args: + engine_file: engine file name + the value of data_type includes ['int8','uint8','bfloat16','float32'] + batch_size: the batch size of inference + max_batch_size: a config parameter for building engine + """ + self.engine_file = engine_file + self.batch_size = ( + 1 if kwargs.get("batch_size") is None else kwargs["batch_size"] + ) + self.static_batch_size = ( + 1 + if kwargs.get("static_batch_size") is None + else kwargs["static_batch_size"] + ) + self.batch_count = ( + 1 if kwargs.get("batch_count") is None else kwargs["batch_count"] + ) + self.batch_sync = ( + 1000 if kwargs.get("batch_sync") is None else kwargs["batch_sync"] + ) + self.data_type = ( + "float32" if kwargs.get("data_type") is None else kwargs["data_type"] + ) + self.max_batch_size = ( + 0 if kwargs.get("max_batch_size") is None else kwargs["max_batch_size"] + ) + self.print_throughput = ( + False + if kwargs.get("print_throughput") is None + else kwargs["print_throughput"] + ) + self.in_out_nparray = ( + False if kwargs.get("in_out_nparray") is None else kwargs["in_out_nparray"] + ) + self.input_files = "" if kwargs.get("inputs") is None else kwargs["inputs"] + self.output_dir = ( + "" if kwargs.get("output_dir") is None else kwargs["output_dir"] + ) + self.use_cache = ( + False if kwargs.get("use_cache") is None else kwargs["use_cache"] + ) + self.rl2 = -1 if kwargs.get("resident_l2") is None else kwargs["resident_l2"] + self.dev_ids_ = [] if kwargs.get("dev_ids") is None else kwargs["dev_ids"] + self.dev_dram_limit = ( + [] if kwargs.get("dev_dram_limit") is None else kwargs["dev_dram_limit"] + ) + self.dump_golden = ( + 0 if kwargs.get("dump_golden") is None else kwargs["dump_golden"] + ) + self.split_strategy = ( + 0 if kwargs.get("split_strategy") is None else kwargs["split_strategy"] + ) + self.config_file = ( + "" if kwargs.get("config_file") is None else kwargs["config_file"] + ) + self.base_length = ( + 256 if kwargs.get("base_length") is None else kwargs["base_length"] + ) + self.engine_version = ( + 0 if kwargs.get("engine_version") is None else kwargs["engine_version"] + ) + self.total = 2048 if kwargs.get("total") is None else kwargs["total"] + self.constant_output = ( + False + if kwargs.get("constant_output") is None + else kwargs["constant_output"] + ) + + self.__check_args() + self.released_ = False + if len(self.dev_ids_) == 0: + self.dev_ids_ = [0] + print("self.engine_file: ", self.engine_file) + self.inf_obj = TcInfer( + self.engine_file, + use_cache=True, + max_batch_size=self.max_batch_size, + dev_ids=self.dev_ids_, + dev_dram_limit=self.dev_dram_limit, + dump_golden=self.dump_golden, + split_strategy=self.split_strategy, + engine_version=self.engine_version, + ) + self.random_input = False + self.next_input = [] + self.type_size_ = {1: "uint8", 2: "bfloat16", 4: "float32"} + self.base_output_shift_ = 0 + + self.base_infer_time = 0 + self.inc_infer_time = 0 + self.max_new_tokens = 32 + + self.infered_tokens = 0 + + self.pos_ = None + self.attention_mask = None + if self.static_batch_size > 1: + self.multi_batch_finish = np.array([False] * self.static_batch_size) + + self.base_round = 1 + + if not os.path.isfile(self.config_file): + check_ret(None, "{} not exists.".format(self.config_file)) + self.config = PretrainedConfig().from_json_file(self.config_file) + self.generation_config = GenerationConfig().from_model_config(self.config) + self.output = [] + + def __check_args(self): + if self.engine_file == "": + check_ret(1, "Missing engine file") + if not os.path.exists(self.engine_file): + check_ret(1, "engine file:{} not exitsts.".format(self.engine_file)) + if self.output_dir != "": + if not os.path.exists(self.output_dir): + msg = "output directory:{} not exists".format(self.output_dir) + check_ret(1, msg) + + def __deal_use_cache_output(self, model_idx): + finish_token = 2 + self.all_finished = True + if model_idx == 0: + self.use_cache_output_ = {} + self.finish_flag_ = {} + static_batches = len(self.next_input[0]) + for job in range(self.batch_size): + if self.use_cache_output_.get(job) is None: + self.use_cache_output_[job] = {} + self.finish_flag_[job] = False + + if not self.finish_flag_[job]: + for static_batch in range(static_batches): + if self.use_cache_output_[job].get(static_batch) is None: + self.use_cache_output_[job][static_batch] = [] + if ( + len(self.use_cache_output_[job][static_batch]) == 0 + or self.use_cache_output_[job][static_batch][-1] != finish_token + ): + self.use_cache_output_[job][static_batch].append( + self.next_input[job][static_batch] + ) + + next_token = torch.tensor(self.next_input[job]) + if static_batches > 1 and not self.constant_output: + mask_token = next_token == 2 + self.multi_batch_finish[mask_token] = True + if all(self.multi_batch_finish): + self.all_finished = True + return + if self.stop_inference(next_token, job): + self.finish_flag_[job] = True + else: + self.all_finished = False + + def get_static_batch(self): + return self.inf_obj.get_static_batch() + + def set_batch_size(self, batch_size): + if self.batch_size == batch_size: + return + self.batch_size = batch_size + self.inf_obj.set_batch_size(batch_size) + + def set_batch_count(self, batch_count): + self.batch_count = batch_count + + def __internal_post_process(self, model_idx, token_idx=0): + self.next_input.clear() + output_binding_idx = 3 + for job in range(self.batch_size): + out_array = self.inf_obj.get_output_by_binding_index( + job, output_binding_idx, model_idx + ) + if model_idx == 0 and self.base_output_shift_ == 0: + self.base_output_shift_ = self.base_length + one_batch = [] + next_token = self.get_next_token(out_array[:, -1, :], job) + for i in range(self.static_batch_size): + one_batch.append(int(next_token[i])) + self.next_input.append(one_batch) + + def __infer_base(self, inputs): + model_index = 0 + base = 0 + for r in range(self.base_round): + for i in range(len(inputs)): + input_ids, pos, attention_mask = inputs[i] + input_ids_ = input_ids[:, base : base + self.base_length] + pos_ = pos[:, base : base + self.base_length] + partial_mask = np.concatenate( + ( + np.zeros([1, 1, 1, base + self.base_length], dtype="bool"), + np.ones( + [1, 1, 1, self.total - base - self.base_length], + dtype="bool", + ), + ), + axis=-1, + ) + attention_mask_ = ( + attention_mask[:, :, base : base + self.base_length :, :] + | partial_mask + ) + self.inf_obj.set_input(i, "pos", pos_) + self.inf_obj.set_input(i, "input_ids", input_ids_) + self.inf_obj.set_input(i, "attention_mask", attention_mask_) + + # batch_size, r, model_idx + self.inf_obj.run(self.batch_size, r, model_index) + base += self.base_length + self.__internal_post_process(model_index) + + def __get_mask(self, job_index, token_idx): + mask_ids = self.tokens["attention_mask"] + new_mask = torch.cat( + (torch.tensor(mask_ids), torch.ones((self.static_batch_size, token_idx + 1))), -1 + ) + expanded_mask = new_mask[:, None, None, :].expand( + self.static_batch_size, 1, 1, new_mask.shape[-1] + ) + inverted_mask = 1.0 - expanded_mask + inc_mask = np.ones((self.static_batch_size, 1, 1, self.total)) + inc_mask[..., : new_mask.shape[-1]] = inverted_mask + inc_mask = inc_mask.astype("bool") + return inc_mask + + def __set_binding_inc(self, token_idx): + model_idx = 1 + for job_index in range(self.batch_size): + input_ids = np.array(self.next_input[job_index]).astype("int32") + pos = np.array(self.pos_sum[job_index]).astype("int32") + self.inf_obj.set_input(job_index, "pos", pos, model_idx) + mask = self.__get_mask(job_index, token_idx) + self.inf_obj.set_input(job_index, "input_ids", input_ids, model_idx) + self.inf_obj.set_input(job_index, "attention_mask", mask, model_idx) + + def __infer_incremental(self, token_idx): + model_idx = 1 + self.__set_binding_inc(token_idx) + context = token_idx + self.base_output_shift_ + self.inf_obj.run(self.batch_size, context, model_idx) + self.__internal_post_process(model_idx, token_idx) + + def show_throughput(self): + base_spent_time = self.base_infer_time * 1000000 + self.static_batch_size = self.get_static_batch() + jobs = self.batch_count * self.static_batch_size * self.batch_size + total_tokens = jobs * self.infered_tokens + base_tokens = jobs * self.base_round + inc_spent_time = self.inc_infer_time * 1000000 + + print("============================================") + print("base infer time: {:.6f} (s)".format(self.base_infer_time)) + print( + "base throughput: {:.3f} (tokens/s)".format( + (base_tokens * 1000000) / base_spent_time + ) + ) + inc_tokens = jobs * (self.infered_tokens - 1) + print("incremental infer time: {:.6f} (s)".format(self.inc_infer_time)) + print( + "incremental throughput: {:.3f} (tokens/s)".format( + (inc_tokens * 1000000) / inc_spent_time + ) + ) + + print( + "infer time: {:.6f} (s)".format(self.base_infer_time + self.inc_infer_time) + ) + print(f"infer tokens: {total_tokens}") + print( + "average throughput: {:.3f} (tokens/s)".format( + (total_tokens * 1000000) / (inc_spent_time + base_spent_time) + ) + ) + print("============================================") + + def generate_pos_attention_mask(self, inputs): + self.pos_ = [] + self.attention_mask = [] + self.pos_sum = [] + for inp in inputs: + self.pos_.append(inp[1].astype("int32")) + self.attention_mask.append(inp[2]) + self.pos_sum.append(inp[1][:, -1] + 1) + + def set_temperature(self, inputs, temperature=0.85, top_p=1, do_sample=False): + self.input_ids = [] + for dynamic_batch_idx in range(self.batch_size): + self.input_ids.append(inputs[dynamic_batch_idx][0]) + + self.generation_config.temperature = temperature + self.generation_config.top_p = top_p + self.generation_config.do_sample = do_sample + self.generation_config.top_k = self.top_k + self.generation_config.repetition_penalty = self.repeat_penalty + self.generation_config.typical_p = self.typical_p + + batch_size, input_ids_seq_length = ( + self.input_ids[0].shape[0], + self.input_ids[0].shape[-1], + ) + + generation_config = self.generation_config + bos_token_id, eos_token_id = ( + generation_config.bos_token_id, + generation_config.eos_token_id, + ) + if isinstance(eos_token_id, int): + eos_token_id = [eos_token_id] + self.eos_token_id = eos_token_id + self.eos_token_id_tensor = torch.tensor(eos_token_id) + + has_default_max_length = generation_config.max_length is not None + if has_default_max_length and generation_config.max_new_tokens is None: + pass + elif generation_config.max_new_tokens is not None: + generation_config.max_length = ( + generation_config.max_new_tokens + input_ids_seq_length + ) + + logits_processor = LogitsProcessorList() + logits_processor.append(InvalidScoreLogitsProcessor()) + + self.logits_processor = [] + for dynamic_batch_idx in range(self.batch_size): + self.logits_processor.append( + self._get_logits_processor( + generation_config=generation_config, + input_ids_seq_length=input_ids_seq_length, + encoder_input_ids=self.input_ids[dynamic_batch_idx].astype("int64"), + prefix_allowed_tokens_fn=None, + logits_processor=logits_processor, + ) + ) + self.stopping_criteria = self._get_stopping_criteria( + generation_config=generation_config, + stopping_criteria=StoppingCriteriaList(), + ) + self.logits_warper = self._get_logits_warper(generation_config) + + self.scores = None + + def _apply_penalties(self, logits): + """ + reference: vllm + presence_penalty: Float that penalizes new tokens based on whether they + appear in the generated text so far. Values > 0 encourage the model + to use new tokens, while values < 0 encourage the model to repeat + tokens. + frequency_penalty: Float that penalizes new tokens based on their + frequency in the generated text so far. Values > 0 encourage the + model to use new tokens, while values < 0 encourage the model to + repeat tokens. + """ + output_tokens_tensor = torch.tensor(self.output) + vocab_size = self.config.vocab_size + bin_counts = torch.zeros((self.static_batch_size, vocab_size + 1), dtype=torch.long) + bin_counts.scatter_add_(1, output_tokens_tensor, torch.ones_like(output_tokens_tensor)) + bin_counts = bin_counts[:, :vocab_size] + frequency_penalties = torch.tensor([[self.frequency_penalties]], dtype=logits.dtype) + presence_penalties = torch.tensor([[self.presence_penalties]], dtype=logits.dtype) + logits = logits - frequency_penalties * bin_counts + logits = logits - presence_penalties * (bin_counts > 0) + return logits + + def _do_sample(self, next_token_scores): + if self.generation_config.do_sample: + probs = torch.softmax(next_token_scores, dim=-1).numpy() + next_token = ( + torch.multinomial(torch.Tensor(probs), num_samples=1).squeeze(1).numpy() + ) + else: + next_token = torch.argmax(next_token_scores, dim=-1).numpy() + return next_token + + def get_next_token(self, next_token_logits, dynamic_batch_idx): + # pre-process distribution + t_input_ids = torch.LongTensor(self.input_ids[dynamic_batch_idx]) + next_token_scores = torch.Tensor(next_token_logits) + next_token_scores = self.logits_processor[dynamic_batch_idx]( + t_input_ids, next_token_scores + ) + next_token = self._do_sample(next_token_scores) + if len(self.output) > 0 and (self.frequency_penalties != 0. or self.presence_penalties != 0.): + for _b, _token in enumerate(next_token.flatten().tolist()): + self.output[_b].append(_token) + next_token_logits = self._apply_penalties(next_token_scores) + next_token = self._do_sample(next_token_logits) + + return next_token + + def stop_inference(self, next_token, dynamic_batch_idx): + t_input_ids = torch.LongTensor(self.input_ids[dynamic_batch_idx]) + unfinished_sequences = t_input_ids.new(t_input_ids.size(0)).fill_(1) + self.input_ids[dynamic_batch_idx] = np.concatenate( + [self.input_ids[dynamic_batch_idx], next_token[:, None]], axis=-1 + ) + unfinished_sequences = unfinished_sequences.mul( + next_token.tile(self.eos_token_id_tensor.shape[0], 1) + .ne(self.eos_token_id_tensor.unsqueeze(1)) + .prod(dim=0) + ) + # stop when each sentence is finished, or if we exceed the maximum length + if not self.constant_output: + if unfinished_sequences.max() == 0 or self.stopping_criteria( + self.input_ids[dynamic_batch_idx], self.scores + ): + return True + return False + + def inference_use_cache( + self, + inputs: list, + max_new_tokens, + tokens=None, + base_round=1, + temperature=0.85, + top_p=1, + top_k=0., + do_sample=False, + typical_p = 0.1, + repeat_penalty= 1.0, + presence_penalties = 0., + frequency_penalties = 0., + ): + if not self.use_cache: + print("engine not created with user cache.") + return + repeat_penalty = 1.0 + self.typical_p = typical_p + self.repeat_penalty = repeat_penalty + self.top_k = top_k + self.presence_penalties = presence_penalties + self.frequency_penalties = frequency_penalties + self.set_temperature( + inputs, temperature=temperature, top_p=top_p, do_sample=do_sample + ) + + output = [] + self.tokens = tokens + self.max_new_tokens = max_new_tokens + total_steps = self.batch_count * self.max_new_tokens + self.base_infer_time = 0 + self.inc_infer_time = 0 + self.base_round = base_round + self.generate_pos_attention_mask(inputs) + self.inf_obj.clear_use_cache() + pbar = tqdm(total=total_steps, desc="Infering") + for i in range(self.batch_count): + self.base_output_shift_ = 0 + self.infered_tokens = 0 + t0 = time.perf_counter() + self.__infer_base(inputs) + self.__deal_use_cache_output(0) + t1 = time.perf_counter() + self.base_infer_time = self.base_infer_time + (t1 - t0) + total_steps = total_steps - 1 + self.infered_tokens = self.infered_tokens + self.base_round + self.base_output_shift_ += (self.base_round - 1) * self.base_length + pbar.update(1) + self.output = deepcopy([self.use_cache_output_[0][i] for i in self.use_cache_output_[0]]) + for token_idx in range(max_new_tokens - 1): + self.__infer_incremental(token_idx) + self.__deal_use_cache_output(1) + pbar.update(1) + self.infered_tokens = self.infered_tokens + 1 + total_steps = total_steps - 1 + for job, _ in enumerate(self.pos_sum): + if self.static_batch_size > 1 and not self.constant_output: + pos_mask = (1 - self.multi_batch_finish).astype("bool") + self.pos_sum[job][pos_mask] += 1 + else: + self.pos_sum[job] += 1 + if self.all_finished: + pbar.update(total_steps) + break + t2 = time.perf_counter() + self.inc_infer_time = self.inc_infer_time + (t2 - t1) + output.append(self.use_cache_output_) + return output + + +def main(): + parser = argparse.ArgumentParser( + description="This program is used to" " infer with python api" + ) + parser.add_argument( + "--engine-file", + action="store", + type=str, + default="", + help="set the engine file", + ) + parser.add_argument( + "--batch-size", + action="store", + type=int, + default=1, + help="default 1, set the size of every batch", + ) + parser.add_argument( + "--batch-count", + action="store", + type=int, + default=1, + help="default 1, set the batch count of infer", + ) + parser.add_argument( + "--batch_sync", + action="store", + type=int, + default=1000, + help="default 1000, set number of batchs do a synchronize", + ) + parser.add_argument( + "--max-batch-size", + action="store", + type=int, + default=0, + help="default 0, set max batch size", + ) + parser.add_argument( + "--inputs", + action="store", + type=str, + default="", + help="default with random inputs, format:" + "job1_tensor1,job1_tensor2,...:job2_tensor1,job2_tensor2,...", + ) + parser.add_argument( + "--output-dir", + action="store", + type=str, + default="", + help="set the dump directory of output tensors, " + "output tensor subpath format is " + "job_{job No}/{tensor name}.dat", + ) + + args = parser.parse_args() + + TcInfer( + args.engine_file, + batch_size=args.batch_size, + batch_count=args.batch_count, + batch_sync=args.batch_sync, + max_batch_size=args.max_batch_size, + inputs=args.inputs, + output_dir=args.output_dir, + print_throughput=True, + in_out_nparray=False, + ) + + +if __name__ == "__main__": + main() + diff --git a/inference/inference_engine/hexaflake/pytorch_foo.py b/inference/inference_engine/hexaflake/pytorch_foo.py new file mode 100644 index 000000000..805e06bcb --- /dev/null +++ b/inference/inference_engine/hexaflake/pytorch_foo.py @@ -0,0 +1,221 @@ +import onnx +import onnxruntime +import torch +import os +import subprocess +import numpy as np +import time +import shutil +import argparse +import json +import hxrt as rt +import pickle as pk +from transformers import AutoTokenizer +from copy import deepcopy +from loguru import logger + +from .hx_infexec import GLMInfer + + +class Graph(object): + def __init__( + self, + engine, + tokenizer_path, + static_batch_size, + batch_size, + input_index, + max_batch_size, + base_length, + constant_output=False, + dev_ids = [], + dev_dram_limit = [], + dump_golden = [], + split_strategy = [], + ): + global engine_version + self.static_batch_size = static_batch_size + self.dynamic_batch_size = batch_size + self.base_length = base_length + self.engine = engine + + self.model = GLMInfer( + engine, + batch_count=1, + static_batch_size=static_batch_size, + batch_size=batch_size, + in_out_nparray=True, + use_cache=True, + max_batch_size=max_batch_size, + dev_ids=dev_ids, + dev_dram_limit=dev_dram_limit, + dump_golden=dump_golden, + split_strategy=split_strategy, + config_file=os.path.join(tokenizer_path, "config.json"), + base_length=base_length, + engine_version=engine_version, + constant_output=constant_output, + total=total, + ) + self.tokenizer = AutoTokenizer.from_pretrained( + tokenizer_path, trust_remote_code=True + ) + self.tokenizer.pad_token = self.tokenizer.eos_token + self.input_index = input_index + + + def graph_run(self, input_data, prompt, max_new_tokens=2048, temperature=0.85, top_p=1, do_sample=True, dump_result=""): + input_data_ = [] + max_len = 0 + for job in range(self.dynamic_batch_size): + for i in range(self.static_batch_size): + input_data_.append(prompt) + input_ids = self.tokenizer(prompt, return_tensors="pt")["input_ids"] + max_len = max(max_len, len(input_ids)) + + base_round = max_len // self.base_length + m = max_len % self.base_length + + if m != 0: + base_round += 1 + + total_pre = base_round * self.base_length + inputs = [] + + questions = [] + base_dynamic_batch = 0 + for job in range(self.dynamic_batch_size): + one_batch = [] + one_pos = [] + batch_input = [] + tokens = self.tokenizer( + input_data_[ + base_dynamic_batch : base_dynamic_batch + self.static_batch_size + ], + padding="max_length", + max_length=total_pre, + ) + + input_ids = np.array(tokens["input_ids"]).astype("int32") + batch_size, input_ids_seq_length = input_ids.shape[0], input_ids.shape[-1] + batch_input.append(input_ids) + + token_mask = torch.tensor(tokens["attention_mask"]) + pos = np.ones(input_ids.shape) + pos_sum = [i.sum().item() for i in token_mask] + for idx, pp in enumerate(pos_sum): + pos[idx, -pp:] = np.arange(pp) + pos = pos.astype("int32") + batch_input.append(pos) + + mask = deepcopy(token_mask) + mask_cond = torch.arange(mask.size(-1)) + mask = 1 - (mask_cond < (mask_cond + 1).view(mask.size(-1), 1)).to(torch.int32) + mask = mask[None, None, :, :].expand(self.static_batch_size, 1, input_ids_seq_length, input_ids_seq_length) + inc_mask = 1.0 - token_mask + inc_mask = inc_mask[:, None, None, :].expand(self.static_batch_size, 1, input_ids_seq_length, input_ids_seq_length) + base_mask = (inc_mask + mask).numpy() + attention_mask = np.ones((self.static_batch_size, 1, input_ids_seq_length, total)) + attention_mask[..., :input_ids_seq_length] = base_mask + attention_mask = attention_mask.astype("bool") + + batch_input.append(attention_mask) + inputs.append(batch_input) + base_dynamic_batch += self.static_batch_size + + outputs = self.model.inference_use_cache( + inputs, + max_new_tokens, + tokens=tokens, + base_round=base_round, + temperature=temperature, + top_p=top_p, + do_sample=do_sample, + ) + + res = [] + index = 0 + batch_count = 0 + for job in range(self.dynamic_batch_size): + for s in range(self.static_batch_size): + out_ids = outputs[batch_count][job][s] + results = self.tokenizer.decode(out_ids) + res.append(out_ids) + answer = results + index = index + 1 + if index >= len(input_data_): + break + + if dump_result != "": + with open(dump_result, "w") as f: + pk.dump(res, f) + return answer, max_len + + +class InferModel: + + def __init__(self, config, ngf_path, model): + self.input_names = [] + + self.dev_ids = [] + self.dev_dram_limit = [] + self.dump_golden = 0 + self.split_strategy = 0 + self.engine_version = 0 + self.dump_result = "" + + self.static_batch_size = 1 + self.dynamic_batch_size = 1 + self.engine = "/home/FlagPerf/data/llama2_7b/llama2_7b_2048_bf16_q/llama2_7b_2048_bf16_q_multi.ngf" + self.max_batch_size = 1 + self.input_index = 0 + self.base_length = 256 + self.max_new_tokens = 1 + self.temperature = 0.8 + self.top_p = 1.2 + self.do_sample = True + global engine_version + engine_version = 0 + global total + total = 2048 + self.top_k = 0 + self.constant_output = 0 + self.typical_p = 0.5 + self.repeat_penalty = 0. + self.presence_penalties = 0. + self.frequency_penalties = 0. + + self.default_questions = ["hello"] + self.input_data = self.default_questions + self.tokenizer_path = "/home/FlagPerf/data/dataset/llama2_7b_hf" + self.g = Graph( + self.engine, + self.tokenizer_path, + self.static_batch_size, + self.dynamic_batch_size, + self.input_index, + self.max_batch_size, + self.base_length, + constant_output=self.constant_output, + ) + + + def __call__(self, model_inputs: list): + + prompt = model_inputs + y = self.g.graph_run( + self.input_data, + prompt, + self.max_new_tokens, + temperature=self.temperature, + top_p=self.top_p, + do_sample=self.do_sample, + dump_result=self.dump_result, + ) + + tokens = y[1] + res = y[0] + + foo_time = 0.0 + return res, foo_time, tokens + diff --git a/inference/run_hexaflake.py b/inference/run_hexaflake.py new file mode 100644 index 000000000..084e294db --- /dev/null +++ b/inference/run_hexaflake.py @@ -0,0 +1,637 @@ +# Copyright (c) 2023 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- +''' TODO Copyright and Other info ''' + +import os +import sys +import ast +import time +import yaml +import importlib +from munch import DefaultMunch +import getpass +from loguru import logger +from collections import namedtuple + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) +sys.path.append(os.path.abspath(os.path.join(CURR_PATH, "../"))) +from utils import cluster_manager, image_manager + +VERSION = "v0.1" +CLUSTER_MGR = cluster_manager.ClusterManager() + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) + + +def print_welcome_msg(): + '''Print colorful welcome message to console.''' + logger.log( + "Welcome", + "\033[1;34;40m==============================================\033[0m") + logger.log("Welcome", + "\033[1;36;40m Welcome to FlagPerf Inference!\033[0m") + logger.log( + "Welcome", + "\033[1;36;40m See more at https://github.com/FlagOpen/FlagPerf \033[0m" + ) + logger.log( + "Welcome", + "\033[1;34;40m==============================================\033[0m") + + +def init_logger(config): + logger.remove() + """ + define "EVENTS", using logger.log("EVENT",msg) to log + #21 means just important than info(#20), less than warning(#30) + """ + logger.level("Welcome", no=21) + + timestamp_log_dir = "run" + time.strftime("%Y%m%d%H%M%S", time.localtime()) + curr_log_path = config.FLAGPERF_PATH + "/" + config.FLAGPERF_LOG_PATH + "/" + timestamp_log_dir + logfile = curr_log_path + "/host.out.log" + + logger.remove() + + if config.LOG_CALL_INFORMATION: + logger.add(logfile, level=config.FLAGPERF_LOG_LEVEL) + logger.add(sys.stdout, level=config.FLAGPERF_LOG_LEVEL) + else: + logger.add(logfile, + level=config.FLAGPERF_LOG_LEVEL, + format="{time} - {level} - {message}") + logger.add(sys.stdout, + level=config.FLAGPERF_LOG_LEVEL, + format="{time} - {level} - {message}") + return curr_log_path + + +def usage(): + ''' Show usage and exit with exit_code. ''' + print("Usage: python3 ", __file__) + print("Edit config file test_conf.py & cluster_conf.py in " + "training/run_benchmarks/config and run.") + sys.exit(0) + + +def check_cluster_health(): + ''' Try to ssh login to all the hosts in cluster_conf.hosts. + Return None if everything goes well. + ''' + logger.debug("Cluster healthcheck ssh. Hosts are: " + + ",".join(CLUSTER_MGR.get_hosts_list())) + bad_hosts = CLUSTER_MGR.healthcheck() + if len(bad_hosts) != 0: + for bad_host in bad_hosts: + logger.error("Check " + bad_host + " failed. ssh command exit " + "with: " + str(bad_hosts[bad_host])) + logger.error("Check hosts in the cluster......[FAILED] [EXIT]") + sys.exit(3) + logger.info("Check hosts in the cluster......[SUCCESS]") + + +def _get_deploy_path(config): + '''Return deploy path according to FLAGPERF_LOG_PATH_HOST in host.yaml.''' + if 'FLAGPERF_PATH' not in config.__dict__.keys() \ + or config.FLAGPERF_PATH is None: + dp_path = CURR_PATH + else: + dp_path = os.path.abspath(config.FLAGPERF_PATH) + return dp_path + + +def check_cluster_deploy_path(dp_path): + '''Make sure that flagperf is deployed on all the hosts + ''' + logger.debug("Check flagperf deployment path: " + dp_path) + bad_hosts = CLUSTER_MGR.run_command_all_hosts("cd " + dp_path) + if len(bad_hosts) != 0: + logger.error("Hosts that can't find deployed path: " + + ",".join(bad_hosts.keys())) + logger.error("Check cluster deploy path " + dp_path + + "......[FAILED] [EXIT]") + sys.exit(3) + logger.info("Check flagperf deployment path: " + dp_path + "...[SUCCESS]") + + +def check_test_host_config(config): + ''' Check test config. + Make sure all CASES are configed. + ''' + logger.debug("Check config in host.yaml") + must_para = [ + 'FLAGPERF_LOG_PATH', 'FLAGPERF_LOG_PATH', 'VENDOR', + 'FLAGPERF_LOG_LEVEL', 'HOSTS', 'SSH_PORT', 'HOSTS_PORTS', + 'MASTER_PORT', 'SHM_SIZE', 'ACCE_CONTAINER_OPT', 'PIP_SOURCE', + 'CLEAR_CACHES', 'ACCE_VISIBLE_DEVICE_ENV_NAME', 'CASES' + ] + + for para in must_para: + if para not in config.__dict__.keys(): + logger.error(f"{para} MUST be set in host.yaml...[EXIT]") + sys.exit(2) + logger.info("Check host.yaml...[SUCCESS]") + + +def check_case_config(case, case_config, vendor): + '''Check config of the testcase. Make sure its path exists, framework is + right and config file exists. + ''' + logger.debug("Check config of test case: " + case) + must_configs = [ + "model", "framework", "data_dir_host", "data_dir_container" + ] + for config_item in case_config.keys(): + if config_item in must_configs: + must_configs.remove(config_item) + if len(must_configs) > 0: + logger.warning("Case " + case + " misses some config items: " + + ",".join(must_configs)) + return False + logger.debug("Check config of test case: " + case + " ...[SUCCESS]") + return True + + +def prepare_docker_image_cluster(dp_path, image_mgr, framework, nnodes, + config): + '''Prepare docker image in registry and in the cluster. + ''' + vendor = config.VENDOR + image_vendor_dir = os.path.join( + CURR_PATH, "docker_images/" + vendor + "/" + framework) + image_name = image_mgr.repository + ":" + image_mgr.tag + logger.debug("Prepare docker image in cluster. image_name=" + image_name + + " image_vendor_dir=" + image_vendor_dir) + prepare_image_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/image_manager.py -o build -i " \ + + image_mgr.repository + " -t " + image_mgr.tag \ + + " -d " + image_vendor_dir + " -f " + framework + timeout = 1200 + logger.debug("Run cmd in the cluster to prepare docker image: " + + prepare_image_cmd + " timeout=" + str(timeout)) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(prepare_image_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't pull image: " + + ",".join(bad_hosts.keys())) + return False + return True + + +def prepare_running_env(dp_path, container_name, case_config): + '''Install extensions and setup env before start task in container. + ''' + nnodes = case_config["nnodes"] + model = case_config["model"] + framework = case_config["framework"] + prepare_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runcmdin -c " \ + + container_name + " -t 1800 -r \"python3 " \ + + config.FLAGPERF_PATH + "/" \ + + "/utils/prepare_in_container.py --framework " \ + + framework + " --model " + model + " --vendor " \ + + config.VENDOR + " --pipsource " + config.PIP_SOURCE + "\"" + timeout = 1800 + logger.debug("Run cmd in the cluster to prepare running environment: " + + prepare_cmd + " timeout=" + str(timeout)) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(prepare_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't prepare running environment " + + "properly: " + ",".join(bad_hosts.keys())) + return False + return True + + +def start_container_in_cluster(dp_path, run_args, container_name, image_name, + nnodes): + '''Call CLUSTER_MGR tool to start containers.''' + start_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runnew " \ + + " -c " + container_name + " -i " + image_name + " -a \"" \ + + run_args + "\"" + logger.debug("Run cmd in the cluster to start container: " + start_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(start_cmd, nnodes, 600) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start docker container: " + + ",".join(bad_hosts.keys())) + return False + return True + + +def stop_container_in_cluster(dp_path, container_name, nnodes): + '''Call CLUSTER_MGR tool to stop containers.''' + stop_cont_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o stop" \ + + " -c " + container_name + logger.debug("Run cmd to stop container(s) in the cluster:" + + stop_cont_cmd) + failed_hosts = CLUSTER_MGR.run_command_some_hosts(stop_cont_cmd, nnodes, + 60) + if len(failed_hosts) != 0: + logger.warning("Hosts that stop container " + container_name + + " failed:" + ",".join(failed_hosts.keys()) + + " Continue.") + return False + logger.info("All containers stoped in the cluster") + return True + + +def clear_caches_cluster(clear, nnodes): + '''Set vm.drop to clean the system caches.''' + if not clear: + logger.info("Caches clear config is NOT set.") + return + + clear_cmd = "sync && echo password | sudo -S /sbin/sysctl vm.drop_caches=3" + timeout = 30 + logger.debug("Run cmd in the cluster to clear the system cache: " + + clear_cmd + " timeout=" + str(timeout)) + failed_hosts = CLUSTER_MGR.run_command_some_hosts(clear_cmd, nnodes, + timeout) + if len(failed_hosts) != 0: + logger.warning("Hosts that clear cache failed: " + + ",".join(failed_hosts.keys()) + ". Continue.") + logger.info("Clear system caches if it set......[SUCCESS]") + + +def start_monitors_in_cluster(dp_path, case_log_dir, nnodes): + '''Start sytem and vendor's monitors.''' + start_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/sys_monitor.py -o restart -l " + timeout = 60 + logger.debug("Run cmd in the cluster to start system monitors: " + + start_mon_cmd) + bad_hosts = CLUSTER_MGR.start_monitors_some_hosts(start_mon_cmd, + case_log_dir, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start system monitors: " + + ",".join(bad_hosts.keys())) + + ven_mon_path = os.path.join(dp_path, "docker_images", config.VENDOR, + config.VENDOR + "_monitor.py") + start_mon_cmd = "cd " + dp_path + " && echo password | sudo -S " + sys.executable \ + + " " + ven_mon_path + " -o restart -l " + logger.debug("Run cmd in the cluster to start vendor's monitors: " + + start_mon_cmd) + bad_hosts = CLUSTER_MGR.start_monitors_some_hosts(start_mon_cmd, + case_log_dir, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start vendor's monitors: " + + ",".join(bad_hosts.keys())) + + +def stop_monitors_in_cluster(dp_path, nnodes): + '''Stop sytem and vendor's monitors.''' + stop_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/sys_monitor.py -o stop" + timeout = 60 + logger.debug("Run cmd in the cluster to stop system monitors: " + + stop_mon_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(stop_mon_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't stop system monitors: " + + ",".join(bad_hosts.keys())) + + ven_mon_path = os.path.join(dp_path, "docker_images", config.VENDOR, + config.VENDOR + "_monitor.py") + stop_mon_cmd = "cd " + dp_path + " && echo password | sudo -S " + sys.executable \ + + " " + ven_mon_path + " -o stop" + logger.debug("Run cmd in the cluster to stop vendor's monitors: " + + stop_mon_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(stop_mon_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't stop vendor's monitors: " + + ",".join(bad_hosts.keys())) + + +def start_tasks_in_cluster(dp_path, container_name, case_config, curr_log_path, + config): + '''Start tasks in cluster, and NOT wait.''' + nnodes = case_config["nnodes"] + env_file = os.path.join( + config.FLAGPERF_PATH, config.VENDOR, + case_config["model"] + "-" + case_config["framework"], + "config/environment_variables.sh") + + must_configs = [ + "FLAGPERF_PATH", "FLAGPERF_LOG_PATH", "MODEL", "VENDOR", + "FLAGPERF_LOG_LEVEL" + ] + new_case_config = {"DATA_DIR": case_config["data_dir_container"]} + new_case_config = {"FLAGPERF_LOG_PATH": curr_log_path} + + for cfg in must_configs: + new_case_config[cfg] = getattr(config, cfg) + + start_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runcmdin -c " \ + + container_name + " -r \"" \ + + f"export vendor=" + getattr(config, "VENDOR") + ";" \ + + f"python3 run_inference.py" \ + + f" --perf_dir " + getattr(config, "FLAGPERF_PATH") \ + + f" --loglevel " + getattr(config, "FLAGPERF_LOG_LEVEL") \ + + f" --vendor " + getattr(config, "VENDOR") \ + + f" --case " + case_config["model"] \ + + f" --data_dir " + case_config["data_dir_container"] \ + + f" --framework " + case_config["framework"] \ + + f" --log_dir " + curr_log_path + " 2>&1 | tee "+curr_log_path+"/stdout_err.out.log" + "\"" + logger.debug("Run cmd in the cluster to start tasks, cmd: " + start_cmd) + + logger.info("3) Waiting for tasks end in the cluster...") + logger.info("Check task log in real time from container: " + + curr_log_path + "/container.out.log") + logger.info("Check task stderr & stdout in real time from container: " + + curr_log_path + "/stdout_err.out.log") + CLUSTER_MGR.run_command_some_hosts_distribution_info(start_cmd, nnodes, 10800) + # Wait a moment for starting tasks. + time.sleep(10) + + +def wait_for_finish(dp_path, container_name, pid_file_path, nnodes): + '''wait all the processes of start_xxx_task.py finished. + ''' + # Aussme pid of start_xxx_task.py won't loop in a short time. + check_cmd = "cd " + dp_path + "; " + sys.executable \ + + " utils/container_manager.py -o pidrunning -c " \ + + container_name + " -f " + pid_file_path + + logger.debug("Run cmd to check whether the training tasks is running: " + + check_cmd) + while True: + bad_hosts = CLUSTER_MGR.run_command_some_hosts(check_cmd, + nnodes, + no_log=True) + + if len(bad_hosts) == nnodes: + break + time.sleep(10) + + +def prepare_containers_env_cluster(dp_path, case_log_dir, config, + container_name, image_name, case_config): + '''Prepare containers environments in the cluster. It will start + containers, setup environments, start monitors, and clear caches.''' + nnodes = case_config["nnodes"] + container_start_args = " --rm --init --detach --net=host --uts=host" \ + + " --ipc=host --security-opt=seccomp=unconfined" \ + + " --privileged=true --ulimit=stack=67108864" \ + + " --ulimit=memlock=-1" \ + + " -w " + config.FLAGPERF_PATH \ + + " --shm-size=" + config.SHM_SIZE \ + + " -v " + dp_path + ":" \ + + config.FLAGPERF_PATH \ + + " -v " + case_config["data_dir_host"] + ":" \ + + case_config["data_dir_container"] + if config.ACCE_CONTAINER_OPT is not None: + container_start_args += " " + config.ACCE_CONTAINER_OPT + + logger.info("a) Stop old container(s) first.") + stop_container_in_cluster(dp_path, container_name, nnodes) + logger.info("b) Start container(s) in the cluster.") + if not start_container_in_cluster(dp_path, container_start_args, + container_name, image_name, nnodes): + logger.error("b) Start container in the cluster......" + "[FAILED]. Ignore this round.") + return False + logger.info("b) Start container(s) in the cluster.......[SUCCESS]") + + logger.info("c) Prepare running environment.") + if not prepare_running_env(dp_path, container_name, case_config): + logger.error("c) Prepare running environment......" + "[FAILED]. Ignore this round.") + logger.info("Stop containers in cluster.") + stop_container_in_cluster(dp_path, container_name, nnodes) + return False + logger.info("c) Prepare running environment......[SUCCESS]") + logger.info("d) Start monitors......") + start_monitors_in_cluster(dp_path, case_log_dir, nnodes) + # logger.info("e) Clear system caches if it set......") + clear_caches_cluster(config.CLEAR_CACHES, nnodes) + return True + + +def clean_containers_env_cluster(dp_path, container_name, nnodes): + '''Clean containers environments in the cluster. It will stop containers, + and stop monitors.''' + logger.info("a) Stop containers......") + stop_container_in_cluster(dp_path, container_name, nnodes) + logger.info("b) Stop monitors......") + stop_monitors_in_cluster(dp_path, nnodes) + + +def compilation_result(case_log_path, config): + '''Scp logs from hosts in the cluster to temp dir, and then merge all. + ''' + case_perf_path = os.path.join(case_log_path, "container.out.log") + vendor_usage_path = os.path.join(case_log_path, + config.VENDOR + "_monitor.log") + + case_perf = None + case_file = open(case_perf_path) + + for line in case_file.readlines(): + if "Finish Info" in line: + case_perf_str = "{" + line.split("{")[1] + case_perf = ast.literal_eval(case_perf_str) + break + + if case_perf is None: + logger.error("Case Run Failed, Please Check Log!") + return + + vendor_module = importlib.import_module("docker_images." + config.VENDOR + + "." + config.VENDOR + "_analysis") + vendor_usage, vendor_maxmem, fp32, fp16 = vendor_module.analysis_log( + vendor_usage_path) + + case_perf["vendor_usage(GiB)"] = vendor_usage + case_perf["vendor_max_mem(GiB)"] = vendor_maxmem + + theory = fp32 if case_perf["precision"] == "fp32" else fp16 + mfu = case_perf["flops"] / theory + case_perf["*MFU"] = str(round(mfu * 100, 1)) + "%" + + for key in case_perf.keys(): + padding_str = str(key).ljust(43) + " : " + str( + case_perf[key]).ljust(23) + logger.info(padding_str) + + +def get_config_from_case(case, config): + '''check case is string''' + if not isinstance(case, str): + logger.error("Key in test_config.CASES must be str") + return False, None + + case_info = case.split(":") + '''check if 4+ : in case, we don't care what to put in''' + if len(case_info) < 2: + logger.error("At least 2 terms split by \":\" should in config.CASES") + logger.error("model:framework:hardware_model:nnodes:nproc:repeat") + return False, None + + case_model = case_info[0] + case_framework = case_info[1] + + case_config = {"model": case_model} + case_config["framework"] = case_framework + case_config["data_dir_host"] = config.CASES[case] + case_config["data_dir_container"] = config.CASES[case] + case_config['nnodes'] = 1 + + return True, case_config + + +def get_valid_cases(config): + '''Check case config in test_conf, return valid cases list.''' + if not isinstance(config.CASES, dict): + logger.error( + "No valid cases found in test_conf because test_config.CASES is not a dict...[EXIT]" + ) + sys.exit(4) + logger.debug("Check configs of all test cases: " + ", ".join(config.CASES)) + valid_cases = [] + cases_config_error = [] + for case in config.CASES: + rets, case_config = get_config_from_case(case, config) + if (not rets) or (not check_case_config(case, case_config, + config.VENDOR)): + cases_config_error.append(case) + continue + valid_cases.append(case) + if len(valid_cases) == 0: + logger.error("No valid cases found in test_conf...[EXIT]") + sys.exit(4) + logger.debug("Valid cases: " + ",".join(valid_cases)) + logger.debug("Invalid cases that config is error: " + + ",".join(cases_config_error)) + logger.info("Get valid cases list......[SUCCESS]") + return valid_cases + + +def prepare_case_config_cluster(dp_path, case_config, case): + '''Sync case config files in cluster.''' + logger.info("--------------------------------------------------") + logger.info("Testcase " + case + " config:") + for config_item in case_config.keys(): + logger.info(config_item + ":\t" + str(case_config[config_item])) + logger.info("--------------------------------------------------") + model = case_config["model"] + framework = case_config["framework"].split("_")[0] + config_file = case_config["config"] + ".py" + nnodes = case_config["nnodes"] + case_config_dir = os.path.join(dp_path, config.VENDOR, + model + "-" + framework, "config") + case_config_file = os.path.join(case_config_dir, config_file) + failed_hosts = CLUSTER_MGR.sync_file_to_some_hosts(case_config_file, + case_config_dir, nnodes) + if len(failed_hosts) != 0: + logger.error("Hosts that sync vendor case config file failed: " + + ",".join(failed_hosts.keys())) + return False + return True + + +def log_test_configs(cases, curr_log_path, dp_path): + '''Put test configs to log ''' + logger.info("--------------------------------------------------") + logger.info("Prepare to run flagperf Inference benchmakrs with configs: ") + logger.info("Deploy path on host:\t" + dp_path) + logger.info("Vendor:\t\t" + config.VENDOR) + logger.info("Testcases:\t\t[" + ','.join(cases) + "]") + logger.info("Log path on host:\t" + curr_log_path) + logger.info("Cluster:\t\t[" + ",".join(config.HOSTS) + "]") + logger.info("--------------------------------------------------") + + +def main(config): + '''Main process to run all the testcases''' + + curr_log_whole = init_logger(config) + + print_welcome_msg() + + logger.info("======== Step 1: Check key configs. ========") + + check_test_host_config(config) + + # Check test environment and configs from host.yaml. + CLUSTER_MGR.init(config.HOSTS, config.SSH_PORT, getpass.getuser()) + check_cluster_health() + dp_path = _get_deploy_path(config) + check_cluster_deploy_path(dp_path) + + cases = get_valid_cases(config) + log_test_configs(cases, curr_log_whole, dp_path) + + logger.info("========= Step 2: Prepare and Run test cases. =========") + + for case in cases: + logger.info("======= Testcase: " + case + " =======") + _, case_config = get_config_from_case(case, config) + + # Prepare docker image. + image_mgr = image_manager.ImageManager( + "flagperf-inference-" + config.VENDOR + "-" + + case_config["framework"], "t_" + VERSION) + image_name = image_mgr.repository + ":" + image_mgr.tag + nnodes = case_config["nnodes"] + logger.info("=== 2.1 Prepare docker image:" + image_name + " ===") + if not prepare_docker_image_cluster( + dp_path, image_mgr, case_config["framework"], nnodes, config): + logger.error("=== 2.1 Prepare docker image...[FAILED] " + + "Ignore this case " + case + " ===") + continue + + # Set command to start docker container in the cluster + container_name = image_mgr.repository + "-" + image_mgr.tag \ + + "-container" + + logger.info("=== 2.2 Setup container and run testcases. ===") + + logger.info("-== Testcase " + case + " starts ==-") + logger.info("1) Prepare container environments in cluster...") + case_log_dir = os.path.join(curr_log_whole, case) + curr_log_path = os.path.join(case_log_dir, + config.HOSTS[0] + "_noderank0") + + if not prepare_containers_env_cluster(dp_path, case_log_dir, config, + container_name, image_name, + case_config): + logger.error("1) Prepare container environments in cluster" + "...[FAILED]. Ignore case " + case) + continue + logger.info("2) Start tasks in the cluster...") + + start_tasks_in_cluster(dp_path, container_name, case_config, + curr_log_path, config) + + logger.info("3) Training tasks end in the cluster...") + logger.info("4) Clean container environments in cluster...") + clean_containers_env_cluster(dp_path, container_name, nnodes) + logger.info("-== Testcase " + case + " finished ==-") + logger.info("=== 2.2 Setup container and run testcases finished." + " ===") + logger.info("=== 2.3 Compilation Case Performance ===") + compilation_result(curr_log_path, config) + + +if __name__ == '__main__': + if len(sys.argv) > 1: + usage() + CURR_PATH = os.path.abspath(os.path.dirname(__file__)) + yaml_path = os.path.join(CURR_PATH, "configs/host.yaml") + data = yaml.safe_load(open(yaml_path)) + + config = DefaultMunch.fromDict(data) + + main(config) + diff --git a/inference/run_inference.py b/inference/run_inference.py index c130496a2..753f9fc53 100644 --- a/inference/run_inference.py +++ b/inference/run_inference.py @@ -12,9 +12,9 @@ from tools import init_logger, merge_config from argparse import ArgumentParser - + def main(config): - + init_logger(config) config = merge_config(config) # e.g. import funcs from benchmarks/resnet50/pytorch/__init__.py