Skip to content

Commit

Permalink
switch to processpoolexecutor to be able to properly kill runaway docs
Browse files Browse the repository at this point in the history
  • Loading branch information
guipenedo committed Jan 1, 2025
1 parent 47379fd commit 6e9af63
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions src/datatrove/pipeline/extractors/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

from datatrove.data import DocumentsPipeline
from datatrove.pipeline.base import PipelineStep
Expand All @@ -21,6 +21,7 @@ def __init__(self, timeout: float = 0.1):
"""
super().__init__()
self.timeout = timeout
self._warned_error = False

@abstractmethod
def extract(self, text: str) -> str:
Expand All @@ -45,19 +46,29 @@ def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> Do
Returns:
"""
with ThreadPoolExecutor() as executor: # more reliable than using signal for timeouts
with ProcessPoolExecutor(
max_workers=1
) as executor: # Single process. We use ProcessPool instead of ThreadPool
# to be able to kill timed out tasks, lest them continue running for minutes and OOM
for doc in data:
self.stat_update(StatHints.total)
with self.track_time():
future = executor.submit(self.extract, doc.text)
try:
doc.text = future.result(timeout=self.timeout)
except TimeoutError:
future.cancel() # Kill the timed-out process
logger.warning("⏰ Timeout while cleaning record text. Skipping record.")
continue
except Exception as e:
logger.warning(f'❌ Error "{e}" while cleaning record text. Skipping record.')
future.cancel() # Good practice to cancel on other errors too
if not self._warned_error:
logger.warning(
f'❌ Error "{e}" while cleaning record text. Skipping record. This message will only appear once.'
)
self._warned_error = True
continue

if doc.text:
self.stat_update(StatHints.forwarded)
self.update_doc_stats(doc)
Expand Down

0 comments on commit 6e9af63

Please sign in to comment.