From 13bd990e2cdb6384b58a30fc9a6cafa121c4aa9c Mon Sep 17 00:00:00 2001 From: Jose Javier Merchante Date: Wed, 4 Dec 2024 09:33:20 +0100 Subject: [PATCH] Archive Events in OpenSearch This commit implements archiving of events in an OpenSearch backend. It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance. Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue. Signed-off-by: Jose Javier Merchante --- poetry.lock | 38 +- pyproject.toml | 1 + src/grimoirelab/core/config/settings.py | 25 +- src/grimoirelab/core/runner/commands/run.py | 92 ++++- src/grimoirelab/core/scheduler/models.py | 8 + .../core/scheduler/tasks/archivist.py | 385 ++++++++++++++++++ .../migrations/0002_storagetask_storagejob.py | 61 +++ .../core/scheduler/tasks/models.py | 93 +++++ tests/scheduler/test_task_archivist.py | 251 ++++++++++++ 9 files changed, 935 insertions(+), 19 deletions(-) create mode 100644 src/grimoirelab/core/scheduler/tasks/archivist.py create mode 100644 src/grimoirelab/core/scheduler/tasks/migrations/0002_storagetask_storagejob.py create mode 100644 tests/scheduler/test_task_archivist.py diff --git a/poetry.lock b/poetry.lock index 8466c83..7f89dbe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -566,6 +566,17 @@ https = ["urllib3 (>=1.24.1)"] paramiko = ["paramiko"] pgp = ["gpg"] +[[package]] +name = "events" +version = "0.5" +description = "Bringing the elegance of C# EventHandler to Python" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "Events-0.5-py3-none-any.whl", hash = "sha256:a7286af378ba3e46640ac9825156c93bdba7502174dd696090fdfcd4d80a1abd"}, +] + [[package]] name = "fakeredis" version = "2.26.2" @@ -710,6 +721,31 @@ files = [ {file = "mysqlclient-2.0.3.tar.gz", hash = "sha256:f6ebea7c008f155baeefe16c56cd3ee6239f7a5a9ae42396c2f1860f08a7c432"}, ] +[[package]] +name = "opensearch-py" +version = "2.8.0" +description = "Python client for OpenSearch" +optional = false +python-versions = "<4,>=3.8" +groups = ["main"] +files = [ + {file = "opensearch_py-2.8.0-py3-none-any.whl", hash = "sha256:52c60fdb5d4dcf6cce3ee746c13b194529b0161e0f41268b98ab8f1624abe2fa"}, + {file = "opensearch_py-2.8.0.tar.gz", hash = "sha256:6598df0bc7a003294edd0ba88a331e0793acbb8c910c43edf398791e3b2eccda"}, +] + +[package.dependencies] +certifi = ">=2024.07.04" +Events = "*" +python-dateutil = "*" +requests = ">=2.32.0,<3.0.0" +urllib3 = {version = ">=1.26.19,<2.2.0 || >2.2.0,<2.2.1 || >2.2.1,<3", markers = "python_version >= \"3.10\""} + +[package.extras] +async = ["aiohttp (>=3.9.4,<4)"] +develop = ["black (>=24.3.0)", "botocore", "coverage (<8.0.0)", "jinja2", "myst_parser", "pytest (>=3.0.0)", "pytest-cov", "pytest-mock (<4.0.0)", "pytz", "pyyaml", "requests (>=2.0.0,<3.0.0)", "sphinx", "sphinx_copybutton", "sphinx_rtd_theme"] +docs = ["aiohttp (>=3.9.4,<4)", "myst_parser", "sphinx", "sphinx_copybutton", "sphinx_rtd_theme"] +kerberos = ["requests_kerberos"] + [[package]] name = "packaging" version = "24.2" @@ -983,4 +1019,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "c749b91b2e5e4c062c89b4d87d5b5d90ba78a790a44c8c251480ac4e1e4cfabe" +content-hash = "72a4fd698a93f0afd66cd100b4f2e4fb01ba6a653362d5455d3c16e40deaf7d1" diff --git a/pyproject.toml b/pyproject.toml index 9046b4d..e8412fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true} grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true} django-cors-headers = "^4.6.0" djangorestframework = "^3.15.2" +opensearch-py = "^2.8.0" [tool.poetry.group.dev.dependencies] fakeredis = "^2.0.0" diff --git a/src/grimoirelab/core/config/settings.py b/src/grimoirelab/core/config/settings.py index ca8c44e..589fb21 100644 --- a/src/grimoirelab/core/config/settings.py +++ b/src/grimoirelab/core/config/settings.py @@ -273,7 +273,8 @@ # https://github.com/rq/django-rq # -GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default') +GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs') +GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs') _RQ_DATABASE = { 'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'), @@ -284,15 +285,11 @@ RQ_QUEUES = { GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE, + GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE, } -GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME', - 'events') -# Maximum events in Redis stream before dropping. Consumers must process events -# faster than production to avoid loss. Default max size is 1M events (~2.5GB Git events). -# Adjust for memory constraints. -GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH', - 1 * 10 ** 6)) +GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME', 'events') +GRIMOIRELAB_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6)) RQ = { 'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob', @@ -309,3 +306,15 @@ GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1)) GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval') + +# +# Archivist configuration +# +GRIMOIRELAB_ARCHIVIST = { + 'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)), + 'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'), + 'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'), + 'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'), + 'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'), + 'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)), +} diff --git a/src/grimoirelab/core/runner/commands/run.py b/src/grimoirelab/core/runner/commands/run.py index 59579a3..a6c4033 100644 --- a/src/grimoirelab/core/runner/commands/run.py +++ b/src/grimoirelab/core/runner/commands/run.py @@ -19,6 +19,7 @@ from __future__ import annotations import os +import sys import typing import click @@ -43,9 +44,13 @@ def run(ctx: Context): is_flag=True, default=False, help="Run the service in developer mode.") +@click.option("--clear-tasks", + is_flag=True, + default=False, + help="Clear background tasks.") @run.command() @click.pass_context -def server(ctx: Context, devel: bool): +def server(ctx: Context, devel: bool, clear_tasks: bool): """Start the GrimoireLab core server. GrimoireLab server allows to schedule tasks and fetch data from @@ -56,6 +61,8 @@ def server(ctx: Context, devel: bool): should be run with a reverse proxy. If you activate the '--dev' flag, a HTTP server will be run instead. """ + create_background_tasks(clear_tasks) + env = os.environ env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}" @@ -91,24 +98,89 @@ def server(ctx: Context, devel: bool): @run.command() +@click.argument('task-types', nargs=-1) @click.option('--workers', default=10, show_default=True, help="Number of workers to run in the pool.") -def eventizers(workers: int): - """Start a pool of eventizer workers. +def worker_pool(task_types: str, workers: int): + """Start a pool of workers that run specific tasks. - The workers on the pool will run tasks to fetch data from software - development repositories. Data will be processed in form of events, - and published in the events queue. + If multiple tasks share the same queue, they will run in the same + pool of workers. The tasks to run are defined as arguments to the + command. The number of workers running in the pool can be defined with the parameter '--workers'. - - Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined - in the configuration file. """ + from grimoirelab.core.scheduler.models import (get_registered_task_model, + get_all_registered_task_names) + + available_tasks = get_all_registered_task_names() + + queues = [] + for task in task_types: + try: + Task = get_registered_task_model(task)[0] + except KeyError: + click.echo(f"Task '{task}' is not a valid task. " + f"Options: {available_tasks}", err=True) + sys.exit(1) + queues.append(Task().default_job_queue) + + if not queues: + click.echo(f"You must define at least one valid task. " + f"Options: {available_tasks}", err=True) + sys.exit(1) + django.core.management.call_command( - 'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS, + 'rqworker-pool', queues, num_workers=workers ) + + +def create_background_tasks(clear_tasks: bool): + """ + Create background tasks before starting the server. + :param clear_tasks: clear tasks before creating new ones. + :return: + """ + from grimoirelab.core.scheduler.scheduler import schedule_task + from grimoirelab.core.scheduler.tasks.models import StorageTask + + workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS'] + storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL'] + storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX'] + storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE'] + verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT'] + events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB'] + + if clear_tasks: + StorageTask.objects.all().delete() + click.echo("Removing old background tasks.") + + current = StorageTask.objects.filter(burst=False).count() + if workers == current: + click.echo("Background tasks already created. Skipping.") + return + + task_args = { + 'storage_url': storage_url, + 'storage_db_name': storage_db_name, + 'storage_verify_certs': verify_certs, + 'redis_group': 'archivist', + 'limit': events_per_job + } + if workers > current: + for _ in range(workers - current): + schedule_task( + task_type=StorageTask.TASK_TYPE, + storage_type=storage_type, + task_args=task_args, + job_interval=1, + job_max_retries=10 + ) + click.echo(f"Created {workers} background tasks.") + elif workers < current: + tasks = StorageTask.objects.all()[workers:] + tasks.update(burst=True) diff --git a/src/grimoirelab/core/scheduler/models.py b/src/grimoirelab/core/scheduler/models.py index cb4dcac..bce85b1 100644 --- a/src/grimoirelab/core/scheduler/models.py +++ b/src/grimoirelab/core/scheduler/models.py @@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]: job classes. """ return iter(GRIMOIRELAB_TASK_MODELS.values()) + + +def get_all_registered_task_names() -> list[str]: + """Return all registered task names. + + :returns: a list with all registered task names. + """ + return list(GRIMOIRELAB_TASK_MODELS.keys()) diff --git a/src/grimoirelab/core/scheduler/tasks/archivist.py b/src/grimoirelab/core/scheduler/tasks/archivist.py new file mode 100644 index 0000000..5cd5958 --- /dev/null +++ b/src/grimoirelab/core/scheduler/tasks/archivist.py @@ -0,0 +1,385 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) GrimoireLab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + + +from __future__ import annotations + +import json +import logging +import typing +import urllib3 + +import redis +import rq.job +from opensearchpy import OpenSearch, RequestError + + +if typing.TYPE_CHECKING: + from typing import Any + + +MAX_EVENTS_PER_JOB = 5000 +BLOCK_TIMEOUT = 60000 # seconds + + +logger = logging.getLogger(__name__) + + +def archivist_job( + storage_type: str, + storage_url: str, + storage_db_name: str, + storage_verify_certs: bool, + redis_group: str, + consumer_name: str, + events_queue: str, + limit: int = MAX_EVENTS_PER_JOB, + block_timeout: int = BLOCK_TIMEOUT +) -> ArchivistProgress: + """Fetch and archive events. + + It will fetch events from a Redis stream and store them in a + storage system. + + :param storage_type: type of the storage system (e.g., 'opensearch') + :param storage_url: URL of the storage system + :param storage_db_name: Name of the database to use + :param storage_verify_certs: Verify certificates when connecting to the storage system + :param redis_group: Redis group name to use for fetching events + :param consumer_name: Name of the consumer + :param events_queue: Redis stream where the events are fetched + :param limit: Maximum number of events to fetch and store + :param block_timeout: Time to block when fetching events, None for not blocking, + 0 for blocking indefinitely. + """ + rq_job = rq.get_current_job() + + progress = ArchivistProgress( + rq_job.get_id(), + storage_type, + redis_group, + consumer_name + ) + rq_job.progress = progress + + Storage = get_storage_backend(storage_type) + storage = Storage(url=storage_url, + db_name=storage_db_name, + verify_certs=storage_verify_certs) + events = events_consumer(rq_job.connection, + consumer_name, + events_queue, + redis_group, + limit, + block_timeout) + + progress.total = storage.store(events) + + return progress + + +def _create_consumer_group( + connection: redis.Redis, + stream_name: str, + group_name: str +) -> None: + """Create a consumer group if it does not exist + + :param connection: Redis connection + :param stream_name: Name of the stream + :param group_name: Name of the group + """ + try: + connection.xgroup_create(stream_name, group_name, id='0', mkstream=True) + except redis.exceptions.ResponseError as e: + if str(e) != 'BUSYGROUP Consumer Group name already exists': + raise + + +def _recover_stream_entries( + connection: redis.Redis, + consumer_name: str, + stream_name: str, + group_name: str +) -> dict: + """ + Transfers ownership of pending stream entries idle + for 5m that match the specified criteria + + :param connection: Redis connection + :param consumer_name: Name of the consumer + :param stream_name: Name of the stream + """ + logging.info(f"Recovering events from '{stream_name}' group '{group_name}'") + + while True: + response = connection.xautoclaim(name=stream_name, + groupname=group_name, + consumername=consumer_name, + min_idle_time=5 * 60 * 1000, + count=10) + + # The response contains an array with the following contents + # 1) "0-0" (stream ID to be used as the start argument for the next call) + # 2) 1) 1) "1609338752495-0" (successfully claimed messages) + # 2) 1) "field" + # 2) "value" + # 3) (empty array) (message IDs that no longer exist in the stream) + messages = response[1] + for message in messages: + message_id = message[0] + message_data = message[1][b'data'] + + yield json.loads(message_data) + + connection.xack(stream_name, group_name, message_id) + + if response[0] == b"0-0": + break + + +def events_consumer( + connection: redis.Redis, + consumer_name: str, + stream_name: str, + group_name: str, + limit: int = MAX_EVENTS_PER_JOB, + block_timeout: int = BLOCK_TIMEOUT, +) -> iter(dict): + """Get items from a Redis stream given a group and a consumer name + + :param connection: Redis connection + :param consumer_name: Name of the consumer + :param stream_name: Name of the stream + :param group_name: Name of the group + :param limit: Maximum number of items to fetch + :param block_timeout: Time to block when fetching events, None for not blocking, + 0 for blocking indefinitely + """ + _create_consumer_group(connection, stream_name, group_name) + + yield from _recover_stream_entries(connection=connection, + consumer_name=consumer_name, + group_name=group_name, + stream_name=stream_name) + + logging.info(f"Fetching events from '{stream_name}' group " + f"'{group_name}' as '{consumer_name}'") + + total = 0 + while True: + try: + response = connection.xreadgroup(groupname=group_name, + consumername=consumer_name, + streams={stream_name: '>'}, + count=10, + block=block_timeout) + + # The response contains an array with the following contents + # 1) 1) "mystream" (name of the stream) + # 2) 1) 1) "1-0" (array of arrays containing the key and the entries) + # 2) 1) "field" + # 2) "value" + if response: + messages = response[0][1] + for message in messages: + total += 1 + message_id = message[0] + message_data = message[1][b'data'] + + yield json.loads(message_data) + + connection.xack(stream_name, group_name, message_id) + + else: + logging.info(f"No new messages for '{stream_name}:{group_name}:{consumer_name}'.") + break + + if total >= limit: + logging.info(f"{total} items inserted. Stop the job.") + break + + except Exception as e: + logging.error(f"Error consuming messages: {e}") + raise e + + +class ArchivistProgress: + """Class to store the progress of an Archivist job. + + It stores the summary of the job. + + :param job_id: job identifier + :param backend: backend used to store the events + :param group: group used to fetch the events + """ + def __init__(self, + job_id: str, + backend: str, + group: str, + consumer_name: str, + total: int = 0) -> None: + self.job_id = job_id + self.backend = backend + self.group = group + self.consumer_name = consumer_name + self.total = total + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ArchivistProgress: + """Create a new instance from a dictionary.""" + + return cls( + data['job_id'], + data['backend'], + data['group'], + data['consumer_name'], + data['total'] + ) + + def to_dict(self) -> dict[str, str | int]: + """Convert object to a dict.""" + + result = { + 'job_id': self.job_id, + 'backend': self.backend, + 'group': self.group, + 'consumer_name': self.consumer_name, + 'total': self.total + } + + return result + + +class StorageBackend: + """Base class for storage backends. + + This class defines the methods that should be implemented by + a storage backend. + + :param url: URL of the storage backend + """ + def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None: + self.url = url + self.db_name = db_name + self.verify_certs = verify_certs + + def store(self, data: dict[str, Any]) -> int: + """Store data in the storage backend. + + :param data: Data to store + + :return: Number of items stored + """ + raise NotImplementedError + + def close(self) -> None: + """Close the connection to the storage backend.""" + + pass + + +def get_storage_backend(storage_type: str) -> typing.Type[StorageBackend]: + """Get the storage backend based on the type. + + :param storage_type: Type of the storage backend + """ + if storage_type == 'opensearch': + return OpenSearchStorage + else: + raise ValueError(f"Storage type '{storage_type}' not supported") + + +class OpenSearchStorage(StorageBackend): + """Storage backend for OpenSearch. + + This class implements the methods to store data in an OpenSearch + instance. + + :param url: URL of the OpenSearch instance + """ + def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None: + super().__init__(url, db_name, verify_certs) + + if not verify_certs: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + self.client = OpenSearch([url], verify_certs=self.verify_certs) + self._create_index(db_name) + self.max_items_bulk = 100 + + def _create_index(self, index_name: str) -> None: + """Create an index in the OpenSearch instance. + + :param index_name: Name of the index + """ + try: + self.client.indices.create(index_name) + except RequestError as e: + if e.error == 'resource_already_exists_exception': + pass + else: + raise + + def _bulk(self, body: str, index: str) -> int: + """Store data in the OpenSearch instance. + + :param body: Data to store + :param index: Name of the index + """ + failed_items = [] + + response = self.client.bulk(body=body, index=index) + + if response['errors']: + # Due to multiple errors that may be thrown when inserting bulk data, only the first error is returned + failed_items = [item['index'] for item in response['items'] if 'error' in item['index']] + error = str(failed_items[0]['error']) + + logger.error(f"Failed to insert data to ES: {error}") + + inserted_items = len(response['items']) - len(failed_items) + + logger.info(f"{inserted_items} items uploaded to ES") + + return inserted_items + + def store(self, events: iter) -> None: + """Store data in the OpenSearch instance. + + :param events: Events to store + """ + current = 0 + new_items = 0 + + bulk_json = "" + for event in events: + data_json = json.dumps(event) + bulk_json += '{{"index" : {{"_id" : "{}" }} }}\n'.format(event['id']) + bulk_json += data_json + "\n" + current += 1 + + if current >= self.max_items_bulk: + new_items += self._bulk(body=bulk_json, index=self.db_name) + current = 0 + bulk_json = "" + + if current > 0: + new_items += self._bulk(body=bulk_json, index=self.db_name) + + return new_items diff --git a/src/grimoirelab/core/scheduler/tasks/migrations/0002_storagetask_storagejob.py b/src/grimoirelab/core/scheduler/tasks/migrations/0002_storagetask_storagejob.py new file mode 100644 index 0000000..b57e2ca --- /dev/null +++ b/src/grimoirelab/core/scheduler/tasks/migrations/0002_storagetask_storagejob.py @@ -0,0 +1,61 @@ +# Generated by Django 4.2.14 on 2024-12-04 08:07 + +from django.db import migrations, models +import django.db.models.deletion +import grimoirelab.core.models +import grimoirelab.core.scheduler.models +import grimoirelab_toolkit.datetime + + +class Migration(migrations.Migration): + + dependencies = [ + ('tasks', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='StorageTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('storage_type', models.CharField(max_length=128)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='StorageJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.storagetask')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/src/grimoirelab/core/scheduler/tasks/models.py b/src/grimoirelab/core/scheduler/tasks/models.py index d42b14c..7ad6b13 100644 --- a/src/grimoirelab/core/scheduler/tasks/models.py +++ b/src/grimoirelab/core/scheduler/tasks/models.py @@ -38,6 +38,7 @@ ChroniclerProgress, get_chronicler_argument_generator ) +from .archivist import archivist_job if typing.TYPE_CHECKING: from typing import Any, Self @@ -163,4 +164,96 @@ def on_failure_callback(*args, **kwargs): return _on_failure_callback(*args, **kwargs) +class StorageTask(Task): + """Task to store events in a database. + + This task will store events in a database. Events will be fetched + from a Redis stream. The progress of the task can be accessed through + the property `progress`. The result of the task can be obtained + accessing to the property `result` of the object. + """ + + storage_type = CharField(max_length=MAX_SIZE_CHAR_FIELD) + + TASK_TYPE = 'storager' + + @classmethod + def create_task( + cls, + task_args: dict[str, Any], + job_interval: int, + job_max_retries: int, + storage_type: str, + burst: bool = False, + *args, **kwargs + ) -> Self: + """Create a new task to store events in a database. + + This method will create a new task to store events in a database. + Besides the common arguments to create a task, this method requires + the name of the Redis stream where events are published. + + :param task_args: arguments to pass to the task + :param job_interval: interval in seconds between each task execution. + :param job_max_retries: maximum number of retries before the task is + considered failed. + :param storage_type: type of storage to use. + :param burst: flag to indicate if the task will only run once. + :param args: additional arguments. + :param kwargs: additional keyword arguments. + + :return: the new task created. + """ + task = super().create_task( + task_args, job_interval, job_max_retries, burst=burst, + *args, **kwargs + ) + task.storage_type = storage_type + task.save() + + return task + + def prepare_job_parameters(self): + """Generate the parameters for a new job. + + This method will generate the parameters for a new job + based on the original parameters set for the task plus + the latest job parameters used. Depending on the status + of the task, new parameters will be generated. + """ + + task_args = { + 'storage_type': self.storage_type, + 'storage_url': self.task_args.get('storage_url'), + 'storage_db_name': self.task_args.get('storage_db_name'), + 'storage_verify_certs': self.task_args.get('storage_verify_certs'), + 'redis_group': self.task_args.get('redis_group'), + 'consumer_name': self.task_id, + 'events_queue': settings.EVENTS_STREAM, + 'limit': self.task_args.get('limit', 5000) + } + + return task_args + + def can_be_retried(self): + return True + + @property + def default_job_queue(self): + return settings.Q_ARCHIVIST_JOBS + + @staticmethod + def job_function(*args, **kwargs): + return archivist_job(*args, **kwargs) + + @staticmethod + def on_success_callback(*args, **kwargs): + return _on_success_callback(*args, **kwargs) + + @staticmethod + def on_failure_callback(*args, **kwargs): + return _on_failure_callback(*args, **kwargs) + + register_task_model(EventizerTask.TASK_TYPE, EventizerTask) +register_task_model(StorageTask.TASK_TYPE, StorageTask) diff --git a/tests/scheduler/test_task_archivist.py b/tests/scheduler/test_task_archivist.py new file mode 100644 index 0000000..9584c85 --- /dev/null +++ b/tests/scheduler/test_task_archivist.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) GrimoireLab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import json +from unittest.mock import patch + +import rq + +from grimoirelab.core.scheduler.jobs import GrimoireLabJob +from grimoirelab.core.scheduler.tasks.archivist import ( + ArchivistProgress, + StorageBackend, + archivist_job +) + +from ..base import GrimoireLabTestCase + + +class MockStorageBackend(StorageBackend): + """Class to store events in the class itself for later inspection""" + + events = [] + + def store(self, events: iter) -> int: + events = [e for e in events] + MockStorageBackend.events.extend(events) + return len(events) + + @classmethod + def get_events(cls): + return cls.events + + @classmethod + def clear_events(cls): + cls.events = [] + + +class TestArchivistJob(GrimoireLabTestCase): + """Unit tests for archivist_job function""" + + def setUp(self): + MockStorageBackend.clear_events() + super().setUp() + + def tearDown(self): + MockStorageBackend.clear_events() + super().tearDown() + + def test_job(self): + """Test if events are stored correctly""" + + # Add some sample events to the stream + events = [ + {'uuid': '1', 'timestamp': '2021-01-01T00:00:00Z', 'data': 'event 1'}, + {'uuid': '2', 'timestamp': '2021-01-02T00:00:00Z', 'data': 'event 2'}, + {'uuid': '3', 'timestamp': '2021-01-03T00:00:00Z', 'data': 'event 3'}, + {'uuid': '4', 'timestamp': '2021-01-04T00:00:00Z', 'data': 'event 4'}, + {'uuid': '5', 'timestamp': '2021-01-05T00:00:00Z', 'data': 'event 5'}, + {'uuid': '6', 'timestamp': '2021-01-06T00:00:00Z', 'data': 'event 6'}, + {'uuid': '7', 'timestamp': '2021-01-07T00:00:00Z', 'data': 'event 7'}, + {'uuid': '8', 'timestamp': '2021-01-08T00:00:00Z', 'data': 'event 8'}, + {'uuid': '9', 'timestamp': '2021-01-09T00:00:00Z', 'data': 'event 9'}, + {'uuid': '10', 'timestamp': '2021-01-10T00:00:00Z', 'data': 'event 10'}, + ] + + for e in events: + message = { + 'data': json.dumps(e) + } + self.conn.xadd('test-events', message, maxlen=len(events)) + + # Create the job + job_args = { + 'storage_type': 'mock_storage', + 'storage_url': 'example.com', + 'storage_db_name': 'mock_db', + 'storage_verify_certs': True, + 'redis_group': 'archivist', + 'consumer_name': 'consumer_1', + 'events_queue': 'test-events', + 'limit': 10 + } + with patch('grimoirelab.core.scheduler.tasks.archivist.get_storage_backend', + return_value=MockStorageBackend): + q = rq.Queue( + 'test-queue', + job_class=GrimoireLabJob, + connection=self.conn, + is_async=False + ) + job = q.enqueue(f=archivist_job, + result_ttl=100, + job_timeout=120, + job_id='archive-events', + **job_args) + result = job.return_value() + + # Check job result + self.assertEqual(result.job_id, job.get_id()) + self.assertEqual(result.backend, 'mock_storage') + self.assertEqual(result.group, 'archivist') + self.assertEqual(result.consumer_name, 'consumer_1') + self.assertEqual(result.total, len(events)) + + result_events = MockStorageBackend.get_events() + for result_event, event in zip(result_events, events): + self.assertDictEqual(result_event, event) + + def test_job_no_result(self): + """Execute a job that will not produce any results""" + + job_args = { + 'storage_type': 'mock_storage', + 'storage_url': 'example.com', + 'storage_db_name': 'mock_db', + 'storage_verify_certs': True, + 'redis_group': 'archivist', + 'consumer_name': 'consumer_1', + 'events_queue': 'test-events', + 'block_timeout': None, + 'limit': 10 + } + with patch('grimoirelab.core.scheduler.tasks.archivist.get_storage_backend', + return_value=MockStorageBackend): + q = rq.Queue( + 'test-queue', + job_class=GrimoireLabJob, + connection=self.conn, + is_async=False + ) + job = q.enqueue(f=archivist_job, + result_ttl=100, + job_timeout=120, + job_id='archive-events', + **job_args) + + result = job.return_value() + + # Check job result + self.assertEqual(result.job_id, job.get_id()) + self.assertEqual(result.backend, 'mock_storage') + self.assertEqual(result.group, 'archivist') + self.assertEqual(result.consumer_name, 'consumer_1') + self.assertEqual(result.total, 0) + + result_events = MockStorageBackend.get_events() + self.assertEqual(len(result_events), 0) + + def test_backend_not_found(self): + """Test if it fails when a storage_type is not found""" + + job_args = { + 'storage_type': 'nobackend', + 'storage_url': 'example.com', + 'storage_db_name': 'mock_db', + 'storage_verify_certs': True, + 'redis_group': 'archivist', + 'consumer_name': 'consumer_1', + 'events_queue': 'test-events', + } + q = rq.Queue( + 'test-queue', + job_class=GrimoireLabJob, + connection=self.conn, + is_async=False + ) + job = q.enqueue(f=archivist_job, + result_ttl=100, + job_timeout=120, + job_id='archive-events', + **job_args) + self.assertTrue(job.is_failed) + + +class TestArchivistProgress(GrimoireLabTestCase): + """Unit tests for ArchivistProgress class""" + + def test_init(self): + """Tests whether the ArchivistProgress initialization is correct""" + + progress = ArchivistProgress(job_id='1234567890', backend='opensearch', + group='storage_grp', consumer_name='consumer_1', + total=100) + + self.assertEqual(progress.job_id, '1234567890') + self.assertEqual(progress.backend, 'opensearch') + self.assertEqual(progress.group, 'storage_grp') + self.assertEqual(progress.consumer_name, 'consumer_1') + self.assertEqual(progress.total, 100) + + def test_from_dict(self): + """Tests whether the ArchivistProgress object is created from a dict""" + + job_id = '1234567890' + backend = 'opensearch' + group = 'storage_grp' + consumer_name = 'consumer_1' + total = 100 + + data = { + 'job_id': job_id, + 'backend': backend, + 'group': group, + 'consumer_name': consumer_name, + 'total': total + } + + progress = ArchivistProgress.from_dict(data) + + self.assertEqual(progress.job_id, job_id) + self.assertEqual(progress.backend, backend) + self.assertEqual(progress.group, group) + self.assertEqual(progress.consumer_name, consumer_name) + self.assertEqual(progress.total, total) + + def test_to_dict(self): + """Tests whether the ArchivistProgress object is converted to a dict""" + + job_id = '12345678' + backend = 'git' + group = 'storage_grp' + consumer_name = 'consumer_1' + total = 100 + + progress = ArchivistProgress(job_id, backend, group, consumer_name, total) + + expected = { + 'job_id': job_id, + 'backend': backend, + 'group': group, + 'consumer_name': consumer_name, + 'total': total + } + + d = progress.to_dict() + self.assertEqual(d, expected)