Skip to content

Commit

Permalink
Merge pull request hyperledger#935 from NataliaDracheva/indy-1644
Browse files Browse the repository at this point in the history
Indy-1644, Indy-1666 Updated spike load script in order to generate growing load.
  • Loading branch information
ashcherbakov authored Sep 5, 2018
2 parents 4b7c9fe + daa07b4 commit ad61020
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 83 deletions.
63 changes: 38 additions & 25 deletions scripts/performance/config_perf_spike_load.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
# perf_spike_load.py arguments
perf_spike:
read_mode: permanent #permanent = background reading operation with writing spikes, spike = reading and writing spikes with 0 load in between
spike_time_in_seconds: 180
rest_time_in_seconds: 180
overall_time_in_seconds: 3600 # 1h = 3600 sec, 24h = 86400
# perf_processes.py will be called with arguments provided below during perf_spike_load.py execution. You may move arguments between common and _txns sections:
# e.g. if you want to define different clients number for reading and writing transactions, just copy "clients" to read_ and write_txns sections and provide values.
# The script takes the most specific arguments (for transactions rather than common) in case of a duplication.
profile:
spike_time_in_seconds: 60
rest_time_in_seconds: 20
overall_time_in_seconds: 600 # 1h = 3600 sec, 24h = 86400

# perf_processes.py will be called with arguments provided below during perf_spike_load.py execution. You may move arguments between common and processes sections:
# e.g. if you want to define different clients number for reading and writing transactions, just copy "clients" to any of processes sections and provide values.
# The script takes the most specific arguments (for processes rather than common) in case of a duplication.
common:
clients: 1
seed: 000000000000000000000000Trustee1
num: 1
refresh: 100
buff_req: 100
sep: "|"
wallet_key: key
mode: t
pool_config: ""
sync_mode: all
out: ""
genesis: ~/pool_transactions_genesis
directory: .
read_txns:
clients: 1
seed: 000000000000000000000000Trustee1
num: 1
refresh: 60
buff_req: 300
sep: "|"
wallet_key: key
mode: p
pool_config: ""
sync_mode: all
out: "load_script_output.txt"
genesis: /home/me/Documents/genesis
directory: ~/Documents/

processes:
background: # set all step_* parameters values to 0 if background load is not needed
kind: get_nym
load_rate: 5
write_txns:
step_time_in_seconds: 10 # for stepwise testing, 0 means stable load without growing
step_initial_load_rate: 0 # initial load rate per spike (and stable load rate if not stepwise)
step_final_load_rate: 100 # additional load rate per step

spikes: # set all step_* parameters values to 0 if background load is not needed
kind: nym
load_rate: 2
step_time_in_seconds: 0 # for stepwise testing, 0 means stable load without growing
step_initial_load_rate: 30 # initial load rate per spike (and stable load rate if not stepwise)
step_final_load_rate: 90 # final load rate per spike

spike2: # remove additional spikes if not needed
kind: nym
step_time_in_seconds: 0 # for stepwise testing, 0 means stable load without growing
step_initial_load_rate: 10 # initial load rate per spike (and stable load rate if not stepwise)
step_final_load_rate: 20 # final load rate per spike
239 changes: 181 additions & 58 deletions scripts/performance/perf_spike_load.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,202 @@
#! /usr/bin/env python3

"""This script uses another load script (perf_processes.py) running it with different parameters which are
provided in config_perf_spike_load.yml file"""


from datetime import timedelta, datetime
import subprocess
import yaml
import time
import os
import collections
import matplotlib.pyplot as plt
import numpy as np
import argparse

parser = argparse.ArgumentParser(description='This script uses another load script (perf_processes.py) running it '
'with different parameters which are provided in '
'config_perf_spike_load.yml file')

parser.add_argument('-g', '--graph', default=False, type=bool, required=False, dest='graph',
help='Build a graph to check if the configured profile is correct')

args = parser.parse_args()


def create_output_directory(folder_path):
output_folder = os.path.join(folder_path[0], *folder_path[1:])
try:
output_folder = os.path.expanduser(output_folder)
except OSError:
raise ValueError("Bad output log folder pathname!")
if not os.path.isdir(output_folder) and not args.graph:
os.makedirs(output_folder)
directory = "--directory={}".format(output_folder)
return directory


