Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get_time_spent_on_completed_canceled_miss_deadline_task_graph #75

Merged
merged 8 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 109 additions & 3 deletions data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from collections import defaultdict
from operator import add, attrgetter
from typing import Mapping, Optional, Sequence, Tuple, Union
from typing import Dict, Mapping, Optional, Sequence, Tuple, Union

import absl # noqa: F401

Expand All @@ -11,6 +11,7 @@
Scheduler,
Simulator,
Task,
TaskGraph,
WorkerPool,
WorkerPoolStats,
WorkerPoolUtilization,
Expand Down Expand Up @@ -48,7 +49,8 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
"""
for csv_path, csv_readings in readings.items():
simulator = None
tasks = {}
tasks: dict[str, Task] = {}
task_graphs: dict[str, TaskGraph] = {}
worker_pools = {}
schedulers = []
for reading in csv_readings:
Expand Down Expand Up @@ -116,6 +118,22 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
elif reading[1] == "TASK_PLACEMENT":
# Update the task with the placement event data.
tasks[reading[5]].update_placement(reading, worker_pools)
elif reading[1] == "TASK_CANCEL":
# Update the task with the placement event data.
if reading[4] not in tasks:
tasks[reading[4]] = Task(
name=reading[2],
task_graph=reading[5],
timestamp=int(reading[3]),
task_id=reading[4],
intended_release_time=float("inf"),
release_time=float("inf"),
deadline=float("inf"),
)
tasks[reading[4]].cancelled = True
tasks[reading[4]].cancelled_at = int(reading[0])
task_graphs[reading[5]].cancelled = True
task_graphs[reading[5]].cancelled_at = int(reading[0])
elif reading[1] == "TASK_SKIP" and reading[4] in tasks:
# Update the task with the skip data.
tasks[reading[4]].update_skip(reading)
Expand All @@ -128,17 +146,50 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
elif reading[1] == "TASK_SCHEDULED":
# Add the task to the last scheduler's invocation.
schedulers[-1].update_task_schedule(reading)
elif reading[1] == "TASK_GRAPH_RELEASE":
# Add the task to the last scheduler's invocation.
task_graphs[reading[4]] = TaskGraph(
name=reading[4],
release_time=int(reading[2]),
deadline=int(reading[3]),
)
elif reading[1] == "TASK_GRAPH_FINISHED":
# Add the task to the last scheduler's invocation.
task_graphs[reading[2]].completion_time = int(reading[0])
task_graphs[reading[2]].cancelled = False
elif reading[1] == "MISSED_TASK_GRAPH_DEADLINE":
# Add the task to the last scheduler's invocation.
task_graphs[reading[2]].deadline_miss_detected_at = int(
reading[0]
)
else:
print(f"[x] Unknown event type: {reading[1]}")
except Exception as e:
raise ValueError(
f"Error while parsing the following line: {reading}"
f"Error while parsing the following line: {reading} "
"from {csv_path}"
) from e

# Some sanity check for task state and task graph state consistency
canceled_task_graphs_count = len(
[tg for tg in task_graphs.values() if tg.cancelled]
)
missed_deadline_task_graphs_count = len(
[tg for tg in task_graphs.values() if tg.missed_deadline]
)
completed_task_graphs_count = len(
[tg for tg in task_graphs.values() if tg.was_completed]
)
assert simulator.finished_task_graphs == completed_task_graphs_count
assert simulator.missed_taskgraphs == missed_deadline_task_graphs_count
assert simulator.dropped_taskgraphs == canceled_task_graphs_count

simulator.worker_pools = worker_pools.values()
simulator.tasks = list(
sorted(tasks.values(), key=attrgetter("release_time"))
)
simulator.scheduler_invocations = schedulers
simulator.task_graphs = task_graphs
self._simulators[csv_path] = simulator

def get_scheduler_invocations(self, csv_path: str) -> Sequence[Scheduler]:
Expand Down Expand Up @@ -224,6 +275,19 @@ def get_tasks(self, csv_path: str) -> Sequence[Task]:
"""
return self._simulators[csv_path].tasks

def get_task_graph(self, csv_path: str) -> dict[str, TaskGraph]:
"""Retrieves the tasks ordered by their release time.

Args:
csv_path (`str`): The path to the CSV file whose tasks need to
be retrieved.

Returns:
A `Sequence[Task]` that depicts the tasks in the execution,
ordered by their release time.
"""
return self._simulators[csv_path].task_graphs

def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]:
"""Retrieves the tasks that had placement issues (i.e., had a TASK_SKIP).

Expand All @@ -239,6 +303,48 @@ def get_tasks_with_placement_issues(self, csv_path: str) -> Sequence[Task]:
task for task in self.get_tasks(csv_path) if len(task.skipped_times) > 0
]

def get_time_spent_on_completed_canceled_miss_deadline_task_graph(
self, csv_path: str
) -> tuple[dict[str, int], dict[str, int], dict[str, int]]:
"""Calculate the time scheduler spent on running tasks belong to task
graphs that
1. completed
2. eventually got canceled
3. missed deadline.

