From 88490235b5dfb67c33842a9f4d5e0459ec2ca937 Mon Sep 17 00:00:00 2001 From: samia Date: Tue, 24 Jan 2023 17:04:12 +0100 Subject: [PATCH 1/2] - implement merge sort and replace sort_file_on_disk - sort metadata chunks before write them to disk --- mokapot/confidence.py | 38 ++++++++++++++++++++++++------------ mokapot/utils.py | 45 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/mokapot/confidence.py b/mokapot/confidence.py index 3a565378..0e13b97a 100644 --- a/mokapot/confidence.py +++ b/mokapot/confidence.py @@ -16,11 +16,13 @@ confidence estimates, rather than initializing the classes below directly. """ import os +import glob from pathlib import Path import logging import pandas as pd import matplotlib.pyplot as plt from triqler import qvality +from joblib import Parallel, delayed from . import qvalues from . import utils @@ -866,20 +868,21 @@ def assign_confidence( scores_slices = utils.create_chunks( scores, chunk_size=CONFIDENCE_CHUNK_SIZE ) - scores_metadata_path = "scores_metadata.csv" - for chunk_metadata, score_chunk in zip(reader, scores_slices): - chunk_metadata["score"] = score_chunk - chunk_metadata.to_csv( - "scores_metadata.csv", - sep=sep, - index=False, - mode="a", - header=None, + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(save_sorted_metadata_chunks)( + chunk_metadata, score_chunk, i, sep ) + for chunk_metadata, score_chunk, i in zip( + reader, scores_slices, range(len(scores_slices)) + ) + ) + psms_path = "psms.csv" peptides_path = "peptides.csv" - iterable_sorted = utils.sort_file_on_disk( - file_path=scores_metadata_path, sort_key=-1, sep=sep, reverse=True + scores_metadata_paths = glob.glob("scores_metadata_*") + iterable_sorted = utils.merge_sort( + scores_metadata_paths, col_score="score", sep=sep ) LOGGER.info("Assigning confidence...") LOGGER.info("Performing target-decoy competition...") @@ -899,7 +902,7 @@ def assign_confidence( out_peptides="peptides.csv", sep=sep, ) - os.remove(scores_metadata_path) + [os.remove(sc_path) for sc_path in scores_metadata_paths] LOGGER.info("\t- Found %i PSMs from unique spectra.", unique_psms) LOGGER.info("\t- Found %i unique peptides.", unique_peptides) @@ -933,6 +936,17 @@ def assign_confidence( ) +def save_sorted_metadata_chunks(chunk_metadata, score_chunk, i, sep): + chunk_metadata["score"] = score_chunk + chunk_metadata.sort_values(by="score", ascending=False, inplace=True) + chunk_metadata.to_csv( + f"scores_metadata_{i}.csv", + sep=sep, + index=False, + mode="w", + ) + + def plot_qvalues(qvalues, threshold=0.1, ax=None, **kwargs): """ Plot the cumulative number of discoveries over range of q-values. diff --git a/mokapot/utils.py b/mokapot/utils.py index 933aa0c8..090bfa49 100644 --- a/mokapot/utils.py +++ b/mokapot/utils.py @@ -77,20 +77,57 @@ def get_unique_psms_and_peptides(iterable, out_psms, out_peptides, sep): seen_peptide = set() f_psm = open(out_psms, "a") f_peptide = open(out_peptides, "a") - for line in iterable: - line_list = line.split(sep) + for line_list in iterable: line_hash_psm = tuple([int(line_list[2]), float(line_list[3])]) line_hash_peptide = line_list[4] if line_hash_psm not in seen_psm: seen_psm.add(line_hash_psm) f_psm.write( - f"{sep.join([line_list[0], line_list[1], line_list[-3], line_list[-2], line_list[-1]])}\n" + f"{sep.join([line_list[0], line_list[1], line_list[-3], line_list[-2], line_list[-1]])}" ) if line_hash_peptide not in seen_peptide: seen_peptide.add(line_hash_peptide) f_peptide.write( - f"{sep.join([line_list[0], line_list[1], line_list[-3], line_list[-2], line_list[-1]])}\n" + f"{sep.join([line_list[0], line_list[1], line_list[-3], line_list[-2], line_list[-1]])}" ) f_psm.close() f_peptide.close() return [len(seen_psm), len(seen_peptide)] + + +def get_next_row(file_handles, current_rows, col_index, sep="\t"): + max_key = max_row = None + max_score = None + for key, row in current_rows.items(): + score = float(row[col_index]) + if max_score is None or max_score < score: + max_score = score + max_key = key + max_row = row + + try: + current_rows[max_key] = next(file_handles[max_key]).split(sep) + except: + file_handles[max_key].close() + del current_rows[max_key] + del file_handles[max_key] + return None + + return max_row + + +def merge_sort(paths, col_score, sep="\t"): + file_handles = {i: open(path, "r") for i, path in enumerate(paths)} + current_rows = {} + for key, f in file_handles.items(): + next(f) + first_row = next(f) + current_rows[key] = first_row.split(sep) + + with open(paths[0], "r") as f: + header = next(f) + col_index = header.rstrip().split(sep).index(col_score) + while file_handles != {}: + row = get_next_row(file_handles, current_rows, col_index) + if row is not None: + yield row From 9859bbb0a4d79179528cd91e8ef712d0354af981 Mon Sep 17 00:00:00 2001 From: samia Date: Tue, 24 Jan 2023 17:29:35 +0100 Subject: [PATCH 2/2] drop duplicates before write to disk --- mokapot/confidence.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mokapot/confidence.py b/mokapot/confidence.py index 0e13b97a..2767cb30 100644 --- a/mokapot/confidence.py +++ b/mokapot/confidence.py @@ -871,7 +871,7 @@ def assign_confidence( Parallel(n_jobs=-1, require="sharedmem")( delayed(save_sorted_metadata_chunks)( - chunk_metadata, score_chunk, i, sep + chunk_metadata, score_chunk, psms_info, i, sep ) for chunk_metadata, score_chunk, i in zip( reader, scores_slices, range(len(scores_slices)) @@ -936,9 +936,14 @@ def assign_confidence( ) -def save_sorted_metadata_chunks(chunk_metadata, score_chunk, i, sep): +def save_sorted_metadata_chunks( + chunk_metadata, score_chunk, psms_info, i, sep +): chunk_metadata["score"] = score_chunk chunk_metadata.sort_values(by="score", ascending=False, inplace=True) + chunk_metadata = chunk_metadata.drop_duplicates( + psms_info["spectrum_columns"] + ) chunk_metadata.to_csv( f"scores_metadata_{i}.csv", sep=sep,