Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor post-processing #258

Merged
merged 18 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions post-processing/config_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import yaml


class ConfigHandler:

def __init__(self, config: dict):

# validate dict structure
config = read_config(config)
# extract config information
self.title = config.get("title")
self.x_axis = config.get("x_axis")
self.y_axis = config.get("y_axis")
self.filters = config.get("filters")
self.series = config.get("series")
self.column_types = config.get("column_types")

# parse filter information
self.and_filters = []
self.or_filters = []
self.series_filters = []
self.parse_filters()

# parse scaling information
self.scaling_column = None
self.scaling_custom = None
self.parse_scaling()

# find relevant columns
self.series_columns = []
self.plot_columns = []
self.all_columns = []
self.parse_columns()

@classmethod
def from_path(cfg_hand, config_path):
return cfg_hand(open_config(config_path))

def get_filters(self):
return self.and_filters, self.or_filters, self.series_filters

def get_y_scaling(self):
return self.scaling_column, self.scaling_custom

def parse_filters(self):
"""
Store filtering information from filters and series.
"""

# filters
if self.filters:
if self.filters.get("and"):
self.and_filters = self.filters.get("and")
if self.filters.get("or"):
self.or_filters = self.filters.get("or")

# series filters
if self.series:
self.series_filters = [[s[0], "==", s[1]] for s in self.series]

def parse_scaling(self):
"""
Store scaling information for numeric axes.
"""

# FIXME (issue #182): add scaling for x-axis
if self.y_axis.get("scaling"):
# scaling column
self.scaling_column = self.y_axis.get("scaling").get("column")
# custom scaling value
self.scaling_custom = self.y_axis.get("scaling").get("custom")

def parse_columns(self):
"""
Store all necessary dataframe columns for plotting and filtering.
"""

# axis columns
self.plot_columns = [self.x_axis.get("value"), self.x_axis["units"].get("column"),
self.y_axis.get("value"), self.y_axis["units"].get("column")]

# FIXME (issue #255): allow all series values to be selected with *
# (or if only column name is supplied)

# series columns (duplicates not removed)
# NOTE: currently assuming there can only be one unique series column
self.series_columns = [s[0] for s in self.series_filters]
# add series column to plot column list
for s in self.series_columns:
if s not in self.plot_columns:
self.plot_columns.append(s)
# drop None values
self.plot_columns = [c for c in self.plot_columns if c is not None]

# filter columns (duplicates not removed)
filter_columns = [f[0] for f in self.and_filters] + [f[0] for f in self.or_filters]

# all typed columns
self.all_columns = set(self.plot_columns + filter_columns +
([self.scaling_column.get("name")] if self.scaling_column else []))


def open_config(path):
"""
Return a dictionary containing configuration information for plotting.

Args:
path: path, path to yaml config file.
"""

with open(path, "r") as file:
return yaml.safe_load(file)


def read_config(config):
"""
Check required configuration information. At least plot title, x-axis,
y-axis, and column types must be present.

Args:
config: dict, config information.
"""

# check plot title information
if not config.get("title"):
raise KeyError("Missing plot title information.")

# check x-axis information
if not config.get("x_axis"):
raise KeyError("Missing x-axis information.")
if not config.get("x_axis").get("value"):
raise KeyError("Missing x-axis value information.")
if not config.get("x_axis").get("units"):
raise KeyError("Missing x-axis units information.")
if (config.get("x_axis").get("units").get("column") is not None and
config.get("x_axis").get("units").get("custom") is not None):
raise RuntimeError(
"Specify x-axis units information as only one of 'column' or 'custom'.")

# check y-axis information
if not config.get("y_axis"):
raise KeyError("Missing y-axis information.")
if not config.get("y_axis").get("value"):
raise KeyError("Missing y-axis value information.")
if not config.get("y_axis").get("units"):
raise KeyError("Missing y-axis units information.")
if (config.get("y_axis").get("units").get("column") is not None and
config.get("y_axis").get("units").get("custom") is not None):
raise RuntimeError(
"Specify y-axis units information as only one of 'column' or 'custom'.")

# check optional scaling information
if config.get("y_axis").get("scaling"):
if config.get("y_axis").get("scaling").get("column") is not None:
if config.get("y_axis").get("scaling").get("custom") is not None:
raise RuntimeError(
"Specify y-axis scaling information as only one of 'column' or 'custom'.")
if not config.get("y_axis").get("scaling").get("column").get("name"):
raise RuntimeError("Scaling column must have a name.")
elif not config.get("y_axis").get("scaling").get("custom"):
raise RuntimeError("Invalid custom scaling value (cannot divide by {0})."
.format(config.get("y_axis").get("scaling").get("custom")))

