Skip to content

Commit

Permalink
feat: Add --lrs-urls option to transform_tracking_logs command
Browse files Browse the repository at this point in the history
- Introduced the --lrs-urls flag to specify target Learning Record Stores (LRS) by their route_url.
- Updated documentation to reflect the new usage of the --lrs-urls option.
- Added tests to ensure correct functionality when using the --lrs-urls flag.

close #483
  • Loading branch information
Ali-Salman29 committed Feb 6, 2025
1 parent 54b9067 commit 279dbbb
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 8 deletions.
12 changes: 11 additions & 1 deletion docs/howto/how_to_bulk_transform.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Modes Of Operation

The command can work in a few distinct ways.

**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues.
**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends by default, just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. To target specific LRSs, you can use the ``--lrs-urls`` option to specify one or more LRS endpoints by their `route_url`. When provided, the command will route the transformed events exclusively to the specified LRSs, rather than all configured ones.

**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied.

Expand Down Expand Up @@ -65,6 +65,16 @@ Examples
--destination_provider LRS \
--transformer_type xapi

::

# Transform all events in the local file /openedx/data/tracking.log to the specified LRSs
python manage.py lms transform_tracking_logs \
--source_provider LOCAL \
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
--destination_provider LRS \
--transformer_type xapi \
--lrs-urls http://lrs1.example.com http://lrs2.example.com

::

# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs
Expand Down
10 changes: 7 additions & 3 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ def configure_host(self, host, router):

return host

def prepare_to_send(self, events):
def prepare_to_send(self, events, router_urls=None):
"""
Prepare a list of events to be sent and create a processed, filtered batch for each router.
If router_urls are explicitly mentioned, then only use the specified routers
"""
routers = RouterConfiguration.get_enabled_routers(self.backend_name)
if router_urls:
routers = routers.filter(route_url__in=router_urls)

business_critical_events = get_business_critical_events()
route_events = {}

Expand Down Expand Up @@ -139,7 +143,7 @@ def get_failed_events(self, batch_size):
return []
return [json.loads(event.decode('utf-8')) for event in failed_events]

def bulk_send(self, events):
def bulk_send(self, events, router_urls=None):
"""
Send the event to configured routers after processing it.
Expand All @@ -150,7 +154,7 @@ def bulk_send(self, events):
Arguments:
events (list[dict]): list of original event dictionaries
"""
event_routes = self.prepare_to_send(events)
event_routes = self.prepare_to_send(events, router_urls)

