Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch all IO to fsspec #49

Merged
merged 34 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5b1d453
batched tokenization
thomwolf Dec 17, 2023
e381c34
revert stats change
guipenedo Dec 18, 2023
4dc7af6
rewrite to only use fsspec
guipenedo Jan 10, 2024
3576bcd
fixed tests
guipenedo Jan 11, 2024
0661ab3
fixed minhash s3 support
guipenedo Jan 11, 2024
e3ea72d
bugfix tests
guipenedo Jan 11, 2024
d6e463d
removed mmaps from tokenizer - now streaming with seek only
guipenedo Jan 12, 2024
5c281e6
try to optimize caching on the streaming a bit
guipenedo Jan 12, 2024
5d517fb
nit
guipenedo Jan 12, 2024
9a612f2
small bugfix
guipenedo Jan 12, 2024
0ea10ad
some qol improvements
guipenedo Jan 12, 2024
244dd06
fixing bugs in slurm with new io
guipenedo Jan 12, 2024
5cf9de5
bugfix fetching list of completed tasks
guipenedo Jan 12, 2024
0cb380e
more executor bugfixes
guipenedo Jan 12, 2024
93bb0e8
change absolute paths of local files
guipenedo Jan 12, 2024
9d59e8b
fix creating log folder
guipenedo Jan 12, 2024
bc4119b
fix creating stats dir
guipenedo Jan 12, 2024
5e8f6b9
more executor io bugfixes
guipenedo Jan 12, 2024
230fc55
bugfix merge_stats
guipenedo Jan 12, 2024
4c57ed8
ensure list_files determinism
guipenedo Jan 16, 2024
e42f152
Update src/datatrove/datafolder.py
guipenedo Jan 16, 2024
ab75d88
address review comments
guipenedo Jan 16, 2024
d1fe562
local_working_dir to improve tokenization shuffling performance
guipenedo Jan 16, 2024
11a2d9a
add (str path, fully initialized fs object) option to `get_datafolder`
guipenedo Jan 17, 2024
11804dc
implement auto_mkdir
guipenedo Jan 17, 2024
5d62aed
changed resolve_paths and added a first io test
guipenedo Jan 17, 2024
e07c67e
added missing test dependency
guipenedo Jan 17, 2024
ca79c08
auto_mkdirs changes and test
guipenedo Jan 17, 2024
de3508a
added some more tests. changed local executor to use forkserver as ot…
guipenedo Jan 18, 2024
5adc77b
don't pin dependencies
guipenedo Jan 18, 2024
65f2041
Merge branch 'main' into io-fsspec
guipenedo Jan 18, 2024
8a4a2d9
some more dependency changes
guipenedo Jan 18, 2024
d9a16ff
add bogus auth credentials
guipenedo Jan 18, 2024
e6c2369
explicitly remove directories from list_files
guipenedo Jan 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@

install_requires = [
"huggingface-hub>=0.17.0",
"boto3==1.28.78",
"faust-cchardet==2.1.19",
"inscriptis==2.3.2",
"loguru==0.7.0",
"multiprocess==0.70.14",
"nltk==3.8.1",
"boto3>=1.33.13",
"faust-cchardet>=2.1.19",
"inscriptis>=2.3.2",
"loguru>=0.7.0",
"multiprocess>=0.70.14",
"nltk>=3.8.1",
"numpy>=1.25.0",
"python-magic==0.4.27",
"python-magic>=0.4.27",
"readability-lxml @ git+https://github.com/huggingface/python-readability.git@speedup",
"trafilatura==1.6.1",
"warcio==1.7.4",
"zstandard==0.21.0",
"trafilatura>=1.6.1",
"warcio>=1.7.4",
"pyarrow>=12.0.1",
"tokenizers>=0.13.3",
"tldextract==3.4.4",
"pandas==2.0.3",
"backoff==2.2.1",
"fsspec==2023.9.2",
"humanize==4.8.0",
"rich==13.7.0",
"tldextract>=3.4.4",
"pandas>=2.0.3",
"backoff>=2.2.1",
"fsspec>=2023.9.2",
"humanize>=4.8.0",
"rich>=13.7.0",
]

