Skip to content

Commit

Permalink
Add large and annoying logging to try and isolate fastapi child proce…
Browse files Browse the repository at this point in the history
…ss death issue
  • Loading branch information
jdye64 committed Jan 23, 2025
1 parent de6983c commit 8982e22
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/nv_ingest/api/v1/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,10 @@ async def submit_job(request: Request, response: Response, job_spec: MessageWrap
)
async def fetch_job(job_id: str, ingest_service: INGEST_SERVICE_T):
try:
# Attempt to fetch the job from the ingest service
# Attempt to fetch the job from the ingest
logger.info(f"!!!!! Attempting to fetch results for job_id: {job_id}")
job_response = await ingest_service.fetch_job(job_id)
logger.info(f"!!!!! job_id: {job_id} ... job_response: {job_response}")
return job_response
except TimeoutError:
# Return a 202 Accepted if the job is not ready yet
Expand Down
2 changes: 2 additions & 0 deletions src/nv_ingest/service/impl/ingest/redis_ingest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ async def submit_job(self, job_spec: MessageWrapper, trace_id: str) -> str:

async def fetch_job(self, job_id: str) -> Any:
# Fetch message with a timeout
logger.info(f"!!!! redis_ingest_service:fetch_job(job_id) job_id: {job_id}")
message = self._ingest_client.fetch_message(f"{job_id}", timeout=5)
logger.info(f"!!!! redis_ingest_service:fetch_job(job_id) job_id: {job_id} message reponse: {message}")
if message is None:
raise TimeoutError()

Expand Down
20 changes: 20 additions & 0 deletions src/nv_ingest/util/message_brokers/redis/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,38 @@ def fetch_message(self, channel_name: str, timeout: float = 10) -> Optional[Unio
while True:
try:
# Attempt to fetch a message from the Redis queue
logger.info(
f"!!!!! entering redis_client:fetch_message(channel_name: {channel_name}, timeout: {timeout})"
)
message, fragment, fragment_count = self._check_response(channel_name, timeout)
logger.info(
f"!!!!! message len: {len(message)}, fragment: {fragment}, fragment_count: {fragment_count}"
)

if message is not None:
logger.info(f"!!!!! Message is not None ...")
if fragment_count == 1:
# No fragmentation, return the message as is
return message

logger.info(
f"!!!!! collected fragments. Existing collected_fragments len: {len(collected_fragments)}"
)
collected_fragments.append(message)

# If we have collected all fragments, combine and return
if len(collected_fragments) == fragment_count:
logger.info(
f"!!!! Collecting fragments .... len(collected_fragments) {len(collected_fragments)} - len(fragment_count): {len(fragment_count)}"
)
# Sort fragments by the 'fragment' field to ensure correct order
collected_fragments.sort(key=lambda x: x["fragment"])

# Combine fragments (assuming they are part of a larger payload)
logger.info(f"!!!!! combining_fragments to reconstruct message ....")
reconstructed_message = self._combine_fragments(collected_fragments)

logger.info(f"!!!! Returning reconstructed_message")
return reconstructed_message

else:
Expand All @@ -240,6 +255,7 @@ def fetch_message(self, channel_name: str, timeout: float = 10) -> Optional[Unio
logger.error(err_msg)
raise ValueError(err_msg)
else:
logger.info(f"!!!! Hitting this case ...")
raise # This is expected in many cases, so re-raise it

except RedisError as err:
Expand All @@ -255,6 +271,7 @@ def fetch_message(self, channel_name: str, timeout: float = 10) -> Optional[Unio
raise ValueError(f"Failed to fetch message from Redis queue after {retries} attempts: {err}")

# Invalidate client to force reconnection on the next try
logger.info(f"!!!! invalidating redis client")
self._client = None

except Exception as e:
Expand All @@ -279,8 +296,10 @@ def _combine_fragments(fragments: List[Dict[str, Any]]) -> Dict:
The combined message as a JSON string, containing 'status', 'description', and combined 'data'.
"""
if not fragments:
logger.info(f"!!!! Fragments list is empty")
raise ValueError("Fragments list is empty")

logger.info(f"!!!! Fragments len: {len(fragments)}")
# Use 'status' and 'description' from the first fragment
combined_message = {
"status": fragments[0]["status"],
Expand All @@ -293,6 +312,7 @@ def _combine_fragments(fragments: List[Dict[str, Any]]) -> Dict:
for fragment in fragments:
combined_message["data"].extend(fragment["data"])

logger.info(f"!!!! Returning combined_message in _combine_fragments")
return combined_message

def submit_message(self, channel_name: str, message: str) -> None:
Expand Down

0 comments on commit 8982e22

Please sign in to comment.