diff --git a/scripts/performance/config_perf_spike_load.yml b/scripts/performance/config_perf_spike_load.yml index c7535ccd8..bb7056ff0 100644 --- a/scripts/performance/config_perf_spike_load.yml +++ b/scripts/performance/config_perf_spike_load.yml @@ -1,29 +1,42 @@ # perf_spike_load.py arguments -perf_spike: - read_mode: permanent #permanent = background reading operation with writing spikes, spike = reading and writing spikes with 0 load in between - spike_time_in_seconds: 180 - rest_time_in_seconds: 180 - overall_time_in_seconds: 3600 # 1h = 3600 sec, 24h = 86400 -# perf_processes.py will be called with arguments provided below during perf_spike_load.py execution. You may move arguments between common and _txns sections: -# e.g. if you want to define different clients number for reading and writing transactions, just copy "clients" to read_ and write_txns sections and provide values. -# The script takes the most specific arguments (for transactions rather than common) in case of a duplication. +profile: + spike_time_in_seconds: 60 + rest_time_in_seconds: 20 + overall_time_in_seconds: 600 # 1h = 3600 sec, 24h = 86400 + +# perf_processes.py will be called with arguments provided below during perf_spike_load.py execution. You may move arguments between common and processes sections: +# e.g. if you want to define different clients number for reading and writing transactions, just copy "clients" to any of processes sections and provide values. +# The script takes the most specific arguments (for processes rather than common) in case of a duplication. common: - clients: 1 - seed: 000000000000000000000000Trustee1 - num: 1 - refresh: 100 - buff_req: 100 - sep: "|" - wallet_key: key - mode: t - pool_config: "" - sync_mode: all - out: "" - genesis: ~/pool_transactions_genesis - directory: . -read_txns: + clients: 1 + seed: 000000000000000000000000Trustee1 + num: 1 + refresh: 60 + buff_req: 300 + sep: "|" + wallet_key: key + mode: p + pool_config: "" + sync_mode: all + out: "load_script_output.txt" + genesis: /home/me/Documents/genesis + directory: ~/Documents/ + +processes: + background: # set all step_* parameters values to 0 if background load is not needed kind: get_nym - load_rate: 5 -write_txns: + step_time_in_seconds: 10 # for stepwise testing, 0 means stable load without growing + step_initial_load_rate: 0 # initial load rate per spike (and stable load rate if not stepwise) + step_final_load_rate: 100 # additional load rate per step + + spikes: # set all step_* parameters values to 0 if background load is not needed kind: nym - load_rate: 2 + step_time_in_seconds: 0 # for stepwise testing, 0 means stable load without growing + step_initial_load_rate: 30 # initial load rate per spike (and stable load rate if not stepwise) + step_final_load_rate: 90 # final load rate per spike + + spike2: # remove additional spikes if not needed + kind: nym + step_time_in_seconds: 0 # for stepwise testing, 0 means stable load without growing + step_initial_load_rate: 10 # initial load rate per spike (and stable load rate if not stepwise) + step_final_load_rate: 20 # final load rate per spike diff --git a/scripts/performance/perf_spike_load.py b/scripts/performance/perf_spike_load.py index 177943d14..f70f30e07 100644 --- a/scripts/performance/perf_spike_load.py +++ b/scripts/performance/perf_spike_load.py @@ -1,79 +1,202 @@ #! /usr/bin/env python3 -"""This script uses another load script (perf_processes.py) running it with different parameters which are -provided in config_perf_spike_load.yml file""" - -from datetime import timedelta, datetime import subprocess import yaml import time import os +import collections +import matplotlib.pyplot as plt +import numpy as np +import argparse + +parser = argparse.ArgumentParser(description='This script uses another load script (perf_processes.py) running it ' + 'with different parameters which are provided in ' + 'config_perf_spike_load.yml file') + +parser.add_argument('-g', '--graph', default=False, type=bool, required=False, dest='graph', + help='Build a graph to check if the configured profile is correct') + +args = parser.parse_args() + + +def create_output_directory(folder_path): + output_folder = os.path.join(folder_path[0], *folder_path[1:]) + try: + output_folder = os.path.expanduser(output_folder) + except OSError: + raise ValueError("Bad output log folder pathname!") + if not os.path.isdir(output_folder) and not args.graph: + os.makedirs(output_folder) + directory = "--directory={}".format(output_folder) + return directory -def create_subprocess_args(config, sub_process_type, folder_count, log_folder_name): - args = ["python3", "perf_processes.py"] +def get_args(config, process_type, directory_arg): + args_for_script = ["python3", "perf_processes.py"] common_args = config["common"].copy() - if "read" in sub_process_type: - common_args.update(config["read_txns"]) - elif "write" in sub_process_type: - common_args.update(config["write_txns"]) + common_args.update(config["processes"][process_type]) for dict_key in common_args: if dict_key == "directory": - output_folder = os.path.join(str(common_args[dict_key]), log_folder_name, - "{}_{}".format(sub_process_type, folder_count)) - try: - output_folder = os.path.expanduser(output_folder) - except OSError: - raise ValueError("Bad output log folder pathname!") - if not os.path.isdir(output_folder): - os.makedirs(output_folder) - args.append("--{}={}".format(dict_key, output_folder)) + args_for_script.append(directory_arg) + elif "step" in dict_key: + continue else: - args.append("--{}={}".format(dict_key, common_args[dict_key])) - if "background" in sub_process_type: - args.append("--load_time={}".format(config["perf_spike"]["overall_time_in_seconds"])) - elif "spike" in sub_process_type: - args.append("--load_time={}".format(config["perf_spike"]["spike_time_in_seconds"])) - return args + args_for_script.append("--{}={}".format(dict_key, common_args[dict_key])) + return args_for_script + + +def order_processes(delays, args_for_script): + assert len(delays) == len(args_for_script), 'Can not order the processes as a list of delays length is not equal ' \ + 'to a list of arguments length.' + unique_delays = set(delays) + processes_dictionary = {} + for delay in unique_delays: + delays_indices = [i for i, e in enumerate(delays) if e == delay] + args_list = [] + for index in delays_indices: + args_list.append(args_for_script[index]) + processes_dictionary.update({delay: args_list}) + processes_dictionary_sorted = collections.OrderedDict(sorted(processes_dictionary.items())) + return processes_dictionary_sorted + + +def collect_delays(function_parameters, time_interval, spike_delay=0): + args_for_script = function_parameters[0] + step_time = function_parameters[1] + step_initial_load = function_parameters[2] + step_final_load = function_parameters[3] + args_copy = args_for_script.copy() + args_to_send = [] + delay = [] + if step_time != 0 and step_final_load != step_initial_load: + step_number = int(time_interval / step_time) + step_value = int((step_final_load - step_initial_load) / step_number) + if step_value == 0: + raise ValueError("There should be at least one transaction per step.") + for i in range(0, step_number): + load_time = time_interval - step_time * i + args_copy = args_for_script.copy() + args_copy.append("--load_time={}".format(load_time)) + if i != 0: + args_copy.append("--load_rate={}".format(step_value)) + else: + args_copy.append("--load_rate={}".format(step_initial_load)) + delay.append(spike_delay + time_interval - load_time) + args_to_send.append(args_copy) + else: + delay.append(spike_delay) + args_copy.append("--load_time={}".format(time_interval)) + args_copy.append("--load_rate={}".format(step_initial_load)) + args_to_send.append(args_copy) + return [delay, args_to_send] + + +def collect_processes(config): + root_log_folder_name = "Spike_log_{}".format(time.strftime("%m-%d-%y_%H-%M-%S")) + processes = list(config["processes"].keys()) + functions = {} + for process_name in processes: + step_time_in_seconds = config["processes"][process_name]["step_time_in_seconds"] + if step_time_in_seconds == 0: + continue + initial_rate = config["processes"][process_name]["step_initial_load_rate"] + final_rate = config["processes"][process_name]["step_final_load_rate"] + if initial_rate > final_rate: + raise ValueError("In {} block initial rate is bigger than final!".format(process_name)) + directory = [config["common"]["directory"], root_log_folder_name, process_name] + directory_arg = create_output_directory(directory) + args_for_script = get_args(config, process_name, directory_arg) + step_parameters = [args_for_script, step_time_in_seconds, initial_rate, final_rate] + functions.update({process_name: step_parameters}) + return functions def start_profile(): - folder_count = 0 # ordering number of the spike which goes to logs folder name - root_log_folder_name = "Spike_log {}".format(time.strftime("%m-%d-%y %H-%M-%S")) with open("config_perf_spike_load.yml") as file: config = yaml.load(file) - if config["perf_spike"]["read_mode"] == 'permanent': - print(""" - ========================================================================================== - The script creates writing transaction spikes, during intervals there is a background load - of reading transactions - ==========================================================================================""") - elif config["perf_spike"]["read_mode"] == 'spike': - print(""" - ============================================================================================ - The script creates reading and writing transaction spikes, during intervals there is no load - ============================================================================================""") - print("Reading transactions mode: ", config["perf_spike"]["read_mode"]) - print("Every spike time in seconds: ", config["perf_spike"]["spike_time_in_seconds"]) - print("Interval between spikes in seconds: ", config["perf_spike"]["rest_time_in_seconds"]) - print("Overall time in minutes: ", config["perf_spike"]["overall_time_in_seconds"] / 60) - if config["perf_spike"]["read_mode"] == 'permanent': - subprocess_args = create_subprocess_args(config, "read_background", folder_count, root_log_folder_name) - subprocess.Popen(subprocess_args, close_fds=True) - end_time = datetime.now() + timedelta(seconds=int(config["perf_spike"]["overall_time_in_seconds"])) - while datetime.now() < end_time: - folder_count += 1 - if config["perf_spike"]["read_mode"] == 'spike': - # start profile with reading transactions for x minutes - subprocess_args = create_subprocess_args(config, "read_spike", folder_count, root_log_folder_name) - subprocess.Popen(subprocess_args, close_fds=True) - folder_count += 1 - # start profile with writing transactions for x minutes - subprocess_args = create_subprocess_args(config, "write_spike", folder_count, root_log_folder_name) - subprocess.Popen(subprocess_args, close_fds=True) - time.sleep(int(config["perf_spike"]["spike_time_in_seconds"]) + - int(config["perf_spike"]["rest_time_in_seconds"])) + spike_time = config["profile"]["spike_time_in_seconds"] + rest_time = config["profile"]["rest_time_in_seconds"] + overall_time_in_seconds = int(config["profile"]["overall_time_in_seconds"]) + delays_list = [] + args_list = [] + background_plot = [] + processes_dict = collect_processes(config) + if "background" in processes_dict.keys(): + delays_args_list = collect_delays(processes_dict["background"], overall_time_in_seconds) + delays_list.extend(delays_args_list[0]) + args_list.extend(delays_args_list[1]) + background_plot = prepare_plot_values(delays_args_list) + + spike_plots_list = [] + time_count = 0 + spike_number = 0 + for spike, spike_args in processes_dict.items(): + if spike == "background": + continue + while time_count < overall_time_in_seconds: + spike_delay = (spike_time + rest_time) * spike_number + delays_args_list = (collect_delays(spike_args, spike_time, spike_delay)) + delays_list.extend(delays_args_list[0]) + args_list.extend(delays_args_list[1]) + spike_plots_list.append(prepare_plot_values(delays_args_list)) + spike_number += 1 + time_count += spike_time + rest_time + + spike_number = 0 + time_count = 0 + + if args.graph: + build_plot_on_config(background_plot, spike_plots_list) + else: + prepared = order_processes(delays_list, args_list) + time_count = 0 + for item in prepared.keys(): + time.sleep(item - time_count) + for process_args in prepared[item]: + subprocess.Popen(process_args, close_fds=True) + time_count = item + + +def prepare_plot_values(delays_args_list): + delays = delays_args_list[0] + args_for_script = delays_args_list[1] + plot_dict = {} + for i in range(0, len(delays)): + plot_dict.update({delays[i]: int(args_for_script[i][-1].split("=")[-1])}) + plot_dict_sorted = collections.OrderedDict(sorted(plot_dict.items())) + return plot_dict_sorted + + +def add_plot(ax, args_dict, color): + step_count = 1 + time_ax = [] + load_rate = [] + for delay, args_for_plot in args_dict.items(): + step_load_rate = args_for_plot + time_ax.append(delay) + if step_count != 1: + load_rate.append(load_rate[0] + step_load_rate * (step_count - 1)) + else: + load_rate.append(step_load_rate) + step_count += 1 + time_ax.append((time_ax[-1] - time_ax[-2]) + time_ax[-1]) + load_rate.append((load_rate[-1] - load_rate[-2]) + load_rate[-1]) + ax.fill_between(time_ax, load_rate, facecolor=color, alpha=0.4) + + +def build_plot_on_config(background, spikes): + figure, ax = plt.subplots(1, 1) + if len(background) != 0: + add_plot(ax, background, 'b') + if len(spikes) != 0: + for spike in spikes: + add_plot(ax, spike, 'g') + start, stop = ax.get_ylim() + ticks = np.arange(start, stop + (stop // 10), stop // 10) + ax.set_yticks(ticks) + ax.grid() + plt.show() if __name__ == '__main__':