extras = {}
Expand All @@ -37,6 +36,8 @@
"pytest-xdist",
# Optional dependencies
"fasttext-wheel",
"moto[s3,server]",
"s3fs>=2023.12.2",
]

extras["all"] = extras["quality"] + extras["tests"]
Expand Down
27 changes: 11 additions & 16 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from loguru import logger

from datatrove.io import BaseOutputDataFolder, LocalOutputDataFolder
from datatrove.io import DataFolderLike, get_datafolder
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import add_task_logger, close_task_logger, get_random_str, get_timestamp, log_pipeline
from datatrove.utils.stats import PipelineStats
Expand All @@ -18,7 +18,7 @@ class PipelineExecutor(ABC):
def __init__(
self,
pipeline: list[PipelineStep | Callable],
logging_dir: BaseOutputDataFolder = None,
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
):
"""
Expand All @@ -31,11 +31,7 @@ def __init__(
previous runs. default: True
"""
self.pipeline: list[PipelineStep | Callable] = pipeline
if isinstance(logging_dir, str):
logging_dir = BaseOutputDataFolder.from_path(logging_dir)
self.logging_dir = (
logging_dir if logging_dir else LocalOutputDataFolder(f"logs/{get_timestamp()}_{get_random_str()}")
)
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed

@abstractmethod
Expand All @@ -51,7 +47,7 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
if self.is_rank_completed(rank):
logger.info(f"Skipping {rank=} as it has already been completed.")
return PipelineStats()
add_task_logger(self.logging_dir, rank, local_rank)
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)
try:
# pipe data from one step to the next
Expand All @@ -70,27 +66,26 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:

# stats
stats = PipelineStats(self.pipeline)
stats.save_to_disk(self.logging_dir.open(f"stats/{rank:05d}.json"))
with self.logging_dir.open(f"stats/{rank:05d}.json", "w") as f:
stats.save_to_disk(f)
logger.info(stats.get_repr(f"Task {rank}"))
# completed
self.mark_rank_as_completed(rank)
except Exception as e:
logger.exception(e)
raise e
finally:
close_task_logger(self.logging_dir, rank)
close_task_logger(logfile)
return stats

def is_rank_completed(self, rank: int):
return self.skip_completed and self.logging_dir.to_input_folder().file_exists(f"completions/{rank:05d}")
return self.skip_completed and self.logging_dir.isfile(f"completions/{rank:05d}")

def mark_rank_as_completed(self, rank: int):
self.logging_dir.open(f"completions/{rank:05d}").close()
self.logging_dir.open(f"completions/{rank:05d}", "w").close()

