diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 816cb02..082afbb 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -1,5 +1,6 @@ import os import subprocess +from .generate_kmer import generate_kmer_features_from_fasta from .atomicwrite import atomic_write def calculate_coverage(depth_stream, bam_file, must_link_threshold, edge=75, is_combined=False, @@ -172,6 +173,144 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[ data_split_cov = None return data_cov, data_split_cov +def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator, logger, outcsv): + """ + combine cov/cov_split for specific sample in one file + + Parameters + ---------- + sample : sample name + cov_dir : where coverage files are stored + bam_list : list of BAM files + suffix: data_cov.csv or data_split_cov.csv + is_combined : whether to process split files + separator: separator + logger: logger + outcsv: writable path to csv file + + Returns + ------- + sample : sample name + """ + import polars as pl + import numpy as np + #logger.debug("\tCombining coverage information for sample {sample}") + + covs = [] + count = 0 + batch = 0 + for bam_index, bam_file in enumerate(bam_list): + bam_fname = os.path.split(bam_file)[-1] + #logger.debug(f"\tProcessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") + + data_cov = pl.scan_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}')\ + .rename({"": "contig_name"})\ + .filter(pl.col("contig_name").str.contains(sample + separator))\ + .collect() + covs.append(data_cov) + + count += 1 + if count > 99: + batch += 1 + #logger.debug(f"\tConcating #batch {batch} dataframe") + data_cov_merged = pl.concat(covs, how="align") + #logger.debug(f"\tConcating #batch {batch} dataframe done") + covs = [] + covs.append(data_cov_merged) + count = 0 + + if len(covs) > 1: + batch += 1 + #logger.debug(f"\tConcating #batch {batch} dataframe") + sample_cov = pl.concat(covs, how="align") # signal SIGSEGV (Address boundary error) + #logger.debug(f"\tConcating #batch {batch} dataframe done") + else: + sample_cov = covs[0] + #covs = [] + + contig_names = [i.split(":")[1] for i in sample_cov["contig_name"]] + sample_cov = sample_cov.drop("contig_name") + headers = ["contig_name"] + list(sample_cov.columns) + + if is_combined: + abun_scale = (sample_cov.mean() / 100).map_rows(np.ceil) * 100 + divided_columns = [pl.col(col) / abun_scale[0, index] for index, col in enumerate(list(sample_cov.columns))] + + sample_cov = sample_cov.select(divided_columns) + + sample_cov = sample_cov.with_columns(pl.Series("contig_name", contig_names)) + sample_cov = sample_cov.select(headers) + + #logger.debug(f"\tSaving {outcsv}") + sample_cov.write_csv(outcsv) + #logger.debug(f"\tSaving {outcsv} done") + + #logger.debug("\tCombining coverage information for sample {sample} done") + return sample + +def generate_sample_cov(sample, bams, output, is_combined, separator, logger, binning_threshold, must_link_threshold): + """ + generate cov/cov_split for specific sample in one file + + Parameters + ---------- + sample : sample name + bams: bams list + output : output dirname + is_combined : whether to process split files + separator: separator + logger: logger + binning_threshold + must_link_threshold + + Returns + ------- + sample : sample name + """ + import pandas as pd + + logger.debug("\tGenerating coverage information for sample {sample}") + + output_path = os.path.join(output, 'samples', sample) + os.makedirs(output_path, exist_ok=True) + + combine_sample_cov( + sample, os.path.join(output, "samples"), bams, "data_cov.csv", + is_combined, separator, logger, + os.path.join(output_path, "data_cov.csv")) + + if is_combined: + combine_sample_cov( + sample, os.path.join(output, "samples"), bams, "data_split_cov.csv", + is_combined, separator, logger, + os.path.join(output_path, 'data_split_cov.csv')) + + sample_contig_fasta = os.path.join(output, f'samples/{sample}.fa') + kmer_whole = generate_kmer_features_from_fasta( + sample_contig_fasta, binning_threshold[sample], 4) + kmer_split = generate_kmer_features_from_fasta( + sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) + + sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow") + kmer_whole.index = kmer_whole.index.astype(str) + sample_cov.index = sample_cov.index.astype(str) + data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, + left_index=True, right_index=True, sort=False, copy=True) + if is_combined: + sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow") + data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, + left_index=True, right_index=True, sort=False, copy=True) + else: + data_split = kmer_split + + with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile: + data.to_csv(ofile) + + with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile: + data_split.to_csv(ofile) + + return sample + def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None): import pandas as pd import numpy as np diff --git a/SemiBin/main.py b/SemiBin/main.py index 491846b..fe70413 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -11,7 +11,7 @@ from . import utils from .utils import validate_normalize_args, get_must_link_threshold, generate_cannot_link, \ set_random_seed, process_fasta, split_data, get_model_path, extract_bams -from .generate_coverage import generate_cov, combine_cov, generate_cov_from_abundances +from .generate_coverage import generate_cov, combine_cov, combine_sample_cov, generate_sample_cov, generate_cov_from_abundances from .generate_kmer import generate_kmer_features_from_fasta from .fasta import fasta_iter @@ -978,55 +978,36 @@ def fasta_sample_iter(fn): sys.exit(1) # Generate cov features for every sample - data_cov, data_split_cov = combine_cov(os.path.join(args.output, 'samples'), args.bams, is_combined) - if is_combined: - data_split_cov = data_split_cov.reset_index() - columns_list = list(data_split_cov.columns) - columns_list[0] = 'contig_name' - data_split_cov.columns = columns_list - - data_cov = data_cov.reset_index() - columns_list = list(data_cov.columns) - columns_list[0] = 'contig_name' - data_cov.columns = columns_list + logger.info('Generating coverage for every sample.') + with Pool(min(args.num_process, len(sample_list))) as pool: + results = [ + pool.apply_async( + generate_sample_cov, + args=( + sample, + args.bams, + args.output, + is_combined, + args.separator, + logger, + binning_threshold, + must_link_threshold + )) + for sample in sample_list] + for r in results: + s = r.get() + logger.info(f'Generated coverage: {s}') for sample in sample_list: - output_path = os.path.join(args.output, 'samples', sample) - os.makedirs(output_path, exist_ok=True) - - part_data = split_data(data_cov, sample, args.separator, is_combined) - part_data.to_csv(os.path.join(output_path, 'data_cov.csv')) - - if is_combined: - part_data = split_data(data_split_cov, sample, args.separator, is_combined) - part_data.to_csv(os.path.join( - output_path, 'data_split_cov.csv')) - - sample_contig_fasta = os.path.join( - args.output, f'samples/{sample}.fa') - kmer_whole = generate_kmer_features_from_fasta( - sample_contig_fasta, binning_threshold[sample], 4) - kmer_split = generate_kmer_features_from_fasta( - sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) - - sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0) - kmer_whole.index = kmer_whole.index.astype(str) - sample_cov.index = sample_cov.index.astype(str) - data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, - left_index=True, right_index=True, sort=False, copy=True) + if not os.path.exists(os.path.join(args.output, 'samples', sample, 'data_cov.csv')): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) if is_combined: - sample_cov_split = pd.read_csv(os.path.join( - output_path, 'data_split_cov.csv'), index_col=0) - data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, - left_index=True, right_index=True, sort=False, copy=True) - else: - data_split = kmer_split - - with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile: - data.to_csv(ofile) - - with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile: - data_split.to_csv(ofile) + if not os.path.join(os.path.join(args.output, 'samples', sample, 'data_split_cov.csv')): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) if args.abundances: logger.info('Reading abundance information from abundance files.') diff --git a/setup.py b/setup.py index a1a807e..16ab06a 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,8 @@ 'torch', 'python-igraph', 'pandas', + 'pyarrow', + 'polars', 'scikit-learn', 'requests', 'numexpr',