Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#2035 Fix new bulk import queuing
Browse files Browse the repository at this point in the history
  • Loading branch information
rkorytkowski committed Jan 22, 2025
1 parent f10e0f9 commit 3595189
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 41 deletions.
28 changes: 21 additions & 7 deletions core/importers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
import tempfile
import zipfile
from zipfile import ZipFile

from celery.result import AsyncResult, result_from_tuple
from celery import group, chain, chord
from celery import group, chain

import ijson
import requests
Expand Down Expand Up @@ -216,7 +215,7 @@ class Importer:
owner_type: str
owner: str
import_type: str = 'default'
BATCH_SIZE: int = 100
MIN_BATCH_SIZE: int = 50
IMPORT_CACHE: str = "import_cache/"

# pylint: disable=too-many-arguments
Expand Down Expand Up @@ -376,14 +375,24 @@ def traverse_dependencies(self, package_file, path, resource_types, dependencies

def prepare_tasks(self, resource_types, packages, resources):
tasks = []
# Count all items to determine batch size
all_count = 0
for resource, item in resources.items():
for filepath, count in item.items():
all_count += count
if all_count > 50000:
task_batch_size = (all_count / 1000)
else:
task_batch_size = self.MIN_BATCH_SIZE

# Import in groups in order. Resources within groups are imported in parallel.
for package in packages:
# Import dependencies in order.
for resource_type in resource_types:
# Import resource types in order.
files = []
groups = []
batch_size = self.BATCH_SIZE
batch_size = task_batch_size
for filepath, count in resources.get(resource_type).items():
if not filepath.startswith(package):
continue
Expand All @@ -403,11 +412,16 @@ def prepare_tasks(self, resource_types, packages, resources):
batch_size -= end_index - start_index
start_index = end_index

if batch_size <= 0 or start_index == end_index:
if batch_size <= 0:
groups.append({"path": package, "username": self.username, "owner_type": self.owner_type,
"owner": self.owner, "resource_type": resource_type, "files": files})
files = []
batch_size = self.BATCH_SIZE
batch_size = task_batch_size

if files:
# Append last task to the group
groups.append({"path": package, "username": self.username, "owner_type": self.owner_type,
"owner": self.owner, "resource_type": resource_type, "files": files})

if groups:
tasks.append(groups)
Expand All @@ -426,7 +440,7 @@ def schedule_tasks(self, tasks):
if len(group_tasks) == 1: # Prevent celery from converting group to a single task
group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent'))

chained_tasks |= chord(group(group_tasks), bulk_import_subtask_empty.si().set(queue='concurrent'))
chained_tasks |= group(group_tasks)
chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent')

final_task = chained_tasks.apply_async(queue='concurrent')
Expand Down
60 changes: 31 additions & 29 deletions core/importers/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2064,36 +2064,38 @@ def test_prepare_tasks(self):
'CodeSystem': {'/package1/path4:': 10},
'ConceptMap': {'/package2/path5': 250}
})

self.assertEqual(tasks, [
# Executed in sequence
[ # Executed in parallel
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]}
], [ # Executed in parallel
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 100}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100,
'end_index': 200}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]}
], [ # Executed in parallel
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]}
], [ # Executed in parallel
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 100}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 0, 'end_index': 100}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 100, 'end_index': 200}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 200,
'end_index': 299}]}
]])
[{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path3', 'start_index': 0, 'end_index': 50}]}],
[{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 0, 'end_index': 50}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 50, 'end_index': 100}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 100, 'end_index': 150}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 150, 'end_index': 200}]},
{'path': '/package2', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ConceptMap', 'files': [{'filepath': 'path5', 'start_index': 200, 'end_index': 250}]}],
[{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'CodeSystem', 'files': [{'filepath': 'path4:', 'start_index': 0, 'end_index': 10}]}],
[{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 0, 'end_index': 50}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 50, 'end_index': 100}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path1', 'start_index': 100, 'end_index': 101},
{'filepath': 'path2', 'start_index': 0, 'end_index': 49}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 49, 'end_index': 99}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 99, 'end_index': 149}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 149, 'end_index': 199}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 199, 'end_index': 249}]},
{'path': '/package1', 'username': 'root', 'owner_type': 'users', 'owner': 'root',
'resource_type': 'ValueSet', 'files': [{'filepath': 'path2', 'start_index': 249, 'end_index': 299}]}]])

hl7_fhir_fr_core_resources = {
'CodeSystem': {'http://fetch/npm/package/package/CodeSystem-fr-core-cs-circonstances-sortie.json': 1,
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
healthcheck:
test: "pg_isready -U postgres"
volumes:
- postgres-data:/usr/share/postgres/data
- postgres-data:/var/lib/postgresql/data

This comment has been minimized.

Copy link
@rkorytkowski

rkorytkowski Jan 22, 2025

Author Contributor

@snyaggarwal heads up, I've fixed the volume location. You will need to recreate DB locally.

redis:
image: bitnami/redis:7.0.12
restart: "always"
Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ python-dateutil==2.8.2
requests==2.32.3
factory_boy==3.3.0
# Celery
celery[redis]==5.3.1
celery[redis]==5.4.0
celery_once==3.0.1
git+https://github.com/snyaggarwal/flower # use until https://github.com/mher/flower/issues/1231 issue is resolved
redis==4.6.0
django-redis==5.3.0 # needed for redis sentinel support
kombu==5.3.1
redis==5.2.1
django-redis==5.4.0 # needed for redis sentinel support
kombu==5.4.2
django-elasticsearch-dsl==7.3
drf-yasg==1.21.5
git+https://github.com/snyaggarwal/django-queryset-csv
Expand Down

0 comments on commit 3595189

Please sign in to comment.