diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt index c2643afc..ddc1390a 100644 --- a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/ChangeEventExtensions.kt @@ -202,6 +202,8 @@ class ChangeEventConverter() { } private fun schemaForKeys(keys: List>?): Schema { + val addedFields = mutableSetOf() + return SchemaBuilder.array( // We need to define a uniform structure of key array elements. Because all elements // must have identical structure, we list all available keys as optional fields. @@ -209,10 +211,11 @@ class ChangeEventConverter() { .apply { keys?.forEach { key -> key.forEach { - field( - it.key, - DynamicTypes.toConnectSchema( - it.value, optional = true, forceMapsAsStruct = true)) + if (addedFields.add(it.key)) { + field( + it.key, + toConnectSchema(it.value, optional = true, forceMapsAsStruct = true)) + } } } } diff --git a/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt b/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt index 52f8cca0..b7775b74 100644 --- a/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt +++ b/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt @@ -614,6 +614,46 @@ abstract class Neo4jCdcSourceNodesIT { } .verifyWithin(Duration.ofSeconds(30)) } + + @Neo4jSource( + startFrom = "EARLIEST", + strategy = CDC, + cdc = + CdcSource( + topics = + arrayOf( + CdcSourceTopic( + topic = "cdc", patterns = arrayOf(CdcSourceParam("(:TestSource)")))))) + @Test + fun `should publish with multiple keys on the same property`( + @TopicConsumer(topic = "cdc", offset = "earliest") consumer: ConvertingKafkaConsumer, + session: Session + ) { + session.run("CREATE CONSTRAINT FOR (n:TestSource) REQUIRE (n.prop1, n.prop2) IS KEY").consume() + session.run("CREATE CONSTRAINT FOR (n:TestSource) REQUIRE n.prop1 IS KEY").consume() + session + .run( + "CREATE (n:TestSource) SET n = ${'$'}props", + mapOf("props" to mapOf("prop1" to "value1", "prop2" to "value2"))) + .consume() + + TopicVerifier.create(consumer) + .assertMessageValue { value -> + assertThat(value) + .hasEventType(NODE) + .hasOperation(CREATE) + .hasNodeKeys( + mapOf( + "TestSource" to + listOf( + mapOf("prop1" to "value1", "prop2" to "value2"), + mapOf("prop1" to "value1")))) + .labelledAs("TestSource") + .hasNoBeforeState() + .hasAfterStateProperties(mapOf("prop1" to "value1", "prop2" to "value2")) + } + .verifyWithin(Duration.ofSeconds(30)) + } } @KeyValueConverter(key = AVRO, value = AVRO) diff --git a/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt b/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt index b0f068ec..88667413 100644 --- a/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt +++ b/source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt @@ -611,6 +611,49 @@ abstract class Neo4jCdcSourceRelationshipsIT { } .verifyWithin(Duration.ofSeconds(30)) } + + @Neo4jSource( + startFrom = "EARLIEST", + strategy = CDC, + cdc = + CdcSource( + topics = + arrayOf( + CdcSourceTopic( + topic = "neo4j-cdc-keys-rel", + patterns = + arrayOf(CdcSourceParam("(:Person)-[:EMPLOYED]->(:Company)")))))) + @Test + fun `should publish with multiple keys on the same property`( + @TopicConsumer(topic = "neo4j-cdc-keys-rel", offset = "earliest") + consumer: ConvertingKafkaConsumer, + session: Session + ) { + session + .run( + "CREATE CONSTRAINT employedId FOR ()-[r:EMPLOYED]->() REQUIRE (r.id, r.role) IS RELATIONSHIP KEY") + .consume() + session + .run( + "CREATE CONSTRAINT employedRole FOR ()-[r:EMPLOYED]->() REQUIRE r.id IS RELATIONSHIP KEY") + .consume() + + session.run("CREATE (:Person)-[:EMPLOYED {id: 1, role: 'SWE'}]->(:Company)").consume() + + TopicVerifier.create(consumer) + .assertMessageValue { value -> + assertThat(value) + .hasEventType(RELATIONSHIP) + .hasOperation(CREATE) + .hasType("EMPLOYED") + .startLabelledAs("Person") + .endLabelledAs("Company") + .hasNoBeforeState() + .hasAfterStateProperties(mapOf("id" to 1L, "role" to "SWE")) + .hasRelationshipKeys(listOf(mapOf("id" to 1L, "role" to "SWE"), mapOf("id" to 1L))) + } + .verifyWithin(Duration.ofSeconds(30)) + } } @KeyValueConverter(key = AVRO, value = AVRO)