Skip to content

Commit

Permalink
feat: send events in batch from routers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 22, 2024
1 parent b65b856 commit 473f3d3
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 11 deletions.
31 changes: 25 additions & 6 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,30 @@ def send(self, event):
Arguments:
event (dict): the original event dictionary
"""
events = self.queue_event(event)
if not events:
return
if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED:
events = self.queue_event(event)
if not events:
return

self.bulk_send(events)
return self.bulk_send(events)

event_routes = self.prepare_to_send([event])
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
self.dispatch_event_persistent(
event_name,
updated_event,
host['router_type'],
host['host_configurations'],
)
else:
self.dispatch_event(
event_name,
updated_event,
host['router_type'],
host['host_configurations'],
)

def queue_event(self, event):
"""
Expand All @@ -178,8 +197,8 @@ def queue_event(self, event):
redis.lpush(self.queue_name, json.dumps(event))
queue_size = redis.llen(self.queue_name)

logger.info(f"Event pushed to the queue: {event['name']}. Queue size: {queue_size}")
if queue_size < settings.EVENT_ROUTING_BATCH_SIZE:
logger.info(f'Event {event["name"]} has been queued for batching.')
if queue_size < settings.EVENT_ROUTING_BACKEND_BATCH_SIZE:
return None
batch = redis.rpop(self.queue_name, queue_size)
events = [json.loads(queued_event.decode('utf-8')) for queued_event in batch]
Expand Down
43 changes: 42 additions & 1 deletion event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""
Test the EventsRouter
"""
import json
import datetime
from copy import copy
from unittest.mock import MagicMock, call, patch, sentinel

import ddt
from django.conf import settings
from django.test import TestCase
from django.test import TestCase, override_settings
from edx_django_utils.cache.utils import TieredCache
from eventtracking.processors.exceptions import EventEmissionExit
from tincan.statement import Statement
Expand Down Expand Up @@ -257,6 +260,44 @@ def test_duplicate_xapi_event_id(self, mocked_logger):
mocked_logger.info.mock_calls
)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.lpush.return_value = None
event1 = copy(self.transformed_event)
event1["timestamp"] = datetime.datetime.now()
event2 = copy(self.transformed_event)
event2["timestamp"] = datetime.datetime.now()
events = [event1, event2]
formatted_events = []
for event in events:
formatted_event = copy(event)
formatted_event["timestamp"] = formatted_event["timestamp"].isoformat()
formatted_events.append(json.dumps(formatted_event).encode('utf-8'))

redis_mock.rpop.return_value = formatted_events
redis_mock.llen.return_value = 1

router.send(event1)
redis_mock.llen.return_value = 2
router.send(event2)

redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1))
redis_mock.llen.assert_any_call(router.queue_name)
redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE)
mock_logger.info.assert_any_call(
f"Event {self.transformed_event['name']} has been queued for batching."
)
mock_bulk_send.assert_any_call(events)


@ddt.ddt
class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests
Expand Down
3 changes: 2 additions & 1 deletion event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def plugin_settings(settings):
settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1
settings.EVENT_ROUTING_BATCH_SIZE = 10
settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 10
# .. setting_name: XAPI_AGENT_IFI_TYPE
# .. setting_default: 'external_id'
# .. setting_description: This setting can be used to specify the type of inverse functional identifier
Expand Down
10 changes: 7 additions & 3 deletions event_routing_backends/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ def plugin_settings(settings):
'EVENT_ROUTING_BACKEND_COUNTDOWN',
settings.EVENT_ROUTING_BACKEND_COUNTDOWN
)
settings.EVENT_ROUTING_BATCH_SIZE = settings.ENV_TOKENS.get(
'EVENT_ROUTING_BATCH_SIZE',
settings.EVENT_ROUTING_BATCH_SIZE
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = settings.ENV_TOKENS.get(
'EVENT_ROUTING_BACKEND_BATCH_SIZE',
settings.EVENT_ROUTING_BACKEND_BATCH_SIZE
)
settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = settings.ENV_TOKENS.get(
'EVENT_ROUTING_BACKEND_BATCHING_ENABLED',
settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED
)
settings.CALIPER_EVENTS_ENABLED = settings.ENV_TOKENS.get(
'CALIPER_EVENTS_ENABLED',
Expand Down
2 changes: 2 additions & 0 deletions test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ def root(*args):
RUNNING_WITH_TEST_SETTINGS = True
EVENT_TRACKING_BACKENDS = {}
XAPI_AGENT_IFI_TYPE = 'external_id'
EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False
EVENT_ROUTING_BACKEND_BATCH_SIZE = 1

_mock_third_party_modules()

0 comments on commit 473f3d3

Please sign in to comment.