Skip to content

Commit

Permalink
Fix largest
Browse files Browse the repository at this point in the history
  • Loading branch information
psmadbec committed Jan 9, 2025
1 parent 551ac59 commit e42dea5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
42 changes: 25 additions & 17 deletions bottom-line/src/main/resources/runLargest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,29 @@ def get_engine(self):
def get_sorted_datasets(self, phenotype, ancestry):
with self.get_engine().connect() as connection:
print(f'Querying db for phenotype {phenotype} for largest {ancestry} dataset')
ancestry_addendum = f'AND ancestry="{ancestry}" ' if ancestry != 'TE' else ''
query = sqlalchemy.text(
f'SELECT name FROM Datasets '
f'SELECT name, ancestry FROM Datasets '
f'WHERE REGEXP_LIKE(phenotypes, "(^|,){phenotype}($|,)") '
f'AND ancestry="{ancestry}" AND tech="GWAS" '
f'{ancestry_addendum}AND tech="GWAS" '
f'ORDER BY subjects DESC'
)
rows = connection.execute(query).all()
print(f'Returned {len(rows)} rows for largest mixed dataset')
return [row[0] for row in rows]
print(f'Returned {len(rows)} rows for largest dataset')
return [(row[0], row[1]) for row in rows]

def check_existence(phenotype, ancestry, dataset):
path = f'{s3_in}/out/metaanalysis/variants/{phenotype}/dataset={dataset}/ancestry={ancestry}/'

def check_existence(phenotype, dataset_ancestry):
path = f'{s3_in}/out/metaanalysis/variants/{phenotype}/dataset={dataset_ancestry[0]}/ancestry={dataset_ancestry[1]}/'
return subprocess.call(['aws', 's3', 'ls', path, '--recursive'])


def get_dataset(phenotype, ancestry):
def get_dataset_ancestry(phenotype, ancestry):
db = BioIndexDB()
datasets = db.get_sorted_datasets(phenotype, ancestry)
for dataset in datasets:
if not check_existence(phenotype, ancestry, dataset):
return dataset
dataset_ancestries = db.get_sorted_datasets(phenotype, ancestry)
for dataset_ancestry in dataset_ancestries:
if not check_existence(phenotype, dataset_ancestry):
return dataset_ancestry


def main():
Expand All @@ -94,14 +96,20 @@ def main():
spark = SparkSession.builder.appName('bottom-line').getOrCreate()

# get the source and output directories
dataset = get_dataset(args.phenotype, args.ancestry)
print(f'Largest GWAS dataset for phenotype {args.phenotype}, ancestry {args.ancestry}: {dataset}')
if dataset is not None:
srcdir = f'{s3_in}/out/metaanalysis/variants/{args.phenotype}/dataset={dataset}/ancestry={args.ancestry}/*/part-*'
outdir = f'{s3_out}/out/metaanalysis/largest/ancestry-specific/{args.phenotype}/ancestry={args.ancestry}/'
dataset_ancestry = get_dataset_ancestry(args.phenotype, args.ancestry)
print(f'Largest GWAS dataset for phenotype {args.phenotype}, ancestry {args.ancestry}: {dataset_ancestry}')
if dataset_ancestry is not None:
dataset, ancestry = dataset_ancestry
srcdir = f'{s3_in}/out/metaanalysis/variants/{args.phenotype}/dataset={dataset}/ancestry={ancestry}/*/part-*'
if args.ancestry == 'TE':
outdir = f'{s3_out}/out/metaanalysis/largest/trans-ethnic/{args.phenotype}/'
else:
outdir = f'{s3_out}/out/metaanalysis/largest/ancestry-specific/{args.phenotype}/ancestry={args.ancestry}/'

columns = [col(field.name) for field in variants_schema]

output_ancestry = args.ancestry if args.ancestry != 'TE' else 'Mixed'

df = spark.read \
.csv(
srcdir,
Expand All @@ -110,7 +118,7 @@ def main():
schema=variants_schema,
) \
.select(*columns) \
.withColumn('ancestry', lit(args.ancestry))
.withColumn('ancestry', lit(output_ancestry))

df.write \
.mode('overwrite') \
Expand Down
4 changes: 2 additions & 2 deletions bottom-line/src/main/scala/LargestStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class LargestStage(implicit context: Context) extends Stage {
.flatten
// if there are no partitions, then we can ignore it
if (partitions.nonEmpty) {
val partitionNames = partitions.map(_.toLargestOutput.toOutput)
val partitionNames = partitions.flatMap(_.toLargestOutputs.map(_.toOutput))
Outputs.Named(partitionNames: _*)
} else Outputs.Null
}
Expand All @@ -55,7 +55,7 @@ case class LargestPartition(
dataset: String,
ancestry: String
) {
def toLargestOutput: LargestOutput = LargestOutput(phenotype, ancestry)
def toLargestOutputs: Seq[LargestOutput] = Seq(LargestOutput(phenotype, ancestry), LargestOutput(phenotype, "TE"))
}

case class LargestOutput(
Expand Down

0 comments on commit e42dea5

Please sign in to comment.