Skip to content

Commit

Permalink
TEMP
Browse files Browse the repository at this point in the history
  • Loading branch information
yarikoptic committed May 1, 2024
1 parent b9b0e85 commit 9a00513
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ src/__pycache__/
duct.egg-info/

.duct/
.idea
67 changes: 35 additions & 32 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,44 +141,50 @@ def serialize(self):

def create_and_parse_args():
parser = argparse.ArgumentParser(
description="Gathers metrics on a command and all its child processes."
description="Gathers metrics on a command and all its child processes.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("command", help="The command to execute.")
parser.add_argument("arguments", nargs="*", help="Arguments for the command.")
parser.add_argument(
"--sample-interval",
type=float,
default=1.0,
help="Interval in seconds between status checks of the running process.",
"-p", "--output-prefix",
type=str,
default=os.getenv("DUCT_OUTPUT_PREFIX", ".duct/logs/{datetime_filesafe}-{pid}_"),
help="File string format to be used as a prefix for the files -- the captured "
"stdout and stderr and the resource usage logs. The understood variables are "
"{datetime}, {datetime_filesafe}, and {pid}. "
"Leading directories will be created if they do not exist. "
"You can also provide value via DUCT_OUTPUT_PREFIX env variable. "
)
parser.add_argument(
"--output-prefix",
type=str,
default=os.getenv("DUCT_OUTPUT_PREFIX", "{file_safe_iso}"),
help="`path+fileprefix` or directory in which all logs will be saved.",
"--s-i", "--sample-interval",
type=float,
default=float(os.getenv("DUCT_SAMPLE_INTERVAL", "1.0")),
help="Interval in seconds between status checks of the running process.",
)
parser.add_argument(
"--report-interval",
"--r-i", "--report-interval",
type=float,
default=60.0,
default=float(os.getenv("DUCT_REPORT_INTERVAL", "60.0")),
help="Interval in seconds at which to report aggregated data.",
)
parser.add_argument(
"--capture-outputs",
"-c", "--capture-outputs",
type=str,
default="all",
default=os.getenv("DUCT_CAPTURE_OUTPUTS", "all"),
choices=["all", "none", "stdout", "stderr"],
help="Record stdout, stderr, all, or none to log files.",
help="Record stdout, stderr, all, or none to log files. "
"You can also provide value via DUCT_CAPTURE_OUTPUTS env variable."
)
parser.add_argument(
"--outputs",
"-o", "--outputs",
type=str,
default="all",
choices=["all", "none", "stdout", "stderr"],
help="Print stdout, stderr, all, or none to stdout/stderr respectively.",
)
parser.add_argument(
"--record-types",
"-t", "--record-types",
type=str,
default="all",
choices=["all", "system-summary", "processes-samples"],
Expand Down Expand Up @@ -243,8 +249,7 @@ def monitor_process(
break

elapsed_time = time.time() - report.start_time
sep = "" if output_prefix.endswith(os.sep) else "-"
resource_stats_log_path = "{output_prefix}{sep}{pid}-resource-usage.json"
resource_stats_log_path = "{output_prefix}usage.json"
if elapsed_time >= (report.number + 1) * report_interval:
aggregated = report.aggregate_samples()
for pid, pinfo in aggregated.items():
Expand All @@ -261,22 +266,21 @@ def monitor_process(


def prepare_outputs(capture_outputs, outputs, output_prefix):
sep = "" if output_prefix.endswith(os.sep) else "-"
if capture_outputs in ["all", "stdout"] and outputs in ["all", "stdout"]:
stdout = TeeStream(f"{output_prefix}{sep}stdout")
stdout = TeeStream(f"{output_prefix}stdout")
stdout.start()
elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]:
stdout = open(f"{output_prefix}{sep}stdout")
stdout = open(f"{output_prefix}stdout")
elif capture_outputs in ["none", "stderr"] and outputs in ["all", "stdout"]:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL

if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]:
stderr = TeeStream(f"{output_prefix}{sep}stderr")
stderr = TeeStream(f"{output_prefix}stderr")
stderr.start()
elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]:
stderr = open(f"{output_prefix}{sep}stderr")
stderr = open(f"{output_prefix}stderr")
elif capture_outputs in ["none", "stdout"] and outputs in [
"all",
"stderr",
Expand All @@ -288,14 +292,14 @@ def prepare_outputs(capture_outputs, outputs, output_prefix):


def format_output_prefix(output_prefix_template):
f_kwargs = {}
if "file_safe_iso" in output_prefix_template:
datenow = datetime.now()
f_kwargs = {
# 'pure' iso 8601 does not make good filenames
f_kwargs["file_safe_iso"] = datetime.now().strftime("%Y-%m-%d.%H-%M-%S")
if f_kwargs:
return output_prefix_template.format(**f_kwargs)
else:
return output_prefix_template
"datetime": datenow.isoformat(),
"datetime_filesafe": datenow.strftime("%Y-%m-%dT%H-%M-%S"),
"pid": os.getpid(),
}
return output_prefix_template.format(**f_kwargs)


def ensure_directories(path):
Expand Down Expand Up @@ -344,8 +348,7 @@ def main():
monitoring_thread.join()

if args.record_types in ["all", "system-summary"]:
sep = "" if formatted_output_prefix.endswith(os.sep) else "-"
system_info_path = f"{formatted_output_prefix}{sep}info.json"
system_info_path = f"{formatted_output_prefix}info.json"
with open(system_info_path, "a") as system_logs:
report.end_time = time.time()
report.run_time_seconds = f"{report.end_time - report.start_time}"
Expand Down

0 comments on commit 9a00513

Please sign in to comment.