Skip to content

Commit

Permalink
Add simple progress bar to synchronous ingestor.ingest() call (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
drobison00 authored Feb 5, 2025
1 parent bd6c0ac commit 29732eb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
27 changes: 17 additions & 10 deletions client/src/nv_ingest_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from typing import Any, Type
from typing import Any, Type, Callable
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -359,19 +359,23 @@ def fetch_job_result(
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
completion_callback: Optional[Callable[[Dict, str], None]] = None,
) -> List[Tuple[Optional[Dict], str]]:
"""
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
Args:
job_ids (List[str]): A list of job IDs to fetch results for.
job_ids (Union[str, List[str]]): A job ID or list of job IDs to fetch results for.
timeout (float): Timeout for each fetch operation, in seconds.
max_retries (int): Maximum number of retries for jobs that are not ready yet.
max_retries (Optional[int]): Maximum number of retries for jobs that are not ready yet.
retry_delay (float): Delay between retry attempts, in seconds.
verbose (bool): If True, logs additional information.
completion_callback (Optional[Callable[[Dict, str], None]]): A callback function that is executed each time
a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.
Returns:
List[Tuple[Optional[Dict], str]]: A list of tuples containing the job results and job IDs.
If a timeout or error occurs, the result will be None for that job.
List[Tuple[Optional[Dict], str]]: A list of tuples, each containing the job result (or None on failure) and
the job ID.
Raises:
ValueError: If there is an error in decoding the job result.
Expand All @@ -393,14 +397,12 @@ def fetch_with_retries(job_id: str):
except TimeoutError:
if verbose:
logger.info(
f"Job {job_id} is not ready. "
f"Retrying {retries + 1}/{max_retries if max_retries else '∞'} "
f"Job {job_id} is not ready. Retrying {retries + 1}/{max_retries if max_retries else '∞'} "
f"after {retry_delay} seconds."
)
retries += 1
time.sleep(retry_delay) # Wait before retrying
except (RuntimeError, Exception) as err:
# For any other error, log and break out of the retry loop
logger.error(f"Error while fetching result for job ID {job_id}: {err}")
return None, job_id
logger.error(f"Max retries exceeded for job {job_id}.")
Expand All @@ -415,7 +417,12 @@ def fetch_with_retries(job_id: str):
job_id = futures[future]
try:
result, _ = handle_future_result(future, timeout=timeout)
results.append(result.get("data"))
# Append a tuple of (result data, job_id). (Using result.get("data") if result is valid.)
results.append((result.get("data") if result else None, job_id))
# Run the callback if provided and the result is valid
if completion_callback and result:
completion_callback(result, job_id)
# Clean up the job spec mapping
del self._job_index_to_job_spec[job_id]
except concurrent.futures.TimeoutError:
logger.error(
Expand All @@ -424,7 +431,7 @@ def fetch_with_retries(job_id: str):
)
except json.JSONDecodeError as e:
logger.error(
f"Decoding while processing job ID {job_id}: "
f"Decoding error while processing job ID {job_id}: "
f"{self._job_index_to_job_spec[job_id].source_id}\n{e}"
)
except RuntimeError as e:
Expand Down
20 changes: 19 additions & 1 deletion client/src/nv_ingest_client/client/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import shutil
import tempfile
from tqdm import tqdm
from concurrent.futures import Future
from functools import wraps
from typing import Any, Union
Expand Down Expand Up @@ -201,14 +202,15 @@ def load(self, **kwargs) -> "Ingestor":

return self

def ingest(self, **kwargs: Any) -> List[Dict[str, Any]]:
def ingest(self, show_progress: bool = False, **kwargs: Any) -> List[Dict[str, Any]]:
"""
Synchronously submits jobs to the NvIngestClient and fetches the results.
Parameters
----------
kwargs : dict
Additional parameters for `submit_job` and `fetch_job_result` methods of NvIngestClient.
Optionally, include 'show_progress' (bool) to display a progress bar while fetching results.
Returns
-------
Expand All @@ -222,12 +224,28 @@ def ingest(self, **kwargs: Any) -> List[Dict[str, Any]]:
submit_kwargs = filter_function_kwargs(self._client.submit_job, **kwargs)
self._job_states = self._client.submit_job(self._job_ids, self._job_queue_id, **submit_kwargs)

# Pop the show_progress flag from kwargs; default to False if not provided.
fetch_kwargs = filter_function_kwargs(self._client.fetch_job_result, **kwargs)

# If progress display is enabled, create a tqdm progress bar and set a callback to update it.
if show_progress:
pbar = tqdm(total=len(self._job_ids), desc="Processing Documents: ", unit="doc")

def progress_callback(result: Dict, job_id: str) -> None:
pbar.update(1)

fetch_kwargs["completion_callback"] = progress_callback

result = self._client.fetch_job_result(self._job_ids, **fetch_kwargs)

if show_progress and pbar:
pbar.close()

if self._vdb_bulk_upload:
self._vdb_bulk_upload.run(result)
# only upload as part of jobs user specified this action
self._vdb_bulk_upload = None

return result

def ingest_async(self, **kwargs: Any) -> Future:
Expand Down
2 changes: 2 additions & 0 deletions src/nv_ingest/util/pipeline/stage_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def add_embed_extractions_stage(pipe, morpheus_pipeline_config, ingest_config):
{"api_key": api_key, "embedding_nim_endpoint": embedding_nim_endpoint, "embedding_model": embedding_model},
),
)

embed_extractions_stage = pipe.add_stage(
LinearModulesStage(
morpheus_pipeline_config,
Expand All @@ -437,6 +438,7 @@ def add_embed_extractions_stage(pipe, morpheus_pipeline_config, ingest_config):
output_port_name="output",
)
)

return embed_extractions_stage


Expand Down

0 comments on commit 29732eb

Please sign in to comment.