Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat(aci): Invoke Rule Registry from Notification Action #84524

Merged
merged 9 commits into from
Feb 10, 2025
2 changes: 1 addition & 1 deletion src/sentry/workflow_engine/handlers/action/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"NotificationActionHandler",
]

from .notification import NotificationActionHandler
from .notification.handler import NotificationActionHandler
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,32 @@
from sentry.grouping.grouptype import ErrorGroupType
from sentry.issues.grouptype import MetricIssuePOC
from sentry.utils.registry import NoRegistrationExistsError, Registry
from sentry.workflow_engine.handlers.action.notification.issue_alert import (
issue_alert_handler_registry,
)
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler, WorkflowJob

logger = logging.getLogger(__name__)


class NotificationHandlerException(Exception):
pass


class LegacyRegistryInvoker(ABC):
"""
Abstract base class that defines the interface for notification handlers.
"""

def __call__(self, job: WorkflowJob, action: Action, detector: Detector) -> None:
# Here we could add metrics collection or other common functionality
self.handle_workflow_action(job, action, detector)

@staticmethod
@abstractmethod
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None:
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None:
"""
Implement this method to handle the specific notification logic for your handler.
"""
pass
raise NotImplementedError


group_type_notification_registry = Registry[LegacyRegistryInvoker]()
Expand All @@ -41,25 +45,40 @@ def execute(
) -> None:
try:
handler = group_type_notification_registry.get(detector.type)
handler(job, action, detector)
handler.handle_workflow_action(job, action, detector)
except NoRegistrationExistsError:
logger.exception(
"No notification handler found for detector type: %s",
detector.type,
extra={"detector_id": detector.id, "action_id": action.id},
)
# Maybe metrics here?


@group_type_notification_registry.register(ErrorGroupType.slug)
class IssueAlertRegistryInvoker(LegacyRegistryInvoker):
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None:
# TODO(iamrajjoshi): Implement this
pass
@staticmethod
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None:
try:
handler = issue_alert_handler_registry.get(action.type)
handler.invoke_legacy_registry(job, action, detector)
except NoRegistrationExistsError:
logger.exception(
"No issue alert handler found for action type: %s",
action.type,
extra={"action_id": action.id},
)
raise
except Exception as e:
logger.exception(
"Error invoking issue alert handler",
extra={"action_id": action.id},
)
raise NotificationHandlerException(e)


@group_type_notification_registry.register(MetricIssuePOC.slug)
class MetricAlertRegistryInvoker(LegacyRegistryInvoker):
def handle_workflow_action(self, job: WorkflowJob, action: Action, detector: Detector) -> None:
@staticmethod
def handle_workflow_action(job: WorkflowJob, action: Action, detector: Detector) -> None:
# TODO(iamrajjoshi): Implement this
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable, Collection, Sequence
from typing import Any

import sentry_sdk

from sentry.constants import ObjectStatus
from sentry.eventstore.models import GroupEvent
from sentry.models.rule import Rule, RuleSource
from sentry.rules.processing.processor import activate_downstream_actions
from sentry.types.rules import RuleFuture
from sentry.utils.registry import Registry
from sentry.utils.safe import safe_execute
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.types import WorkflowJob
from sentry.workflow_engine.typings.notification_action import DiscordDataBlob

logger = logging.getLogger(__name__)


class BaseIssueAlertHandler(ABC):
"""
Base class for invoking the legacy issue alert registry.
"""

@staticmethod
@abstractmethod
def build_rule_action_blob(
action: Action,
) -> dict[str, Any]:
"""
Build the rule action blob from the Action model.
"""
raise NotImplementedError

@classmethod
def create_rule_instance_from_action(
cls,
action: Action,
detector: Detector,
job: WorkflowJob,
) -> Rule:
"""
Creates a Rule instance from the Action model.
:param action: Action
:param detector: Detector
:param job: WorkflowJob
:return: Rule instance
"""
workflow = job.get("workflow")
environment_id = None
if workflow and workflow.environment:
environment_id = workflow.environment.id

rule = Rule(
id=action.id,
project=detector.project,
environment_id=environment_id,
label=detector.name,
data={"actions": [cls.build_rule_action_blob(action)]},
status=ObjectStatus.ACTIVE,
source=RuleSource.ISSUE,
)

