Skip to content

Commit

Permalink
Archive Events in OpenSearch
Browse files Browse the repository at this point in the history
This commit implements archiving of events in an OpenSearch backend.

It introduces a new subscriber group that reads from the events stream
and stores events in an OpenSearch instance.

Additionally, the worker pool now supports execution on specific task
queues, enabling different workers to process tasks based on their queue.

Signed-off-by: Jose Javier Merchante <[email protected]>
  • Loading branch information
jjmerchante committed Jan 21, 2025
1 parent 2e6fd19 commit 13bd990
Show file tree
Hide file tree
Showing 9 changed files with 935 additions and 19 deletions.
38 changes: 37 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true}
django-cors-headers = "^4.6.0"
djangorestframework = "^3.15.2"
opensearch-py = "^2.8.0"

[tool.poetry.group.dev.dependencies]
fakeredis = "^2.0.0"
Expand Down
25 changes: 17 additions & 8 deletions src/grimoirelab/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@
# https://github.com/rq/django-rq
#

GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs')
GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')

_RQ_DATABASE = {
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
Expand All @@ -284,15 +285,11 @@

RQ_QUEUES = {
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE,
}

GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
'events')
# Maximum events in Redis stream before dropping. Consumers must process events
# faster than production to avoid loss. Default max size is 1M events (~2.5GB Git events).
# Adjust for memory constraints.
GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_EVENTS_STREAM_MAX_LENGTH',
1 * 10 ** 6))
GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME', 'events')
GRIMOIRELAB_STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6))

RQ = {
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
Expand All @@ -309,3 +306,15 @@
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))

GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')

#
# Archivist configuration
#
GRIMOIRELAB_ARCHIVIST = {
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
}
92 changes: 82 additions & 10 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import os
import sys
import typing

import click
Expand All @@ -43,9 +44,13 @@ def run(ctx: Context):
is_flag=True,
default=False,
help="Run the service in developer mode.")
@click.option("--clear-tasks",
is_flag=True,
default=False,
help="Clear background tasks.")
@run.command()
@click.pass_context
def server(ctx: Context, devel: bool):
def server(ctx: Context, devel: bool, clear_tasks: bool):
"""Start the GrimoireLab core server.
GrimoireLab server allows to schedule tasks and fetch data from
Expand All @@ -56,6 +61,8 @@ def server(ctx: Context, devel: bool):
should be run with a reverse proxy. If you activate the '--dev' flag,
a HTTP server will be run instead.
"""
create_background_tasks(clear_tasks)

env = os.environ

env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
Expand Down Expand Up @@ -91,24 +98,89 @@ def server(ctx: Context, devel: bool):


@run.command()
@click.argument('task-types', nargs=-1)
@click.option('--workers',
default=10,
show_default=True,
help="Number of workers to run in the pool.")
def eventizers(workers: int):
"""Start a pool of eventizer workers.
def worker_pool(task_types: str, workers: int):
"""Start a pool of workers that run specific tasks.
The workers on the pool will run tasks to fetch data from software
development repositories. Data will be processed in form of events,
and published in the events queue.
If multiple tasks share the same queue, they will run in the same
pool of workers. The tasks to run are defined as arguments to the
command.
The number of workers running in the pool can be defined with the
parameter '--workers'.
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
in the configuration file.
"""
from grimoirelab.core.scheduler.models import (get_registered_task_model,
get_all_registered_task_names)

available_tasks = get_all_registered_task_names()

queues = []
for task in task_types:
try:
Task = get_registered_task_model(task)[0]
except KeyError:
click.echo(f"Task '{task}' is not a valid task. "
f"Options: {available_tasks}", err=True)
sys.exit(1)
queues.append(Task().default_job_queue)

if not queues:
click.echo(f"You must define at least one valid task. "
f"Options: {available_tasks}", err=True)
sys.exit(1)

django.core.management.call_command(
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
'rqworker-pool', queues,
num_workers=workers
)


def create_background_tasks(clear_tasks: bool):
"""
Create background tasks before starting the server.
:param clear_tasks: clear tasks before creating new ones.
:return:
"""
from grimoirelab.core.scheduler.scheduler import schedule_task
from grimoirelab.core.scheduler.tasks.models import StorageTask

workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']

if clear_tasks:
StorageTask.objects.all().delete()
click.echo("Removing old background tasks.")

current = StorageTask.objects.filter(burst=False).count()
if workers == current:
click.echo("Background tasks already created. Skipping.")
return

task_args = {
'storage_url': storage_url,
'storage_db_name': storage_db_name,
'storage_verify_certs': verify_certs,
'redis_group': 'archivist',
'limit': events_per_job
}
if workers > current:
for _ in range(workers - current):
schedule_task(
task_type=StorageTask.TASK_TYPE,
storage_type=storage_type,
task_args=task_args,
job_interval=1,
job_max_retries=10
)
click.echo(f"Created {workers} background tasks.")
elif workers < current:
tasks = StorageTask.objects.all()[workers:]
tasks.update(burst=True)
8 changes: 8 additions & 0 deletions src/grimoirelab/core/scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
job classes.
"""
return iter(GRIMOIRELAB_TASK_MODELS.values())


def get_all_registered_task_names() -> list[str]:
"""Return all registered task names.
:returns: a list with all registered task names.
"""
return list(GRIMOIRELAB_TASK_MODELS.keys())
Loading

0 comments on commit 13bd990

Please sign in to comment.