Skip to content

Commit

Permalink
Merge pull request #169 from broadinstitute/ps-single-cell-metadata
Browse files Browse the repository at this point in the history
[Single Cell] Metadata
  • Loading branch information
psmadbec authored Sep 9, 2024
2 parents dc1e921 + f3d18d6 commit 4a0105a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 23 deletions.
42 changes: 22 additions & 20 deletions bioindex/src/main/resources/singleCell.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
cpus = 8

metadata_cell_key = 'NAME'
output_metadata_labels = ['cell_subtype__custom', 'cell_type__custom', 'bmi__group', 'disease__ontology_label', 'sex']
metadata_labels = [metadata_cell_key] + output_metadata_labels
coordinate_cell_key = 'NAME'
coordinate_labels = ['X', 'Y']

Expand All @@ -27,21 +25,32 @@
def fetch_metadata():
with open('raw/metadata.tsv', 'r') as f:
header = f.readline().strip().split('\t')
label_to_header_idx = {label: header.index(label) for label in metadata_labels}
index_lists = {label: [] for label in metadata_labels}
set_lists = {label: [] for label in metadata_labels}
index_dict = {label: dict() for label in metadata_labels}
possible_label_dict = {label: idx for idx, label in enumerate(header)}
index_lists = {label: [] for label in possible_label_dict}
set_lists = {label: [] for label in possible_label_dict}
index_dict = {label: dict() for label in possible_label_dict}
for line in f:
split_line = line.strip().split('\t')
for label in metadata_labels:
label_value = split_line[label_to_header_idx[label]]
split_line = [a.strip() for a in line.split('\t')] # can have empty cells at the end of the line
for label in possible_label_dict:
label_value = split_line[possible_label_dict[label]]
if label_value not in index_dict[label]:
index_dict[label][label_value] = len(set_lists[label])
set_lists[label].append(label_value)
index_lists[label].append(index_dict[label][label_value])
return index_lists, set_lists, index_dict


def filter_metadata(index_lists, set_lists, index_dict):
max_numb = 100
for label in list(index_lists.keys()):
if (len(set_lists[label]) >= max_numb and label != metadata_cell_key) or \
(len(set_lists[label]) <= 1 and ''.join(set_lists[label]) == ''):
index_lists.pop(label)
set_lists.pop(label)
index_dict.pop(label)
return index_lists, set_lists, index_dict