def create_subprocess_args(config, sub_process_type, folder_count, log_folder_name):
args = ["python3", "perf_processes.py"]
def get_args(config, process_type, directory_arg):
args_for_script = ["python3", "perf_processes.py"]
common_args = config["common"].copy()
if "read" in sub_process_type:
common_args.update(config["read_txns"])
elif "write" in sub_process_type:
common_args.update(config["write_txns"])
common_args.update(config["processes"][process_type])
for dict_key in common_args:
if dict_key == "directory":
output_folder = os.path.join(str(common_args[dict_key]), log_folder_name,
"{}_{}".format(sub_process_type, folder_count))
try:
output_folder = os.path.expanduser(output_folder)
except OSError:
raise ValueError("Bad output log folder pathname!")
if not os.path.isdir(output_folder):
os.makedirs(output_folder)
args.append("--{}={}".format(dict_key, output_folder))
args_for_script.append(directory_arg)
elif "step" in dict_key:
continue
else:
args.append("--{}={}".format(dict_key, common_args[dict_key]))
if "background" in sub_process_type:
args.append("--load_time={}".format(config["perf_spike"]["overall_time_in_seconds"]))
elif "spike" in sub_process_type:
args.append("--load_time={}".format(config["perf_spike"]["spike_time_in_seconds"]))
return args
args_for_script.append("--{}={}".format(dict_key, common_args[dict_key]))
return args_for_script


def order_processes(delays, args_for_script):
assert len(delays) == len(args_for_script), 'Can not order the processes as a list of delays length is not equal ' \
'to a list of arguments length.'
unique_delays = set(delays)
processes_dictionary = {}
for delay in unique_delays:
delays_indices = [i for i, e in enumerate(delays) if e == delay]
args_list = []
for index in delays_indices:
args_list.append(args_for_script[index])
processes_dictionary.update({delay: args_list})
processes_dictionary_sorted = collections.OrderedDict(sorted(processes_dictionary.items()))
return processes_dictionary_sorted


def collect_delays(function_parameters, time_interval, spike_delay=0):
args_for_script = function_parameters[0]
step_time = function_parameters[1]
step_initial_load = function_parameters[2]
step_final_load = function_parameters[3]
args_copy = args_for_script.copy()
args_to_send = []
delay = []
if step_time != 0 and step_final_load != step_initial_load:
step_number = int(time_interval / step_time)
step_value = int((step_final_load - step_initial_load) / step_number)
if step_value == 0:
raise ValueError("There should be at least one transaction per step.")
for i in range(0, step_number):
load_time = time_interval - step_time * i
args_copy = args_for_script.copy()
args_copy.append("--load_time={}".format(load_time))
if i != 0:
args_copy.append("--load_rate={}".format(step_value))
else:
args_copy.append("--load_rate={}".format(step_initial_load))
delay.append(spike_delay + time_interval - load_time)
args_to_send.append(args_copy)
else:
delay.append(spike_delay)
args_copy.append("--load_time={}".format(time_interval))
args_copy.append("--load_rate={}".format(step_initial_load))
args_to_send.append(args_copy)
return [delay, args_to_send]


def collect_processes(config):
root_log_folder_name = "Spike_log_{}".format(time.strftime("%m-%d-%y_%H-%M-%S"))
processes = list(config["processes"].keys())
functions = {}
for process_name in processes:
step_time_in_seconds = config["processes"][process_name]["step_time_in_seconds"]
if step_time_in_seconds == 0:
continue
initial_rate = config["processes"][process_name]["step_initial_load_rate"]
final_rate = config["processes"][process_name]["step_final_load_rate"]
if initial_rate > final_rate:
raise ValueError("In {} block initial rate is bigger than final!".format(process_name))
directory = [config["common"]["directory"], root_log_folder_name, process_name]
directory_arg = create_output_directory(directory)
args_for_script = get_args(config, process_name, directory_arg)
step_parameters = [args_for_script, step_time_in_seconds, initial_rate, final_rate]
functions.update({process_name: step_parameters})
return functions


