Skip to content

Commit

Permalink
Merge pull request #50 from cloudblue/LITE-30582-upload-issue-with-bi…
Browse files Browse the repository at this point in the history
…g-files

LITE-30582 Increase backoff factor between tasks generation
  • Loading branch information
akodelia authored Oct 17, 2024
2 parents 3843454 + 50f6ab2 commit 133889a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 10 deletions.
2 changes: 1 addition & 1 deletion connect_bi_reporter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Delay in seconds for schedule to process Upload task
SECONDS_DELAY = 120
# Backoff factor in seconds between Upload tasks creation
SECONDS_BACKOFF_FACTOR = 10
SECONDS_BACKOFF_FACTOR = 120
CREATE_UPLOADS_METHOD_NAME = 'create_uploads'
PROCESS_UPLOADS_METHOD_NAME = 'process_upload'
PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = {
Expand Down
3 changes: 2 additions & 1 deletion connect_bi_reporter/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from logging import Logger
import enum
from logging import Logger
from typing import Any, Dict, Optional

from connect.client import ClientError, ConnectClient
Expand All @@ -23,6 +23,7 @@ class TriggerTypeEnum(str, enum.Enum):
class ResponseTypeEnum(str, enum.Enum):
SUCCESS = 'done'
ERROR = 'reschedule'
FAIL = 'fail'


class TriggerType:
Expand Down
3 changes: 2 additions & 1 deletion connect_bi_reporter/uploads/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
import copy

from sqlalchemy import util
from connect.client import ClientError
Expand Down Expand Up @@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds):


def get_process_upload_task_payload(installation_id, upload_id, account_id):
payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD
payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD)
payload.update({'name': f'Process Uploads - {account_id}'})
parameters = {
'installation_id': installation_id,
Expand Down
28 changes: 21 additions & 7 deletions connect_bi_reporter/uploads/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
from datetime import datetime
import time
from zipfile import ZipFile

from connect.client import ClientError
Expand All @@ -18,7 +19,7 @@
from connect_bi_reporter.uploads.models import Upload
from connect_bi_reporter.uploads.services import create_process_upload_tasks, create_uploads
from connect_bi_reporter.uploads.storage_utils import upload_file
from connect_bi_reporter.scheduler import Scheduler
from connect_bi_reporter.scheduler import ResponseTypeEnum, Scheduler


class UploadTaskApplicationMixin:
Expand Down Expand Up @@ -78,6 +79,7 @@ def process_upload(self, schedule):
if 'installation_id' not in schedule['parameter']:
return ScheduledExecutionResponse.fail(output='Parameter installation_id is missing.')

begin_time = time.monotonic()
instalation_client = self.get_installation_admin_client(
schedule['parameter']['installation_id'],
)
Expand All @@ -90,15 +92,16 @@ def process_upload(self, schedule):
if not upload:
return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.')

if upload.status != 'pending':
if upload.status != Upload.STATUSES.pending:
return ScheduledExecutionResponse.fail(
output=f'Cannot process upload in status `{upload.status}`.',
)

upload.status = 'processing'
upload.status = Upload.STATUSES.processing
db.add(upload)
db.commit()

execution_method_result = ResponseTypeEnum.SUCCESS
try:
report_data = download_report(instalation_client, upload.report_id)

Expand All @@ -112,14 +115,25 @@ def process_upload(self, schedule):
)
upload.size = uploaded_file_props.get('size', 0)

upload.status = 'uploaded'
upload.status = Upload.STATUSES.uploaded
upload.name = file_name
db.add(upload)
db.commit()
return ScheduledExecutionResponse.done()
except Exception:
self.logger.exception(msg='Error processing upload')
upload.status = 'failed'
upload.status = Upload.STATUSES.failed
db.add(upload)
db.commit()
return ScheduledExecutionResponse.fail()
execution_method_result = ResponseTypeEnum.FAIL

took = time.monotonic() - begin_time
self.logger.info(
'Execution of `process_upload` task for Upload {0} finished (took "{1}"): '
'Upload status: `{2}`, Taks result: `{3}`.'.format(
upload.id,
took,
upload.status,
execution_method_result,
),
)
return getattr(ScheduledExecutionResponse, execution_method_result)()
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
testpaths = "tests"
addopts = "--cov=connect_bi_reporter --cov-report=term-missing --cov-report=html --cov-report=xml"
filterwarnings = [
"ignore::sqlalchemy.exc.SADeprecationWarning"
]

[tool.coverage.run]
relative_files = true
Expand Down
56 changes: 56 additions & 0 deletions tests/uploads/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, timezone
import re
from unittest.mock import call

Expand All @@ -7,6 +8,7 @@
from connect.eaas.core.inject.models import Context
from sqlalchemy.exc import DBAPIError

from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY
from connect_bi_reporter.events import ConnectBiReporterEventsApplication


Expand All @@ -16,6 +18,8 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
logger,
config={},
)
p_time = mocker.patch('connect_bi_reporter.uploads.tasks.time')
p_time.monotonic.side_effect = [10, 12]
ext.get_installation_admin_client = lambda self: connect_client

with open('./tests/uploads/test-zip.zip', 'rb') as zf:
Expand All @@ -41,6 +45,10 @@ def test_process_upload(dbsession, connect_client, installation, logger, mocker,
assert re.match(feed.file_name + '_\\d{8} \\d{2}:\\d{2}:\\d{2}.csv', upload.name)
assert upload.size == 1024
assert upload.status == upload.STATUSES.uploaded
assert logger.method_calls[0].args[0] == (
f'Execution of `process_upload` task for Upload {upload.id} '
f'finished (took "2"): Upload status: `uploaded`, Taks result: `done`.'
)


def test_process_upload_report_download_failed(
Expand Down Expand Up @@ -229,6 +237,11 @@ def test_create_upload_schedule_task(
),
)
ext.get_installation_admin_client = lambda self: connect_client

_now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc)
p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime')
p_datetime.utcnow = lambda: _now

mocker.patch(
'connect_bi_reporter.uploads.tasks.get_extension_owner_client',
return_value=connect_client,
Expand All @@ -245,6 +258,9 @@ def test_create_upload_schedule_task(
'connect_bi_reporter.scheduler.create_schedule_task',
return_value=eaas_schedule_task,
)
p_get_task_payload = mocker.patch(
'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload',
)
feed1 = feed_factory(
schedule_id=report_schedule['id'],
account_id=installation['owner']['id'],
Expand Down Expand Up @@ -274,6 +290,44 @@ def test_create_upload_schedule_task(
),
],
)
delay = SECONDS_DELAY
new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR
p_get_task_payload.assert_has_calls(
[
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=new_delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
],
)
for idx, zipped in enumerate(zip(uploads, [feed1, feed2])):
upload, feed = zipped
assert result.status == 'success'
Expand All @@ -298,6 +352,8 @@ def test_create_upload_schedule_task(
f' created for Upload `{uploads[1].id}`: '
f'Will process Report File `{report_file[1]["id"]}`'
)
assert delay == 120
assert new_delay == 240


def test_create_upload_schedule_task_no_feeds(
Expand Down

0 comments on commit 133889a

Please sign in to comment.