Skip to content

Commit

Permalink
map CachedArtifactNotFoundError to PreviousStepStillProcessingError (#…
Browse files Browse the repository at this point in the history
…2093)

map CachedArtifactNotFoundError to customError
  • Loading branch information
AndreaFrancis authored Nov 10, 2023
1 parent da54e3f commit 67459d2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
8 changes: 8 additions & 0 deletions libs/libcommon/src/libcommon/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def as_response(self) -> ErrorResponse:
"ParquetResponseEmptyError",
"PreviousStepFormatError",
"PreviousStepStatusError",
"PreviousStepStillProcessingError",
"ResponseAlreadyComputedError",
"RowsPostProcessingError",
"SplitsNamesError",
Expand Down Expand Up @@ -441,6 +442,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStatusError", cause, False)


class PreviousStepStillProcessingError(CacheableError):
"""The previous steps are still being processed."""

def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStillProcessingError", cause, False)


class ResponseAlreadyComputedError(CacheableError):
"""The response has been already computed by another job runner."""

Expand Down
6 changes: 6 additions & 0 deletions services/worker/src/worker/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DatasetScriptError,
JobManagerCrashedError,
JobManagerExceededMaximumDurationError,
PreviousStepStillProcessingError,
ResponseAlreadyComputedError,
TooBigContentError,
UnexpectedError,
Expand All @@ -21,6 +22,7 @@
from libcommon.processing_graph import ProcessingGraph, ProcessingStep
from libcommon.simple_cache import (
CachedArtifactError,
CachedArtifactNotFoundError,
CacheEntryDoesNotExistError,
get_response_without_content_params,
)
Expand Down Expand Up @@ -174,6 +176,10 @@ def process(
"The computed response content exceeds the supported size in bytes"
f" ({self.worker_config.content_max_bytes})."
)
except CachedArtifactNotFoundError as err:
raise PreviousStepStillProcessingError(
message="The previous steps are still being processed", cause=err
)
finally:
# ensure the post_compute hook is called even if the compute raises an exception
self.job_runner.post_compute()
Expand Down

0 comments on commit 67459d2

Please sign in to comment.