Skip to content

Commit

Permalink
made slurm launcher automatically launch all the dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
guipenedo committed Nov 1, 2023
1 parent 5b1c4b4 commit dd30eac
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
19 changes: 13 additions & 6 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ def launch_job(self):
os.makedirs(os.path.join(self.logging_dir, "logs"), exist_ok=True)
os.makedirs(os.path.join(self.logging_dir, "completions"), exist_ok=True)
assert not self.depends or (
isinstance(self.depends, SlurmPipelineExecutor) and self.depends.job_ids
), "depends= must be a SlurmPipelineExecutor that was already launched!"
isinstance(self.depends, SlurmPipelineExecutor)
), "depends= must be a SlurmPipelineExecutor"
if self.depends and not self.depends.job_ids:
logger.info(f'Launching dependency job "{self.depends.job_name}"')
self.depends.launch_job()

# pickle
with open(os.path.join(self.logging_dir, "executor.pik"), "wb") as f:
Expand Down Expand Up @@ -160,12 +163,16 @@ def launch_file(self):
def get_launch_file(self, sbatch_args: dict, run_script: str):
args = "\n".join([f"#SBATCH --{k}={v}" for k, v in sbatch_args.items()])

env_command = self.env_command if self.env_command else (
f"""conda init bash
env_command = (
self.env_command
if self.env_command
else (
f"""conda init bash
conda activate {self.condaenv}
source ~/.bashrc"""
if self.condaenv
else (f"source {self.venv_path}" if self.venv_path else "")
if self.condaenv
else (f"source {self.venv_path}" if self.venv_path else "")
)
)

return (
Expand Down
13 changes: 6 additions & 7 deletions src/datatrove/pipeline/dedup/sentence_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,15 @@ def __init__(self, output_folder: BaseOutputDataFolder, n_sentences: int = 3, **
super().__init__(**kwargs)
self.output_folder = output_folder
self.n_sentences = n_sentences
self.signatures = []

def set_up_dl_locks(self, dl_lock, up_lock):
self.output_folder.set_lock(up_lock)

def save_hashes(self, rank: int):
self.signatures.sort()
def save_hashes(self, rank: int, signatures):
signatures.sort()

f = self.output_folder.open(f"{rank:05d}{ExtensionHelperSD.stage_1_signature}", mode="wb")
for hs in self.signatures:
for hs in signatures:
f.write(struct.pack("<Q", hs.hash_value))
f.write(struct.pack("<I", hs.doc_id))
f.write(struct.pack("<H", hs.sent_id))
Expand Down Expand Up @@ -108,12 +107,12 @@ def __call__(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
sentence idx. Before saving them the hashes are sorted.
"""
self.signatures = []
signatures = []
for doc_idx, doc in enumerate(data):
with self.stats.time_manager:
self.stat_update(StatHints.total)
self.signatures.extend(self.get_hashes(doc, doc_idx))
self.save_hashes(rank)
signatures.extend(self.get_hashes(doc, doc_idx))
self.save_hashes(rank, signatures)
self.output_folder.close()


Expand Down

0 comments on commit dd30eac

Please sign in to comment.