diff --git a/client/src/nv_ingest_client/client/client.py b/client/src/nv_ingest_client/client/client.py index 0b2f3017..dadd6809 100644 --- a/client/src/nv_ingest_client/client/client.py +++ b/client/src/nv_ingest_client/client/client.py @@ -14,7 +14,7 @@ from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed -from typing import Any, Type +from typing import Any, Type, Callable from typing import Dict from typing import List from typing import Optional @@ -359,19 +359,23 @@ def fetch_job_result( max_retries: Optional[int] = None, retry_delay: float = 1, verbose: bool = False, + completion_callback: Optional[Callable[[Dict, str], None]] = None, ) -> List[Tuple[Optional[Dict], str]]: """ Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic. Args: - job_ids (List[str]): A list of job IDs to fetch results for. + job_ids (Union[str, List[str]]): A job ID or list of job IDs to fetch results for. timeout (float): Timeout for each fetch operation, in seconds. - max_retries (int): Maximum number of retries for jobs that are not ready yet. + max_retries (Optional[int]): Maximum number of retries for jobs that are not ready yet. retry_delay (float): Delay between retry attempts, in seconds. + verbose (bool): If True, logs additional information. + completion_callback (Optional[Callable[[Dict, str], None]]): A callback function that is executed each time + a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID. Returns: - List[Tuple[Optional[Dict], str]]: A list of tuples containing the job results and job IDs. - If a timeout or error occurs, the result will be None for that job. + List[Tuple[Optional[Dict], str]]: A list of tuples, each containing the job result (or None on failure) and + the job ID. Raises: ValueError: If there is an error in decoding the job result. @@ -393,14 +397,12 @@ def fetch_with_retries(job_id: str): except TimeoutError: if verbose: logger.info( - f"Job {job_id} is not ready. " - f"Retrying {retries + 1}/{max_retries if max_retries else '∞'} " + f"Job {job_id} is not ready. Retrying {retries + 1}/{max_retries if max_retries else '∞'} " f"after {retry_delay} seconds." ) retries += 1 time.sleep(retry_delay) # Wait before retrying except (RuntimeError, Exception) as err: - # For any other error, log and break out of the retry loop logger.error(f"Error while fetching result for job ID {job_id}: {err}") return None, job_id logger.error(f"Max retries exceeded for job {job_id}.") @@ -415,7 +417,12 @@ def fetch_with_retries(job_id: str): job_id = futures[future] try: result, _ = handle_future_result(future, timeout=timeout) - results.append(result.get("data")) + # Append a tuple of (result data, job_id). (Using result.get("data") if result is valid.) + results.append((result.get("data") if result else None, job_id)) + # Run the callback if provided and the result is valid + if completion_callback and result: + completion_callback(result, job_id) + # Clean up the job spec mapping del self._job_index_to_job_spec[job_id] except concurrent.futures.TimeoutError: logger.error( @@ -424,7 +431,7 @@ def fetch_with_retries(job_id: str): ) except json.JSONDecodeError as e: logger.error( - f"Decoding while processing job ID {job_id}: " + f"Decoding error while processing job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}\n{e}" ) except RuntimeError as e: diff --git a/client/src/nv_ingest_client/client/interface.py b/client/src/nv_ingest_client/client/interface.py index 0d4e3b0d..0ca04fa1 100644 --- a/client/src/nv_ingest_client/client/interface.py +++ b/client/src/nv_ingest_client/client/interface.py @@ -8,6 +8,7 @@ import os import shutil import tempfile +from tqdm import tqdm from concurrent.futures import Future from functools import wraps from typing import Any, Union @@ -201,7 +202,7 @@ def load(self, **kwargs) -> "Ingestor": return self - def ingest(self, **kwargs: Any) -> List[Dict[str, Any]]: + def ingest(self, show_progress: bool = False, **kwargs: Any) -> List[Dict[str, Any]]: """ Synchronously submits jobs to the NvIngestClient and fetches the results. @@ -209,6 +210,7 @@ def ingest(self, **kwargs: Any) -> List[Dict[str, Any]]: ---------- kwargs : dict Additional parameters for `submit_job` and `fetch_job_result` methods of NvIngestClient. + Optionally, include 'show_progress' (bool) to display a progress bar while fetching results. Returns ------- @@ -222,12 +224,28 @@ def ingest(self, **kwargs: Any) -> List[Dict[str, Any]]: submit_kwargs = filter_function_kwargs(self._client.submit_job, **kwargs) self._job_states = self._client.submit_job(self._job_ids, self._job_queue_id, **submit_kwargs) + # Pop the show_progress flag from kwargs; default to False if not provided. fetch_kwargs = filter_function_kwargs(self._client.fetch_job_result, **kwargs) + + # If progress display is enabled, create a tqdm progress bar and set a callback to update it. + if show_progress: + pbar = tqdm(total=len(self._job_ids), desc="Processing Documents: ", unit="doc") + + def progress_callback(result: Dict, job_id: str) -> None: + pbar.update(1) + + fetch_kwargs["completion_callback"] = progress_callback + result = self._client.fetch_job_result(self._job_ids, **fetch_kwargs) + + if show_progress and pbar: + pbar.close() + if self._vdb_bulk_upload: self._vdb_bulk_upload.run(result) # only upload as part of jobs user specified this action self._vdb_bulk_upload = None + return result def ingest_async(self, **kwargs: Any) -> Future: diff --git a/src/nv_ingest/util/pipeline/stage_builders.py b/src/nv_ingest/util/pipeline/stage_builders.py index 4f9f152d..6824c431 100644 --- a/src/nv_ingest/util/pipeline/stage_builders.py +++ b/src/nv_ingest/util/pipeline/stage_builders.py @@ -427,6 +427,7 @@ def add_embed_extractions_stage(pipe, morpheus_pipeline_config, ingest_config): {"api_key": api_key, "embedding_nim_endpoint": embedding_nim_endpoint, "embedding_model": embedding_model}, ), ) + embed_extractions_stage = pipe.add_stage( LinearModulesStage( morpheus_pipeline_config, @@ -437,6 +438,7 @@ def add_embed_extractions_stage(pipe, morpheus_pipeline_config, ingest_config): output_port_name="output", ) ) + return embed_extractions_stage