def get_incomplete_ranks(self):
completed = {
file.relative_path for file in self.logging_dir.to_input_folder().list_files(suffix="completions")
}
completed = set(self.logging_dir.list_files("completions"))
return list(
filter(
lambda rank: not self.skip_completed or f"completions/{rank:05d}" not in completed,
Expand All @@ -104,7 +99,7 @@ def to_json(self, indent=4):
return json.dumps(data, indent=indent)

def save_executor_as_json(self, indent: int = 4):
with self.logging_dir.open("executor.json") as f:
with self.logging_dir.open("executor.json", "w") as f:
json.dump(self, f, cls=ExecutorJSONEncoder, indent=indent)


Expand Down
82 changes: 33 additions & 49 deletions src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,25 @@
from copy import deepcopy
from multiprocessing import Value
from functools import partial
from typing import Callable

import multiprocess.pool
import multiprocess
from loguru import logger
from multiprocess import Queue, Semaphore

from datatrove.executor.base import PipelineExecutor
from datatrove.io import BaseOutputDataFolder
from datatrove.io import DataFolderLike
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.stats import PipelineStats


# multiprocessing vars
download_semaphore, upload_semaphore, ranks_queue, completed = None, None, None, None


def init_pool_processes(dl_sem, up_sem, ranks_q, completed_counter):
global download_semaphore, upload_semaphore, ranks_queue, completed
download_semaphore = dl_sem
upload_semaphore = up_sem
ranks_queue = ranks_q
completed = completed_counter


class LocalPipelineExecutor(PipelineExecutor):
def __init__(
self,
pipeline: list[PipelineStep | Callable],
tasks: int = 1,
workers: int = -1,
max_concurrent_uploads: int = 20,
max_concurrent_downloads: int = 50,
logging_dir: BaseOutputDataFolder | str = None,
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
start_method: str = "forkserver",
):
"""Execute a pipeline locally

Expand All @@ -44,43 +30,36 @@ def __init__(
tasks: total number of tasks to run the pipeline on
workers: how many tasks to run simultaneously. -1 for no
limit
max_concurrent_uploads: limit the number of files that may
be uploaded simultaneously to avoid rate limits
max_concurrent_downloads: limit the number of files that may
be downloaded simultaneously to avoid rate limits
logging_dir: where to save logs, stats, etc. Should be an
OutputDataFolder or a str. If str, BaseOutputDataFolder.from_path(value) will be used to convert it
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
start_method: method to use to spawn a multiprocessing Pool
"""
super().__init__(pipeline, logging_dir, skip_completed)
self.tasks = tasks
self.workers = workers if workers != -1 else tasks
self.max_concurrent_uploads = max_concurrent_uploads
self.max_concurrent_downloads = max_concurrent_downloads
self.start_method = start_method

def _run_for_rank(self, rank: int, local_rank: int = -1):
if self.workers > 1:
for pipeline_step in self.pipeline:
if isinstance(pipeline_step, PipelineStep):
pipeline_step.set_up_dl_locks(download_semaphore, upload_semaphore)
local_rank = ranks_queue.get()
def _launch_run_for_rank(self, rank: int, ranks_q, completed=None, completed_lock=None):
local_rank = ranks_q.get()
try:
return super()._run_for_rank(rank, local_rank)
return self._run_for_rank(rank, local_rank)
finally:
if completed:
with completed.get_lock():
if completed and completed_lock:
with completed_lock:
completed.value += 1
logger.info(f"{completed.value}/{self.world_size} tasks completed.")
ranks_queue.put(local_rank) # free up used rank
ranks_q.put(local_rank) # free up used rank

def run(self):
if all(map(self.is_rank_completed, range(self.tasks))):
logger.info(f"Not doing anything as all {self.tasks} tasks have already been completed.")
return

self.save_executor_as_json()
ranks_q = Queue()
mg = multiprocess.Manager()
ranks_q = mg.Queue()
for i in range(self.workers):
ranks_q.put(i)

Expand All @@ -89,27 +68,32 @@ def run(self):
logger.info(f"Skipping {skipped} already completed tasks")

if self.workers == 1:
global ranks_queue
ranks_queue = ranks_q
pipeline = self.pipeline
stats = []
for rank in ranks_to_run:
self.pipeline = deepcopy(pipeline)
stats.append(self._run_for_rank(rank))
stats.append(self._launch_run_for_rank(rank, ranks_q))
else:
dl_sem = Semaphore(self.max_concurrent_downloads)
up_sem = Semaphore(self.max_concurrent_uploads)
completed_counter = Value("i", skipped)

with multiprocess.Pool(
self.workers, initializer=init_pool_processes, initargs=(dl_sem, up_sem, ranks_q, completed_counter)
) as pool:
stats = list(pool.imap_unordered(self._run_for_rank, ranks_to_run))
completed_counter = mg.Value("i", skipped)
completed_lock = mg.Lock()
ctx = multiprocess.get_context(self.start_method)
with ctx.Pool(self.workers) as pool:
stats = list(
pool.imap_unordered(
partial(
self._launch_run_for_rank,
ranks_q=ranks_q,
completed=completed_counter,
completed_lock=completed_lock,
),
ranks_to_run,
)
)
# merged stats
stats = sum(stats, start=PipelineStats())
stats.save_to_disk(self.logging_dir.open("stats.json"))
with self.logging_dir.open("stats.json", "wt") as statsfile:
stats.save_to_disk(statsfile)
logger.success(stats.get_repr(f"All {self.tasks} tasks"))
self.logging_dir.close()
return stats

@property
Expand Down
31 changes: 16 additions & 15 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import dataclasses
import json
import os
import random
Expand All @@ -13,10 +12,11 @@

import dill
from dill import CONTENTS_FMODE
from fsspec.implementations.local import LocalFileSystem
from loguru import logger

from datatrove.executor.base import PipelineExecutor
from datatrove.io import BaseOutputDataFolder
from datatrove.io import DataFolderLike
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import get_random_str, get_timestamp

Expand All @@ -43,7 +43,7 @@ def __init__(
sbatch_args: dict | None = None,
max_array_size: int = 1001,
depends: SlurmPipelineExecutor | None = None,
logging_dir: str | BaseOutputDataFolder = None,
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
slurm_logs_folder: str = None,
max_array_launch_parallel: bool = False,
Expand Down Expand Up @@ -117,30 +117,30 @@ def __init__(
self.slurm_logs_folder = (
slurm_logs_folder
if slurm_logs_folder
else f"slurm_logs/{self.job_name}/{get_timestamp()}_{get_random_str()}"
else (
f"slurm_logs/{self.job_name}/{get_timestamp()}_{get_random_str()}"
if not isinstance(self.logging_dir.fs, LocalFileSystem)
else self.logging_dir.resolve_paths("slurm_logs")
)
)

def run(self):
if "SLURM_ARRAY_TASK_ID" in os.environ:
slurm_rank = int(os.environ["SLURM_ARRAY_TASK_ID"]) + self.max_array_size * int(
os.environ.get("RUN_OFFSET", 0)
)
with self.logging_dir.to_input_folder().get_file("ranks_to_run.json").open() as ranks_to_run_file:
with self.logging_dir.open("ranks_to_run.json", "r") as ranks_to_run_file:
all_ranks = json.load(ranks_to_run_file)
if slurm_rank >= len(all_ranks):
return
rank = all_ranks[slurm_rank]
if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
self._run_for_rank(rank)
self.logging_dir.close() # make sure everything is properly saved (logs etc)
else:
self.launch_job()

def launch_merge_stats(self):
stats_json_file = self.logging_dir.create_new_file("stats.json")
# dump outputfile
options = [f"{k}={v}" for k, v in dataclasses.asdict(stats_json_file).items() if not k.startswith("_")]
launch_slurm_job(
self.get_launch_file_contents(
{
Expand All @@ -149,7 +149,8 @@ def launch_merge_stats(self):
"mem-per-cpu": "1G",
"dependency": f"afterok:{self.job_id}",
},
f'merge_stats {os.path.join(self.logging_dir.path, "stats")} {" ".join(options)}',
f'merge_stats {self.logging_dir.resolve_paths("stats")} '
f'-o {self.logging_dir.resolve_paths("stats.json")}',
)
)

Expand Down Expand Up @@ -187,21 +188,21 @@ def launch_job(self):
dill.dump(executor, executor_f, fmode=CONTENTS_FMODE)
self.save_executor_as_json()

with self.logging_dir.open("ranks_to_run.json") as ranks_to_run_file:
with self.logging_dir.open("ranks_to_run.json", "w") as ranks_to_run_file:
# we actually save this (only once) to avoid race conditions
json.dump(ranks_to_run, ranks_to_run_file)

max_array = min(len(ranks_to_run), self.max_array_size) if self.max_array_size != -1 else len(ranks_to_run)

launch_file_contents = self.get_launch_file_contents(
self.get_sbatch_args(max_array),
f"srun -l launch_pickled_pipeline {executor_f.path}",
f"srun -l launch_pickled_pipeline {self.logging_dir.resolve_paths('executor.pik')}",
)
with self.logging_dir.open("launch_script.slurm") as launchscript_f:
with self.logging_dir.open("launch_script.slurm", "w") as launchscript_f:
launchscript_f.write(launch_file_contents)
logger.info(
f"Launching Slurm job {self.job_name} ({len(ranks_to_run)} tasks) with launch script "
f'"{launchscript_f.path}"'
f'"{self.logging_dir.resolve_paths("launch_script.slurm")}"'
)

launched_jobs = 0
Expand All @@ -215,9 +216,9 @@ def launch_job(self):
launched_jobs += 1
logger.info(f"Slurm job launched successfully with (last) id={self.job_id}.")
self.launch_merge_stats()
self.logging_dir.close()

def get_sbatch_args(self, max_array: int = 1) -> dict:
# this one we actually have to create as slurm will be writing here
os.makedirs(self.slurm_logs_folder, exist_ok=True)
slurm_logfile = os.path.join(self.slurm_logs_folder, "%A_%a.out")
return {
Expand Down
Loading