diff --git a/src/robusta/core/sinks/sink_base.py b/src/robusta/core/sinks/sink_base.py index 6c5262f3c..96d9ca821 100644 --- a/src/robusta/core/sinks/sink_base.py +++ b/src/robusta/core/sinks/sink_base.py @@ -2,9 +2,10 @@ import time from abc import abstractmethod, ABC from collections import defaultdict +from datetime import datetime from typing import Any, List, Dict, Tuple, DefaultDict, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, root_validator from robusta.core.model.k8s_operation_type import K8sOperationType from robusta.core.reporting.base import Finding @@ -33,20 +34,62 @@ def register_notification(self, interval: int, threshold: int) -> bool: class NotificationSummary(BaseModel): message_id: Optional[str] = None # identifier of the summary message - start_ts: float = Field(default_factory=lambda: time.time()) # Timestamp of the first notification + start_ts: float = None + end_ts: float = None # Keys for the table are determined by grouping.notification_mode.summary.by summary_table: DefaultDict[KeyT, List[int]] = None - def register_notification(self, summary_key: KeyT, resolved: bool, interval: int): - now_ts = time.time() + def register_notification(self, summary_key: KeyT, resolved: bool, interval: int, aligned: bool): + now_dt = datetime.now() + now_ts = int(now_dt.timestamp()) idx = 1 if resolved else 0 - if now_ts - self.start_ts > interval or not self.summary_table: - # Expired or the first summary ever for this group_key, reset the data + if not self.end_ts or now_ts > self.end_ts: + # Group expired or the first summary ever for this group_key, reset the data self.summary_table = defaultdict(lambda: [0, 0]) - self.start_ts = now_ts + self.start_ts, self.end_ts = self.calculate_interval_boundaries(interval, aligned, now_dt) self.message_id = None self.summary_table[summary_key][idx] += 1 + @classmethod + def calculate_interval_boundaries(cls, interval: int, aligned: bool, now_dt: datetime) -> Tuple[float, float]: + now_ts = int(now_dt.timestamp()) + if aligned: + # This handles leap seconds by adjusting the length of the last interval in the + # day to the actual end of day. Note leap seconds are expected to almost always be +1, + # but it's also expected that some -1's will appear in the (far) future, and it's + # not out of the realm of possibility that somewhat larger adjustments will happen + # before the leap second adjustment is phased out around 2035. + + start_of_this_day_ts, end_of_this_day_ts = cls.get_day_boundaries(now_dt) + start_ts = now_ts - (now_ts - start_of_this_day_ts) % interval + end_ts = start_ts + interval + if ( + end_ts > end_of_this_day_ts # negative leap seconds + or end_of_this_day_ts - end_ts < interval # positive leap seconds + ): + end_ts = end_of_this_day_ts + else: + start_ts = now_ts + end_ts = now_ts + interval + return start_ts, end_ts + + @staticmethod + def get_day_boundaries(now_dt: datetime) -> Tuple[int, int]: + # Note: we assume day boundaries according to the timezone configured on the pod + # running Robusta runner. A caveat of this is that Slack will show times according + # to the client's timezone, which may differ. + start_of_this_day = now_dt.replace(hour=0, minute=0, second=0, microsecond=0) + start_of_this_day_ts = int(start_of_this_day.timestamp()) + try: + end_of_this_day = start_of_this_day.replace(day=start_of_this_day.day + 1) + except ValueError: # end of month + try: + end_of_this_day = start_of_this_day.replace(month=start_of_this_day.month + 1, day=1) + except ValueError: # end of year + end_of_this_day = start_of_this_day.replace(year=start_of_this_day.year + 1, month=1, day=1) + end_of_this_day_ts = int(end_of_this_day.timestamp()) + return start_of_this_day_ts, end_of_this_day_ts + class SinkBase(ABC): grouping_enabled: bool diff --git a/src/robusta/core/sinks/sink_base_params.py b/src/robusta/core/sinks/sink_base_params.py index 9347ef74f..4a67c9255 100644 --- a/src/robusta/core/sinks/sink_base_params.py +++ b/src/robusta/core/sinks/sink_base_params.py @@ -91,12 +91,21 @@ def validate_exactly_one_defined(cls, values: Dict): class GroupingParams(BaseModel): group_by: GroupingAttributeSelectorListT = ["cluster"] interval: int = 15*60 # in seconds + aligned: bool = False notification_mode: Optional[NotificationModeParams] @root_validator - def validate_notification_mode(cls, values: Dict): - if values is None: - return {"summary": SummaryNotificationModeParams()} + def validate_interval_alignment(cls, values: Dict): + if values["aligned"]: + if values["interval"] < 24 * 3600: + if (24 * 3600) % values["interval"]: + raise ValueError(f'Unable to properly align time interval of {values["interval"]} seconds') + else: + # TODO do we also want to support automatically aligning intervals longer than + # a day? Using month/year boundaries? This would require additionally handling + # leap years and daytime saving, just as we handle leap seconds in + # NotificationSummary.register_notification + raise ValueError(f"Automatically aligning time intervals longer than 24 hours is not supported") return values diff --git a/src/robusta/core/sinks/slack/slack_sink.py b/src/robusta/core/sinks/slack/slack_sink.py index a9aa8c623..72abe3187 100644 --- a/src/robusta/core/sinks/slack/slack_sink.py +++ b/src/robusta/core/sinks/slack/slack_sink.py @@ -1,6 +1,6 @@ from robusta.core.model.env_vars import ROBUSTA_UI_DOMAIN -from robusta.core.reporting.base import Finding, FindingStatus -from robusta.core.sinks.sink_base import NotificationGroup, NotificationSummary, SinkBase +from robusta.core.reporting.base import Finding +from robusta.core.sinks.sink_base import SinkBase from robusta.core.sinks.slack.slack_sink_params import SlackSinkConfigWrapper, SlackSinkParams from robusta.integrations import slack as slack_module @@ -37,7 +37,6 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) finding_data["cluster"] = self.cluster_name resolved = finding.title.startswith("[RESOLVED]") - # 1. Notification accounting group_key, group_header = self.get_group_key_and_header( finding_data, self.params.grouping.group_by ) @@ -48,7 +47,7 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) ) notification_summary = self.summaries[group_key] notification_summary.register_notification( - summary_key, resolved, self.params.grouping.interval + summary_key, resolved, self.params.grouping.interval, self.params.grouping.aligned ) slack_thread_ts = self.slack_sender.send_or_update_summary_message( group_header, @@ -75,6 +74,5 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool) finding, self.params, platform_enabled, thread_ts=slack_thread_ts ) - def get_timeline_uri(self, account_id: str, cluster_name: str) -> str: return f"{ROBUSTA_UI_DOMAIN}/graphs?account_id={account_id}&cluster={cluster_name}" diff --git a/tests/test_slack_grouping.py b/tests/test_slack_grouping.py new file mode 100644 index 000000000..d8b699253 --- /dev/null +++ b/tests/test_slack_grouping.py @@ -0,0 +1,16 @@ +import pytest + +from datetime import datetime + +from robusta.core.sinks.sink_base import NotificationSummary + + +class TestNotificationSummary: + @pytest.mark.parametrize("input_dt,expected_output", [ + (datetime(2024, 6, 25, 12, 15, 33), (1719266400, 1719352800)), + (datetime(2024, 6, 30, 17, 22, 19), (1719698400, 1719784800)), + (datetime(2024, 12, 3, 10, 59, 59), (1733180400, 1733266800)), + (datetime(2024, 12, 31, 16, 42, 28), (1735599600, 1735686000)), + ]) + def test_get_day_boundaries(self, input_dt, expected_output): + assert NotificationSummary.get_day_boundaries(input_dt) == expected_output