return rule

@staticmethod
def get_rule_futures(
job: WorkflowJob,
rule: Rule,
notification_uuid: str,
) -> Collection[tuple[Callable[[GroupEvent, Sequence[RuleFuture]], None], list[RuleFuture]]]:
"""
This method will collect the futures from the activate_downstream_actions method.
Based off of rule_processor.apply in rules/processing/processor.py
"""
with sentry_sdk.start_span(
op="workflow_engine.handlers.action.notification.issue_alert.invoke_legacy_registry.activate_downstream_actions"
):
grouped_futures = activate_downstream_actions(rule, job["event"], notification_uuid)
return grouped_futures.values()

@staticmethod
def execute_futures(
job: WorkflowJob,
futures: Collection[
tuple[Callable[[GroupEvent, Sequence[RuleFuture]], None], list[RuleFuture]]
],
) -> None:
"""
This method will execute the futures.
Based off of process_rules in post_process.py
"""
with sentry_sdk.start_span(
op="workflow_engine.handlers.action.notification.issue_alert.execute_futures"
):
for callback, futures in futures:
safe_execute(callback, job["event"], futures)

@classmethod
def invoke_legacy_registry(
cls,
job: WorkflowJob,
action: Action,
detector: Detector,
) -> None:
"""
This method will create a rule instance from the Action model, and then invoke the legacy registry.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which part of this is doing the invocation? it looks like it's just executing the futures like we do currently

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added a lot more docstrings just now. the tldr is we consolidate the post_process process_rules, activate_downstream_actions into this method which will handle building a rule instance from an Action, getting the action handler class, getting the rule futures and executing them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not seeing the use of any registry here, maybe that's what colleen was asking about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, good point. the rule registry is quite literally invoked in the activate_downstream_actions with the instantiate_action method call it makes. do you think there is a better way for me to be explicit about this?

This method encompases the following logic in our legacy system:
1. post_process process_rules calls rule_processor.apply
2. activate_downstream_actions
3. execute_futures (also in post_process process_rules)
"""

with sentry_sdk.start_span(
op="workflow_engine.handlers.action.notification.issue_alert.invoke_legacy_registry"
):
# Create a notification uuid
notification_uuid = str(uuid.uuid4())
iamrajjoshi marked this conversation as resolved.
Show resolved Hide resolved

# Create a rule
rule = cls.create_rule_instance_from_action(action, detector, job)

# Get the futures
futures = cls.get_rule_futures(job, rule, notification_uuid)

# Execute the futures
cls.execute_futures(job, futures)
Comment on lines +127 to +130
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these need to be separate? is it for having separate spans?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but also- i want to decouple the logic here, i have a feeling when we move to notif platform, we will slowly remove each of these calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also makes it easier to test each of these steps with mock data, so I've been encouraging this kind of decoupling. It can also allow us to add metrics surrounding each call to see what the rough failure rate is for each step.



issue_alert_handler_registry = Registry[BaseIssueAlertHandler]()