def start_profile():
folder_count = 0 # ordering number of the spike which goes to logs folder name
root_log_folder_name = "Spike_log {}".format(time.strftime("%m-%d-%y %H-%M-%S"))
with open("config_perf_spike_load.yml") as file:
config = yaml.load(file)
if config["perf_spike"]["read_mode"] == 'permanent':
print("""
==========================================================================================
The script creates writing transaction spikes, during intervals there is a background load
of reading transactions
==========================================================================================""")
elif config["perf_spike"]["read_mode"] == 'spike':
print("""
============================================================================================
The script creates reading and writing transaction spikes, during intervals there is no load
============================================================================================""")
print("Reading transactions mode: ", config["perf_spike"]["read_mode"])
print("Every spike time in seconds: ", config["perf_spike"]["spike_time_in_seconds"])
print("Interval between spikes in seconds: ", config["perf_spike"]["rest_time_in_seconds"])
print("Overall time in minutes: ", config["perf_spike"]["overall_time_in_seconds"] / 60)
if config["perf_spike"]["read_mode"] == 'permanent':
subprocess_args = create_subprocess_args(config, "read_background", folder_count, root_log_folder_name)
subprocess.Popen(subprocess_args, close_fds=True)
end_time = datetime.now() + timedelta(seconds=int(config["perf_spike"]["overall_time_in_seconds"]))
while datetime.now() < end_time:
folder_count += 1
if config["perf_spike"]["read_mode"] == 'spike':
# start profile with reading transactions for x minutes
subprocess_args = create_subprocess_args(config, "read_spike", folder_count, root_log_folder_name)
subprocess.Popen(subprocess_args, close_fds=True)
folder_count += 1
# start profile with writing transactions for x minutes
subprocess_args = create_subprocess_args(config, "write_spike", folder_count, root_log_folder_name)
subprocess.Popen(subprocess_args, close_fds=True)
time.sleep(int(config["perf_spike"]["spike_time_in_seconds"]) +
int(config["perf_spike"]["rest_time_in_seconds"]))
spike_time = config["profile"]["spike_time_in_seconds"]
rest_time = config["profile"]["rest_time_in_seconds"]
overall_time_in_seconds = int(config["profile"]["overall_time_in_seconds"])
delays_list = []
args_list = []
background_plot = []
processes_dict = collect_processes(config)
if "background" in processes_dict.keys():
delays_args_list = collect_delays(processes_dict["background"], overall_time_in_seconds)
delays_list.extend(delays_args_list[0])
args_list.extend(delays_args_list[1])
background_plot = prepare_plot_values(delays_args_list)

spike_plots_list = []
time_count = 0
spike_number = 0
for spike, spike_args in processes_dict.items():
if spike == "background":
continue
while time_count < overall_time_in_seconds:
spike_delay = (spike_time + rest_time) * spike_number
delays_args_list = (collect_delays(spike_args, spike_time, spike_delay))
delays_list.extend(delays_args_list[0])
args_list.extend(delays_args_list[1])
spike_plots_list.append(prepare_plot_values(delays_args_list))
spike_number += 1
time_count += spike_time + rest_time

spike_number = 0
time_count = 0

if args.graph:
build_plot_on_config(background_plot, spike_plots_list)
else:
prepared = order_processes(delays_list, args_list)
time_count = 0
for item in prepared.keys():
time.sleep(item - time_count)
for process_args in prepared[item]:
subprocess.Popen(process_args, close_fds=True)
time_count = item


def prepare_plot_values(delays_args_list):
delays = delays_args_list[0]
args_for_script = delays_args_list[1]
plot_dict = {}
for i in range(0, len(delays)):
plot_dict.update({delays[i]: int(args_for_script[i][-1].split("=")[-1])})
plot_dict_sorted = collections.OrderedDict(sorted(plot_dict.items()))
return plot_dict_sorted


def add_plot(ax, args_dict, color):
step_count = 1
time_ax = []
load_rate = []
for delay, args_for_plot in args_dict.items():
step_load_rate = args_for_plot
time_ax.append(delay)
if step_count != 1:
load_rate.append(load_rate[0] + step_load_rate * (step_count - 1))
else:
load_rate.append(step_load_rate)
step_count += 1
time_ax.append((time_ax[-1] - time_ax[-2]) + time_ax[-1])
load_rate.append((load_rate[-1] - load_rate[-2]) + load_rate[-1])
ax.fill_between(time_ax, load_rate, facecolor=color, alpha=0.4)


def build_plot_on_config(background, spikes):
figure, ax = plt.subplots(1, 1)
if len(background) != 0:
add_plot(ax, background, 'b')
if len(spikes) != 0:
for spike in spikes:
add_plot(ax, spike, 'g')
start, stop = ax.get_ylim()
ticks = np.arange(start, stop + (stop // 10), stop // 10)
ax.set_yticks(ticks)
ax.grid()
plt.show()


if __name__ == '__main__':
Expand Down

0 comments on commit ad61020

Please sign in to comment.