Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform/angela/14116/read hl7 #16859

Merged
merged 26 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2dc6a1e
[WIP] Changed HL7Reader.getMessages to use the same logic as FHIRConv…
adegolier Dec 6, 2024
d91b689
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 10, 2024
de360f6
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 11, 2024
7edf98e
[WIP] tests still broken, I think this is touching CP stuff, do we ca…
adegolier Dec 12, 2024
78807d0
[WIP] tests still failing, but fixed the SubmissionReceiver tests by …
adegolier Dec 13, 2024
27bc350
[WIP] Still working on tests. Fixed HL7AckUtilsTest and moved isBatch…
adegolier Dec 16, 2024
e074084
[WIP] Tests still broken, fixed FHIRBundleHelpersTests by using defau…
adegolier Dec 17, 2024
37b41b7
[WIP] Fixed HL7Reader tests and marked a couple things in HL7Reader a…
adegolier Dec 19, 2024
68640e8
[WIP] Added error handling to HL7MessageHelpers for batching files
adegolier Dec 19, 2024
67c8df4
Dealing with merge conflicts
adegolier Dec 19, 2024
cc2b64c
More effort to deal with merge conflicts
adegolier Dec 19, 2024
ad37042
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 19, 2024
0360fba
One more little leftover from merge conflicts
adegolier Dec 19, 2024
7bb94b1
Fixed some more references to getMessages() that came in from main an…
adegolier Dec 19, 2024
955f3f8
Minor fix to HL7MessageHelpers
adegolier Dec 20, 2024
97600ba
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 20, 2024
0e5b19d
added requested helper function
adegolier Dec 23, 2024
fc64067
removed unnecessary test
adegolier Dec 23, 2024
306d985
Removed MessageType validation from SubmissionReceiver
adegolier Jan 6, 2025
f36a90a
Removed HL7MessageParseAndConvertConfiguration
adegolier Jan 6, 2025
08357af
Modified ProcessFhirCommands to remove things from HL7Reader that wer…
adegolier Jan 7, 2025
f64fe7e
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
44c93c8
removed deprecation mark from ProcessFhirCommands
adegolier Jan 7, 2025
7d6422f
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
f511958
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
e44d58d
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions prime-router/src/main/kotlin/SettingsProvider.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonValue
import gov.cdc.prime.router.CustomerStatus.ACTIVE
import gov.cdc.prime.router.CustomerStatus.INACTIVE
import gov.cdc.prime.router.CustomerStatus.TESTING
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.validation.IItemValidator
import gov.cdc.prime.router.validation.MarsOtcElrOnboardingValidator
import gov.cdc.prime.router.validation.MarsOtcElrValidator
Expand Down Expand Up @@ -55,7 +54,6 @@ enum class Topic(
val isUniversalPipeline: Boolean = true,
val isSendOriginal: Boolean = false,
val validator: IItemValidator = NoopItemValidator(),
val hl7ParseConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration? = null,
) {
FULL_ELR("full-elr", true, false),
ETOR_TI("etor-ti", true, false),
Expand Down
17 changes: 7 additions & 10 deletions prime-router/src/main/kotlin/SubmissionReceiver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import gov.cdc.prime.router.azure.ReportWriter
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.fhirengine.engine.FhirConvertQueueMessage
import gov.cdc.prime.router.fhirengine.engine.MessageType
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7MessageHelpers
import gov.cdc.prime.router.fhirengine.utils.HL7Reader

/**
Expand Down Expand Up @@ -268,14 +268,14 @@ class UniversalPipelineReceiver : SubmissionReceiver {

when (sender.format) {
MimeFormat.HL7 -> {
val messages = HL7Reader(actionLogs).getMessages(content)
val isBatch = HL7Reader(actionLogs).isBatch(content, messages.size)
// create a Report for this incoming HL7 message to use for tracking in the database
val messageCount = HL7MessageHelpers.messageCount(content)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file look good. However, Long-term, we don't want to be doing this type of validation in the receive step (the submission microservice doesn't, see submissions/src/main/kotlin/gov/cdc/prime/reportstream/submissions/controllers/SubmissionController.kt). I think its fine to leave this as is for now and if we find we are not as close to moving to the microservice as we think we are we can prioritize tickets to align this function with the microservice.

val isBatch = HL7Reader.isBatch(content, messageCount)

// create a Report for this incoming HL7 message to use for tracking in the database
report = Report(
if (isBatch) MimeFormat.HL7_BATCH else MimeFormat.HL7,
sources,
messages.size,
messageCount,
metadata = metadata,
nextAction = TaskAction.convert,
topic = sender.topic,
Expand All @@ -290,11 +290,8 @@ class UniversalPipelineReceiver : SubmissionReceiver {
// actionLogs
// )
// }

// check for valid message type
messages.forEachIndexed {
idx, element ->
MessageType.validateMessageType(element, actionLogs, idx + 1)
if (messageCount == 0 && !actionLogs.hasErrors()) {
actionLogs.error(InvalidReportMessage("Unable to find HL7 messages in provided data."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Sender
import gov.cdc.prime.router.azure.HttpUtilities
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.HL7MessageHelpers
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -106,12 +106,11 @@ class SubmissionResponseBuilder(
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
val hl7Reader = HL7Reader(ActionLogger())
val messages = hl7Reader.getMessages(requestBody)
val isBatch = hl7Reader.isBatch(requestBody, messages.size)
val messageCount = HL7MessageHelpers.messageCount(requestBody)
val isBatch = HL7Reader.isBatch(requestBody, messageCount)

if (!isBatch && messages.size == 1) {
val message = messages.first()
if (!isBatch && messageCount == 1) {
val message = HL7Reader.parseHL7Message(requestBody)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be careful here. parseHL7Message can return exceptions, so what does that mean for the submission api in general? If there is a failure here, it will except out of the main processRequest function and an HTTP error will be returned in from the caller submitToWaters. The DB state will be updated I think by this point, but the queueMessage for the convert step will not be populated.

I poked around a bit and I think this is fine. This submission method is slated to be deprecated and we should take this issue into consideration when implementing this HL7v2 ACK functionality into the submission microservice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the DevNotes section of this ticket linking to this comment: #15731

Copy link
Collaborator Author

@adegolier adegolier Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be okay because we're checking the message count with the iterator and it would have gotten angry there and returned with 0 messages if it was bad HL7.

val acceptAcknowledgementType = HL7Reader.getAcceptAcknowledgmentType(message)
val ackResponseRequired = acceptAcknowledgmentTypeRespondValues.contains(acceptAcknowledgementType)
if (ackResponseRequired) {
Expand Down
19 changes: 7 additions & 12 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ class ProcessFhirCommands : CliktCommand(
(isCli && outputFormat == MimeFormat.HL7.toString()) ||
(
receiver != null &&
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
) -> {
val (bundle2, inputMessage) = convertHl7ToFhir(contents, receiver)

Expand Down Expand Up @@ -297,7 +297,7 @@ class ProcessFhirCommands : CliktCommand(
}
}

private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (receiver != null && messageOrBundle.bundle != null) {
val reportStreamFilters = mutableListOf<Pair<String, ReportStreamFilter>>()
reportStreamFilters.add(Pair("Jurisdictional Filter", receiver.jurisdictionalFilter))
Expand Down Expand Up @@ -516,16 +516,15 @@ class ProcessFhirCommands : CliktCommand(
* look like.
* @return a FHIR bundle and the parsed HL7 input that represents the data in the one HL7 message
*/
@Suppress("DEPRECATION")
adegolier marked this conversation as resolved.
Show resolved Hide resolved
private fun convertHl7ToFhir(hl7String: String, receiver: Receiver?): Pair<Bundle, Message> {
val hasFiveEncodingChars = hl7MessageHasFiveEncodingChars(hl7String)
// Some HL7 2.5.1 implementations have adopted the truncation character # that was added in 2.7
// However, the library used to encode the HL7 message throws an error it there are more than 4 encoding
// characters, so this work around exists for that scenario
val stringToEncode = hl7String.replace("MSH|^~\\&#|", "MSH|^~\\&|")
val hl7message = HL7Reader.parseHL7Message(
stringToEncode,
null
)
val hl7message = HL7Reader.parseHL7Message(stringToEncode)

// if a hl7 parsing failure happens, throw error and show the message
if (hl7message.toString().lowercase().contains("failed")) {
throw CliktError("HL7 parser failure. $hl7message")
Expand All @@ -534,12 +533,8 @@ class ProcessFhirCommands : CliktCommand(
val msh = hl7message.get("MSH") as Segment
Terser.set(msh, 2, 0, 1, 1, "^~\\&#")
}
val hl7profile = HL7Reader.getMessageProfile(hl7message.toString())
// search hl7 profile map and create translator with config path if found
var fhirMessage = when (val configPath = HL7Reader.profileDirectoryMap[hl7profile]) {
null -> HL7toFhirTranslator(inputSchema).translate(hl7message)
else -> HL7toFhirTranslator(configPath).translate(hl7message)
}
var fhirMessage = HL7toFhirTranslator(inputSchema).translate(hl7message)

val stamper = ConditionStamper(LookupTableConditionMapper(Metadata.getInstance()))
fhirMessage.getObservations().forEach { observation ->
Expand Down
13 changes: 9 additions & 4 deletions prime-router/src/main/kotlin/cli/ProcessHl7Commands.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package gov.cdc.prime.router.cli

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.CliktError
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.file
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.fhirengine.utils.HL7Reader

Expand Down Expand Up @@ -45,9 +45,14 @@ class ProcessHl7Commands : CliktCommand(
val comparisonFile = comparisonFile.inputStream().readBytes().toString(Charsets.UTF_8)
if (comparisonFile.isBlank()) throw CliktError("File ${this.comparisonFile.absolutePath} is empty.")

val actionLogger = ActionLogger()
val starterMessages = HL7Reader(actionLogger).getMessages(starterFile)
val comparisonMessages = HL7Reader(actionLogger).getMessages(comparisonFile)
val starterMessages = Hl7InputStreamMessageStringIterator(starterFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem)
}.toList()
val comparisonMessages = Hl7InputStreamMessageStringIterator(comparisonFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem)
}.toList()

starterMessages.forEachIndexed { counter, message ->
val differences = hl7DiffHelper.diffHl7(message, comparisonMessages[counter])
Expand Down
39 changes: 13 additions & 26 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.fhirengine.utils.HL7Reader.Companion.parseHL7Message
import gov.cdc.prime.router.fhirengine.utils.getObservations
import gov.cdc.prime.router.fhirengine.utils.getRSMessageType
import gov.cdc.prime.router.fhirengine.utils.isElr
import gov.cdc.prime.router.logging.LogMeasuredTime
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.validation.IItemValidator
Expand Down Expand Up @@ -397,7 +395,7 @@ class FHIRConverter(
)
}

FHIREngineRunResult(
FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
Expand Down Expand Up @@ -427,7 +425,7 @@ class FHIRConverter(
report,
TaskAction.convert,
"Submitted report was either empty or could not be parsed into HL7"
) {
) {
parentReportId(input.reportId)
params(
mapOf(
Expand Down Expand Up @@ -479,7 +477,7 @@ class FHIRConverter(
"format" to format.name
)
) {
getBundlesFromRawHL7(rawReport, validator, input.topic.hl7ParseConfiguration)
getBundlesFromRawHL7(rawReport, validator)
}
} catch (ex: ParseFailureError) {
actionLogger.error(
Expand Down Expand Up @@ -571,7 +569,6 @@ class FHIRConverter(
private fun getBundlesFromRawHL7(
rawReport: String,
validator: IItemValidator,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
): List<IProcessedItem<Message>> {
val itemStream =
Hl7InputStreamMessageStringIterator(rawReport.byteInputStream()).asSequence()
Expand All @@ -580,17 +577,16 @@ class FHIRConverter(
}.toList()

return maybeParallelize(itemStream.size, itemStream.stream(), "Generating FHIR bundles in").map { item ->
parseHL7Item(item, hL7MessageParseAndConvertConfiguration)
parseHL7Item(item)
}.map { item ->
validateAndConvertHL7Item(item, validator, hL7MessageParseAndConvertConfiguration)
validateAndConvertHL7Item(item, validator)
}.collect(Collectors.toList())
}

private fun parseHL7Item(
item: ProcessedHL7Item,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
) = try {
val message = parseHL7Message(item.rawItem, hL7MessageParseAndConvertConfiguration)
val message = parseHL7Message(item.rawItem)
item.updateParsed(message)
} catch (e: HL7Exception) {
item.updateParsed(
Expand All @@ -605,20 +601,11 @@ class FHIRConverter(
private fun validateAndConvertHL7Item(
item: ProcessedHL7Item,
validator: IItemValidator,
hL7MessageParseAndConvertConfiguration: HL7Reader.Companion.HL7MessageParseAndConvertConfiguration?,
): ProcessedHL7Item = if (item.parsedItem != null) {
val validationResult = validator.validate(item.parsedItem)
if (validationResult.isValid()) {
try {
val bundle = when (hL7MessageParseAndConvertConfiguration) {
null -> HL7toFhirTranslator.getHL7ToFhirTranslatorInstance().translate(item.parsedItem)
else ->
HL7toFhirTranslator
.getHL7ToFhirTranslatorInstance(
hL7MessageParseAndConvertConfiguration.hl7toFHIRMappingLocation
)
.translate(item.parsedItem)
}
val bundle = HL7toFhirTranslator.getHL7ToFhirTranslatorInstance().translate(item.parsedItem)
item.setBundle(bundle)
} catch (ex: Exception) {
item.setConversionError(
Expand Down Expand Up @@ -759,13 +746,13 @@ class FHIRConverter(
* transformer in tests.
*/
fun getTransformerFromSchema(schemaName: String): FhirTransformer? = if (schemaName.isNotBlank()) {
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package gov.cdc.prime.router.fhirengine.translation

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.azure.storage.blob.models.BlobItem
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
Expand Down Expand Up @@ -41,7 +41,6 @@ class TranslationSchemaManager : Logging {
Regex("/$previousValidBlobName-$timestampRegex")
private val previousPreviousValidBlobNameRegex =
Regex("/$previousPreviousValidBlobName-$timestampRegex")
private val hL7Reader = HL7Reader(ActionLogger())

/**
* Container class that holds the current state for a schema type in a particular azure store.
Expand Down Expand Up @@ -440,7 +439,11 @@ class TranslationSchemaManager : Logging {
)
).validate(
inputBundle,
hL7Reader.getMessages(rawValidationInput.output)[0]
HL7Reader.parseHL7Message(
Hl7InputStreamMessageStringIterator(rawValidationInput.output.byteInputStream())
.asSequence()
.first()
),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gov.cdc.prime.router.fhirengine.utils

import ca.uhn.hl7v2.AbstractHL7Exception
import ca.uhn.hl7v2.model.v251.datatype.DTM
import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import ca.uhn.hl7v2.util.Terser
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
Expand All @@ -24,6 +26,8 @@ object HL7MessageHelpers : Logging {
*/
const val hl7SegmentDelimiter = "\r"

val actionLogger = ActionLogger()

/**
* Generate a HL7 Batch file from the list of [hl7RawMsgs] for the given [receiver]. The [hl7RawMsgs] are expected
* to be real HL7 messages at this point, so we will not validate their contents here for performance reasons.
Expand All @@ -34,12 +38,17 @@ object HL7MessageHelpers : Logging {
val useBatchHeaders = receiver.translation.useBatchHeaders
// Grab the first message to extract some data if not set in the settings
val firstMessage = if (hl7RawMsgs.isNotEmpty()) {
val messages = HL7Reader(ActionLogger()).getMessages(hl7RawMsgs[0])
if (messages.isEmpty()) {
try {
val message = HL7Reader.parseHL7Message(hl7RawMsgs[0])
Terser(message)
} catch (exception: Hl7InputStreamMessageStringIterator.ParseFailureError) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.logHL7ParseFailure(exception, actionLogger)
null
} catch (exception: AbstractHL7Exception) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.recordError(exception, actionLogger)
null
} else {
Terser(messages[0])
}
} else {
null
Expand Down Expand Up @@ -94,4 +103,8 @@ object HL7MessageHelpers : Logging {

return builder.toString()
}

fun messageCount(rawHl7: String): Int {
jack-h-wang marked this conversation as resolved.
Show resolved Hide resolved
return Hl7InputStreamMessageStringIterator(rawHl7.byteInputStream()).asSequence().count()
}
}
Loading
Loading