diff --git a/bioindex/src/main/resources/singleCell.py b/bioindex/src/main/resources/singleCell.py index 303a5379..49f80e91 100644 --- a/bioindex/src/main/resources/singleCell.py +++ b/bioindex/src/main/resources/singleCell.py @@ -13,8 +13,6 @@ cpus = 8 metadata_cell_key = 'NAME' -output_metadata_labels = ['cell_subtype__custom', 'cell_type__custom', 'bmi__group', 'disease__ontology_label', 'sex'] -metadata_labels = [metadata_cell_key] + output_metadata_labels coordinate_cell_key = 'NAME' coordinate_labels = ['X', 'Y'] @@ -27,14 +25,14 @@ def fetch_metadata(): with open('raw/metadata.tsv', 'r') as f: header = f.readline().strip().split('\t') - label_to_header_idx = {label: header.index(label) for label in metadata_labels} - index_lists = {label: [] for label in metadata_labels} - set_lists = {label: [] for label in metadata_labels} - index_dict = {label: dict() for label in metadata_labels} + possible_label_dict = {label: idx for idx, label in enumerate(header)} + index_lists = {label: [] for label in possible_label_dict} + set_lists = {label: [] for label in possible_label_dict} + index_dict = {label: dict() for label in possible_label_dict} for line in f: - split_line = line.strip().split('\t') - for label in metadata_labels: - label_value = split_line[label_to_header_idx[label]] + split_line = [a.strip() for a in line.split('\t')] # can have empty cells at the end of the line + for label in possible_label_dict: + label_value = split_line[possible_label_dict[label]] if label_value not in index_dict[label]: index_dict[label][label_value] = len(set_lists[label]) set_lists[label].append(label_value) @@ -42,6 +40,17 @@ def fetch_metadata(): return index_lists, set_lists, index_dict +def filter_metadata(index_lists, set_lists, index_dict): + max_numb = 100 + for label in list(index_lists.keys()): + if (len(set_lists[label]) >= max_numb and label != metadata_cell_key) or \ + (len(set_lists[label]) <= 1 and ''.join(set_lists[label]) == ''): + index_lists.pop(label) + set_lists.pop(label) + index_dict.pop(label) + return index_lists, set_lists, index_dict + + def fetch_coordinates(cell_indexes): with open('raw/coordinates.tsv', 'r') as f: header = f.readline().strip().split('\t') @@ -60,12 +69,8 @@ def fetch_coordinates(cell_indexes): def output_metadata(set_lists, index_lists): fields = { metadata_cell_key: set_lists[metadata_cell_key], - 'metadata_labels': { - output_metadata_label: set_lists[output_metadata_label] for output_metadata_label in output_metadata_labels - }, - 'metadata': { - output_metadata_label: index_lists[output_metadata_label] for output_metadata_label in output_metadata_labels - } + 'metadata_labels': {label: data for label, data in set_lists.items() if label != metadata_cell_key}, + 'metadata': {label: data for label, data in index_lists.items() if label != metadata_cell_key}, } with gzip.open('processed/fields.json.gz', 'wt') as f: json.dump(fields, f) @@ -125,8 +130,6 @@ def fetch_and_output_expression(dataset, cell_indexes, infile, outfile, number_m def upload(dataset): subprocess.check_call(['aws', 's3', 'cp', 'processed/fields.json.gz', f'{s3_bioindex}/raw/single_cell/{dataset}/']) subprocess.check_call(['aws', 's3', 'cp', 'processed/coordinates.tsv.gz', f'{s3_bioindex}/raw/single_cell/{dataset}/']) - subprocess.check_call(['aws', 's3', 'rm', f'{s3_bioindex}/single_cell/gene/{dataset}/', '--recursive']) - subprocess.check_call(['aws', 's3', 'cp', 'processed/gene/', f'{s3_bioindex}/single_cell/gene/{dataset}/', '--recursive']) subprocess.check_call(['aws', 's3', 'rm', f'{s3_bioindex}/single_cell/gene_lognorm/{dataset}/', '--recursive']) subprocess.check_call(['aws', 's3', 'cp', 'processed/gene_lognorm/', f'{s3_bioindex}/single_cell/gene_lognorm/{dataset}/', '--recursive']) shutil.rmtree('raw') @@ -143,14 +146,13 @@ def main(): os.mkdir('processed') index_lists, set_lists, index_dict = fetch_metadata() + index_lists, set_lists, index_dict = filter_metadata(index_lists, set_lists, index_dict) + coordinates = fetch_coordinates(index_dict[metadata_cell_key]) output_metadata(set_lists, index_lists) output_coordinates(index_lists, coordinates) cells = index_dict[metadata_cell_key] - os.mkdir('processed/gene') - gene_out = f'processed/gene/part-{{}}.json' - fetch_and_output_expression(args.dataset, cells, 'raw/raw_counts.tsv.gz', gene_out, 'int') os.mkdir('processed/gene_lognorm') gene_lognorm_out = f'processed/gene_lognorm/part-{{}}.json' fetch_and_output_expression(args.dataset, cells, 'raw/lognorm_counts.tsv.gz', gene_lognorm_out, 'float') diff --git a/bioindex/src/main/resources/singleCellMetadata.py b/bioindex/src/main/resources/singleCellMetadata.py new file mode 100644 index 00000000..0105ca46 --- /dev/null +++ b/bioindex/src/main/resources/singleCellMetadata.py @@ -0,0 +1,41 @@ +#!/usr/bin/python3 +import os +import glob +import gzip +import json +import shutil +import subprocess + +s3_in = 's3://dig-analysis-data' +s3_bioindex = 's3://dig-bio-index' + + +def download_data(): + out = [] + cmd = 'aws s3 cp s3://dig-analysis-data/single_cell/ ./data/ ' \ + '--recursive --exclude="*" --include="*dataset_metadata"' + subprocess.check_call(cmd, shell=True) + for file in glob.glob('data/*/dataset_metadata'): + with open(file, 'r') as f: + out.append(json.load(f)) + return out + + +def upload_file(data): + file = 'dataset_metadata.json.gz' + path = f'{s3_bioindex}/raw/single_cell_metadata/' + with gzip.open(file, 'wt') as f: + for d in data: + f.write(f'{json.dumps(d)}\n') + subprocess.check_call(['aws', 's3', 'cp', file, path]) + os.remove(file) + + +def main(): + data = download_data() + upload_file(data) + shutil.rmtree('data') + + +if __name__ == '__main__': + main() diff --git a/bioindex/src/main/scala/BioIndex.scala b/bioindex/src/main/scala/BioIndex.scala index 81a20bfb..ff38c717 100644 --- a/bioindex/src/main/scala/BioIndex.scala +++ b/bioindex/src/main/scala/BioIndex.scala @@ -50,5 +50,6 @@ object BioIndex extends Method { addStage(new GeneToTranscriptStage) addStage(new HugeStage) addStage(new SingleCellStage) + addStage(new SingleCellMetadataStage) } } diff --git a/bioindex/src/main/scala/SingleCellMetadataStage.scala b/bioindex/src/main/scala/SingleCellMetadataStage.scala new file mode 100644 index 00000000..2f5aedf9 --- /dev/null +++ b/bioindex/src/main/scala/SingleCellMetadataStage.scala @@ -0,0 +1,29 @@ +package org.broadinstitute.dig.aggregator.methods.bioindex + +import org.broadinstitute.dig.aggregator.core._ +import org.broadinstitute.dig.aws._ +import org.broadinstitute.dig.aws.emr._ + +class SingleCellMetadataStage(implicit context: Context) extends Stage { + import MemorySize.Implicits._ + + val metadata: Input.Source = Input.Source.Raw("single_cell/*/dataset_metadata") + + override val cluster: ClusterDef = super.cluster.copy( + instances = 1, + applications = Seq.empty, + ) + + /** Input sources. */ + override val sources: Seq[Input.Source] = Seq(metadata) + + /** Rules for mapping input to outputs. */ + override val rules: PartialFunction[Input, Outputs] = { + case metadata(_) => Outputs.Named("metadata") + } + + /** Output to Job steps. */ + override def make(dataset: String): Job = { + new Job(Job.Script(resourceUri("singleCellMetadata.py"))) + } +} diff --git a/bioindex/src/main/scala/SingleCellStage.scala b/bioindex/src/main/scala/SingleCellStage.scala index 014adf5b..85596a77 100644 --- a/bioindex/src/main/scala/SingleCellStage.scala +++ b/bioindex/src/main/scala/SingleCellStage.scala @@ -12,7 +12,6 @@ class SingleCellStage(implicit context: Context) extends Stage { val fields: Input.Source = Input.Source.Raw("single_cell/*/metadata.tsv") val coordinates: Input.Source = Input.Source.Raw("single_cell/*/coordinates.tsv") - val gene: Input.Source = Input.Source.Raw("single_cell/*/raw_counts.tsv.gz") val geneLogNorm: Input.Source = Input.Source.Raw("single_cell/*/lognorm_counts.tsv.gz") override val cluster: ClusterDef = super.cluster.copy( @@ -21,13 +20,12 @@ class SingleCellStage(implicit context: Context) extends Stage { ) /** Input sources. */ - override val sources: Seq[Input.Source] = Seq(fields, coordinates, gene, geneLogNorm) + override val sources: Seq[Input.Source] = Seq(fields, coordinates, geneLogNorm) /** Rules for mapping input to outputs. */ override val rules: PartialFunction[Input, Outputs] = { case fields(dataset) => Outputs.Named(dataset) case coordinates(dataset) => Outputs.Named(dataset) - case gene(dataset) => Outputs.Named(dataset) case geneLogNorm(dataset) => Outputs.Named(dataset) }