From 37d1bce72701bdd05465e7c6e0696fb8a0faa207 Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 17:42:30 -0500 Subject: [PATCH 1/7] Also log TASK_GRAPH_RELEASE during dry_run --- simulator.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/simulator.py b/simulator.py index fd8caf3c..f6523174 100644 --- a/simulator.py +++ b/simulator.py @@ -450,6 +450,14 @@ def dry_run(self) -> None: task_graph.deadline, task_graph.job_graph.completion_time, ) + self._csv_logger.debug( + "%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s", + 0, + task_graph.release_time.to(EventTime.Unit.US).time, + task_graph.deadline.to(EventTime.Unit.US).time, + task_graph.name, + len(task_graph.get_nodes()), + ) if self._log_task_graphs: # Log a DOT representation of the TaskGraph, if requested. task_graph.to_dot( @@ -1441,14 +1449,7 @@ def __handle_task_graph_release(self, event: Event) -> None: task_graph: TaskGraph = self._workload.get_task_graph(event.task_graph) if task_graph is None: raise ValueError(f"TaskGraph {event.task_graph} not found in the Workload.") - self._csv_logger.debug( - "%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s", - event.time.to(EventTime.Unit.US).time, - task_graph.release_time.to(EventTime.Unit.US).time, - task_graph.deadline.to(EventTime.Unit.US).time, - task_graph.name, - len(task_graph.get_nodes()), - ) + def __handle_update_workload(self, event: Event) -> None: """Handles an Event of type `UPDATE_WORKLOAD`. From c428625a645b21ba934b9bf3126ac0cd096f72fe Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 18:19:17 -0500 Subject: [PATCH 2/7] - Add TaskGraph to CSV types - Implement get_time_spent_on_completed_canceled_miss_deadline_task_graph --- data/csv_reader.py | 88 ++++++++++++++++++++++++++++++++++++++++++++-- data/csv_types.py | 33 +++++++++++++++-- 2 files changed, 116 insertions(+), 5 deletions(-) diff --git a/data/csv_reader.py b/data/csv_reader.py index d7d8c749..c93bfcd0 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -2,7 +2,7 @@ import json from collections import defaultdict from operator import add, attrgetter -from typing import Mapping, Optional, Sequence, Tuple, Union +from typing import Dict, Mapping, Optional, Sequence, Tuple, Union import absl # noqa: F401 @@ -11,6 +11,7 @@ Scheduler, Simulator, Task, + TaskGraph, WorkerPool, WorkerPoolStats, WorkerPoolUtilization, @@ -48,7 +49,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): """ for csv_path, csv_readings in readings.items(): simulator = None - tasks = {} + tasks: dict[str, Task] = {} + task_graphs: dict[str, TaskGraph] = {} worker_pools = {} schedulers = [] for reading in csv_readings: @@ -116,6 +118,21 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): elif reading[1] == "TASK_PLACEMENT": # Update the task with the placement event data. tasks[reading[5]].update_placement(reading, worker_pools) + elif reading[1] == "TASK_CANCEL": + # Update the task with the placement event data. + if reading[4] not in tasks: + tasks[reading[4]] = Task( + name=reading[2], + task_graph=reading[5], + timestamp=int(reading[3]), + task_id=reading[4], + intended_release_time=float("inf"), + release_time=float("inf"), + deadline=float("inf")) + tasks[reading[4]].cancelled = True + tasks[reading[4]].cancelled_at = int(reading[0]) + task_graphs[reading[5]].cancelled = True + task_graphs[reading[5]].cancelled_at = int(reading[0]) elif reading[1] == "TASK_SKIP" and reading[4] in tasks: # Update the task with the skip data. tasks[reading[4]].update_skip(reading) @@ -128,17 +145,41 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): elif reading[1] == "TASK_SCHEDULED": # Add the task to the last scheduler's invocation. schedulers[-1].update_task_schedule(reading) + elif reading[1] == "TASK_GRAPH_RELEASE": + # Add the task to the last scheduler's invocation. + task_graphs[reading[4]] = TaskGraph( + name=reading[4], + release_time=int(reading[2]), + deadline=int(reading[3]), + ) + elif reading[1] == "TASK_GRAPH_FINISHED": + # Add the task to the last scheduler's invocation. + task_graphs[reading[2]].completion_time = int(reading[0]) + task_graphs[reading[2]].cancelled = False + elif reading[1] == "MISSED_TASK_GRAPH_DEADLINE": + # Add the task to the last scheduler's invocation. + task_graphs[reading[2]].deadline_miss_detected_at = int(reading[0]) else: print(f"[x] Unknown event type: {reading[1]}") except Exception as e: raise ValueError( - f"Error while parsing the following line: {reading}" + f"Error while parsing the following line: {reading} from {csv_path}" ) from e + + # Some sanity check for task state and task graph state consistency + canceled_task_graphs_count = len([tg for tg in task_graphs.values() if tg.cancelled]) + missed_deadline_task_graphs_count = len([tg for tg in task_graphs.values() if tg.missed_deadline]) + completed_task_graphs_count = len([tg for tg in task_graphs.values() if tg.was_completed]) + assert simulator.finished_task_graphs == completed_task_graphs_count + assert simulator.missed_taskgraphs == missed_deadline_task_graphs_count + assert simulator.dropped_taskgraphs == canceled_task_graphs_count + simulator.worker_pools = worker_pools.values() simulator.tasks = list( sorted(tasks.values(), key=attrgetter("release_time")) ) simulator.scheduler_invocations = schedulers + simulator.task_graphs = task_graphs self._simulators[csv_path] = simulator def get_scheduler_invocations(self, csv_path: str) -> Sequence[Scheduler]: @@ -223,6 +264,19 @@ def get_tasks(self, csv_path: str) -> Sequence[Task]: ordered by their release time. """ return self._simulators[csv_path].tasks + + def get_task_graph(self, csv_path: str) -> dict[str, TaskGraph]: + """Retrieves the tasks ordered by their release time. + + Args: + csv_path (`str`): The path to the CSV file whose tasks need to + be retrieved. + + Returns: + A `Sequence[Task]` that depicts the tasks in the execution, + ordered by their release time. + """ + return self._simulators[csv_path].task_graphs def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: """Retrieves the tasks that had placement issues (i.e., had a TASK_SKIP). @@ -238,7 +292,35 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: return [ task for task in self.get_tasks(csv_path) if len(task.skipped_times) > 0 ] + + def get_time_spent_on_completed_canceled_miss_deadline_task_graph(self, csv_path: str) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: + """Calculate the time scheduler spent on running tasks belong to task graph that eventually + got canceled. This helped you identify the wasted time spent on canceled task graph. + + Args: + csv_path (`str`): The path to the CSV file whose tasks need to + be retrieved. + Returns: + A `Dict[str, int]` that contains the task graph name to wasted time. + """ + completed_task_graph_run_time = defaultdict(int) + canceled_task_graph_run_time = defaultdict(int) + miss_deadline_task_graph_run_time = defaultdict(int) + for task in self.get_tasks(csv_path): + if self.get_task_graph(csv_path)[task.task_graph].cancelled: + if task.was_completed: + canceled_task_graph_run_time[task.task_graph] += task.runtime + elif task.cancelled and task.placement_time is not None: + canceled_task_graph_run_time[task.task_graph] += task.cancelled_at - task.placement_time + else: + # This task graph was completed + assert self.get_task_graph(csv_path)[task.task_graph].was_completed == True + completed_task_graph_run_time[task.task_graph] += task.runtime + if self.get_task_graph(csv_path)[task.task_graph].missed_deadline: + miss_deadline_task_graph_run_time[task.task_graph] += task.runtime + return completed_task_graph_run_time, canceled_task_graph_run_time, miss_deadline_task_graph_run_time + def get_simulator_end_time(self, csv_path: str) -> int: """Retrieves the time at which the simulator ended. diff --git a/data/csv_types.py b/data/csv_types.py index 54940670..2222c2e1 100644 --- a/data/csv_types.py +++ b/data/csv_types.py @@ -1,4 +1,5 @@ from collections import namedtuple +from dataclasses import dataclass from functools import total_ordering from typing import List, Mapping, Optional, Sequence @@ -80,6 +81,10 @@ def __init__( # Values updated from the MISSED_DEADLINE event. self.missed_deadline = False self.deadline_miss_detected_at = None + + # Values updated from the TASK_CANCEL event. + self.cancelled: bool = False + self.cancelled_at: Optional[int] = None def get_deadline_delay(self) -> int: """Retrieve the deadline delay in microseconds. @@ -263,6 +268,27 @@ def __eq__(self, other): return self.id == other.id +@dataclass +class TaskGraph(object): + name: str + release_time: int + deadline: int + + cancelled: bool = False + cancelled_at: Optional[int] = None + + # Values updated from the TASK_GRAPH_FINISHED event. + completion_time: Optional[int] = None + deadline_miss_detected_at: Optional[int] = None + + @property + def was_completed(self): + return self.completion_time is not None + + @property + def missed_deadline(self): + return self.completion_time is not None and self.completion_time > self.deadline + class Scheduler(object): def __init__( self, @@ -335,13 +361,15 @@ def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0): self.finished_tasks = None self.dropped_tasks = None self.missed_deadlines = None - self.goodput_taskgraphs = None + self.finished_task_graphs = None self.dropped_taskgraphs = None self.missed_taskgraphs = None + self.goodput_taskgraphs = None self.worker_pools = [] self.tasks = [] self.scheduler_invocations = [] + self.task_graphs: dict[str, TaskGraph] = {} def update_finish(self, csv_reading: str): """Updates the values of the Simulator based on the SIMULATOR_END event from @@ -357,6 +385,7 @@ def update_finish(self, csv_reading: str): self.finished_tasks = int(csv_reading[2]) self.dropped_tasks = int(csv_reading[3]) self.missed_deadlines = int(csv_reading[4]) - self.goodput_taskgraphs = int(csv_reading[5]) - int(csv_reading[7]) + self.finished_task_graphs = int(csv_reading[5]) self.dropped_taskgraphs = int(csv_reading[6]) self.missed_taskgraphs = int(csv_reading[7]) + self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs \ No newline at end of file From 324febdfdbccfb9fbd9c12c9a9aad49feacb53b8 Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 21:06:27 -0500 Subject: [PATCH 3/7] Format code with black --- data/csv_reader.py | 55 +++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/data/csv_reader.py b/data/csv_reader.py index c93bfcd0..b93ef564 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -128,7 +128,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): task_id=reading[4], intended_release_time=float("inf"), release_time=float("inf"), - deadline=float("inf")) + deadline=float("inf"), + ) tasks[reading[4]].cancelled = True tasks[reading[4]].cancelled_at = int(reading[0]) task_graphs[reading[5]].cancelled = True @@ -153,27 +154,35 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): deadline=int(reading[3]), ) elif reading[1] == "TASK_GRAPH_FINISHED": - # Add the task to the last scheduler's invocation. + # Add the task to the last scheduler's invocation. task_graphs[reading[2]].completion_time = int(reading[0]) task_graphs[reading[2]].cancelled = False elif reading[1] == "MISSED_TASK_GRAPH_DEADLINE": # Add the task to the last scheduler's invocation. - task_graphs[reading[2]].deadline_miss_detected_at = int(reading[0]) + task_graphs[reading[2]].deadline_miss_detected_at = int( + reading[0] + ) else: print(f"[x] Unknown event type: {reading[1]}") except Exception as e: raise ValueError( f"Error while parsing the following line: {reading} from {csv_path}" ) from e - - # Some sanity check for task state and task graph state consistency - canceled_task_graphs_count = len([tg for tg in task_graphs.values() if tg.cancelled]) - missed_deadline_task_graphs_count = len([tg for tg in task_graphs.values() if tg.missed_deadline]) - completed_task_graphs_count = len([tg for tg in task_graphs.values() if tg.was_completed]) + + # Some sanity check for task state and task graph state consistency + canceled_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.cancelled] + ) + missed_deadline_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.missed_deadline] + ) + completed_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.was_completed] + ) assert simulator.finished_task_graphs == completed_task_graphs_count assert simulator.missed_taskgraphs == missed_deadline_task_graphs_count assert simulator.dropped_taskgraphs == canceled_task_graphs_count - + simulator.worker_pools = worker_pools.values() simulator.tasks = list( sorted(tasks.values(), key=attrgetter("release_time")) @@ -264,7 +273,7 @@ def get_tasks(self, csv_path: str) -> Sequence[Task]: ordered by their release time. """ return self._simulators[csv_path].tasks - + def get_task_graph(self, csv_path: str) -> dict[str, TaskGraph]: """Retrieves the tasks ordered by their release time. @@ -292,8 +301,10 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: return [ task for task in self.get_tasks(csv_path) if len(task.skipped_times) > 0 ] - - def get_time_spent_on_completed_canceled_miss_deadline_task_graph(self, csv_path: str) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: + + def get_time_spent_on_completed_canceled_miss_deadline_task_graph( + self, csv_path: str + ) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: """Calculate the time scheduler spent on running tasks belong to task graph that eventually got canceled. This helped you identify the wasted time spent on canceled task graph. @@ -308,19 +319,27 @@ def get_time_spent_on_completed_canceled_miss_deadline_task_graph(self, csv_path canceled_task_graph_run_time = defaultdict(int) miss_deadline_task_graph_run_time = defaultdict(int) for task in self.get_tasks(csv_path): - if self.get_task_graph(csv_path)[task.task_graph].cancelled: + if self.get_task_graph(csv_path)[task.task_graph].cancelled: if task.was_completed: canceled_task_graph_run_time[task.task_graph] += task.runtime elif task.cancelled and task.placement_time is not None: - canceled_task_graph_run_time[task.task_graph] += task.cancelled_at - task.placement_time - else: + canceled_task_graph_run_time[task.task_graph] += ( + task.cancelled_at - task.placement_time + ) + else: # This task graph was completed - assert self.get_task_graph(csv_path)[task.task_graph].was_completed == True + assert ( + self.get_task_graph(csv_path)[task.task_graph].was_completed == True + ) completed_task_graph_run_time[task.task_graph] += task.runtime if self.get_task_graph(csv_path)[task.task_graph].missed_deadline: miss_deadline_task_graph_run_time[task.task_graph] += task.runtime - return completed_task_graph_run_time, canceled_task_graph_run_time, miss_deadline_task_graph_run_time - + return ( + completed_task_graph_run_time, + canceled_task_graph_run_time, + miss_deadline_task_graph_run_time, + ) + def get_simulator_end_time(self, csv_path: str) -> int: """Retrieves the time at which the simulator ended. From 89435dfe2cc76b8f0fe3153dc5c8e896e1bf281b Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 21:08:55 -0500 Subject: [PATCH 4/7] Update `get_time_spent_on_completed_canceled_miss_deadline_task_graph()` doc string --- data/csv_reader.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data/csv_reader.py b/data/csv_reader.py index b93ef564..4d532e54 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -305,15 +305,20 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: def get_time_spent_on_completed_canceled_miss_deadline_task_graph( self, csv_path: str ) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: - """Calculate the time scheduler spent on running tasks belong to task graph that eventually - got canceled. This helped you identify the wasted time spent on canceled task graph. + """Calculate the time scheduler spent on running tasks belong to task graphs that + 1. completed + 2. eventually got canceled + 3. missed deadline. + + This helped you identify the wasted time spent on canceled task graph. Args: csv_path (`str`): The path to the CSV file whose tasks need to be retrieved. Returns: - A `Dict[str, int]` that contains the task graph name to wasted time. + A `tuple[dict[str, int], dict[str, int], dict[str, int]]` where each dict + contains the task graph name to time spent. """ completed_task_graph_run_time = defaultdict(int) canceled_task_graph_run_time = defaultdict(int) From 79efb690bcf66baab452830dcae6cbc52247b43b Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 21:12:18 -0500 Subject: [PATCH 5/7] Fix formatting --- data/csv_types.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/data/csv_types.py b/data/csv_types.py index 2222c2e1..83ad3441 100644 --- a/data/csv_types.py +++ b/data/csv_types.py @@ -81,7 +81,7 @@ def __init__( # Values updated from the MISSED_DEADLINE event. self.missed_deadline = False self.deadline_miss_detected_at = None - + # Values updated from the TASK_CANCEL event. self.cancelled: bool = False self.cancelled_at: Optional[int] = None @@ -273,10 +273,10 @@ class TaskGraph(object): name: str release_time: int deadline: int - + cancelled: bool = False cancelled_at: Optional[int] = None - + # Values updated from the TASK_GRAPH_FINISHED event. completion_time: Optional[int] = None deadline_miss_detected_at: Optional[int] = None @@ -284,11 +284,12 @@ class TaskGraph(object): @property def was_completed(self): return self.completion_time is not None - + @property def missed_deadline(self): return self.completion_time is not None and self.completion_time > self.deadline + class Scheduler(object): def __init__( self, @@ -388,4 +389,4 @@ def update_finish(self, csv_reading: str): self.finished_task_graphs = int(csv_reading[5]) self.dropped_taskgraphs = int(csv_reading[6]) self.missed_taskgraphs = int(csv_reading[7]) - self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs \ No newline at end of file + self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs From d0552cec7b285c9e603255fa592e34583f6fe915 Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 21:19:25 -0500 Subject: [PATCH 6/7] format simulator.py --- simulator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/simulator.py b/simulator.py index f6523174..2ccd5243 100644 --- a/simulator.py +++ b/simulator.py @@ -1449,7 +1449,6 @@ def __handle_task_graph_release(self, event: Event) -> None: task_graph: TaskGraph = self._workload.get_task_graph(event.task_graph) if task_graph is None: raise ValueError(f"TaskGraph {event.task_graph} not found in the Workload.") - def __handle_update_workload(self, event: Event) -> None: """Handles an Event of type `UPDATE_WORKLOAD`. From c4e64bb9aa62e287030684a2c66b6da74e4ec321 Mon Sep 17 00:00:00 2001 From: Ray Hung Date: Thu, 7 Dec 2023 21:22:40 -0500 Subject: [PATCH 7/7] Format files again ... --- data/csv_reader.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data/csv_reader.py b/data/csv_reader.py index 4d532e54..69ae4d2e 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -166,7 +166,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): print(f"[x] Unknown event type: {reading[1]}") except Exception as e: raise ValueError( - f"Error while parsing the following line: {reading} from {csv_path}" + f"Error while parsing the following line: {reading} " + "from {csv_path}" ) from e # Some sanity check for task state and task graph state consistency @@ -305,7 +306,8 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: def get_time_spent_on_completed_canceled_miss_deadline_task_graph( self, csv_path: str ) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: - """Calculate the time scheduler spent on running tasks belong to task graphs that + """Calculate the time scheduler spent on running tasks belong to task + graphs that 1. completed 2. eventually got canceled 3. missed deadline. @@ -333,9 +335,7 @@ def get_time_spent_on_completed_canceled_miss_deadline_task_graph( ) else: # This task graph was completed - assert ( - self.get_task_graph(csv_path)[task.task_graph].was_completed == True - ) + assert self.get_task_graph(csv_path)[task.task_graph].was_completed completed_task_graph_run_time[task.task_graph] += task.runtime if self.get_task_graph(csv_path)[task.task_graph].missed_deadline: miss_deadline_task_graph_run_time[task.task_graph] += task.runtime