Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect all pango lineage values and output them #7

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/main/kotlin/org/genspectrum/ingest/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.genspectrum.ingest
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.subcommands
import com.github.ajalt.clikt.parameters.arguments.argument
import org.genspectrum.ingest.file.AllPangoLineagesFile
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
Expand Down Expand Up @@ -33,18 +34,27 @@ class SC2GisaidIngestCommand : CliktCommand(name = "ingest-sc2-gisaid") {
private val geoLocationRulesFile by argument("geo-location-rules")

override fun run() {
val previousProcessedVersionDir = Path(workdirPath)
.resolve("00_archive")
.resolve(previousProcessedVersion)
runSC2GisaidWorkflow(
Path(workdirPath),
url, user, password,
File(
workdir = Path(workdirPath),
url = url,
user = user,
password = password,
previousProcessed = File(
"provision.$previousProcessedVersion",
Path(workdirPath).resolve("00_archive"),
previousProcessedVersionDir,
false,
FileType.NDJSON,
Compression.ZSTD
),
Path(workdirPath).resolve("00_archive/provision.$previousProcessedVersion.hashes.ndjson.zst"),
Path(geoLocationRulesFile)
previousAllPangoLineagesFile = AllPangoLineagesFile(
previousProcessedVersion,
previousProcessedVersionDir,
),
previousHashes = previousProcessedVersionDir.resolve("provision.$previousProcessedVersion.hashes.ndjson.zst"),
geoLocationRulesFile = Path(geoLocationRulesFile)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.genspectrum.ingest.entry


/**
* This function cleans "non-pango lineages" from pango lineages columns
*/
fun MutableEntry.mapPangoLineageToNull(
keys: Collection<String>,
nullValues: Set<String> = setOf("Unassigned", "unclassifiable")
) {
for (key in keys) {
if (nullValues.contains(this.metadata[key])) {
this.metadata[key] = null
}
}
}
38 changes: 38 additions & 0 deletions src/main/kotlin/org/genspectrum/ingest/file/File.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.genspectrum.ingest.file

import org.genspectrum.ingest.util.readFile
import org.genspectrum.ingest.util.writeFile
import java.nio.file.Path

data class File(
Expand Down Expand Up @@ -38,3 +40,39 @@ enum class Compression {
ZSTD,
XZ
}

class AllPangoLineagesFile(
dataVersion: String? = null,
directory: Path
) {
val file = File(
name = when (dataVersion) {
null -> "allPangoLineages"
else -> "allPangoLineages.$dataVersion"
},
directory = directory,
sorted = false,
type = FileType.TSV,
compression = Compression.NONE
)

val path
get() = file.path

fun read(): Set<String> {
return readFile(file.path)
.bufferedReader()
.use { it.readText() }
.lines()
.toSet()
}

fun write(allPangoLineages: Collection<String>) {
writeFile(file.path).bufferedWriter().use {
for (pangoLineage in allPangoLineages) {
it.write(pangoLineage)
it.newLine()
}
}
}
}
29 changes: 24 additions & 5 deletions src/main/kotlin/org/genspectrum/ingest/proc/JoinSC2GisaidData.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.genspectrum.ingest.proc
import com.alibaba.fastjson2.to
import org.genspectrum.ingest.AlignedGenome
import org.genspectrum.ingest.entry.*
import org.genspectrum.ingest.file.AllPangoLineagesFile
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
Expand All @@ -21,14 +22,16 @@ fun joinSC2GisaidData(
outputDirectory: Path,
outputName: String,
nextcladeDatasetVersion: String,
): File {
): Pair<File, AllPangoLineagesFile> {
val allFiles = listOf(provisionFile, nextcladeFile, alignedFile) + translationFiles.map { it.second }
require(allFiles.all { it.sorted && it.type == FileType.NDJSON })
val outputFile = File(outputName, outputDirectory, true, FileType.NDJSON, Compression.ZSTD)

val translationNames = translationFiles.map { it.first }
val inputStreams = allFiles.map { readFile(it.path) }

val allPangoLineages = HashSet<String>()

val joiner = SortedNdjsonFilesOuterJoiner("id", "seqName", inputStreams)
val writer = writeNdjson<Any>(writeFile(outputFile.path))
for ((_, values) in joiner) {
Expand Down Expand Up @@ -66,17 +69,32 @@ fun joinSC2GisaidData(
clean(provisionEntry)
provisionEntry.metadata["nextcladeDatasetVersion"] = nextcladeDatasetVersion

for (pangoLineageField in pangoLineageNames) {
val pangoLineage = provisionEntry.metadata[pangoLineageField]
if (pangoLineage is String) {
allPangoLineages.add(pangoLineage)
}
}

writer.write(provisionEntry)
}
writer.close()

return outputFile
val allPangoLineagesFile = AllPangoLineagesFile(directory = outputDirectory)
allPangoLineagesFile.write(allPangoLineages)

return outputFile to allPangoLineagesFile
}

private const val pangoLineage = "pangoLineage"
private const val nextcladePangoLineage = "nextcladePangoLineage"

private val pangoLineageNames = listOf(pangoLineage, nextcladePangoLineage)

private val oldToNewMetadataNames = listOf(
"clade_nextstrain" to "nextstrainClade",
"clade_who" to "whoClade",
"Nextclade_pango" to "nextcladePangoLineage",
"Nextclade_pango" to nextcladePangoLineage,
"qc.overallScore" to "nextcladeQcOverallScore",
"qc.missingData.score" to "nextcladeQcMissingDataScore",
"qc.mixedSites.score" to "nextcladeQcMixedSites",
Expand Down Expand Up @@ -111,8 +129,8 @@ private val selectedMetadata = setOf(
"age",
"sex",
"samplingStrategy",
"pangoLineage",
"nextcladePangoLineage",
pangoLineage,
nextcladePangoLineage,
"nextstrainClade",
"whoClade",
"gisaidClade",
Expand Down Expand Up @@ -162,6 +180,7 @@ private fun clean(entry: MutableEntry) {
parseIntegerFields.forEach { parseInteger(it, true) }
parseFloatFields.forEach { parseFloat(it) }
fillInMissingAlignedSequences(fillInMissingAlignedSequencesTemplate)
mapPangoLineageToNull(pangoLineageNames)

metadata["genbankAccession"] = null
metadata["sraAccession"] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.genspectrum.ingest.proc

import org.genspectrum.ingest.AlignedGenome
import org.genspectrum.ingest.entry.*
import org.genspectrum.ingest.file.AllPangoLineagesFile
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
Expand All @@ -21,7 +22,7 @@ fun joinSC2NextstrainOpenData(
outputDirectory: Path,
outputName: String,
outputCompression: Compression = Compression.ZSTD,
): File {
): Pair<File, AllPangoLineagesFile> {
val allInputFiles = listOf(sortedMetadataFile, sortedNextcladeFile, sortedSequencesFile, sortedAlignedFile) +
sortedTranslationFiles.map { it.second }
require(allInputFiles.all { it.sorted && it.type == FileType.NDJSON })
Expand All @@ -30,6 +31,8 @@ fun joinSC2NextstrainOpenData(
val translationNames = sortedTranslationFiles.map { it.first }
val inputStreams = allInputFiles.map { readFile(it.path) }

val allPangoLineages = HashSet<String>()

val joiner = SortedNdjsonFilesOuterJoiner("strain", "seqName", inputStreams)
val writer = writeNdjson<Any>(writeFile(outputFile.path))
for ((_, values) in joiner) {
Expand Down Expand Up @@ -72,12 +75,27 @@ fun joinSC2NextstrainOpenData(
if (joined.metadata["strain"] == null) {
continue
}
for (pangoLineageField in pangoLineageNames) {
val pangoLineage = joined.metadata[pangoLineageField]
if (pangoLineage is String) {
allPangoLineages.add(pangoLineage)
}
}
writer.write(joined)
}
writer.close()
return outputFile

val allPangoLineagesFile = AllPangoLineagesFile(directory = outputDirectory)
allPangoLineagesFile.write(allPangoLineages)

return outputFile to allPangoLineagesFile
}

private const val pangoLineage = "pangoLineage"
private const val nextcladePangoLineage = "nextcladePangoLineage"

private val pangoLineageNames = listOf(pangoLineage, nextcladePangoLineage)

private val oldToNewMetadataNames = listOf(
"gisaid_epi_isl" to "gisaidEpiIsl",
"genbank_accession" to "genbankAccession",
Expand All @@ -87,7 +105,7 @@ private val oldToNewMetadataNames = listOf(
"country_exposure" to "countryExposure",
"division_exposure" to "divisionExposure",
"Nextstrain_clade" to "nextstrainClade",
"pango_lineage" to "pangoLineage",
"pango_lineage" to pangoLineage,
"GISAID_clade" to "gisaidClade",
"originating_lab" to "originatingLab",
"submitting_lab" to "submittingLab",
Expand All @@ -96,7 +114,7 @@ private val oldToNewMetadataNames = listOf(
"sampling_strategy" to "samplingStrategy",
"clade_nextstrain" to "nextstrainClade",
"clade_who" to "whoClade",
"Nextclade_pango" to "nextcladePangoLineage",
"Nextclade_pango" to nextcladePangoLineage,
"immune_escape" to "immuneEscape",
"ace2_binding" to "ace2Binding",
"QC_overall_score" to "nextcladeQcOverallScore",
Expand Down Expand Up @@ -133,8 +151,8 @@ private val selectedMetadata = setOf(
"age",
"sex",
"samplingStrategy",
"pangoLineage",
"nextcladePangoLineage",
pangoLineage,
nextcladePangoLineage,
"nextstrainClade",
"whoClade",
"gisaidClade",
Expand Down Expand Up @@ -184,6 +202,7 @@ private fun clean(entry: MutableEntry) {
parseIntegerFields.forEach { parseInteger(it) }
parseFloatFields.forEach { parseFloat(it) }
fillInMissingAlignedSequences(fillInMissingAlignedSequencesTemplate)
mapPangoLineageToNull(pangoLineageNames)

metadata["died"] = null
metadata["fullyVaccinated"] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import org.genspectrum.ingest.entry.mapToNull
import org.genspectrum.ingest.file.Compression
import org.genspectrum.ingest.file.File
import org.genspectrum.ingest.file.FileType
import org.genspectrum.ingest.util.*
import org.genspectrum.ingest.util.GeoLocationMapper
import org.genspectrum.ingest.util.readFile
import org.genspectrum.ingest.util.readNdjson
import org.genspectrum.ingest.util.writeFile
import org.genspectrum.ingest.util.writeNdjson
import java.nio.file.Path

fun transformSC2GisaidBasics(
Expand Down Expand Up @@ -80,7 +84,7 @@ fun transformSC2GisaidBasics(
return TransformSC2GisaidBasicsResult(outputFile, hashOutputFile)
}

data class TransformSC2GisaidBasicsResult (
data class TransformSC2GisaidBasicsResult(
val dataFile: File,
val hashesFile: File
)
4 changes: 2 additions & 2 deletions src/main/kotlin/org/genspectrum/ingest/util/io.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ fun <T> writeNdjson(
}

return WriteNdjsonResponse(
fun (entry: T) {
write = fun (entry: T) {
queue.put(entry)
},
fun () {
close = fun () {
closed = true
writingThread.join()
}
Expand Down
Loading