Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform/angela/14116/read hl7 #16859

Merged
merged 26 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2dc6a1e
[WIP] Changed HL7Reader.getMessages to use the same logic as FHIRConv…
adegolier Dec 6, 2024
d91b689
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 10, 2024
de360f6
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 11, 2024
7edf98e
[WIP] tests still broken, I think this is touching CP stuff, do we ca…
adegolier Dec 12, 2024
78807d0
[WIP] tests still failing, but fixed the SubmissionReceiver tests by …
adegolier Dec 13, 2024
27bc350
[WIP] Still working on tests. Fixed HL7AckUtilsTest and moved isBatch…
adegolier Dec 16, 2024
e074084
[WIP] Tests still broken, fixed FHIRBundleHelpersTests by using defau…
adegolier Dec 17, 2024
37b41b7
[WIP] Fixed HL7Reader tests and marked a couple things in HL7Reader a…
adegolier Dec 19, 2024
68640e8
[WIP] Added error handling to HL7MessageHelpers for batching files
adegolier Dec 19, 2024
67c8df4
Dealing with merge conflicts
adegolier Dec 19, 2024
cc2b64c
More effort to deal with merge conflicts
adegolier Dec 19, 2024
ad37042
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 19, 2024
0360fba
One more little leftover from merge conflicts
adegolier Dec 19, 2024
7bb94b1
Fixed some more references to getMessages() that came in from main an…
adegolier Dec 19, 2024
955f3f8
Minor fix to HL7MessageHelpers
adegolier Dec 20, 2024
97600ba
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Dec 20, 2024
0e5b19d
added requested helper function
adegolier Dec 23, 2024
fc64067
removed unnecessary test
adegolier Dec 23, 2024
306d985
Removed MessageType validation from SubmissionReceiver
adegolier Jan 6, 2025
f36a90a
Removed HL7MessageParseAndConvertConfiguration
adegolier Jan 6, 2025
08357af
Modified ProcessFhirCommands to remove things from HL7Reader that wer…
adegolier Jan 7, 2025
f64fe7e
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
44c93c8
removed deprecation mark from ProcessFhirCommands
adegolier Jan 7, 2025
7d6422f
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
f511958
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 7, 2025
e44d58d
Merge branch 'main' into platform/angela/14116/ReadHL7
adegolier Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions prime-router/src/main/kotlin/SubmissionReceiver.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cdc.prime.router

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.router.azure.ActionHistory
Expand Down Expand Up @@ -268,14 +269,14 @@ class UniversalPipelineReceiver : SubmissionReceiver {

when (sender.format) {
MimeFormat.HL7 -> {
val messages = HL7Reader(actionLogs).getMessages(content)
val isBatch = HL7Reader(actionLogs).isBatch(content, messages.size)
// create a Report for this incoming HL7 message to use for tracking in the database
val messageCount = Hl7InputStreamMessageStringIterator(content.byteInputStream()).asSequence().count()
JFisk42 marked this conversation as resolved.
Show resolved Hide resolved
val isBatch = HL7Reader.isBatch(content, messageCount)

// create a Report for this incoming HL7 message to use for tracking in the database
report = Report(
if (isBatch) MimeFormat.HL7_BATCH else MimeFormat.HL7,
sources,
messages.size,
messageCount,
metadata = metadata,
nextAction = TaskAction.convert,
topic = sender.topic,
Expand All @@ -290,12 +291,14 @@ class UniversalPipelineReceiver : SubmissionReceiver {
// actionLogs
// )
// }

// check for valid message type
messages.forEachIndexed {
idx, element ->
MessageType.validateMessageType(element, actionLogs, idx + 1)
if (messageCount == 0 && !actionLogs.hasErrors()) {
actionLogs.error(InvalidReportMessage("Unable to find HL7 messages in provided data."))
}
// check for valid message type
Hl7InputStreamMessageStringIterator(content.byteInputStream()).asSequence()
adegolier marked this conversation as resolved.
Show resolved Hide resolved
.forEachIndexed { index, rawItem ->
MessageType.validateMessageType(HL7Reader.parseHL7Message(rawItem, null), actionLogs, index + 1)
JFisk42 marked this conversation as resolved.
Show resolved Hide resolved
}
}

MimeFormat.FHIR -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package gov.cdc.prime.router.azure.service

import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Sender
import gov.cdc.prime.router.azure.HttpUtilities
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
Expand Down Expand Up @@ -106,12 +106,11 @@ class SubmissionResponseBuilder(
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
val hl7Reader = HL7Reader(ActionLogger())
val messages = hl7Reader.getMessages(requestBody)
val isBatch = hl7Reader.isBatch(requestBody, messages.size)
val messageCount = Hl7InputStreamMessageStringIterator(requestBody.byteInputStream()).asSequence().count()
val isBatch = HL7Reader.isBatch(requestBody, messageCount)

if (!isBatch && messages.size == 1) {
val message = messages.first()
if (!isBatch && messageCount == 1) {
val message = HL7Reader.parseHL7Message(requestBody, null)
JFisk42 marked this conversation as resolved.
Show resolved Hide resolved
val acceptAcknowledgementType = HL7Reader.getAcceptAcknowledgmentType(message)
val ackResponseRequired = acceptAcknowledgmentTypeRespondValues.contains(acceptAcknowledgementType)
if (ackResponseRequired) {
Expand Down
7 changes: 4 additions & 3 deletions prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ class ProcessFhirCommands : CliktCommand(
(isCli && outputFormat == MimeFormat.HL7.toString()) ||
(
receiver != null &&
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
(receiver.format == MimeFormat.HL7 || receiver.format == MimeFormat.HL7_BATCH)
)
) -> {
val (bundle2, inputMessage) = convertHl7ToFhir(contents, receiver)

Expand Down Expand Up @@ -297,7 +297,7 @@ class ProcessFhirCommands : CliktCommand(
}
}

private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
private fun evaluateReceiverFilters(receiver: Receiver?, messageOrBundle: MessageOrBundle, isCli: Boolean) {
if (receiver != null && messageOrBundle.bundle != null) {
val reportStreamFilters = mutableListOf<Pair<String, ReportStreamFilter>>()
reportStreamFilters.add(Pair("Jurisdictional Filter", receiver.jurisdictionalFilter))
Expand Down Expand Up @@ -516,6 +516,7 @@ class ProcessFhirCommands : CliktCommand(
* look like.
* @return a FHIR bundle and the parsed HL7 input that represents the data in the one HL7 message
*/
@Suppress("DEPRECATION")
adegolier marked this conversation as resolved.
Show resolved Hide resolved
private fun convertHl7ToFhir(hl7String: String, receiver: Receiver?): Pair<Bundle, Message> {
val hasFiveEncodingChars = hl7MessageHasFiveEncodingChars(hl7String)
// Some HL7 2.5.1 implementations have adopted the truncation character # that was added in 2.7
Expand Down
13 changes: 9 additions & 4 deletions prime-router/src/main/kotlin/cli/ProcessHl7Commands.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package gov.cdc.prime.router.cli

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.CliktError
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.file
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.fhirengine.utils.HL7Reader

Expand Down Expand Up @@ -45,9 +45,14 @@ class ProcessHl7Commands : CliktCommand(
val comparisonFile = comparisonFile.inputStream().readBytes().toString(Charsets.UTF_8)
if (comparisonFile.isBlank()) throw CliktError("File ${this.comparisonFile.absolutePath} is empty.")

val actionLogger = ActionLogger()
val starterMessages = HL7Reader(actionLogger).getMessages(starterFile)
val comparisonMessages = HL7Reader(actionLogger).getMessages(comparisonFile)
val starterMessages = Hl7InputStreamMessageStringIterator(starterFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem, null)
}.toList()
val comparisonMessages = Hl7InputStreamMessageStringIterator(comparisonFile.byteInputStream()).asSequence()
.map { rawItem ->
HL7Reader.parseHL7Message(rawItem, null)
}.toList()

starterMessages.forEachIndexed { counter, message ->
val differences = hl7DiffHelper.diffHl7(message, comparisonMessages[counter])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ class FHIRConverter(
)
}

FHIREngineRunResult(
FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
Expand Down Expand Up @@ -427,7 +427,7 @@ class FHIRConverter(
report,
TaskAction.convert,
"Submitted report was either empty or could not be parsed into HL7"
) {
) {
parentReportId(input.reportId)
params(
mapOf(
Expand Down Expand Up @@ -759,13 +759,13 @@ class FHIRConverter(
* transformer in tests.
*/
fun getTransformerFromSchema(schemaName: String): FhirTransformer? = if (schemaName.isNotBlank()) {
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package gov.cdc.prime.router.fhirengine.translation

import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import com.azure.storage.blob.models.BlobItem
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
Expand Down Expand Up @@ -41,7 +41,6 @@ class TranslationSchemaManager : Logging {
Regex("/$previousValidBlobName-$timestampRegex")
private val previousPreviousValidBlobNameRegex =
Regex("/$previousPreviousValidBlobName-$timestampRegex")
private val hL7Reader = HL7Reader(ActionLogger())

/**
* Container class that holds the current state for a schema type in a particular azure store.
Expand Down Expand Up @@ -440,7 +439,12 @@ class TranslationSchemaManager : Logging {
)
).validate(
inputBundle,
hL7Reader.getMessages(rawValidationInput.output)[0]
HL7Reader.parseHL7Message(
Hl7InputStreamMessageStringIterator(rawValidationInput.output.byteInputStream())
.asSequence()
.first(),
null
),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gov.cdc.prime.router.fhirengine.utils

import ca.uhn.hl7v2.AbstractHL7Exception
import ca.uhn.hl7v2.model.v251.datatype.DTM
import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import ca.uhn.hl7v2.util.Terser
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Hl7Configuration
Expand All @@ -24,6 +26,8 @@ object HL7MessageHelpers : Logging {
*/
const val hl7SegmentDelimiter = "\r"

val actionLogger = ActionLogger()

/**
* Generate a HL7 Batch file from the list of [hl7RawMsgs] for the given [receiver]. The [hl7RawMsgs] are expected
* to be real HL7 messages at this point, so we will not validate their contents here for performance reasons.
Expand All @@ -34,12 +38,17 @@ object HL7MessageHelpers : Logging {
val useBatchHeaders = receiver.translation.useBatchHeaders
// Grab the first message to extract some data if not set in the settings
val firstMessage = if (hl7RawMsgs.isNotEmpty()) {
val messages = HL7Reader(ActionLogger()).getMessages(hl7RawMsgs[0])
if (messages.isEmpty()) {
try {
val message = HL7Reader.parseHL7Message(hl7RawMsgs[0], null)
Terser(message)
} catch (exception: Hl7InputStreamMessageStringIterator.ParseFailureError) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.logHL7ParseFailure(exception, actionLogger)
null
} catch (exception: AbstractHL7Exception) {
logger.warn("Unable to extract batch header values from HL7: ${hl7RawMsgs[0].take(80)} ...")
HL7Reader.recordError(exception, actionLogger)
null
} else {
Terser(messages[0])
}
} else {
null
Expand Down
Loading
Loading