From dc111d8391c2b3c53f08cb55fea9b6d28896c510 Mon Sep 17 00:00:00 2001 From: guipenedo Date: Fri, 10 Nov 2023 16:55:28 +0000 Subject: [PATCH] bugfixes with logging + save a dump of the executor in json format --- examples/other sources/tokenize_the_pile.py | 32 +++++------- src/datatrove/executor/base.py | 57 ++++++++++++++++----- src/datatrove/executor/local.py | 29 +++++++---- src/datatrove/executor/slurm.py | 4 +- src/datatrove/io/base.py | 15 ++---- src/datatrove/io/fsspec.py | 4 +- src/datatrove/io/local.py | 4 +- src/datatrove/io/s3.py | 21 ++++---- src/datatrove/utils/logging.py | 5 ++ 9 files changed, 105 insertions(+), 66 deletions(-) diff --git a/examples/other sources/tokenize_the_pile.py b/examples/other sources/tokenize_the_pile.py index 72e364de..c4292035 100644 --- a/examples/other sources/tokenize_the_pile.py +++ b/examples/other sources/tokenize_the_pile.py @@ -1,36 +1,32 @@ -import os.path - from datatrove.executor.base import PipelineExecutor -from datatrove.executor.slurm import SlurmPipelineExecutor -from datatrove.io import LocalOutputDataFolder, S3InputDataFolder +from datatrove.executor.local import LocalPipelineExecutor +from datatrove.io import LocalOutputDataFolder, S3InputDataFolder, S3OutputDataFolder from datatrove.pipeline.readers import JsonlReader from datatrove.pipeline.tokens.tokenizer import DocumentTokenizer -def format_adapter(d: dict, path: str, li: int): - return { - "content": d["text"], - "data_id": f"{os.path.splitext(os.path.basename(path))[0]}_{li}", - "metadata": d["meta"], - } - - pipeline = [ JsonlReader( S3InputDataFolder("s3://bigcode-experiments/the-pile-sharded/", stream=True), gzip=False, - adapter=format_adapter, + content_key="text", + limit=100, ), DocumentTokenizer( - LocalOutputDataFolder(path="/fsx/guilherme/the-pile/tokenized"), save_filename="the_pile_tokenized" + LocalOutputDataFolder(path="/home/gui/hf_dev/datatrove/examples_test/piletokenized-test/tokenized"), + save_filename="the_pile_tokenized", ), ] -executor: PipelineExecutor = SlurmPipelineExecutor( +executor: PipelineExecutor = LocalPipelineExecutor( pipeline=pipeline, tasks=20, - time="12:00:00", - partition="dev-cluster", - logging_dir="/fsx/guilherme/logs/tokenize_the_pile", + workers=20, + skip_completed=False, + logging_dir=S3OutputDataFolder( + "s3://extreme-scale-dp-temp/logs/tests/piletokenized/tokenized", + local_path="/home/gui/hf_dev/datatrove/examples_test/piletokenized-test/logs", + cleanup=False, + ), ) executor.run() diff --git a/src/datatrove/executor/base.py b/src/datatrove/executor/base.py index 16e5e0b2..5b08df27 100644 --- a/src/datatrove/executor/base.py +++ b/src/datatrove/executor/base.py @@ -1,3 +1,5 @@ +import dataclasses +import json from abc import ABC, abstractmethod from collections import deque from collections.abc import Sequence @@ -7,20 +9,26 @@ from datatrove.io import BaseOutputDataFolder, LocalOutputDataFolder from datatrove.pipeline.base import PipelineStep -from datatrove.utils.logging import add_task_logger, get_random_str, get_timestamp +from datatrove.utils.logging import add_task_logger, close_task_logger, get_random_str, get_timestamp from datatrove.utils.stats import PipelineStats class PipelineExecutor(ABC): @abstractmethod - def __init__(self, pipeline: list[PipelineStep | Callable], logging_dir: BaseOutputDataFolder = None): + def __init__( + self, + pipeline: list[PipelineStep | Callable], + logging_dir: BaseOutputDataFolder = None, + skip_completed: bool = True, + ): self.pipeline: list[PipelineStep | Callable] = pipeline self.logging_dir = ( logging_dir if logging_dir else LocalOutputDataFolder(f"logs/{get_timestamp()}_{get_random_str()}") ) + self.skip_completed = skip_completed - pipeline = "\n".join([pipe.__repr__() if callable(pipe) else "Sequence..." for pipe in self.pipeline]) - print(f"--- 🛠️PIPELINE 🛠\n{pipeline}") + # pipeline = "\n".join([pipe.__repr__() if callable(pipe) else "Sequence..." for pipe in self.pipeline]) + # print(f"--- 🛠️PIPELINE 🛠\n{pipeline}") @abstractmethod def run(self): @@ -36,8 +44,8 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats: logger.info(f"Skipping {rank=} as it has already been completed.") return PipelineStats() # todo: fetch the original stats file (?) add_task_logger(self.logging_dir, rank, local_rank) - # pipe data from one step to the next try: + # pipe data from one step to the next pipelined_data = None for pipeline_step in self.pipeline: if callable(pipeline_step): @@ -48,19 +56,44 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats: raise ValueError if pipelined_data: deque(pipelined_data, maxlen=0) + + logger.info(f"Processing done for {rank=}") + # stats + stats = PipelineStats( + [pipeline_step.stats for pipeline_step in self.pipeline if isinstance(pipeline_step, PipelineStep)] + ) + stats.save_to_disk(self.logging_dir.open(f"stats/{rank:05d}.json")) + # completed + self.mark_rank_as_completed(rank) except Exception as e: logger.exception(e) raise e - logger.info(f"Processing done for {rank=}") - stats = PipelineStats( - [pipeline_step.stats for pipeline_step in self.pipeline if isinstance(pipeline_step, PipelineStep)] - ) - stats.save_to_disk(self.logging_dir.open(f"stats/{rank:05d}.json")) - self.mark_rank_as_completed(rank) + finally: + close_task_logger(self.logging_dir, rank) return stats def is_rank_completed(self, rank: int): - return self.logging_dir.to_input_folder().file_exists(f"completions/{rank:05d}") + return self.skip_completed and self.logging_dir.to_input_folder().file_exists(f"completions/{rank:05d}") def mark_rank_as_completed(self, rank: int): self.logging_dir.open(f"completions/{rank:05d}").close() + + def to_json(self, indent=4): + data = self.__dict__ + data["pipeline"] = [{a: b for a, b in x.__dict__.items() if a != "stats"} for x in data["pipeline"]] + return json.dumps(data, indent=indent) + + def save_executor_as_json(self, indent: int = 4): + with self.logging_dir.open("executor.json") as f: + json.dump(self, f, cls=ExecutorJSONEncoder, indent=indent) + + +class ExecutorJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + if isinstance(o, PipelineExecutor): + return o.__dict__ + if isinstance(o, PipelineStep): + return {a: b for a, b in o.__dict__.items() if a != "stats"} + return str(o) diff --git a/src/datatrove/executor/local.py b/src/datatrove/executor/local.py index b162ec31..f53c8a6c 100644 --- a/src/datatrove/executor/local.py +++ b/src/datatrove/executor/local.py @@ -9,13 +9,14 @@ from datatrove.pipeline.base import PipelineStep -download_semaphore, upload_semaphore = None, None +download_semaphore, upload_semaphore, ranks_queue = None, None, None -def init_pool_processes(dl_sem, up_sem): - global download_semaphore, upload_semaphore +def init_pool_processes(dl_sem, up_sem, ranks_q): + global download_semaphore, upload_semaphore, ranks_queue download_semaphore = dl_sem upload_semaphore = up_sem + ranks_queue = ranks_q class LocalPipelineExecutor(PipelineExecutor): @@ -27,9 +28,9 @@ def __init__( max_concurrent_uploads: int = 20, max_concurrent_downloads: int = 50, logging_dir: BaseOutputDataFolder = None, + skip_completed: bool = True, ): - super().__init__(pipeline, logging_dir) - self.local_ranks = None + super().__init__(pipeline, logging_dir, skip_completed) self.tasks = tasks self.workers = workers if workers != -1 else tasks self.max_concurrent_uploads = max_concurrent_uploads @@ -40,16 +41,17 @@ def _run_for_rank(self, rank: int, local_rank: int = -1): for pipeline_step in self.pipeline: if isinstance(pipeline_step, PipelineStep): pipeline_step.set_up_dl_locks(download_semaphore, upload_semaphore) - local_rank = self.local_ranks.get() + local_rank = ranks_queue.get() try: return super()._run_for_rank(rank, local_rank) finally: - self.local_ranks.put(local_rank) # free up used rank + ranks_queue.put(local_rank) # free up used rank def run(self): - self.local_ranks = Queue() + self.save_executor_as_json() + ranks_q = Queue() for i in range(self.workers): - self.local_ranks.put(i) + ranks_q.put(i) if self.workers == 1: pipeline = self.pipeline @@ -61,9 +63,14 @@ def run(self): dl_sem = Semaphore(self.max_concurrent_downloads) up_sem = Semaphore(self.max_concurrent_uploads) - with multiprocess.Pool(self.workers, initializer=init_pool_processes, initargs=(dl_sem, up_sem)) as pool: + with multiprocess.Pool( + self.workers, initializer=init_pool_processes, initargs=(dl_sem, up_sem, ranks_q) + ) as pool: stats = list(pool.map(self._run_for_rank, range(self.tasks))) - return sum(stats) + stats = sum(stats) + stats.save_to_disk(self.logging_dir.open("stats.json")) + self.logging_dir.close() + return stats @property def world_size(self): diff --git a/src/datatrove/executor/slurm.py b/src/datatrove/executor/slurm.py index d4d9c448..0963ce6d 100644 --- a/src/datatrove/executor/slurm.py +++ b/src/datatrove/executor/slurm.py @@ -38,6 +38,7 @@ def __init__( max_array_size: int = 1001, depends: SlurmPipelineExecutor | None = None, logging_dir: BaseOutputDataFolder = None, + skip_completed: bool = True, ): """ :param tasks: total number of tasks to run @@ -51,7 +52,7 @@ def __init__( """ if isinstance(logging_dir, S3OutputDataFolder): logging_dir.cleanup = False # if the files are removed from disk job launch will fail - super().__init__(pipeline, logging_dir) + super().__init__(pipeline, logging_dir, skip_completed) self.tasks = tasks self.workers = workers self.partition = partition @@ -125,6 +126,7 @@ def launch_job(self): # pickle with self.logging_dir.open("executor.pik", "wb") as executor_f: dill.dump(self, executor_f, fmode=CONTENTS_FMODE) + self.save_executor_as_json() with self.logging_dir.open("launch_script.slurm") as launchscript_f: launchscript_f.write( diff --git a/src/datatrove/io/base.py b/src/datatrove/io/base.py index 384cfee7..37b5431e 100644 --- a/src/datatrove/io/base.py +++ b/src/datatrove/io/base.py @@ -11,6 +11,7 @@ import numpy as np import zstandard from loguru import logger +from multiprocess.synchronize import SemLock from datatrove.io.utils.fsspec import valid_fsspec_path @@ -28,7 +29,7 @@ class BaseInputDataFile: path: str relative_path: str - folder: "BaseInputDataFolder" = None + _lock: SemLock | contextlib.nullcontext = field(default_factory=contextlib.nullcontext) @classmethod def from_path(cls, path: str, **kwargs): @@ -133,7 +134,7 @@ class BaseInputDataFolder(ABC): extension: str | list[str] = None recursive: bool = True match_pattern: str = None - _lock = contextlib.nullcontext() + _lock: SemLock | contextlib.nullcontext = field(default_factory=contextlib.nullcontext) @classmethod def from_path(cls, path, **kwargs): @@ -284,9 +285,9 @@ class BaseOutputDataFile(ABC): path: str relative_path: str - folder: "BaseOutputDataFolder" = None _file_handler = None _mode: str = None + _lock: SemLock | contextlib.nullcontext = field(default_factory=contextlib.nullcontext) @classmethod def from_path(cls, path: str, **kwargs): @@ -371,8 +372,6 @@ def delete(self): Does not call close() on the OutputDataFile directly to avoid uploading/saving it externally. :return: """ - if self.folder: - self.folder.pop_file(self.relative_path) if self._file_handler: self._file_handler.close() self._file_handler = None @@ -395,7 +394,7 @@ class BaseOutputDataFolder(ABC): path: str _output_files: dict[str, BaseOutputDataFile] = field(default_factory=dict) - _lock = contextlib.nullcontext() + _lock: SemLock | contextlib.nullcontext = field(default_factory=contextlib.nullcontext) def close(self): """ @@ -454,10 +453,6 @@ def open(self, relative_path: str, mode: str = "w", gzip: bool = False) -> BaseO self._output_files[relative_path] = self._create_new_file(relative_path) return self._output_files[relative_path].open(mode, gzip) - def pop_file(self, relative_path): - if relative_path in self._output_files: - self._output_files.pop(relative_path) - @abstractmethod def to_input_folder(self) -> BaseInputDataFolder: raise NotImplementedError diff --git a/src/datatrove/io/fsspec.py b/src/datatrove/io/fsspec.py index ab4d6d24..dc19bde3 100644 --- a/src/datatrove/io/fsspec.py +++ b/src/datatrove/io/fsspec.py @@ -51,7 +51,7 @@ def __post_init__(self): def _create_new_file(self, relative_path: str): return FSSpecOutputDataFile( - path=os.path.join(self.path, relative_path), relative_path=relative_path, _fs=self._fs, folder=self + path=os.path.join(self.path, relative_path), relative_path=relative_path, _fs=self._fs, _lock=self._lock ) def to_input_folder(self) -> BaseInputDataFolder: @@ -88,7 +88,7 @@ def list_files(self, extension: str | list[str] = None, suffix: str = "") -> lis def _unchecked_get_file(self, relative_path: str): return FSSpecInputDataFile( - path=os.path.join(self.path, relative_path), relative_path=relative_path, _fs=self._fs, folder=self + path=os.path.join(self.path, relative_path), relative_path=relative_path, _fs=self._fs, _lock=self._lock ) def file_exists(self, relative_path: str) -> bool: diff --git a/src/datatrove/io/local.py b/src/datatrove/io/local.py index 505659d4..726554ae 100644 --- a/src/datatrove/io/local.py +++ b/src/datatrove/io/local.py @@ -38,7 +38,7 @@ class LocalOutputDataFolder(BaseOutputDataFolder): def _create_new_file(self, relative_path: str) -> LocalOutputDataFile: return LocalOutputDataFile( - path=os.path.join(self.path, relative_path), relative_path=relative_path, folder=self + path=os.path.join(self.path, relative_path), relative_path=relative_path, _lock=self._lock ) def to_input_folder(self) -> BaseInputDataFolder: @@ -83,7 +83,7 @@ def _unchecked_get_file(self, relative_path: str) -> LocalInputDataFile: :return: an input file """ return LocalInputDataFile( - path=os.path.join(self.path, relative_path), relative_path=relative_path, folder=self + path=os.path.join(self.path, relative_path), relative_path=relative_path, _lock=self._lock ) diff --git a/src/datatrove/io/s3.py b/src/datatrove/io/s3.py index 0c3924a0..987ea9b5 100644 --- a/src/datatrove/io/s3.py +++ b/src/datatrove/io/s3.py @@ -1,4 +1,3 @@ -import contextlib import gzip as gzip_lib import os.path import tempfile @@ -49,7 +48,8 @@ def _create_new_file(self, relative_path: str): local_path=os.path.join(self.local_path, relative_path), path=os.path.join(self.path, relative_path), relative_path=relative_path, - folder=self, + cleanup=self.cleanup, + _lock=self._lock, ) def to_input_folder(self) -> BaseInputDataFolder: @@ -63,7 +63,7 @@ class S3OutputDataFile(BaseOutputDataFile): """ local_path: str = None - folder: "S3OutputDataFolder" = None + cleanup: bool = True def __post_init__(self): if not self.path.startswith("s3://"): @@ -73,11 +73,11 @@ def __post_init__(self): def close(self): super().close() if self.local_path: - with contextlib.nullcontext() if not self.folder else self.folder._lock: + with self._lock: logger.info(f'Uploading "{self.local_path}" to "{self.path}"...') s3_upload_file(self.local_path, self.path) logger.info(f'Uploaded "{self.local_path}" to "{self.path}".') - if self.folder and self.folder.cleanup: + if self.cleanup: os.remove(self.local_path) self.local_path = None @@ -92,7 +92,7 @@ def delete(self): @property def persistent_local_path(self): - assert not self.folder or (not self.folder._tmpdir and not self.folder.cleanup) + assert not self.cleanup return self.local_path @@ -107,7 +107,7 @@ class S3InputDataFile(BaseInputDataFile): local_path: str = None stream: bool = False - folder: "S3InputDataFolder" = None + cleanup: bool = True @contextmanager def open_binary(self): @@ -121,7 +121,7 @@ def open_binary(self): else: # download if not os.path.isfile(self.local_path): - with contextlib.nullcontext() if not self.folder else self.folder._lock: + with self._lock: logger.info(f'Downloading "{self.path}" to "{self.local_path}"...') s3_download_file(self.path, self.local_path) logger.info(f'Downloaded "{self.path}" to "{self.local_path}".') @@ -129,7 +129,7 @@ def open_binary(self): try: yield f finally: - if self.folder and self.folder.cleanup: + if self.cleanup: os.remove(self.local_path) @@ -161,8 +161,9 @@ def _unchecked_get_file(self, relative_path: str): path=os.path.join(self.path, relative_path), local_path=os.path.join(self.local_path, relative_path), relative_path=relative_path, - folder=self, stream=self.stream, + cleanup=self.cleanup, + _lock=self._lock, ) def file_exists(self, relative_path: str) -> bool: diff --git a/src/datatrove/utils/logging.py b/src/datatrove/utils/logging.py index b5edca0a..010ab2d4 100644 --- a/src/datatrove/utils/logging.py +++ b/src/datatrove/utils/logging.py @@ -21,3 +21,8 @@ def add_task_logger(logging_dir: BaseOutputDataFolder, rank: int, local_rank: in logger.add(sys.stderr, level="INFO" if local_rank == 0 else "ERROR") logger.add(logging_dir.open(f"logs/task_{rank:05d}.log"), colorize=True, level="DEBUG") logger.info(f"Launching pipeline for {rank=}") + + +def close_task_logger(logging_dir: BaseOutputDataFolder, rank: int): + logger.remove() + logging_dir.open(f"logs/task_{rank:05d}.log").close()