# check optional series information
if config.get("series"):
if len(config.get("series")) == 1:
raise RuntimeError("Number of series must be >= 2.")
if len(set([s[0] for s in config.get("series")])) > 1:
raise RuntimeError("Currently supporting grouping of series by only one column. \
Please use a single column name in your series configuration.")

# check column types information
if not config.get("column_types"):
raise KeyError("Missing column types information.")

return config
163 changes: 163 additions & 0 deletions post-processing/perflog_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import errno
import json
import os
import re
from itertools import chain

import pandas as pd


class PerflogHandler:

def __init__(self, log_path, debug=False):

self.log_path = log_path
self.debug = debug

self.get_log_files()
self.read_all_perflogs()

def get_df(self):
return self.df

def get_log_files(self):
"""
Search for all performance logs in class log path. Record found logs in class
log file list.
"""

self.log_files = []
# one perflog supplied
if os.path.isfile(self.log_path):
# check correct log extension
if os.path.splitext(self.log_path)[1] != ".log":
raise RuntimeError("Perflog file name provided should have a .log extension.")
self.log_files = [self.log_path]

# look for perflogs in folder
elif os.path.isdir(self.log_path):
for file in [os.path.join(root, file) for root, _, files
in os.walk(self.log_path) for file in files]:
# append files with correct log extension
if os.path.splitext(file)[1] == ".log":
self.log_files.append(file)
# no perflogs in folder
if len(self.log_files) == 0:
raise RuntimeError(
"No perflogs found in this path. Perflogs should have a .log extension.")

# invalid path
else:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.log_path)

if self.debug:
print("Found log files:")
for log in self.log_files:
print("-", log)
print("")

def read_all_perflogs(self):
"""
Return a pandas dataframe containing information from all reframe performance logs
in class log file list.
"""

self.df = pd.DataFrame()
# put all perflog information in one dataframe
for file in self.log_files:
try:
self.df = pd.concat([self.df, read_perflog(file)], ignore_index=True)
# discard invalid perflogs
except KeyError as e:
if self.debug:
print("Discarding %s:" % os.path.basename(file),
type(e).__name__ + ":", e.args[0], e.args[1])
print("")

# no valid perflogs found
if self.df.empty:
raise FileNotFoundError(
errno.ENOENT, "Could not find a valid perflog in path", self.log_path)


def read_perflog(path):
"""
Return a pandas dataframe from a reframe performance log. The dataframe will
have columns for all fields in a performance log record except display name,
extra resources, and env vars. Display name will be broken up into test name and
parameter columns, while the other two will be replaced by the dictionary contents
of their fields (keys become columns, values become row contents).

NB: This currently depends on having a non-default handlers_perflog.filelog.format
in reframe's configuration. See code.

Args:
path: path, path to log file.
"""

# read perflog into dataframe
df = pd.read_csv(path, delimiter="|")
REQUIRED_LOG_FIELDS = ["job_completion_time", r"\w+_value$", r"\w+_unit$", "display_name"]

# look for required column matches
required_field_matches = [len(list(filter(re.compile(rexpr).match, df.columns))) > 0
for rexpr in REQUIRED_LOG_FIELDS]
# check all required columns are present
if False in required_field_matches:
raise KeyError("Perflog missing one or more required fields", REQUIRED_LOG_FIELDS)

# replace display name
results = df["display_name"].apply(get_display_name_info)
index = df.columns.get_loc("display_name")
# insert new columns and contents
insert_key_cols(df, index, [r[1] for r in results])
df.insert(index, "test_name", [r[0] for r in results])
# drop old column
df.drop("display_name", axis=1, inplace=True)

# replace other columns with dictionary contents
dict_cols = [c for c in ["extra_resources", "env_vars"] if c in df.columns]
for col in dict_cols:
results = df[col].apply(lambda x: json.loads(x))
# insert new columns and contents
insert_key_cols(df, df.columns.get_loc(col), results)
# drop old column
df.drop(col, axis=1, inplace=True)

return df


def get_display_name_info(display_name):
"""
Return a tuple containing the test name and a dictionary of parameter names
and their values from the given input string. The parameter dictionary may be empty
if no parameters are present.

Args:
display_name: str, expecting a format of <test_name> followed by zero or more
%<param>=<value> pairs.
"""

split_display_name = display_name.split(" %")
test_name = split_display_name[0]
params = [p.split("=") for p in split_display_name[1:]]

return test_name, dict(params)


def insert_key_cols(df: pd.DataFrame, index, results):
"""
Modify a dataframe to include new columns (extracted from results) inserted at
a given index.

Args:
df: dataframe, to be modified by this function.
index: int, index as which to insert new columns into the dataframe.
results: dict list, contains key-value mapping information for all rows.
"""

# get set of keys from all rows
keys = set(chain.from_iterable([r.keys() for r in results]))
for k in keys:
# insert keys as new columns
df.insert(index, k, [r[k] if k in r.keys() else None for r in results])
Loading