Skip to content

Commit

Permalink
feat: send events in batch
Browse files Browse the repository at this point in the history
(cherry picked from commit 4e6a374)
  • Loading branch information
Ian2012 committed Feb 5, 2024
1 parent dacce4f commit 2c13d4a
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 57 deletions.
49 changes: 41 additions & 8 deletions event_routing_backends/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import json
import logging

from django.conf import settings
from django.dispatch import receiver
from django_redis import get_redis_connection
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.tracker import get_tracker
from openedx_events.analytics.signals import TRACKING_EVENT_EMITTED

logger = logging.getLogger(__name__)

TRANSFORMED_EVENT_KEY_NAME = "transformed_events"

@receiver(TRACKING_EVENT_EMITTED)
def send_tracking_log_to_backends(
Expand All @@ -36,22 +38,53 @@ def send_tracking_log_to_backends(

event = {
"name": tracking_log.name,
"timestamp": tracking_log.timestamp,
"timestamp": tracking_log.timestamp.isoformat(),
"data": json.loads(tracking_log.data),
"context": json.loads(tracking_log.context),
}

redis = get_redis_connection()

add_to_batch(redis, event)

queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE -1:
batch = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, queue_size)
send_batch(batch)


def add_to_batch(redis, event):
"""
Add the event to the redis queue.
"""
redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps(event))
logger.info("Event pushed to the queue: %s", event["name"])


def send_batch(queued_events):
"""
Transform the event to xAPI or Caliper format.
"""
json_events = []
for queued_event in queued_events:
new_event = json.loads(queued_event.decode('utf-8'))
json_events.append(new_event)

process_batch(json_events)

def process_batch(json_events):
"""
Send the transformed event to the different event bus backends.
"""
tracker = get_tracker()

engines = {
name: engine
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}

for name, engine in engines.items():
try:
processed_event = engine.process_event(event)
logger.info('Successfully processed event "{}"'.format(event["name"]))
engine.send_to_backends(processed_event.copy())
except EventEmissionExit:
logger.info("[EventEmissionExit] skipping event {}".format(event["name"]))
for backend_name, backend in engine.backends.items():
logger.info(f"Sending events to backend [{backend_name}] event bus in batch")
backend.bulk_send(json_events)
2 changes: 2 additions & 0 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,5 @@ def plugin_settings(settings):
}
}
})

