From 07d2f4e884d940e0ad06b9c874779321f1a43ddf Mon Sep 17 00:00:00 2001 From: joshwilson-dbx <133822502+joshwilson-dbx@users.noreply.github.com> Date: Tue, 20 Feb 2024 17:51:30 +0000 Subject: [PATCH 1/2] Use function's keep_results configuration when storing failed jobs results --- arq/worker.py | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 398409b5..8f932298 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -486,7 +486,7 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 args: Tuple[Any, ...] = () kwargs: Dict[Any, Any] = {} - async def job_failed(exc: BaseException) -> None: + async def job_failed(exc: BaseException, function: Optional[Union[Function, CronJob]]) -> None: self.jobs_failed += 1 result_data_ = serialize_result( function=function_name, @@ -502,11 +502,11 @@ async def job_failed(exc: BaseException) -> None: serializer=self.job_serializer, queue_name=self.queue_name, ) - await asyncio.shield(self.finish_failed_job(job_id, result_data_)) + await asyncio.shield(self.finish_failed_job(job_id, result_data_, function)) if not v: logger.warning('job %s expired', job_id) - return await job_failed(JobExecutionFailed('job expired')) + return await job_failed(JobExecutionFailed('job expired'), None) try: function_name, args, kwargs, enqueue_job_try, enqueue_time_ms = deserialize_job_raw( @@ -514,18 +514,20 @@ async def job_failed(exc: BaseException) -> None: ) except SerializationError as e: logger.exception('deserializing job %s failed', job_id) - return await job_failed(e) + return await job_failed(e, None) + + function: Optional[Union[Function, CronJob]] = None + with contextlib.suppress(KeyError): + function = self.functions[function_name] if abort_job: t = (timestamp_ms() - enqueue_time_ms) / 1000 logger.info('%6.2fs ⊘ %s:%s aborted before start', t, job_id, function_name) - return await job_failed(asyncio.CancelledError()) + return await job_failed(asyncio.CancelledError(), function) - try: - function: Union[Function, CronJob] = self.functions[function_name] - except KeyError: + if function is None: logger.warning('job %s, function %r not found', job_id, function_name) - return await job_failed(JobExecutionFailed(f'function {function_name!r} not found')) + return await job_failed(JobExecutionFailed(f'function {function_name!r} not found'), None) if hasattr(function, 'next_run'): # cron_job @@ -558,7 +560,7 @@ async def job_failed(exc: BaseException) -> None: self.queue_name, serializer=self.job_serializer, ) - return await asyncio.shield(self.finish_failed_job(job_id, result_data)) + return await asyncio.shield(self.finish_failed_job(job_id, result_data, function)) result = no_result exc_extra = None @@ -701,7 +703,9 @@ async def finish_job( tr.delete(*delete_keys) # type: ignore[unused-coroutine] await tr.execute() - async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None: + async def finish_failed_job( + self, job_id: str, result_data: Optional[bytes], function: Optional[Union[Function, CronJob]] + ) -> None: async with self.pool.pipeline(transaction=True) as tr: tr.delete( # type: ignore[unused-coroutine] retry_key_prefix + job_id, @@ -710,8 +714,16 @@ async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> ) tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + + keep_result_forever = self.keep_result_forever + keep_result_s = self.keep_result_s + if function is not None: + if function.keep_result_forever is not None: + keep_result_forever = function.keep_result_forever + if function.keep_result_s is not None: + keep_result_s = function.keep_result_s + keep_result = keep_result_forever or keep_result_s > 0 # result_data would only be None if serializing the result fails - keep_result = self.keep_result_forever or self.keep_result_s > 0 if result_data is not None and keep_result: # pragma: no branch expire = 0 if self.keep_result_forever else self.keep_result_s tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine] From a06ea0439697cfbcdfe1a83a5250386d3e17cf5a Mon Sep 17 00:00:00 2001 From: joshwilson-dbx <133822502+joshwilson-dbx@users.noreply.github.com> Date: Tue, 20 Feb 2024 18:17:27 +0000 Subject: [PATCH 2/2] Use closure variable for function, inline with existing code --- arq/worker.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 8f932298..73a6158c 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -483,10 +483,11 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 abort_job = False function_name, enqueue_time_ms = '', 0 + function: Optional[Union[Function, CronJob]] = None args: Tuple[Any, ...] = () kwargs: Dict[Any, Any] = {} - async def job_failed(exc: BaseException, function: Optional[Union[Function, CronJob]]) -> None: + async def job_failed(exc: BaseException) -> None: self.jobs_failed += 1 result_data_ = serialize_result( function=function_name, @@ -506,7 +507,7 @@ async def job_failed(exc: BaseException, function: Optional[Union[Function, Cron if not v: logger.warning('job %s expired', job_id) - return await job_failed(JobExecutionFailed('job expired'), None) + return await job_failed(JobExecutionFailed('job expired')) try: function_name, args, kwargs, enqueue_job_try, enqueue_time_ms = deserialize_job_raw( @@ -514,20 +515,19 @@ async def job_failed(exc: BaseException, function: Optional[Union[Function, Cron ) except SerializationError as e: logger.exception('deserializing job %s failed', job_id) - return await job_failed(e, None) + return await job_failed(e) - function: Optional[Union[Function, CronJob]] = None with contextlib.suppress(KeyError): function = self.functions[function_name] if abort_job: t = (timestamp_ms() - enqueue_time_ms) / 1000 logger.info('%6.2fs ⊘ %s:%s aborted before start', t, job_id, function_name) - return await job_failed(asyncio.CancelledError(), function) + return await job_failed(asyncio.CancelledError()) if function is None: logger.warning('job %s, function %r not found', job_id, function_name) - return await job_failed(JobExecutionFailed(f'function {function_name!r} not found'), None) + return await job_failed(JobExecutionFailed(f'function {function_name!r} not found')) if hasattr(function, 'next_run'): # cron_job