for events_for_route in event_routes.values():
prepared_events = []
Expand Down
60 changes: 60 additions & 0 deletions event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,66 @@ def test_successful_routing_of_bulk_events(
# test mocked oauth client
mocked_oauth_client.assert_not_called()

@patch("event_routing_backends.tasks.dispatch_bulk_events.delay")
@patch("event_routing_backends.utils.http_client.requests.post")
@patch("event_routing_backends.utils.xapi_lrs_client.RemoteLRS")
def test_bulk_send_routes_events_based_on_configured_urls(
self, mocked_lrs, mocked_post, mock_dispatch_event
):
TieredCache.dangerous_clear_all_tiers()
mocked_oauth_client = MagicMock()
mocked_api_key_client = MagicMock()

MOCKED_MAP = {
"AUTH_HEADERS": HttpClient,
"OAUTH2": mocked_oauth_client,
"API_KEY": mocked_api_key_client,
"XAPI_LRS": LrsClient,
}
RouterConfigurationFactory.create(
backend_name=RouterConfiguration.XAPI_BACKEND,
enabled=True,
route_url="http://test1.com",
auth_scheme=RouterConfiguration.AUTH_BASIC,
auth_key=None,
username="abc",
password="xyz",
configurations=ROUTER_CONFIG_FIXTURE[0],
)
RouterConfigurationFactory.create(
backend_name=RouterConfiguration.XAPI_BACKEND,
enabled=True,
route_url="http://test2.com",
auth_scheme=RouterConfiguration.AUTH_BASIC,
auth_key=None,
username="abc1",
password="xyz1",
configurations=ROUTER_CONFIG_FIXTURE[0],
)

router = AsyncEventsRouter(
processors=[], backend_name=RouterConfiguration.XAPI_BACKEND
)

with patch.dict(
"event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP
):
router.bulk_send(self.bulk_transformed_events)

assert mock_dispatch_event.call_count == 2

# Reset mock before the next call
mock_dispatch_event.reset_mock()

with patch.dict(
"event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP
):
router.bulk_send(
self.bulk_transformed_events, router_urls=["http://test1.com"]
)

assert mock_dispatch_event.call_count == 1


@ddt.ddt
class TestSyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def __init__( # pylint: disable=too-many-positional-arguments
transformer_type,
max_queue_size=10000,
sleep_between_batches_secs=1.0,
dry_run=False
dry_run=False,
lrs_urls=None
):
self.destination = destination
self.destination_container = destination_container
Expand All @@ -34,6 +35,7 @@ def __init__( # pylint: disable=too-many-positional-arguments
self.max_queue_size = max_queue_size
self.sleep_between_batches = sleep_between_batches_secs
self.dry_run = dry_run
self.lrs_urls = lrs_urls or []

# Bookkeeping
self.queued_lines = 0
Expand Down Expand Up @@ -101,7 +103,7 @@ def send(self):
"""
if self.destination == "LRS":
print(f"Sending {len(self.event_queue)} events to LRS...")
self.backend.bulk_send(self.event_queue)
self.backend.bulk_send(self.event_queue, self.lrs_urls)
else:
print("Skipping send, we're storing with libcloud instead of an LRS.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ def command_options():
},
"whitelist": ["problem_check"]
},
# Test with LRS URLs
{
"transformer_type": "xapi",
"source_provider": "MINIO",
"source_config": REMOTE_CONFIG,
"lrs_urls": ["http://lrs1.com", "http://lrs2.com"],
"expected_results": {
"expected_batches_sent": 1,
"log_lines": [
"Looking for log files in test_bucket/xapi_statements/*",
"Finalizing 2 events to LRS",
"Sending to LRS!",
"Sending 2 events to LRS...",
"Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.",
],
},
"whitelist": ["problem_check"],
},
]

for option in options:
Expand All @@ -189,7 +207,8 @@ def _get_raw_log_stream(_, start_bytes, chunk_size):


@pytest.mark.parametrize("command_opts", command_options())
def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
def test_transform_command(MockRouterConfiguration, command_opts, mock_common_calls, caplog, capsys):
"""
Test the command and QueuedSender with a variety of options.
"""
Expand All @@ -198,6 +217,12 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
expected_results = command_opts.pop("expected_results")
transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2)

# Mock RouterConfiguration to return specific URLs
MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [
"http://lrs1.com",
"http://lrs2.com",
]

mm = MagicMock()

mock_log_object = MagicMock()
Expand Down Expand Up @@ -244,6 +269,44 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
assert line in caplog.text or line in captured.out


@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
def test_invalid_lrs_urls(MockRouterConfiguration, mock_common_calls, caplog):
"""
Test that a ValueError is raised when invalid LRS URLs are provided.
"""
command_opts = {
"transformer_type": "xapi",
"source_provider": "MINIO",
"source_config": REMOTE_CONFIG,
"lrs_urls": ["http://lrs3-invalid.com"],
}

mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls

MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [
"http://lrs1.com",
"http://lrs2.com",
]

transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024 * 1024 * 2)

mm = MagicMock()

mock_log_object = MagicMock()
mock_log_object.__str__.return_value = "tracking.log"
mock_log_object.name = "tracking.log"
mock_log_object.size = _get_raw_log_size()

# Fake finding one log file in each container, it will be loaded and parsed twice
mm.return_value.iterate_container_objects.return_value = [mock_log_object]
mm.return_value.download_object_range_as_stream = _get_raw_log_stream
mock_libcloud_get_driver.return_value = mm

# Run command with invalid route_urls and assert ValueError is raised
with pytest.raises(ValueError):
call_command("transform_tracking_logs", **command_opts)


def test_queued_sender_store_on_lrs(mock_common_calls, capsys):
"""
Test that we don't attempt to store on an LRS backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from libcloud.storage.types import Provider

from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender
from event_routing_backends.models import RouterConfiguration

# Number of bytes to download at a time, this is 2 MB
CHUNK_SIZE = 1024 * 1024 * 2
Expand Down Expand Up @@ -159,6 +160,25 @@ def validate_destination(driver, container_name, prefix, source_objects):
print(f"Wrote source file list to '{container_name}/{full_path}'")


def validate_lrs_routes(lrs_urls):
"""
Validate that the provided LRS URLs are present and enabled in the RouterConfiguration.
Raises a ValueError if any of the URLs are missing or not enabled.
"""
if lrs_urls:
missing_urls = set(lrs_urls) - set(
RouterConfiguration.objects.filter(
route_url__in=lrs_urls, enabled=True
).values_list("route_url", flat=True)
)
if missing_urls:
raise ValueError(
"The following LRS URLs are not present or not enabled in the ",
f"RouterConfiguration: {', '.join(missing_urls)}"
)


def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config):
"""
Attempt to configure the libcloud drivers for source and destination.
Expand Down Expand Up @@ -256,6 +276,15 @@ def add_arguments(self, parser):
help="Attempt to transform all lines from all files, but do not send to the destination.",
)

parser.add_argument(
'--lrs-urls',
nargs='+',
type=str,
default=None,
help="Specify the LRS route_url(s) to send data to "
"(e.g., --lrs-urls http://lrs1.example.com http://lrs2.example.com).",
)

def handle(self, *args, **options):
"""
Configure the command and start the transform process.
Expand All @@ -272,11 +301,13 @@ def handle(self, *args, **options):
options["destination_provider"],
dest_config
)
lrs_urls = options.get('lrs_urls')

source_file_list = validate_source_and_files(source_driver, source_container, source_prefix)
if dest_driver != "LRS":
validate_destination(dest_driver, dest_container, dest_prefix, source_file_list)
else:
validate_lrs_routes(lrs_urls)
print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n")

sender = QueuedSender(
Expand All @@ -286,7 +317,8 @@ def handle(self, *args, **options):
options["transformer_type"],
max_queue_size=options["batch_size"],
sleep_between_batches_secs=options["sleep_between_batches_secs"],
dry_run=options["dry_run"]
dry_run=options["dry_run"],
lrs_urls=lrs_urls
)

transform_tracking_logs(
Expand Down

0 comments on commit 279dbbb

Please sign in to comment.