diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index a43b0f9f..89cf7d77 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -161,11 +161,30 @@ def send(self, event): Arguments: event (dict): the original event dictionary """ - events = self.queue_event(event) - if not events: - return + if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED: + events = self.queue_event(event) + if not events: + return - self.bulk_send(events) + return self.bulk_send(events) + + event_routes = self.prepare_to_send([event]) + for events_for_route in event_routes.values(): + for event_name, updated_event, host, is_business_critical in events_for_route: + if is_business_critical: + self.dispatch_event_persistent( + event_name, + updated_event, + host['router_type'], + host['host_configurations'], + ) + else: + self.dispatch_event( + event_name, + updated_event, + host['router_type'], + host['host_configurations'], + ) def queue_event(self, event): """ @@ -178,8 +197,8 @@ def queue_event(self, event): redis.lpush(self.queue_name, json.dumps(event)) queue_size = redis.llen(self.queue_name) - logger.info(f"Event pushed to the queue: {event['name']}. Queue size: {queue_size}") - if queue_size < settings.EVENT_ROUTING_BATCH_SIZE: + logger.info(f'Event {event["name"]} has been queued for batching.') + if queue_size < settings.EVENT_ROUTING_BACKEND_BATCH_SIZE: return None batch = redis.rpop(self.queue_name, queue_size) events = [json.loads(queued_event.decode('utf-8')) for queued_event in batch] diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 45cc35ff..635e7089 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -1,11 +1,14 @@ """ Test the EventsRouter """ +import json +import datetime +from copy import copy from unittest.mock import MagicMock, call, patch, sentinel import ddt from django.conf import settings -from django.test import TestCase +from django.test import TestCase, override_settings from edx_django_utils.cache.utils import TieredCache from eventtracking.processors.exceptions import EventEmissionExit from tincan.statement import Statement @@ -257,6 +260,44 @@ def test_duplicate_xapi_event_id(self, mocked_logger): mocked_logger.info.mock_calls ) + @override_settings( + EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True, + EVENT_ROUTING_BACKEND_BATCH_SIZE=2 + ) + @patch('event_routing_backends.backends.events_router.get_redis_connection') + @patch('event_routing_backends.backends.events_router.logger') + @patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send') + def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection): + router = EventsRouter(processors=[], backend_name='test') + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + redis_mock.lpush.return_value = None + event1 = copy(self.transformed_event) + event1["timestamp"] = datetime.datetime.now() + event2 = copy(self.transformed_event) + event2["timestamp"] = datetime.datetime.now() + events = [event1, event2] + formatted_events = [] + for event in events: + formatted_event = copy(event) + formatted_event["timestamp"] = formatted_event["timestamp"].isoformat() + formatted_events.append(json.dumps(formatted_event).encode('utf-8')) + + redis_mock.rpop.return_value = formatted_events + redis_mock.llen.return_value = 1 + + router.send(event1) + redis_mock.llen.return_value = 2 + router.send(event2) + + redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1)) + redis_mock.llen.assert_any_call(router.queue_name) + redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE) + mock_logger.info.assert_any_call( + f"Event {self.transformed_event['name']} has been queued for batching." + ) + mock_bulk_send.assert_any_call(events) + @ddt.ddt class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index b048fb25..7042e41e 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -15,7 +15,8 @@ def plugin_settings(settings): settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1 - settings.EVENT_ROUTING_BATCH_SIZE = 10 + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 10 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id' # .. setting_description: This setting can be used to specify the type of inverse functional identifier diff --git a/event_routing_backends/settings/production.py b/event_routing_backends/settings/production.py index b2c0946c..3c84fdd4 100644 --- a/event_routing_backends/settings/production.py +++ b/event_routing_backends/settings/production.py @@ -15,9 +15,13 @@ def plugin_settings(settings): 'EVENT_ROUTING_BACKEND_COUNTDOWN', settings.EVENT_ROUTING_BACKEND_COUNTDOWN ) - settings.EVENT_ROUTING_BATCH_SIZE = settings.ENV_TOKENS.get( - 'EVENT_ROUTING_BATCH_SIZE', - settings.EVENT_ROUTING_BATCH_SIZE + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCH_SIZE', + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE + ) + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCHING_ENABLED', + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED ) settings.CALIPER_EVENTS_ENABLED = settings.ENV_TOKENS.get( 'CALIPER_EVENTS_ENABLED', diff --git a/test_settings.py b/test_settings.py index 23efece7..34b2b797 100644 --- a/test_settings.py +++ b/test_settings.py @@ -47,5 +47,7 @@ def root(*args): RUNNING_WITH_TEST_SETTINGS = True EVENT_TRACKING_BACKENDS = {} XAPI_AGENT_IFI_TYPE = 'external_id' +EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False +EVENT_ROUTING_BACKEND_BATCH_SIZE = 1 _mock_third_party_modules()