Skip to content

Commit

Permalink
feat(backend): allow streaming released data in zstd compression #774
Browse files Browse the repository at this point in the history
  • Loading branch information
fengelniederhammer committed Apr 2, 2024
1 parent 0c34e72 commit 3699857
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 28 deletions.
1 change: 1 addition & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-datetime:0.5.0"
implementation "org.hibernate.validator:hibernate-validator:8.0.1.Final"
implementation "org.keycloak:keycloak-admin-client:23.0.7"
implementation 'com.github.luben:zstd-jni:1.5.5-11'

implementation "org.springframework.boot:spring-boot-starter-oauth2-resource-server"
implementation "org.springframework.boot:spring-boot-starter-security"
Expand Down
14 changes: 14 additions & 0 deletions backend/src/main/kotlin/org/loculus/backend/api/SubmissionTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.swagger.v3.oas.annotations.media.Schema
import org.loculus.backend.utils.Accession
import org.loculus.backend.utils.Version
import org.springframework.core.convert.converter.Converter
import org.springframework.stereotype.Component

data class Accessions(
val accessions: List<Accession>,
Expand Down Expand Up @@ -269,3 +271,15 @@ enum class SiloVersionStatus {
REVISED,
LATEST_VERSION,
}

enum class CompressionFormat(val compressionName: String) {
ZSTD("zstd"),
}

@Component
class CompressionFormatConverter : Converter<String, CompressionFormat> {
override fun convert(source: String): CompressionFormat {
return CompressionFormat.entries.firstOrNull { it.compressionName.equals(source, ignoreCase = true) }
?: throw IllegalArgumentException("Unknown compression: $source")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import jakarta.servlet.http.HttpServletRequest
import jakarta.validation.Valid
import jakarta.validation.constraints.Max
import mu.KotlinLogging
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream
import org.loculus.backend.api.AccessionVersion
import org.loculus.backend.api.AccessionVersionsFilterWithApprovalScope
import org.loculus.backend.api.AccessionVersionsFilterWithDeletionScope
import org.loculus.backend.api.Accessions
import org.loculus.backend.api.CompressionFormat
import org.loculus.backend.api.DataUseTerms
import org.loculus.backend.api.DataUseTermsType
import org.loculus.backend.api.GetSequenceResponse
Expand Down Expand Up @@ -202,13 +204,16 @@ class SubmissionController(
)
@GetMapping("/get-released-data", produces = [MediaType.APPLICATION_NDJSON_VALUE])
fun getReleasedData(
@PathVariable @Valid
organism: Organism,
@PathVariable @Valid organism: Organism,
@RequestParam compression: CompressionFormat?,
): ResponseEntity<StreamingResponseBody> {
val headers = HttpHeaders()
headers.contentType = MediaType.parseMediaType(MediaType.APPLICATION_NDJSON_VALUE)
if (compression != null) {
headers.add(HttpHeaders.CONTENT_ENCODING, compression.compressionName)
}

val streamBody = stream { releasedDataModel.getReleasedData(organism) }
val streamBody = stream(compression) { releasedDataModel.getReleasedData(organism) }

return ResponseEntity(streamBody, headers, HttpStatus.OK)
}
Expand Down Expand Up @@ -320,14 +325,22 @@ class SubmissionController(
body.scope,
)

private fun <T> stream(sequenceProvider: () -> Sequence<T>) = StreamingResponseBody { outputStream ->
try {
iteratorStreamer.streamAsNdjson(sequenceProvider(), outputStream)
} catch (e: Exception) {
log.error(e) { "An unexpected error occurred while streaming, aborting the stream: $e" }
outputStream.write(
"An unexpected error occurred while streaming, aborting the stream: ${e.message}".toByteArray(),
)
private fun <T> stream(compressionFormat: CompressionFormat? = null, sequenceProvider: () -> Sequence<T>) =
StreamingResponseBody { responseBodyStream ->
val outputStream = when (compressionFormat) {
CompressionFormat.ZSTD -> ZstdCompressorOutputStream(responseBodyStream)
null -> responseBodyStream
}

outputStream.use { stream ->
try {
iteratorStreamer.streamAsNdjson(sequenceProvider(), stream)
} catch (e: Exception) {
log.error(e) { "An unexpected error occurred while streaming, aborting the stream: $e" }
stream.write(
"An unexpected error occurred while streaming, aborting the stream: ${e.message}".toByteArray(),
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.IntNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.TextNode
import com.fasterxml.jackson.module.kotlin.readValue
import com.github.luben.zstd.ZstdInputStream
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.datetime.TimeZone
import kotlinx.datetime.toLocalDateTime
import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.hasSize
import org.hamcrest.Matchers.matchesPattern
import org.hamcrest.Matchers.not
import org.junit.jupiter.api.Test
import org.loculus.backend.api.GeneticSequence
import org.loculus.backend.api.ProcessedData
Expand All @@ -22,11 +26,19 @@ import org.loculus.backend.controller.EndpointTest
import org.loculus.backend.controller.expectForbiddenResponse
import org.loculus.backend.controller.expectNdjsonAndGetContent
import org.loculus.backend.controller.expectUnauthorizedResponse
import org.loculus.backend.controller.jacksonObjectMapper
import org.loculus.backend.controller.jwtForDefaultUser
import org.loculus.backend.controller.submission.SubmitFiles.DefaultFiles
import org.loculus.backend.controller.submission.SubmitFiles.DefaultFiles.NUMBER_OF_SEQUENCES
import org.loculus.backend.utils.Accession
import org.loculus.backend.utils.Version
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.test.web.servlet.result.MockMvcResultMatchers.content
import org.springframework.test.web.servlet.result.MockMvcResultMatchers.header
import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
import org.testcontainers.shaded.org.awaitility.Awaitility.await

private val ADDED_FIELDS_WITH_UNKNOWN_VALUES_FOR_RELEASE = listOf(
"releasedAt",
Expand Down Expand Up @@ -218,6 +230,34 @@ class GetReleasedDataEndpointTest(
assertThat(revocationEntry.aminoAcidInsertions, `is`(expectedAminoAcidInsertions))
}

@Test
fun `WHEN I request zstd compressed data THEN should return zstd compressed data`() {
convenienceClient.prepareDataTo(Status.APPROVED_FOR_RELEASE)

val response = submissionControllerClient.getReleasedData(compression = "zstd")
.andExpect(status().isOk)
.andExpect(content().contentType(MediaType.APPLICATION_NDJSON_VALUE))
.andExpect(header().string(HttpHeaders.CONTENT_ENCODING, "zstd"))
.andReturn()
.response
await().until {
response.isCommitted
}
val content = response.contentAsByteArray

val decompressedContent = ZstdInputStream(content.inputStream())
.apply { setContinuous(true) }
.readAllBytes()
.decodeToString()

val data = decompressedContent.lines()
.filter { it.isNotBlank() }
.map { jacksonObjectMapper.readValue<ProcessedData<GeneticSequence>>(it) }

assertThat(data, hasSize(NUMBER_OF_SEQUENCES))
assertThat(data[0].metadata, `is`(not(emptyMap())))
}

private fun prepareRevokedAndRevocationAndRevisedVersions(): PreparedVersions {
val preparedSubmissions = convenienceClient.prepareDataTo(Status.APPROVED_FOR_RELEASE)
convenienceClient.reviseAndProcessDefaultSequenceEntries(preparedSubmissions.map { it.accession })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,20 @@ class SubmissionControllerClient(private val mockMvc: MockMvc, private val objec
.withAuth(jwt),
)

fun getReleasedData(organism: String = DEFAULT_ORGANISM, jwt: String? = jwtForGetReleasedData): ResultActions =
mockMvc.perform(
get(addOrganismToPath("/get-released-data", organism = organism))
.withAuth(jwt),
)
fun getReleasedData(
organism: String = DEFAULT_ORGANISM,
jwt: String? = jwtForGetReleasedData,
compression: String? = null,
): ResultActions = mockMvc.perform(
get(addOrganismToPath("/get-released-data", organism = organism))
.also {
when (compression) {
null -> it
else -> it.param("compression", compression)
}
}
.withAuth(jwt),
)

fun deleteSequenceEntries(
scope: DeleteSequenceScope,
Expand Down
23 changes: 13 additions & 10 deletions kubernetes/loculus/silo_import_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ new_input_data_dir="$input_data_dir/$current_timestamp"

old_input_data_dir="$input_data_dir"/$(ls -1 "$input_data_dir" | sort -n | grep -E '^[0-9]+$' | tail -n 1)

new_input_data="$new_input_data_dir/data.ndjson"
old_input_data="$old_input_data_dir/data.ndjson"
silo_input_data="$input_data_dir/data.ndjson"
new_input_data="$new_input_data_dir/data.ndjson.zst"
old_input_data="$old_input_data_dir/data.ndjson.zst"
silo_input_data="$input_data_dir/data.ndjson.zst"

get_token() {
if [ -z "$KEYCLOAK_TOKEN_URL" ]; then
Expand Down Expand Up @@ -57,7 +57,7 @@ download_data() {
mkdir -p "$new_input_data_dir"
echo "created $new_input_data_dir"

released_data_endpoint="$BACKEND_BASE_URL/get-released-data"
released_data_endpoint="$BACKEND_BASE_URL/get-released-data?compression=zstd"
echo "calling $released_data_endpoint"

set +e
Expand All @@ -71,7 +71,8 @@ download_data() {
exit $exit_code
fi

echo "downloaded $(wc -l < "$new_input_data") sequences"
echo "downloaded sequences"
ls -l "$new_input_data_dir"
echo

echo "checking for old input data dir $old_input_data_dir"
Expand All @@ -97,13 +98,15 @@ download_data() {

preprocessing() {
# TODO: #1489 Remove emptiness test once https://github.com/GenSpectrum/LAPIS-SILO/issues/244 fixed
if [ -s "$new_input_data_dir" ]; then
echo "data.ndjson is not empty, starting preprocessing"
rough_size_of_empty_zstd_file="12"
size_of_data=$(stat -c %s "$new_input_data")
if [ "$size_of_data" -gt "$rough_size_of_empty_zstd_file" ]; then
echo "data.ndjson.zst is not empty ($size_of_data bytes), starting preprocessing"

rm -f "$silo_input_data"

# This is necessary because silo preprocessing expects the input data to be in a specific magic location
# At /preprocessing/input/data.ndjson
# This is necessary because the silo preprocessing is configured to expect the input data
# at /preprocessing/input/data.ndjson.zst
cp "$new_input_data" "$silo_input_data"

set +e
Expand All @@ -119,7 +122,7 @@ preprocessing() {

echo "preprocessing for $current_timestamp done"
else
echo "empty data.ndjson, deleting all input"
echo "empty data.ndjson.zst ($size_of_data bytes), deleting all input"
delete_all_input

fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ data:
{{- end }}

preprocessing_config.yaml: |
ndjsonInputFilename: data.ndjson
ndjsonInputFilename: data.ndjson.zst
pangoLineageDefinitionFilename: pangolineage_alias.json
referenceGenomeFilename: reference_genomes.json
Expand Down

0 comments on commit 3699857

Please sign in to comment.