diff --git a/data/csv_reader.py b/data/csv_reader.py index d7d8c749..69ae4d2e 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -2,7 +2,7 @@ import json from collections import defaultdict from operator import add, attrgetter -from typing import Mapping, Optional, Sequence, Tuple, Union +from typing import Dict, Mapping, Optional, Sequence, Tuple, Union import absl # noqa: F401 @@ -11,6 +11,7 @@ Scheduler, Simulator, Task, + TaskGraph, WorkerPool, WorkerPoolStats, WorkerPoolUtilization, @@ -48,7 +49,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): """ for csv_path, csv_readings in readings.items(): simulator = None - tasks = {} + tasks: dict[str, Task] = {} + task_graphs: dict[str, TaskGraph] = {} worker_pools = {} schedulers = [] for reading in csv_readings: @@ -116,6 +118,22 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): elif reading[1] == "TASK_PLACEMENT": # Update the task with the placement event data. tasks[reading[5]].update_placement(reading, worker_pools) + elif reading[1] == "TASK_CANCEL": + # Update the task with the placement event data. + if reading[4] not in tasks: + tasks[reading[4]] = Task( + name=reading[2], + task_graph=reading[5], + timestamp=int(reading[3]), + task_id=reading[4], + intended_release_time=float("inf"), + release_time=float("inf"), + deadline=float("inf"), + ) + tasks[reading[4]].cancelled = True + tasks[reading[4]].cancelled_at = int(reading[0]) + task_graphs[reading[5]].cancelled = True + task_graphs[reading[5]].cancelled_at = int(reading[0]) elif reading[1] == "TASK_SKIP" and reading[4] in tasks: # Update the task with the skip data. tasks[reading[4]].update_skip(reading) @@ -128,17 +146,50 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): elif reading[1] == "TASK_SCHEDULED": # Add the task to the last scheduler's invocation. schedulers[-1].update_task_schedule(reading) + elif reading[1] == "TASK_GRAPH_RELEASE": + # Add the task to the last scheduler's invocation. + task_graphs[reading[4]] = TaskGraph( + name=reading[4], + release_time=int(reading[2]), + deadline=int(reading[3]), + ) + elif reading[1] == "TASK_GRAPH_FINISHED": + # Add the task to the last scheduler's invocation. + task_graphs[reading[2]].completion_time = int(reading[0]) + task_graphs[reading[2]].cancelled = False + elif reading[1] == "MISSED_TASK_GRAPH_DEADLINE": + # Add the task to the last scheduler's invocation. + task_graphs[reading[2]].deadline_miss_detected_at = int( + reading[0] + ) else: print(f"[x] Unknown event type: {reading[1]}") except Exception as e: raise ValueError( - f"Error while parsing the following line: {reading}" + f"Error while parsing the following line: {reading} " + "from {csv_path}" ) from e + + # Some sanity check for task state and task graph state consistency + canceled_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.cancelled] + ) + missed_deadline_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.missed_deadline] + ) + completed_task_graphs_count = len( + [tg for tg in task_graphs.values() if tg.was_completed] + ) + assert simulator.finished_task_graphs == completed_task_graphs_count + assert simulator.missed_taskgraphs == missed_deadline_task_graphs_count + assert simulator.dropped_taskgraphs == canceled_task_graphs_count + simulator.worker_pools = worker_pools.values() simulator.tasks = list( sorted(tasks.values(), key=attrgetter("release_time")) ) simulator.scheduler_invocations = schedulers + simulator.task_graphs = task_graphs self._simulators[csv_path] = simulator def get_scheduler_invocations(self, csv_path: str) -> Sequence[Scheduler]: @@ -224,6 +275,19 @@ def get_tasks(self, csv_path: str) -> Sequence[Task]: """ return self._simulators[csv_path].tasks + def get_task_graph(self, csv_path: str) -> dict[str, TaskGraph]: + """Retrieves the tasks ordered by their release time. + + Args: + csv_path (`str`): The path to the CSV file whose tasks need to + be retrieved. + + Returns: + A `Sequence[Task]` that depicts the tasks in the execution, + ordered by their release time. + """ + return self._simulators[csv_path].task_graphs + def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: """Retrieves the tasks that had placement issues (i.e., had a TASK_SKIP). @@ -239,6 +303,48 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]: task for task in self.get_tasks(csv_path) if len(task.skipped_times) > 0 ] + def get_time_spent_on_completed_canceled_miss_deadline_task_graph( + self, csv_path: str + ) -> tuple[dict[str, int], dict[str, int], dict[str, int]]: + """Calculate the time scheduler spent on running tasks belong to task + graphs that + 1. completed + 2. eventually got canceled + 3. missed deadline. + + This helped you identify the wasted time spent on canceled task graph. + + Args: + csv_path (`str`): The path to the CSV file whose tasks need to + be retrieved. + + Returns: + A `tuple[dict[str, int], dict[str, int], dict[str, int]]` where each dict + contains the task graph name to time spent. + """ + completed_task_graph_run_time = defaultdict(int) + canceled_task_graph_run_time = defaultdict(int) + miss_deadline_task_graph_run_time = defaultdict(int) + for task in self.get_tasks(csv_path): + if self.get_task_graph(csv_path)[task.task_graph].cancelled: + if task.was_completed: + canceled_task_graph_run_time[task.task_graph] += task.runtime + elif task.cancelled and task.placement_time is not None: + canceled_task_graph_run_time[task.task_graph] += ( + task.cancelled_at - task.placement_time + ) + else: + # This task graph was completed + assert self.get_task_graph(csv_path)[task.task_graph].was_completed + completed_task_graph_run_time[task.task_graph] += task.runtime + if self.get_task_graph(csv_path)[task.task_graph].missed_deadline: + miss_deadline_task_graph_run_time[task.task_graph] += task.runtime + return ( + completed_task_graph_run_time, + canceled_task_graph_run_time, + miss_deadline_task_graph_run_time, + ) + def get_simulator_end_time(self, csv_path: str) -> int: """Retrieves the time at which the simulator ended. diff --git a/data/csv_types.py b/data/csv_types.py index 54940670..83ad3441 100644 --- a/data/csv_types.py +++ b/data/csv_types.py @@ -1,4 +1,5 @@ from collections import namedtuple +from dataclasses import dataclass from functools import total_ordering from typing import List, Mapping, Optional, Sequence @@ -81,6 +82,10 @@ def __init__( self.missed_deadline = False self.deadline_miss_detected_at = None + # Values updated from the TASK_CANCEL event. + self.cancelled: bool = False + self.cancelled_at: Optional[int] = None + def get_deadline_delay(self) -> int: """Retrieve the deadline delay in microseconds. @@ -263,6 +268,28 @@ def __eq__(self, other): return self.id == other.id +@dataclass +class TaskGraph(object): + name: str + release_time: int + deadline: int + + cancelled: bool = False + cancelled_at: Optional[int] = None + + # Values updated from the TASK_GRAPH_FINISHED event. + completion_time: Optional[int] = None + deadline_miss_detected_at: Optional[int] = None + + @property + def was_completed(self): + return self.completion_time is not None + + @property + def missed_deadline(self): + return self.completion_time is not None and self.completion_time > self.deadline + + class Scheduler(object): def __init__( self, @@ -335,13 +362,15 @@ def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0): self.finished_tasks = None self.dropped_tasks = None self.missed_deadlines = None - self.goodput_taskgraphs = None + self.finished_task_graphs = None self.dropped_taskgraphs = None self.missed_taskgraphs = None + self.goodput_taskgraphs = None self.worker_pools = [] self.tasks = [] self.scheduler_invocations = [] + self.task_graphs: dict[str, TaskGraph] = {} def update_finish(self, csv_reading: str): """Updates the values of the Simulator based on the SIMULATOR_END event from @@ -357,6 +386,7 @@ def update_finish(self, csv_reading: str): self.finished_tasks = int(csv_reading[2]) self.dropped_tasks = int(csv_reading[3]) self.missed_deadlines = int(csv_reading[4]) - self.goodput_taskgraphs = int(csv_reading[5]) - int(csv_reading[7]) + self.finished_task_graphs = int(csv_reading[5]) self.dropped_taskgraphs = int(csv_reading[6]) self.missed_taskgraphs = int(csv_reading[7]) + self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs diff --git a/simulator.py b/simulator.py index fd8caf3c..2ccd5243 100644 --- a/simulator.py +++ b/simulator.py @@ -450,6 +450,14 @@ def dry_run(self) -> None: task_graph.deadline, task_graph.job_graph.completion_time, ) + self._csv_logger.debug( + "%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s", + 0, + task_graph.release_time.to(EventTime.Unit.US).time, + task_graph.deadline.to(EventTime.Unit.US).time, + task_graph.name, + len(task_graph.get_nodes()), + ) if self._log_task_graphs: # Log a DOT representation of the TaskGraph, if requested. task_graph.to_dot( @@ -1441,14 +1449,6 @@ def __handle_task_graph_release(self, event: Event) -> None: task_graph: TaskGraph = self._workload.get_task_graph(event.task_graph) if task_graph is None: raise ValueError(f"TaskGraph {event.task_graph} not found in the Workload.") - self._csv_logger.debug( - "%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s", - event.time.to(EventTime.Unit.US).time, - task_graph.release_time.to(EventTime.Unit.US).time, - task_graph.deadline.to(EventTime.Unit.US).time, - task_graph.name, - len(task_graph.get_nodes()), - ) def __handle_update_workload(self, event: Event) -> None: """Handles an Event of type `UPDATE_WORKLOAD`.