Skip to content

Commit

Permalink
CLI: improve parallel processing (#522)
Browse files Browse the repository at this point in the history
* CLI fixes: parallel cores and processing

* fix for other OSes

* fix typo

* restore as_completed
  • Loading branch information
adbar authored Mar 18, 2024
1 parent 3d0c934 commit 2c1b606
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions trafilatura/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
file_processing_pipeline, load_blacklist,
load_input_dict, probe_homepage,
url_processing_pipeline, write_result)
from .settings import DOWNLOAD_THREADS
from .settings import PARALLEL_CORES

# fix output encoding on some systems
try:
Expand Down Expand Up @@ -63,7 +63,7 @@ def parse_args(args):

group1.add_argument('--parallel',
help="specify a number of cores/threads for downloads and/or processing",
type=int, default=DOWNLOAD_THREADS)
type=int, default=PARALLEL_CORES)
group1.add_argument('-b', '--blacklist',
help="file containing unwanted URLs to discard during processing",
type=str)
Expand Down
9 changes: 3 additions & 6 deletions trafilatura/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import string
import sys
import traceback
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
as_completed)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from functools import partial
from os import makedirs, path, walk

Expand All @@ -29,8 +28,7 @@
from .filters import LANGID_FLAG, language_classifier
from .hashing import generate_hash_filename
from .meta import reset_caches
from .settings import (FILE_PROCESSING_CORES, FILENAME_LEN,
MAX_FILES_PER_DIRECTORY, use_config)
from .settings import FILENAME_LEN, MAX_FILES_PER_DIRECTORY, use_config
from .sitemaps import sitemap_search
from .utils import URL_BLACKLIST_REGEX, make_chunks, uniquify_list

Expand Down Expand Up @@ -369,12 +367,11 @@ def url_processing_pipeline(args, url_store):
def file_processing_pipeline(args):
'''Define batches for parallel file processing and perform the extraction'''
filecounter = None
processing_cores = args.parallel or FILE_PROCESSING_CORES
config = use_config(filename=args.config_file)
timeout = config.getint('DEFAULT', 'EXTRACTION_TIMEOUT') or None

# max_tasks_per_child available in Python >= 3.11
with ProcessPoolExecutor(max_workers=processing_cores) as executor:
with ProcessPoolExecutor(max_workers=args.parallel) as executor:
# chunk input: https://github.com/python/cpython/issues/74028
for filebatch in make_chunks(generate_filelist(args.input_dir), MAX_FILES_PER_DIRECTORY):
if filecounter is None and len(filebatch) >= MAX_FILES_PER_DIRECTORY:
Expand Down
9 changes: 6 additions & 3 deletions trafilatura/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@


from configparser import ConfigParser
from os import cpu_count
try:
from os import sched_getaffinity
except ImportError:
sched_getaffinity = None
from os import cpu_count
from pathlib import Path

from lxml.etree import XPath
Expand All @@ -31,13 +35,12 @@ def use_config(filename=None, config=None):
DEFAULT_CONFIG = use_config()

# Safety checks
DOWNLOAD_THREADS = min(cpu_count(), 16) # 16 processes at most
PARALLEL_CORES = min(len(sched_getaffinity(0)) if sched_getaffinity else cpu_count(), 16) # 16 processes at most
LRU_SIZE = 4096

# Files
MAX_FILES_PER_DIRECTORY = 1000
FILENAME_LEN = 8
FILE_PROCESSING_CORES = min(cpu_count(), 16) # 16 processes at most

# Network
MAX_LINKS = 10**6
Expand Down

0 comments on commit 2c1b606

Please sign in to comment.