def fetch_coordinates(cell_indexes):
with open('raw/coordinates.tsv', 'r') as f:
header = f.readline().strip().split('\t')
Expand All @@ -60,12 +69,8 @@ def fetch_coordinates(cell_indexes):
def output_metadata(set_lists, index_lists):
fields = {
metadata_cell_key: set_lists[metadata_cell_key],
'metadata_labels': {
output_metadata_label: set_lists[output_metadata_label] for output_metadata_label in output_metadata_labels
},
'metadata': {
output_metadata_label: index_lists[output_metadata_label] for output_metadata_label in output_metadata_labels
}
'metadata_labels': {label: data for label, data in set_lists.items() if label != metadata_cell_key},
'metadata': {label: data for label, data in index_lists.items() if label != metadata_cell_key},
}
with gzip.open('processed/fields.json.gz', 'wt') as f:
json.dump(fields, f)
Expand Down Expand Up @@ -125,8 +130,6 @@ def fetch_and_output_expression(dataset, cell_indexes, infile, outfile, number_m
def upload(dataset):
subprocess.check_call(['aws', 's3', 'cp', 'processed/fields.json.gz', f'{s3_bioindex}/raw/single_cell/{dataset}/'])
subprocess.check_call(['aws', 's3', 'cp', 'processed/coordinates.tsv.gz', f'{s3_bioindex}/raw/single_cell/{dataset}/'])
subprocess.check_call(['aws', 's3', 'rm', f'{s3_bioindex}/single_cell/gene/{dataset}/', '--recursive'])
subprocess.check_call(['aws', 's3', 'cp', 'processed/gene/', f'{s3_bioindex}/single_cell/gene/{dataset}/', '--recursive'])
subprocess.check_call(['aws', 's3', 'rm', f'{s3_bioindex}/single_cell/gene_lognorm/{dataset}/', '--recursive'])
subprocess.check_call(['aws', 's3', 'cp', 'processed/gene_lognorm/', f'{s3_bioindex}/single_cell/gene_lognorm/{dataset}/', '--recursive'])
shutil.rmtree('raw')
Expand All @@ -143,14 +146,13 @@ def main():

os.mkdir('processed')
index_lists, set_lists, index_dict = fetch_metadata()
index_lists, set_lists, index_dict = filter_metadata(index_lists, set_lists, index_dict)

coordinates = fetch_coordinates(index_dict[metadata_cell_key])
output_metadata(set_lists, index_lists)
output_coordinates(index_lists, coordinates)

cells = index_dict[metadata_cell_key]
os.mkdir('processed/gene')
gene_out = f'processed/gene/part-{{}}.json'
fetch_and_output_expression(args.dataset, cells, 'raw/raw_counts.tsv.gz', gene_out, 'int')
os.mkdir('processed/gene_lognorm')
gene_lognorm_out = f'processed/gene_lognorm/part-{{}}.json'
fetch_and_output_expression(args.dataset, cells, 'raw/lognorm_counts.tsv.gz', gene_lognorm_out, 'float')
Expand Down
41 changes: 41 additions & 0 deletions bioindex/src/main/resources/singleCellMetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/python3
import os
import glob
import gzip
import json
import shutil
import subprocess

s3_in = 's3://dig-analysis-data'
s3_bioindex = 's3://dig-bio-index'


def download_data():
out = []
cmd = 'aws s3 cp s3://dig-analysis-data/single_cell/ ./data/ ' \
'--recursive --exclude="*" --include="*dataset_metadata"'
subprocess.check_call(cmd, shell=True)
for file in glob.glob('data/*/dataset_metadata'):
with open(file, 'r') as f:
out.append(json.load(f))
return out


def upload_file(data):
file = 'dataset_metadata.json.gz'
path = f'{s3_bioindex}/raw/single_cell_metadata/'
with gzip.open(file, 'wt') as f:
for d in data:
f.write(f'{json.dumps(d)}\n')
subprocess.check_call(['aws', 's3', 'cp', file, path])
os.remove(file)


def main():
data = download_data()
upload_file(data)
shutil.rmtree('data')


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions bioindex/src/main/scala/BioIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ object BioIndex extends Method {
addStage(new GeneToTranscriptStage)
addStage(new HugeStage)
addStage(new SingleCellStage)
addStage(new SingleCellMetadataStage)
}
}
29 changes: 29 additions & 0 deletions bioindex/src/main/scala/SingleCellMetadataStage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.broadinstitute.dig.aggregator.methods.bioindex

import org.broadinstitute.dig.aggregator.core._
import org.broadinstitute.dig.aws._
import org.broadinstitute.dig.aws.emr._

class SingleCellMetadataStage(implicit context: Context) extends Stage {
import MemorySize.Implicits._

val metadata: Input.Source = Input.Source.Raw("single_cell/*/dataset_metadata")

override val cluster: ClusterDef = super.cluster.copy(
instances = 1,
applications = Seq.empty,
)

/** Input sources. */
override val sources: Seq[Input.Source] = Seq(metadata)

/** Rules for mapping input to outputs. */
override val rules: PartialFunction[Input, Outputs] = {
case metadata(_) => Outputs.Named("metadata")
}

/** Output to Job steps. */
override def make(dataset: String): Job = {
new Job(Job.Script(resourceUri("singleCellMetadata.py")))
}
}
4 changes: 1 addition & 3 deletions bioindex/src/main/scala/SingleCellStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class SingleCellStage(implicit context: Context) extends Stage {

val fields: Input.Source = Input.Source.Raw("single_cell/*/metadata.tsv")
val coordinates: Input.Source = Input.Source.Raw("single_cell/*/coordinates.tsv")
val gene: Input.Source = Input.Source.Raw("single_cell/*/raw_counts.tsv.gz")
val geneLogNorm: Input.Source = Input.Source.Raw("single_cell/*/lognorm_counts.tsv.gz")

override val cluster: ClusterDef = super.cluster.copy(
Expand All @@ -21,13 +20,12 @@ class SingleCellStage(implicit context: Context) extends Stage {
)

/** Input sources. */
override val sources: Seq[Input.Source] = Seq(fields, coordinates, gene, geneLogNorm)
override val sources: Seq[Input.Source] = Seq(fields, coordinates, geneLogNorm)

/** Rules for mapping input to outputs. */
override val rules: PartialFunction[Input, Outputs] = {
case fields(dataset) => Outputs.Named(dataset)
case coordinates(dataset) => Outputs.Named(dataset)
case gene(dataset) => Outputs.Named(dataset)
case geneLogNorm(dataset) => Outputs.Named(dataset)
}

Expand Down

0 comments on commit 4a0105a

Please sign in to comment.