Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive events in OpenSearch #24

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/settings/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __call__(self, _, strict):
return self.conn


RQ_QUEUES['default'] = _RQ_DATABASE # noqa: F405
RQ_QUEUES['testing'] = _RQ_DATABASE # noqa: F405
RQ['WORKER_CLASS'] = rq.worker.SimpleWorker

Expand Down
38 changes: 37 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
grimoirelab-chronicler = {git = "https://github.com/chaoss/grimoirelab-chronicler.git", allow-prereleases = true}
django-cors-headers = "^4.6.0"
djangorestframework = "^3.15.2"
opensearch-py = "^2.8.0"

[tool.poetry.group.dev.dependencies]
fakeredis = "^2.0.0"
Expand Down
16 changes: 15 additions & 1 deletion src/grimoirelab/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@
# https://github.com/rq/django-rq
#

GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'eventizer_jobs')
GRIMOIRELAB_Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')

_RQ_DATABASE = {
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
Expand All @@ -284,6 +285,7 @@

RQ_QUEUES = {
GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE,
GRIMOIRELAB_Q_ARCHIVIST_JOBS: _RQ_DATABASE,
}

GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get('GRIMOIRELAB_EVENTS_STREAM_NAME',
Expand All @@ -309,3 +311,15 @@
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))

GRIMOIRELAB_GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')

#
# Archivist configuration
#
GRIMOIRELAB_ARCHIVIST = {
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
}
80 changes: 78 additions & 2 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ def run(ctx: Context):
is_flag=True,
default=False,
help="Run the service in developer mode.")
@click.option("--clear-tasks",
is_flag=True,
default=False,
help="Clear background tasks.")
@run.command()
@click.pass_context
def server(ctx: Context, devel: bool):
def server(ctx: Context, devel: bool, clear_tasks: bool):
"""Start the GrimoireLab core server.

GrimoireLab server allows to schedule tasks and fetch data from
Expand All @@ -56,6 +60,8 @@ def server(ctx: Context, devel: bool):
should be run with a reverse proxy. If you activate the '--dev' flag,
a HTTP server will be run instead.
"""
create_background_tasks(clear_tasks)

env = os.environ

env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
Expand Down Expand Up @@ -92,7 +98,7 @@ def server(ctx: Context, devel: bool):

@run.command()
@click.option('--workers',
default=10,
default=5,
show_default=True,
help="Number of workers to run in the pool.")
def eventizers(workers: int):
Expand All @@ -112,3 +118,73 @@ def eventizers(workers: int):
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
num_workers=workers
)


@run.command()
@click.option('--workers',
default=20,
show_default=True,
help="Number of workers to run in the pool.")
def archivists(workers: int):
"""Start a pool of archivists workers.

The workers on the pool will run tasks to fetch events from redis.
Data will be stored in the defined data source.

The number of workers running in the pool can be defined with the
parameter '--workers'.

Workers get jobs from the GRIMOIRELAB_Q_ARCHIVIST_JOBS queue defined
in the configuration file.
"""
django.core.management.call_command(
'rqworker-pool', settings.GRIMOIRELAB_Q_ARCHIVIST_JOBS,
num_workers=workers
)


def create_background_tasks(clear_tasks: bool):
"""
Create background tasks before starting the server.
:param clear_tasks: clear tasks before creating new ones.
:return:
"""
from grimoirelab.core.scheduler.scheduler import schedule_task
from grimoirelab.core.scheduler.tasks.models import StorageTask

workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']

if clear_tasks:
StorageTask.objects.all().delete()
click.echo("Removing old background tasks.")

current = StorageTask.objects.filter(burst=False).count()
if workers == current:
click.echo("Background tasks already created. Skipping.")
return

task_args = {
'storage_url': storage_url,
'storage_db_name': storage_db_name,
'storage_verify_certs': verify_certs,
'redis_group': 'archivist',
'limit': events_per_job
}
if workers > current:
for _ in range(workers - current):
schedule_task(
task_type=StorageTask.TASK_TYPE,
storage_type=storage_type,
task_args=task_args,
job_interval=1,
job_max_retries=10
)
click.echo(f"Created {workers} background tasks.")
elif workers < current:
tasks = StorageTask.objects.all()[workers:]
tasks.update(burst=True)
2 changes: 1 addition & 1 deletion src/grimoirelab/core/scheduler/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class GrimoireLabJob(rq.job.Job):
"""

# Default packages to log
PACKAGES_TO_LOG = [__name__, 'chronicler', 'perceval', 'rq']
PACKAGES_TO_LOG = [__name__, 'chronicler', 'archivist', 'perceval', 'rq']

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand Down
8 changes: 8 additions & 0 deletions src/grimoirelab/core/scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
job classes.
"""
return iter(GRIMOIRELAB_TASK_MODELS.values())


def get_all_registered_task_names() -> list[str]:
"""Return all registered task names.

:returns: a list with all registered task names.
"""
return list(GRIMOIRELAB_TASK_MODELS.keys())
4 changes: 2 additions & 2 deletions src/grimoirelab/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def cancel_task(task_uuid: str) -> None:

jobs = job_class.objects.filter(task=task).all()
for job in jobs:
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection())
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection(task.default_job_queue))
job_rq.delete()

task.delete()
Expand All @@ -125,7 +125,7 @@ def maintain_tasks() -> None:
job_db = task.jobs.order_by('scheduled_at').first()

try:
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection(task.default_job_queue))
continue
except rq.exceptions.NoSuchJobError:
logger.debug(
Expand Down
Loading
Loading