This helped you identify the wasted time spent on canceled task graph.

Args:
csv_path (`str`): The path to the CSV file whose tasks need to
be retrieved.

Returns:
A `tuple[dict[str, int], dict[str, int], dict[str, int]]` where each dict
contains the task graph name to time spent.
"""
completed_task_graph_run_time = defaultdict(int)
canceled_task_graph_run_time = defaultdict(int)
miss_deadline_task_graph_run_time = defaultdict(int)
for task in self.get_tasks(csv_path):
if self.get_task_graph(csv_path)[task.task_graph].cancelled:
if task.was_completed:
canceled_task_graph_run_time[task.task_graph] += task.runtime
elif task.cancelled and task.placement_time is not None:
canceled_task_graph_run_time[task.task_graph] += (
task.cancelled_at - task.placement_time
)
else:
# This task graph was completed
assert self.get_task_graph(csv_path)[task.task_graph].was_completed
completed_task_graph_run_time[task.task_graph] += task.runtime
if self.get_task_graph(csv_path)[task.task_graph].missed_deadline:
miss_deadline_task_graph_run_time[task.task_graph] += task.runtime
return (
completed_task_graph_run_time,
canceled_task_graph_run_time,
miss_deadline_task_graph_run_time,
)

def get_simulator_end_time(self, csv_path: str) -> int:
"""Retrieves the time at which the simulator ended.

Expand Down
34 changes: 32 additions & 2 deletions data/csv_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import namedtuple
from dataclasses import dataclass
from functools import total_ordering
from typing import List, Mapping, Optional, Sequence

Expand Down Expand Up @@ -81,6 +82,10 @@ def __init__(
self.missed_deadline = False
self.deadline_miss_detected_at = None

# Values updated from the TASK_CANCEL event.
self.cancelled: bool = False
self.cancelled_at: Optional[int] = None

def get_deadline_delay(self) -> int:
"""Retrieve the deadline delay in microseconds.

Expand Down Expand Up @@ -263,6 +268,28 @@ def __eq__(self, other):
return self.id == other.id


@dataclass
class TaskGraph(object):
name: str
release_time: int
deadline: int

cancelled: bool = False
cancelled_at: Optional[int] = None

# Values updated from the TASK_GRAPH_FINISHED event.
completion_time: Optional[int] = None
deadline_miss_detected_at: Optional[int] = None

@property
def was_completed(self):
return self.completion_time is not None

@property
def missed_deadline(self):
return self.completion_time is not None and self.completion_time > self.deadline


class Scheduler(object):
def __init__(
self,
Expand Down Expand Up @@ -335,13 +362,15 @@ def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0):
self.finished_tasks = None
self.dropped_tasks = None
self.missed_deadlines = None
self.goodput_taskgraphs = None
self.finished_task_graphs = None
self.dropped_taskgraphs = None
self.missed_taskgraphs = None
self.goodput_taskgraphs = None

self.worker_pools = []
self.tasks = []
self.scheduler_invocations = []
self.task_graphs: dict[str, TaskGraph] = {}

def update_finish(self, csv_reading: str):
"""Updates the values of the Simulator based on the SIMULATOR_END event from
Expand All @@ -357,6 +386,7 @@ def update_finish(self, csv_reading: str):
self.finished_tasks = int(csv_reading[2])
self.dropped_tasks = int(csv_reading[3])
self.missed_deadlines = int(csv_reading[4])
self.goodput_taskgraphs = int(csv_reading[5]) - int(csv_reading[7])
self.finished_task_graphs = int(csv_reading[5])
self.dropped_taskgraphs = int(csv_reading[6])
self.missed_taskgraphs = int(csv_reading[7])
self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs
16 changes: 8 additions & 8 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ def dry_run(self) -> None:
task_graph.deadline,
task_graph.job_graph.completion_time,
)
self._csv_logger.debug(
"%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s",
0,
task_graph.release_time.to(EventTime.Unit.US).time,
task_graph.deadline.to(EventTime.Unit.US).time,
task_graph.name,
len(task_graph.get_nodes()),
)
if self._log_task_graphs:
# Log a DOT representation of the TaskGraph, if requested.
task_graph.to_dot(
Expand Down Expand Up @@ -1441,14 +1449,6 @@ def __handle_task_graph_release(self, event: Event) -> None:
task_graph: TaskGraph = self._workload.get_task_graph(event.task_graph)
if task_graph is None:
raise ValueError(f"TaskGraph {event.task_graph} not found in the Workload.")
self._csv_logger.debug(
"%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s",
event.time.to(EventTime.Unit.US).time,
task_graph.release_time.to(EventTime.Unit.US).time,
task_graph.deadline.to(EventTime.Unit.US).time,
task_graph.name,
len(task_graph.get_nodes()),
)

def __handle_update_workload(self, event: Event) -> None:
"""Handles an Event of type `UPDATE_WORKLOAD`.
Expand Down
Loading