@issue_alert_handler_registry.register(Action.Type.DISCORD)
class DiscordIssueAlertHandler(BaseIssueAlertHandler):
@staticmethod
def build_rule_action_blob(action: Action) -> dict[str, Any]:
blob = DiscordDataBlob(**action.data)
return {
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": action.integration_id,
"channel_id": action.target_identifier,
"tags": blob.tags,
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import uuid
from unittest import mock

from sentry.constants import ObjectStatus
from sentry.models.rule import Rule, RuleSource
from sentry.workflow_engine.handlers.action.notification.issue_alert import (
BaseIssueAlertHandler,
DiscordIssueAlertHandler,
)
from sentry.workflow_engine.models import Action
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


class TestBaseIssueAlertHandler(BaseWorkflowTest):
def setUp(self):
super().setUp()
self.project = self.create_project()
self.detector = self.create_detector(project=self.project)
self.workflow = self.create_workflow(environment=self.environment)
self.action = self.create_action(
type=Action.Type.DISCORD,
integration_id="1234567890",
target_identifier="channel456",
data={"tags": "environment,user,my_tag"},
)
self.group, self.event, self.group_event = self.create_group_event()
self.job = WorkflowJob(event=self.group_event, workflow=self.workflow)

class TestHandler(BaseIssueAlertHandler):
@staticmethod
def build_rule_action_blob(action: Action) -> dict:
return {
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": "1234567890",
"channel_id": "channel456",
"tags": "environment,user,my_tag",
}

self.handler = TestHandler()

def test_create_rule_instance_from_action(self):
"""Test that create_rule_instance_from_action creates a Rule with correct attributes"""
rule = self.handler.create_rule_instance_from_action(self.action, self.detector, self.job)

assert isinstance(rule, Rule)
assert rule.id == self.action.id
assert rule.project == self.detector.project
assert rule.environment_id is not None
assert self.workflow.environment is not None
assert rule.environment_id == self.workflow.environment.id
assert rule.label == self.detector.name
assert rule.data == {
"actions": [
{
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": "1234567890",
"channel_id": "channel456",
"tags": "environment,user,my_tag",
}
]
}
assert rule.status == ObjectStatus.ACTIVE
assert rule.source == RuleSource.ISSUE

def test_create_rule_instance_from_action_no_environment(self):
"""Test that create_rule_instance_from_action creates a Rule with correct attributes"""
workflow = self.create_workflow()
job = WorkflowJob(event=self.group_event, workflow=workflow)
rule = self.handler.create_rule_instance_from_action(self.action, self.detector, job)

assert isinstance(rule, Rule)
assert rule.id == self.action.id
assert rule.project == self.detector.project
assert rule.environment_id is None
assert rule.label == self.detector.name
assert rule.data == {
"actions": [
{
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": "1234567890",
"channel_id": "channel456",
"tags": "environment,user,my_tag",
}
]
}
assert rule.status == ObjectStatus.ACTIVE
assert rule.source == RuleSource.ISSUE

@mock.patch("sentry.workflow_engine.handlers.action.notification.issue_alert.safe_execute")
@mock.patch(
"sentry.workflow_engine.handlers.action.notification.issue_alert.activate_downstream_actions"
)
@mock.patch("uuid.uuid4")
def test_invoke_legacy_registry(
self, mock_uuid, mock_activate_downstream_actions, mock_safe_execute
):
# Test that invoke_legacy_registry correctly processes the action
mock_uuid.return_value = uuid.UUID("12345678-1234-5678-1234-567812345678")

# Mock callback and futures
mock_callback = mock.Mock()
mock_futures = [mock.Mock()]
mock_activate_downstream_actions.return_value = {"some_key": (mock_callback, mock_futures)}

self.handler.invoke_legacy_registry(self.job, self.action, self.detector)

# Verify activate_downstream_actions called with correct args
mock_activate_downstream_actions.assert_called_once_with(
mock.ANY, self.job["event"], "12345678-1234-5678-1234-567812345678" # Rule instance
)

# Verify callback execution
mock_safe_execute.assert_called_once_with(mock_callback, self.job["event"], mock_futures)


class TestDiscordIssueAlertHandler(BaseWorkflowTest):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we test that the correct after() method is called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't need to. activate_downstream_actions is well tested and as long as we are certain that the rule action id we are sending it (for example, "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction" for Discord), we can be certain it will call the correct after method

def setUp(self):
super().setUp()
self.handler = DiscordIssueAlertHandler()
self.action = self.create_action(
type=Action.Type.DISCORD,
integration_id="1234567890",
target_identifier="channel456",
data={"tags": "environment,user,my_tag"},
)
# self.project = self.create_project()
# self.detector = self.create_detector(project=self.project)
# self.group, self.event, self.group_event = self.create_group_event()
# self.job = WorkflowJob(event=self.group_event)

def test_build_rule_action_blob(self):
"""Test that build_rule_action_blob creates correct Discord action data"""
blob = self.handler.build_rule_action_blob(self.action)

assert blob == {
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": "1234567890",
"channel_id": "channel456",
"tags": "environment,user,my_tag",
}

def test_build_rule_action_blob_no_tags(self):
"""Test that build_rule_action_blob handles missing tags"""
self.action.data = {}
blob = self.handler.build_rule_action_blob(self.action)

assert blob == {
"id": "sentry.integrations.discord.notify_action.DiscordNotifyServiceAction",
"server": "1234567890",
"channel_id": "channel456",
"tags": "",
}
Loading
Loading