From 7a8ae26444ccf8fbe227ed85cd74aa9af5827dff Mon Sep 17 00:00:00 2001 From: Jonathan Rios Date: Tue, 15 Oct 2024 15:47:29 +0200 Subject: [PATCH] LITE-30582 Increase backoff factor between tasks generation --- connect_bi_reporter/constants.py | 2 +- connect_bi_reporter/scheduler.py | 2 +- connect_bi_reporter/uploads/services.py | 3 +- connect_bi_reporter/uploads/tasks.py | 8 ++-- tests/uploads/test_tasks.py | 50 +++++++++++++++++++++++++ 5 files changed, 58 insertions(+), 7 deletions(-) diff --git a/connect_bi_reporter/constants.py b/connect_bi_reporter/constants.py index dea49e2..c0c9406 100644 --- a/connect_bi_reporter/constants.py +++ b/connect_bi_reporter/constants.py @@ -9,7 +9,7 @@ # Delay in seconds for schedule to process Upload task SECONDS_DELAY = 120 # Backoff factor in seconds between Upload tasks creation -SECONDS_BACKOFF_FACTOR = 10 +SECONDS_BACKOFF_FACTOR = 120 CREATE_UPLOADS_METHOD_NAME = 'create_uploads' PROCESS_UPLOADS_METHOD_NAME = 'process_upload' PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD = { diff --git a/connect_bi_reporter/scheduler.py b/connect_bi_reporter/scheduler.py index 3630c27..ea54a2b 100644 --- a/connect_bi_reporter/scheduler.py +++ b/connect_bi_reporter/scheduler.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from logging import Logger import enum +from logging import Logger from typing import Any, Dict, Optional from connect.client import ClientError, ConnectClient diff --git a/connect_bi_reporter/uploads/services.py b/connect_bi_reporter/uploads/services.py index c896275..974c3e2 100644 --- a/connect_bi_reporter/uploads/services.py +++ b/connect_bi_reporter/uploads/services.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +import copy from sqlalchemy import util from connect.client import ClientError @@ -161,7 +162,7 @@ def create_uploads(db, client, logger, feeds): def get_process_upload_task_payload(installation_id, upload_id, account_id): - payload = PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD + payload = copy.deepcopy(PROCESS_UPLOAD_TAKS_BASE_METHOD_PAYLOAD) payload.update({'name': f'Process Uploads - {account_id}'}) parameters = { 'installation_id': installation_id, diff --git a/connect_bi_reporter/uploads/tasks.py b/connect_bi_reporter/uploads/tasks.py index 394839d..8e667d3 100644 --- a/connect_bi_reporter/uploads/tasks.py +++ b/connect_bi_reporter/uploads/tasks.py @@ -90,12 +90,12 @@ def process_upload(self, schedule): if not upload: return ScheduledExecutionResponse.fail(output=f'Invalid upload `{upload_id}`.') - if upload.status != 'pending': + if upload.status != Upload.STATUSES.pending: return ScheduledExecutionResponse.fail( output=f'Cannot process upload in status `{upload.status}`.', ) - upload.status = 'processing' + upload.status = Upload.STATUSES.processing db.add(upload) db.commit() @@ -112,14 +112,14 @@ def process_upload(self, schedule): ) upload.size = uploaded_file_props.get('size', 0) - upload.status = 'uploaded' + upload.status = Upload.STATUSES.uploaded upload.name = file_name db.add(upload) db.commit() return ScheduledExecutionResponse.done() except Exception: self.logger.exception(msg='Error processing upload') - upload.status = 'failed' + upload.status = Upload.STATUSES.failed db.add(upload) db.commit() return ScheduledExecutionResponse.fail() diff --git a/tests/uploads/test_tasks.py b/tests/uploads/test_tasks.py index 2ae78cb..224707d 100644 --- a/tests/uploads/test_tasks.py +++ b/tests/uploads/test_tasks.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta, timezone import re from unittest.mock import call @@ -7,6 +8,7 @@ from connect.eaas.core.inject.models import Context from sqlalchemy.exc import DBAPIError +from connect_bi_reporter.constants import SECONDS_BACKOFF_FACTOR, SECONDS_DELAY from connect_bi_reporter.events import ConnectBiReporterEventsApplication @@ -229,6 +231,11 @@ def test_create_upload_schedule_task( ), ) ext.get_installation_admin_client = lambda self: connect_client + + _now = datetime(2024, 10, 15, 10, 0, 0, tzinfo=timezone.utc) + p_datetime = mocker.patch('connect_bi_reporter.uploads.services.datetime') + p_datetime.utcnow = lambda: _now + mocker.patch( 'connect_bi_reporter.uploads.tasks.get_extension_owner_client', return_value=connect_client, @@ -245,6 +252,9 @@ def test_create_upload_schedule_task( 'connect_bi_reporter.scheduler.create_schedule_task', return_value=eaas_schedule_task, ) + p_get_task_payload = mocker.patch( + 'connect_bi_reporter.scheduler.EaasScheduleTask.get_task_payload', + ) feed1 = feed_factory( schedule_id=report_schedule['id'], account_id=installation['owner']['id'], @@ -274,6 +284,44 @@ def test_create_upload_schedule_task( ), ], ) + delay = SECONDS_DELAY + new_delay = SECONDS_DELAY + SECONDS_BACKOFF_FACTOR + p_get_task_payload.assert_has_calls( + [ + call( + trigger_type='onetime', + trigger_data={ + 'date': (_now + timedelta(seconds=delay)).isoformat(), + }, + method_payload={ + 'method': 'process_upload', + 'description': 'This task will download the report from' + ' connect and published it in the respective storage.', + 'parameter': { + 'installation_id': 'EIN-8436-7221-8308', + 'upload_id': f'ULF-{feed1.id.split("-", 1)[-1]}-000', + }, + 'name': 'Process Uploads - PA-000-000', + }, + ), + call( + trigger_type='onetime', + trigger_data={ + 'date': (_now + timedelta(seconds=new_delay)).isoformat(), + }, + method_payload={ + 'method': 'process_upload', + 'description': 'This task will download the report from' + ' connect and published it in the respective storage.', + 'parameter': { + 'installation_id': 'EIN-8436-7221-8308', + 'upload_id': f'ULF-{feed2.id.split("-", 1)[-1]}-000', + }, + 'name': 'Process Uploads - PA-000-000', + }, + ), + ], + ) for idx, zipped in enumerate(zip(uploads, [feed1, feed2])): upload, feed = zipped assert result.status == 'success' @@ -298,6 +346,8 @@ def test_create_upload_schedule_task( f' created for Upload `{uploads[1].id}`: ' f'Will process Report File `{report_file[1]["id"]}`' ) + assert delay == 120 + assert new_delay == 240 def test_create_upload_schedule_task_no_feeds(