settings.EVENT_ROUTING_BATCH_SIZE = 5
129 changes: 91 additions & 38 deletions event_routing_backends/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
Test handlers for signals emitted by the analytics app
"""

from datetime import datetime
from unittest.mock import Mock, patch

from django.test import TestCase
from django.test.utils import override_settings
from eventtracking.django.django_tracker import DjangoTracker
from openedx_events.analytics.data import TrackingLogData

from event_routing_backends.handlers import send_tracking_log_to_backends

import json
from event_routing_backends.handlers import send_tracking_log_to_backends, send_batch, add_to_batch, process_batch

class TestHandlers(TestCase):
"""
Expand All @@ -26,9 +26,10 @@ class TestHandlers(TestCase):
}
)
@patch("event_routing_backends.handlers.get_tracker")
def test_send_tracking_log_to_backends(
self, mock_get_tracker
):
@patch("event_routing_backends.handlers.send_batch")
@patch("event_routing_backends.handlers.add_to_batch")
@patch("event_routing_backends.handlers.get_redis_connection")
def test_send_tracking_log_to_batch(self, mock_get_redis_connection, mock_add_to_batch, mock_send_batch, mock_get_tracker):
"""
Test for send_tracking_log_to_backends
"""
Expand All @@ -37,25 +38,27 @@ def test_send_tracking_log_to_backends(
mock_backend = Mock()
tracker.backends["event_bus"].send_to_backends = mock_backend

mock_get_redis_connection.return_value.llen.return_value = 0

tracking_log_data = TrackingLogData(
name="test_name",
timestamp=datetime.now(),
data="{}",
context="{}",
)
send_tracking_log_to_backends(
sender=None,
signal=None,
tracking_log=TrackingLogData(
name="test_name",
timestamp="test_timestamp",
data="{}",
context="{}",
),
tracking_log=tracking_log_data,
)
mock_add_to_batch.assert_called_once_with(mock_get_redis_connection.return_value, {
"name": tracking_log_data.name,
"timestamp": tracking_log_data.timestamp.isoformat(),
"data": json.loads(tracking_log_data.data),
"context": json.loads(tracking_log_data.context),
})

mock_backend.assert_called_once_with(
{
"name": "test_name",
"timestamp": "test_timestamp",
"data": {},
"context": {},
}
)
mock_send_batch.assert_called_once()

@override_settings(
EVENT_TRACKING_BACKENDS={
Expand All @@ -76,31 +79,81 @@ def test_send_tracking_log_to_backends(
)
@patch("event_routing_backends.handlers.get_tracker")
@patch("event_routing_backends.handlers.isinstance")
@patch("event_routing_backends.handlers.logger")
def test_send_tracking_log_to_backends_error(
self, mock_logger, mock_is_instance, mock_get_tracker
def test_send_batch(
self, mock_is_instance, mock_get_tracker
):
"""
Test for send_tracking_log_to_backends
Test for send_batch
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker

mock_is_instance.return_value = True
event = {
"name": "test_name",
"timestamp": datetime.now().isoformat(),
"data": {},
"context": {},
}
send_batch([json.dumps(event).encode("utf-8")])

x = send_tracking_log_to_backends(
sender=None,
signal=None,
tracking_log=TrackingLogData(
name="test_name",
timestamp="test_timestamp",
data="{}",
context="{}",
),
)

assert x is None

mock_logger.info.assert_called_once_with(
"[EventEmissionExit] skipping event {}".format("test_name")
)
def test_add_to_batch(self):
"""
Test for add_to_batch
"""
mock_redis = Mock()
event = {
"name": "test_name",
"timestamp": datetime.now().isoformat(),
"data": {},
"context": {},
}
add_to_batch(mock_redis, event)
mock_redis.lpush.assert_called_once_with("transformed_events", json.dumps(event))

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"processors": [
{
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor",
"OPTIONS": {
"whitelist": ["no_test_name"]
}
}
],
"backends": {
"xapi": {
"ENGINE": "event_routing_backends.backends.sync_events_router.SyncEventsRouter",
},
}
},
},
}
)
@patch("event_routing_backends.handlers.get_tracker")
@patch("event_routing_backends.handlers.isinstance")
@patch("event_routing_backends.handlers.logger")
def test_process_batch(self, mock_logger, mock_is_instance, mock_get_tracker):
"""
Test for process_batch
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_bulk_send = Mock()
tracker.backends["event_bus"].backends["xapi"].bulk_send = mock_bulk_send
mock_is_instance.return_value = True

event = {
"name": "test_name",
"timestamp": datetime.now().isoformat(),
"data": {},
"context": {},
}
process_batch([event])
mock_logger.info.assert_called_once_with("Sending events to backend [xapi] event bus in batch")
mock_bulk_send.assert_called_once_with([event])
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ apache-libcloud # For bulk event log loading
fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it
openedx-filters
openedx-events
django-redis
7 changes: 7 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ apache-libcloud==3.8.0
# via -r requirements/base.in
asgiref==3.7.2
# via django
async-timeout==4.0.3
# via redis
attrs==23.2.0
# via openedx-events
backports-zoneinfo[tzdata]==0.2.1
Expand Down Expand Up @@ -58,6 +60,7 @@ django==3.2.23
# django-crum
# django-fernet-fields-v2
# django-model-utils
# django-redis
# django-waffle
# djangorestframework
# edx-celeryutils
Expand All @@ -77,6 +80,8 @@ django-fernet-fields-v2==0.9
# via -r requirements/base.in
django-model-utils==4.3.1
# via edx-celeryutils
django-redis==5.4.0
# via -r requirements/base.in
django-waffle==4.1.0
# via
# edx-django-utils
Expand Down Expand Up @@ -153,6 +158,8 @@ pytz==2024.1
# tincan
pyyaml==6.0.1
# via code-annotations
redis==5.0.1
# via django-redis
requests==2.31.0
# via
# -r requirements/base.in
Expand Down
21 changes: 16 additions & 5 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ asgiref==3.7.2
# via
# -r requirements/quality.txt
# django
astroid==3.0.2
astroid==3.0.3
# via
# -r requirements/quality.txt
# pylint
# pylint-celery
async-timeout==4.0.3
# via
# -r requirements/quality.txt
# redis
attrs==23.2.0
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -138,6 +142,7 @@ django==3.2.23
# django-crum
# django-fernet-fields-v2
# django-model-utils
# django-redis
# django-waffle
# djangorestframework
# edx-celeryutils
Expand All @@ -161,6 +166,8 @@ django-model-utils==4.3.1
# via
# -r requirements/quality.txt
# edx-celeryutils
django-redis==5.4.0
# via -r requirements/quality.txt
django-waffle==4.1.0
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -199,7 +206,7 @@ exceptiongroup==1.2.0
# pytest
factory-boy==3.3.0
# via -r requirements/quality.txt
faker==22.6.0
faker==22.7.0
# via
# -r requirements/quality.txt
# factory-boy
Expand Down Expand Up @@ -281,7 +288,7 @@ packaging==23.2
# pyproject-api
# pytest
# tox
path==16.9.0
path==16.10.0
# via edx-i18n-tools
pbr==6.0.0
# via
Expand Down Expand Up @@ -319,9 +326,9 @@ pycparser==2.21
# via
# -r requirements/quality.txt
# cffi
pydantic==2.6.0
pydantic==2.6.1
# via inflect
pydantic-core==2.16.1
pydantic-core==2.16.2
# via pydantic
pydocstyle==6.3.0
# via -r requirements/quality.txt
Expand Down Expand Up @@ -394,6 +401,10 @@ pyyaml==6.0.1
# -r requirements/quality.txt
# code-annotations
# edx-i18n-tools
redis==5.0.1
# via
# -r requirements/quality.txt
# django-redis
requests==2.31.0
# via
# -r requirements/quality.txt
Expand Down
Loading

0 comments on commit 2c13d4a

Please sign in to comment.