Skip to content

Commit

Permalink
LITE-30582 Increase backoff factor between tasks generation
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatrios committed Oct 15, 2024
1 parent 3843454 commit 7a8ae26
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 7 deletions.
2 changes: 1 addition & 1 deletion connect_bi_reporter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Delay in seconds for schedule to process Upload task
SECONDS_DELAY = 120
# Backoff factor in seconds between Upload tasks creation
SECONDS_BACKOFF_FACTOR = 10
SECONDS_BACKOFF_FACTOR = 120
CREATE_UPLOADS_METHOD_NAME = 'create_uploads'
PROCESS_UPLOADS_METHOD_NAME = 'process_upload'
PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = {
Expand Down
2 changes: 1 addition & 1 deletion connect_bi_reporter/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from logging import Logger
import enum
from logging import Logger
from typing import Any, Dict, Optional

from connect.client import ClientError, ConnectClient
Expand Down
3 changes: 2 additions & 1 deletion connect_bi_reporter/uploads/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
import copy

from sqlalchemy import util
from connect.client import ClientError
Expand Down Expand Up @@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds):


def get_process_upload_task_payload(installation_id, upload_id, account_id):
payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD
payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD)
payload.update({'name': f'Process Uploads - {account_id}'})
parameters = {
'installation_id': installation_id,
Expand Down
8 changes: 4 additions & 4 deletions connect_bi_reporter/uploads/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ def process_upload(self, schedule):
if not upload:
return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.')

if upload.status != 'pending':
if upload.status != Upload.STATUSES.pending:
return ScheduledExecutionResponse.fail(
output=f'Cannot process upload in status `{upload.status}`.',
)

upload.status = 'processing'
upload.status = Upload.STATUSES.processing
db.add(upload)
db.commit()

Expand All @@ -112,14 +112,14 @@ def process_upload(self, schedule):
)
upload.size = uploaded_file_props.get('size', 0)

upload.status = 'uploaded'
upload.status = Upload.STATUSES.uploaded
upload.name = file_name
db.add(upload)
db.commit()
return ScheduledExecutionResponse.done()
except Exception:
self.logger.exception(msg='Error processing upload')
upload.status = 'failed'
upload.status = Upload.STATUSES.failed
db.add(upload)
db.commit()
return ScheduledExecutionResponse.fail()
50 changes: 50 additions & 0 deletions tests/uploads/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, timezone
import re
from unittest.mock import call

Expand All @@ -7,6 +8,7 @@
from connect.eaas.core.inject.models import Context
from sqlalchemy.exc import DBAPIError

from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY
from connect_bi_reporter.events import ConnectBiReporterEventsApplication


Expand Down Expand Up @@ -229,6 +231,11 @@ def test_create_upload_schedule_task(
),
)
ext.get_installation_admin_client = lambda self: connect_client

_now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc)
p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime')
p_datetime.utcnow = lambda: _now

mocker.patch(
'connect_bi_reporter.uploads.tasks.get_extension_owner_client',
return_value=connect_client,
Expand All @@ -245,6 +252,9 @@ def test_create_upload_schedule_task(
'connect_bi_reporter.scheduler.create_schedule_task',
return_value=eaas_schedule_task,
)
p_get_task_payload = mocker.patch(
'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload',
)
feed1 = feed_factory(
schedule_id=report_schedule['id'],
account_id=installation['owner']['id'],
Expand Down Expand Up @@ -274,6 +284,44 @@ def test_create_upload_schedule_task(
),
],
)
delay = SECONDS_DELAY
new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR
p_get_task_payload.assert_has_calls(
[
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
call(
trigger_type='onetime',
trigger_data={
'date': (_now + timedelta(seconds=new_delay)).isoformat(),
},
method_payload={
'method': 'process_upload',
'description': 'This task will download the report from'
' connect and published it in the respective storage.',
'parameter': {
'installation_id': 'EIN-8436-7221-8308',
'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000',
},
'name': 'Process Uploads - PA-000-000',
},
),
],
)
for idx, zipped in enumerate(zip(uploads, [feed1, feed2])):
upload, feed = zipped
assert result.status == 'success'
Expand All @@ -298,6 +346,8 @@ def test_create_upload_schedule_task(
f' created for Upload `{uploads[1].id}`: '
f'Will process Report File `{report_file[1]["id"]}`'
)
assert delay == 120
assert new_delay == 240


def test_create_upload_schedule_task_no_feeds(
Expand Down

0 comments on commit 7a8ae26

Please sign in to comment.