-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ feat(aci): Invoke Rule Registry from Notification Action #84524
Changes from 5 commits
ab20a0b
141a394
157ca3c
d983c01
f0c00d0
541bb52
afd322e
16d962a
6c334c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import logging | ||
import uuid | ||
from collections.abc import Callable, Collection, Sequence | ||
from typing import Any | ||
|
||
import sentry_sdk | ||
|
||
from sentry.constants import ObjectStatus | ||
from sentry.eventstore.models import GroupEvent | ||
from sentry.models.rule import Rule, RuleSource | ||
from sentry.rules.processing.processor import activate_downstream_actions | ||
from sentry.types.rules import RuleFuture | ||
from sentry.utils.registry import Registry | ||
from sentry.utils.safe import safe_execute | ||
from sentry.workflow_engine.models import Action, Detector | ||
from sentry.workflow_engine.types import WorkflowJob | ||
from sentry.workflow_engine.typings.notification_action import DiscordDataBlob | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class BaseIssueAlertHandler: | ||
iamrajjoshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Base class for invoking the legacy issue alert registry. | ||
""" | ||
|
||
@staticmethod | ||
def build_rule_action_blob( | ||
action: Action, | ||
) -> dict[str, Any]: | ||
""" | ||
Build the rule action blob from the Action model. | ||
""" | ||
raise NotImplementedError | ||
|
||
@classmethod | ||
def create_rule_instance_from_action( | ||
cls, | ||
action: Action, | ||
detector: Detector, | ||
) -> Rule: | ||
""" | ||
Creates a Rule instance from the Action model. | ||
:param action: Action | ||
:param detector: Detector | ||
:return: Rule instance | ||
""" | ||
|
||
rule = Rule( | ||
id=detector.id, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you remind me why this needs to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually i need this to be |
||
project=detector.project, | ||
label=detector.name, | ||
data={"actions": [cls.build_rule_action_blob(action)]}, | ||
status=ObjectStatus.ACTIVE, | ||
source=RuleSource.ISSUE, | ||
) | ||
|
||
return rule | ||
|
||
@staticmethod | ||
def get_rule_futures( | ||
job: WorkflowJob, | ||
rule: Rule, | ||
notification_uuid: str, | ||
) -> Collection[tuple[Callable[[GroupEvent, Sequence[RuleFuture]], None], list[RuleFuture]]]: | ||
""" | ||
This method will collect the futures from the activate_downstream_actions method. | ||
Based off of rule_processor.apply in rules/processing/processor.py | ||
""" | ||
with sentry_sdk.start_span( | ||
op="workflow_engine.handlers.action.notification.issue_alert.invoke_legacy_registry.activate_downstream_actions" | ||
): | ||
grouped_futures = activate_downstream_actions(rule, job["event"], notification_uuid) | ||
return grouped_futures.values() | ||
|
||
@staticmethod | ||
def execute_futures( | ||
job: WorkflowJob, | ||
futures: Collection[ | ||
tuple[Callable[[GroupEvent, Sequence[RuleFuture]], None], list[RuleFuture]] | ||
], | ||
) -> None: | ||
""" | ||
This method will execute the futures. | ||
Based off of process_rules in post_process.py | ||
""" | ||
with sentry_sdk.start_span( | ||
op="workflow_engine.handlers.action.notification.issue_alert.execute_futures" | ||
): | ||
for callback, futures in futures: | ||
safe_execute(callback, job["event"], futures) | ||
|
||
@classmethod | ||
def invoke_legacy_registry( | ||
cls, | ||
job: WorkflowJob, | ||
action: Action, | ||
detector: Detector, | ||
) -> None: | ||
""" | ||
This method will create a rule instance from the Action model, and then invoke the legacy registry. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which part of this is doing the invocation? it looks like it's just executing the futures like we do currently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added a lot more docstrings just now. the tldr is we consolidate the post_process There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm not seeing the use of any registry here, maybe that's what colleen was asking about? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yes, good point. the rule registry is quite literally invoked in the |
||
This method encompases the following logic in our legacy system: | ||
1. post_process process_rules calls rule_processor.apply | ||
2. activate_downstream_actions | ||
3. execute_futures (also in post_process process_rules) | ||
""" | ||
|
||
with sentry_sdk.start_span( | ||
op="workflow_engine.handlers.action.notification.issue_alert.invoke_legacy_registry" | ||
): | ||
# Create a notification uuid | ||
notification_uuid = str(uuid.uuid4()) | ||
iamrajjoshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Create a rule | ||
rule = cls.create_rule_instance_from_action(action, detector) | ||
|
||
# Get the futures | ||
futures = cls.get_rule_futures(job, rule, notification_uuid) | ||
|
||
# Execute the futures | ||
cls.execute_futures(job, futures) | ||
Comment on lines
+127
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do these need to be separate? is it for having separate spans? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes but also- i want to decouple the logic here, i have a feeling when we move to notif platform, we will slowly remove each of these calls. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It also makes it easier to test each of these steps with mock data, so I've been encouraging this kind of decoupling. It can also allow us to add metrics surrounding each call to see what the rough failure rate is for each step. |
||
|
||
|
||
issue_alert_handler_registry = Registry[BaseIssueAlertHandler]() | ||
|
||
|
||
@issue_alert_handler_registry.register(Action.Type.DISCORD) | ||
class DiscordIssueAlertHandler(BaseIssueAlertHandler): | ||
@staticmethod | ||
def build_rule_action_blob(action: Action) -> dict[str, Any]: | ||
blob = DiscordDataBlob(**action.data) | ||
return { | ||
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction", | ||
"server": action.integration_id, | ||
"channel_id": action.target_identifier, | ||
"tags": blob.tags, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,29 @@ | ||
import logging | ||
from abc import ABC, abstractmethod | ||
|
||
from sentry.grouping.grouptype import ErrorGroupType | ||
from sentry.issues.grouptype import MetricIssuePOC | ||
from sentry.utils.registry import NoRegistrationExistsError, Registry | ||
from sentry.workflow_engine.handlers.action.notification.issue_alert import ( | ||
issue_alert_handler_registry, | ||
) | ||
from sentry.workflow_engine.models import Action, Detector | ||
from sentry.workflow_engine.registry import action_handler_registry | ||
from sentry.workflow_engine.types import ActionHandler, WorkflowJob | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LegacyRegistryInvoker(ABC): | ||
class LegacyRegistryInvoker: | ||
iamrajjoshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Abstract base class that defines the interface for notification handlers. | ||
""" | ||
|
||
def __call__(self, job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
# Here we could add metrics collection or other common functionality | ||
self.handle_workflow_action(job, action, detector) | ||
|
||
@abstractmethod | ||
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
@staticmethod | ||
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
""" | ||
Implement this method to handle the specific notification logic for your handler. | ||
""" | ||
pass | ||
raise NotImplementedError | ||
|
||
|
||
group_type_notification_registry = Registry[LegacyRegistryInvoker]() | ||
|
@@ -41,25 +39,38 @@ def execute( | |
) -> None: | ||
try: | ||
handler = group_type_notification_registry.get(detector.type) | ||
handler(job, action, detector) | ||
handler.handle_workflow_action(job, action, detector) | ||
except NoRegistrationExistsError: | ||
logger.exception( | ||
"No notification handler found for detector type: %s", | ||
detector.type, | ||
extra={"detector_id": detector.id, "action_id": action.id}, | ||
) | ||
# Maybe metrics here? | ||
|
||
|
||
@group_type_notification_registry.register(ErrorGroupType.slug) | ||
class IssueAlertRegistryInvoker(LegacyRegistryInvoker): | ||
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
# TODO(iamrajjoshi): Implement this | ||
pass | ||
@staticmethod | ||
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
try: | ||
handler = issue_alert_handler_registry.get(action.type) | ||
handler.invoke_legacy_registry(job, action, detector) | ||
except NoRegistrationExistsError: | ||
logger.exception( | ||
"No issue alert handler found for action type: %s", | ||
action.type, | ||
extra={"action_id": action.id}, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this swallow the exception? Does the caller assume these calls never fail? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, let me raise exceptions, we can swallow if needed later |
||
except Exception: | ||
logger.exception( | ||
"Error invoking issue alert handler", | ||
extra={"action_id": action.id}, | ||
) | ||
iamrajjoshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@group_type_notification_registry.register(MetricIssuePOC.slug) | ||
class MetricAlertRegistryInvoker(LegacyRegistryInvoker): | ||
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
@staticmethod | ||
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None: | ||
# TODO(iamrajjoshi): Implement this | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import uuid | ||
from unittest import mock | ||
|
||
from sentry.constants import ObjectStatus | ||
from sentry.models.rule import Rule, RuleSource | ||
from sentry.workflow_engine.handlers.action.notification.issue_alert import ( | ||
BaseIssueAlertHandler, | ||
DiscordIssueAlertHandler, | ||
) | ||
from sentry.workflow_engine.models import Action | ||
from sentry.workflow_engine.types import WorkflowJob | ||
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest | ||
|
||
|
||
class TestBaseIssueAlertHandler(BaseWorkflowTest): | ||
def setUp(self): | ||
super().setUp() | ||
self.project = self.create_project() | ||
self.detector = self.create_detector(project=self.project) | ||
self.action = self.create_action( | ||
type=Action.Type.DISCORD, | ||
integration_id="1234567890", | ||
target_identifier="channel456", | ||
data={"tags": "environment,user,my_tag"}, | ||
) | ||
self.group, self.event, self.group_event = self.create_group_event() | ||
self.job = WorkflowJob(event=self.group_event) | ||
|
||
class TestHandler(BaseIssueAlertHandler): | ||
@staticmethod | ||
def build_rule_action_blob(action: Action) -> dict: | ||
return { | ||
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction", | ||
"server": "1234567890", | ||
"channel_id": "channel456", | ||
"tags": "environment,user,my_tag", | ||
} | ||
|
||
self.handler = TestHandler() | ||
|
||
def test_create_rule_instance_from_action(self): | ||
"""Test that create_rule_instance_from_action creates a Rule with correct attributes""" | ||
rule = self.handler.create_rule_instance_from_action(self.action, self.detector) | ||
|
||
assert isinstance(rule, Rule) | ||
assert rule.id == self.detector.id | ||
assert rule.project == self.detector.project | ||
assert rule.label == self.detector.name | ||
assert rule.data == { | ||
"actions": [ | ||
{ | ||
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction", | ||
"server": "1234567890", | ||
"channel_id": "channel456", | ||
"tags": "environment,user,my_tag", | ||
} | ||
] | ||
} | ||
assert rule.status == ObjectStatus.ACTIVE | ||
assert rule.source == RuleSource.ISSUE | ||
|
||
@mock.patch("sentry.workflow_engine.handlers.action.notification.issue_alert.safe_execute") | ||
@mock.patch( | ||
"sentry.workflow_engine.handlers.action.notification.issue_alert.activate_downstream_actions" | ||
) | ||
@mock.patch("uuid.uuid4") | ||
def test_invoke_legacy_registry( | ||
self, mock_uuid, mock_activate_downstream_actions, mock_safe_execute | ||
): | ||
# Test that invoke_legacy_registry correctly processes the action | ||
mock_uuid.return_value = uuid.UUID("12345678-1234-5678-1234-567812345678") | ||
|
||
# Mock callback and futures | ||
mock_callback = mock.Mock() | ||
mock_futures = [mock.Mock()] | ||
mock_activate_downstream_actions.return_value = {"some_key": (mock_callback, mock_futures)} | ||
|
||
self.handler.invoke_legacy_registry(self.job, self.action, self.detector) | ||
|
||
# Verify activate_downstream_actions called with correct args | ||
mock_activate_downstream_actions.assert_called_once_with( | ||
mock.ANY, self.job["event"], "12345678-1234-5678-1234-567812345678" # Rule instance | ||
) | ||
|
||
# Verify callback execution | ||
mock_safe_execute.assert_called_once_with(mock_callback, self.job["event"], mock_futures) | ||
|
||
|
||
class TestDiscordIssueAlertHandler(BaseWorkflowTest): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we test that the correct after() method is called? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we shouldn't need to. |
||
def setUp(self): | ||
super().setUp() | ||
self.handler = DiscordIssueAlertHandler() | ||
self.action = self.create_action( | ||
type=Action.Type.DISCORD, | ||
integration_id="1234567890", | ||
target_identifier="channel456", | ||
data={"tags": "environment,user,my_tag"}, | ||
) | ||
|
||
def test_build_rule_action_blob(self): | ||
"""Test that build_rule_action_blob creates correct Discord action data""" | ||
blob = self.handler.build_rule_action_blob(self.action) | ||
|
||
assert blob == { | ||
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction", | ||
"server": "1234567890", | ||
"channel_id": "channel456", | ||
"tags": "environment,user,my_tag", | ||
} | ||
|
||
def test_build_rule_action_blob_no_tags(self): | ||
"""Test that build_rule_action_blob handles missing tags""" | ||
self.action.data = {} | ||
blob = self.handler.build_rule_action_blob(self.action) | ||
|
||
assert blob == { | ||
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction", | ||
"server": "1234567890", | ||
"channel_id": "channel456", | ||
"tags": "", | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this file path a little confusing, since notification/notification doesn't tell me a lot about what's in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updating to handler