From fe169574b1abe75f7b6baca71ba685b587c57a30 Mon Sep 17 00:00:00 2001 From: smednick Date: Tue, 30 Jan 2024 11:15:14 -0800 Subject: [PATCH] added whitelist filter to kafka message stream --- .../consumer/ELRRMessageListener.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/deloitte/elrr/elrrconsolidate/consumer/ELRRMessageListener.java b/src/main/java/com/deloitte/elrr/elrrconsolidate/consumer/ELRRMessageListener.java index 708e7ba..4a2651c 100644 --- a/src/main/java/com/deloitte/elrr/elrrconsolidate/consumer/ELRRMessageListener.java +++ b/src/main/java/com/deloitte/elrr/elrrconsolidate/consumer/ELRRMessageListener.java @@ -4,6 +4,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; +import com.deloitte.elrr.elrrconsolidate.InputSanatizer; import com.deloitte.elrr.elrrconsolidate.dto.LearnerChange; import com.deloitte.elrr.elrrconsolidate.dto.MessageVO; import com.deloitte.elrr.elrrconsolidate.entity.ELRRAuditLog; @@ -36,9 +37,14 @@ public class ELRRMessageListener { */ @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}") public void listen(final String message) { - log.info("Received Messasge in group - group-id: " + message); - LearnerChange learnerChange = getLearnerChange(message); - messageService.process(learnerChange); + if (InputSanatizer.isValidInput(message)) { + log.info("Received Messasge in group - group-id: " + message); + LearnerChange learnerChange = getLearnerChange(message); + messageService.process(learnerChange); + } else { + log.warn("Invalid message did not pass whitelist check - " + + message); + } } /**