From 3595189536f89f6bd29ac5e5e78744812eb81071 Mon Sep 17 00:00:00 2001 From: rkorytkowski Date: Wed, 22 Jan 2025 12:45:39 +0100 Subject: [PATCH] OpenConceptLab/ocl_issues#2035 Fix new bulk import queuing --- core/importers/importer.py | 28 +++++++++++++----- core/importers/tests.py | 60 ++++++++++++++++++++------------------ docker-compose.yml | 2 +- requirements.txt | 8 ++--- 4 files changed, 57 insertions(+), 41 deletions(-) diff --git a/core/importers/importer.py b/core/importers/importer.py index 209013bf..ef164053 100644 --- a/core/importers/importer.py +++ b/core/importers/importer.py @@ -9,9 +9,8 @@ import tempfile import zipfile from zipfile import ZipFile - from celery.result import AsyncResult, result_from_tuple -from celery import group, chain, chord +from celery import group, chain import ijson import requests @@ -216,7 +215,7 @@ class Importer: owner_type: str owner: str import_type: str = 'default' - BATCH_SIZE: int = 100 + MIN_BATCH_SIZE: int = 50 IMPORT_CACHE: str = "import_cache/" # pylint: disable=too-many-arguments @@ -376,6 +375,16 @@ def traverse_dependencies(self, package_file, path, resource_types, dependencies def prepare_tasks(self, resource_types, packages, resources): tasks = [] + # Count all items to determine batch size + all_count = 0 + for resource, item in resources.items(): + for filepath, count in item.items(): + all_count += count + if all_count > 50000: + task_batch_size = (all_count / 1000) + else: + task_batch_size = self.MIN_BATCH_SIZE + # Import in groups in order. Resources within groups are imported in parallel. for package in packages: # Import dependencies in order. @@ -383,7 +392,7 @@ def prepare_tasks(self, resource_types, packages, resources): # Import resource types in order. files = [] groups = [] - batch_size = self.BATCH_SIZE + batch_size = task_batch_size for filepath, count in resources.get(resource_type).items(): if not filepath.startswith(package): continue @@ -403,11 +412,16 @@ def prepare_tasks(self, resource_types, packages, resources): batch_size -= end_index - start_index start_index = end_index - if batch_size <= 0 or start_index == end_index: + if batch_size <= 0: groups.append({"path": package, "username": self.username, "owner_type": self.owner_type, "owner": self.owner, "resource_type": resource_type, "files": files}) files = [] - batch_size = self.BATCH_SIZE + batch_size = task_batch_size + + if files: + # Append last task to the group + groups.append({"path": package, "username": self.username, "owner_type": self.owner_type, + "owner": self.owner, "resource_type": resource_type, "files": files}) if groups: tasks.append(groups) @@ -426,7 +440,7 @@ def schedule_tasks(self, tasks): if len(group_tasks) == 1: # Prevent celery from converting group to a single task group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent')) - chained_tasks |= chord(group(group_tasks), bulk_import_subtask_empty.si().set(queue='concurrent')) + chained_tasks |= group(group_tasks) chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent') final_task = chained_tasks.apply_async(queue='concurrent') diff --git a/core/importers/tests.py b/core/importers/tests.py index 7e680d5e..b9e2d6de 100644 --- a/core/importers/tests.py +++ b/core/importers/tests.py @@ -2064,36 +2064,38 @@ def test_prepare_tasks(self): 'CodeSystem': {'/package1/path4:': 10}, 'ConceptMap': {'/package2/path5': 250} }) - self.assertEqual(tasks, [ - # Executed in sequence - [ # Executed in parallel - {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]} - ], [ # Executed in parallel - {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 100}]}, - {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100, - 'end_index': 200}]}, - {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]} - ], [ # Executed in parallel - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]} - ], [ # Executed in parallel - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 100}]}, - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101}]}, - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 0, 'end_index': 100}]}, - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 100, 'end_index': 200}]}, - {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', - 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 200, - 'end_index': 299}]} - ]]) + [{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]}], + [{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 50}]}, + {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 50, 'end_index': 100}]}, + {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100, 'end_index': 150}]}, + {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 150, 'end_index': 200}]}, + {'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]}], + [{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]}], + [{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 50}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 50, 'end_index': 100}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101}, + {'filepath': 'path2', 'start_index': 0, 'end_index': 49}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 49, 'end_index': 99}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 99, 'end_index': 149}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 149, 'end_index': 199}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 199, 'end_index': 249}]}, + {'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root', + 'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 249, 'end_index': 299}]}]]) hl7_fhir_fr_core_resources = { 'CodeSystem': {'http://fetch/npm/package/package/CodeSystem-fr-core-cs-circonstances-sortie.json': 1, diff --git a/docker-compose.yml b/docker-compose.yml index ce326419..a4ea8be5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: healthcheck: test: "pg_isready -U postgres" volumes: - - postgres-data:/usr/share/postgres/data + - postgres-data:/var/lib/postgresql/data redis: image: bitnami/redis:7.0.12 restart: "always" diff --git a/requirements.txt b/requirements.txt index f24e7976..acde393d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,12 +14,12 @@ python-dateutil==2.8.2 requests==2.32.3 factory_boy==3.3.0 # Celery -celery[redis]==5.3.1 +celery[redis]==5.4.0 celery_once==3.0.1 git+https://github.com/snyaggarwal/flower # use until https://github.com/mher/flower/issues/1231 issue is resolved -redis==4.6.0 -django-redis==5.3.0 # needed for redis sentinel support -kombu==5.3.1 +redis==5.2.1 +django-redis==5.4.0 # needed for redis sentinel support +kombu==5.4.2 django-elasticsearch-dsl==7.3 drf-yasg==1.21.5 git+https://github.com/snyaggarwal/django-queryset-csv