Skip to content

Commit

Permalink
Merge pull request #3561 from mr0re1/fix_time
Browse files Browse the repository at this point in the history
SlurmGCP. Refactor timestamp parsing
  • Loading branch information
mr0re1 authored Jan 22, 2025
2 parents f995043 + 5f1da83 commit b98b7d1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
NodeState,
chunked,
dirs,
parse_gcp_timestamp,
)
from util import lookup
from suspend import delete_instances
Expand Down Expand Up @@ -150,7 +151,7 @@ def _find_dynamic_node_status() -> NodeAction:
return NodeActionUnchanged() # don't touch dynamic nodes

def get_fr_action(fr: FutureReservation, state:Optional[NodeState]) -> Optional[NodeAction]:
now = datetime.utcnow()
now = datetime.now()
if state is None:
return None # handle like any other node
if fr.start_time < now < fr.end_time:
Expand Down Expand Up @@ -246,7 +247,12 @@ def get_node_action(nodename: str) -> NodeAction:
("POWER_DOWN", "POWERING_UP", "POWERING_DOWN", "POWERED_DOWN")
) & (state.flags if state is not None else set())

if (inst is None) and (state is not None):
if (state is None) and (inst is None):
# Should never happen
return NodeActionUnknown(None, None)

if inst is None:
assert state is not None # to keep type-checker happy
if "POWERING_UP" in state.flags:
return NodeActionUnchanged()
if state.base == "DOWN" and "POWERED_DOWN" in state.flags:
Expand All @@ -273,16 +279,11 @@ def get_node_action(nodename: str) -> NodeAction:
return NodeActionDown(reason="Instance terminated")
elif (state is None or "POWERED_DOWN" in state.flags) and inst.status == "RUNNING":
log.info("%s is potential orphan node", nodename)
age_threshold_seconds = 90
inst_seconds_old = _seconds_since_timestamp(inst.creationTimestamp)
log.info("%s state: %s, age: %0.1fs", nodename, state, inst_seconds_old)
if inst_seconds_old < age_threshold_seconds:
log.info(
"%s not marked as orphan, it started less than %ds ago (%0.1fs)",
nodename,
age_threshold_seconds,
inst_seconds_old,
)
threshold = timedelta(seconds=90)
age = datetime.now() - parse_gcp_timestamp(inst.creationTimestamp)
log.info(f"{nodename} state: {state}, age: {age}")
if age < threshold:
log.info(f"{nodename} not marked as orphan, it started less than {threshold.seconds}s ago ({age.seconds}s)")
return NodeActionUnchanged()
return NodeActionDelete()
elif state is None:
Expand All @@ -292,19 +293,6 @@ def get_node_action(nodename: str) -> NodeAction:
return NodeActionUnchanged()


def _seconds_since_timestamp(timestamp):
"""Returns duration in seconds since a timestamp
Args:
timestamp: A formatted timestamp string (%Y-%m-%dT%H:%M:%S.%f%z)
Returns:
number of seconds that have past since the timestamp (float)
"""
if timestamp[-3] == ":": # python 36 datetime does not support the colon
timestamp = timestamp[:-3] + timestamp[-2:]
creation_dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
return datetime.now().timestamp() - creation_dt.timestamp()


def delete_placement_groups(placement_groups):
requests = {
pg["name"]: lookup().compute.resourcePolicies().delete(
Expand Down Expand Up @@ -478,7 +466,7 @@ def get_upcoming_maintenance(lkp: util.Lookup) -> Dict[str, Tuple[str, datetime]

for node, properties in lkp.instances().items():
if 'upcomingMaintenance' in properties:
start_time = datetime.strptime(properties['upcomingMaintenance']['startTimeWindow']['earliest'], '%Y-%m-%dT%H:%M:%S%z')
start_time = parse_gcp_timestamp(properties['upcomingMaintenance']['startTimeWindow']['earliest'])
upc_maint_map[node + "_maintenance"] = (node, start_time)

return upc_maint_map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

import pytest
from mock import Mock
from datetime import datetime, timezone, timedelta

from common import TstNodeset, TstCfg # needed to import util
import util
from util import NodeState, MachineType, AcceleratorInfo
from datetime import timedelta
from google.api_core.client_options import ClientOptions # noqa: E402

# Note: need to install pytest-mock
Expand Down Expand Up @@ -400,3 +401,18 @@ def test_node_state(node: str, state: Optional[NodeState], want: NodeState | Non
])
def test_MachineType_from_json(jo: dict, want: MachineType):
assert MachineType.from_json(jo) == want

UTC, PST = timezone.utc, timezone(timedelta(hours=-8))

@pytest.mark.parametrize(
"got,want",
[
# from instance.creationTimestamp:
("2024-11-30T12:47:51.676-08:00", datetime(2024, 11, 30, 12, 47, 51, 676000, tzinfo=PST)),
# from futureReservation.creationTimestamp
("2024-11-05T15:23:33.702-08:00", datetime(2024, 11, 5, 15, 23, 33, 702000, tzinfo=PST)),
# from futureReservation.timeWindow.endTime
("2025-01-15T00:00:00Z", datetime(2025, 1, 15, 0, 0, tzinfo=UTC)),
])
def test_parse_gcp_timestamp(got: str, want: datetime):
assert util.parse_gcp_timestamp(got) == want
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ def authentication_project():
DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"


def parse_gcp_timestamp(s: str) -> datetime:
"""
Parse timestamp strings returned by GCP API into datetime.
Works with both Zulu and non-Zulu timestamps.
"""
# Requires Python >= 3.7
# TODO: Remove this "hack" of trimming the Z from timestamps once we move to Python 3.11
# (context: https://discuss.python.org/t/parse-z-timezone-suffix-in-datetime/2220/30)
return datetime.fromisoformat(s.replace('Z', '+00:00'))


def universe_domain() -> str:
try:
return instance_metadata("attributes/universe_domain")
Expand Down Expand Up @@ -1622,12 +1633,11 @@ def future_reservation(self, nodeset:object) -> Optional[FutureReservation]:
project, zone, name = match.group("project","zone","name")
fr = self._get_future_reservation(project,zone,name)

# TODO: Remove this "hack" of trimming the Z from timestamps once we move to Python 3.11 (context: https://discuss.python.org/t/parse-z-timezone-suffix-in-datetime/2220/30)
start_time = datetime.fromisoformat(fr["timeWindow"]["startTime"][:-1])
end_time = datetime.fromisoformat(fr["timeWindow"]["endTime"][:-1])
start_time = parse_gcp_timestamp(fr["timeWindow"]["startTime"])
end_time = parse_gcp_timestamp(fr["timeWindow"]["endTime"])

if "autoCreatedReservations" in fr["status"] and (res:=fr["status"]["autoCreatedReservations"][0]):
if (start_time<=datetime.utcnow()<=end_time):
if (start_time<=datetime.now()<=end_time):
match = re.search(r'projects/(?P<project>[^/]+)/zones/(?P<zone>[^/]+)/reservations/(?P<name>[^/]+)(/.*)?$',res)
assert match, f"Unexpected reservation name '{res}'"
res_name = match.group("name")
Expand Down

0 comments on commit b98b7d1

Please sign in to comment.