Skip to content

Commit

Permalink
fix: use incoming object if schema is not defined (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince authored Sep 24, 2024
1 parent 94182fb commit 246bb6d
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_EMBEDDED
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA
Expand All @@ -45,20 +44,20 @@ abstract class Neo4jSinkIT {
[
CypherStrategy(
TOPIC,
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})")])
"MERGE (p:Person {name: event.name, surname: event.surname})",
),
],
)
@Test
fun `writes messages to Neo4j via sink connector`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
testInfo: TestInfo
) {
val executionId = testInfo.displayName + System.currentTimeMillis()
val value = mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId)
val value = mapOf("name" to "Jane", "surname" to "Doe")
val schema =
SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("surname", Schema.STRING_SCHEMA)
.field("executionId", Schema.STRING_SCHEMA)
.build()
val struct = Struct(schema)
schema.fields().forEach { struct.put(it, value[it.name()]) }
Expand All @@ -68,8 +67,9 @@ abstract class Neo4jSinkIT {
await().atMost(30.seconds.toJavaDuration()).until {
session
.run(
"MATCH (p:Person {name: \$name, surname: \$surname, executionId: \$executionId}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId))
"MATCH (p:Person {name: \$name, surname: \$surname}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe"),
)
.single()["result"]
.asBoolean()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.sink

import io.kotest.matchers.shouldBe
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_RAW
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaProducer
import org.neo4j.connectors.kafka.testing.sink.CypherStrategy
import org.neo4j.connectors.kafka.testing.sink.Neo4jSink
import org.neo4j.connectors.kafka.testing.sink.TopicProducer
import org.neo4j.driver.Session

@KeyValueConverter(key = JSON_RAW, value = JSON_RAW)
class Neo4jSinkRawJsonIT {
companion object {
private const val TOPIC = "persons"
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS person MERGE (p:Person {name: person.name, surname: person.surname})",
),
],
)
@Test
fun `should support json map`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = mapOf("name" to "Jane", "surname" to "Doe"),
valueSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(),
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {name: ${'$'}name, surname: ${'$'}surname}) RETURN count(p) as result",
mapOf("name" to "Jane", "surname" to "Doe"),
)
.single()["result"]
.asLong() shouldBe 1L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS persons UNWIND persons AS person MERGE (p:Person {name: person.name, surname: person.surname})",
),
],
)
@Test
fun `should support json list`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value =
listOf(
mapOf("name" to "Jane", "surname" to "Doe"),
mapOf("name" to "John", "surname" to "Doe"),
),
valueSchema =
SchemaBuilder.array(
SchemaBuilder.map(
Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA,
)
.build(),
)
.build(),
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person) WHERE [p.name, p.surname] IN ${'$'}names RETURN count(p) as result",
mapOf("names" to listOf(listOf("Jane", "Doe"), listOf("John", "Doe"))),
)
.single()["result"]
.asLong() shouldBe 2L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS name MERGE (p:Person {name: name})",
),
],
)
@Test
fun `should support raw string value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = "John",
valueSchema = Schema.STRING_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {name: ${'$'}name}) RETURN count(p) as result",
mapOf("name" to "John"),
)
.single()["result"]
.asLong() shouldBe 1L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS age MERGE (p:Person {age: age})",
),
],
)
@Test
fun `should support raw numeric value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = 25L,
valueSchema = Schema.INT64_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {age: ${'$'}age}) RETURN count(p) as result",
mapOf("age" to 25L),
)
.single()["result"]
.asLong() shouldBe 1L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS status MERGE (p:Person {single: status})",
),
],
)
@Test
fun `should support raw boolean value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = true,
valueSchema = Schema.BOOLEAN_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {single: ${'$'}single}) RETURN count(p) as result",
mapOf("single" to true),
)
.single()["result"]
.asLong() shouldBe 1L
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ data class SinkMessage(val record: SinkRecord) {
}
}
converted
}
} ?: value
}

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ import org.junit.jupiter.api.extension.ExtensionContext
import org.neo4j.connectors.kafka.testing.AnnotationSupport
import org.neo4j.connectors.kafka.testing.format.avro.AvroSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonEmbeddedSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonRawSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonSchemaSerializer
import org.neo4j.connectors.kafka.testing.format.protobuf.ProtobufSerializer
import org.neo4j.connectors.kafka.testing.format.string.StringSerializer

private val PROTOBUF_OPTIONS =
mapOf("enhanced.protobuf.schema.support" to "true", "optional.for.nullables" to "true")

private val JSON_RAW_OPTIONS = mapOf("schemas.enable" to "false")

enum class KafkaConverter(
val className: String,
val converterProvider: () -> Converter,
Expand All @@ -50,28 +53,40 @@ enum class KafkaConverter(
className = "io.confluent.connect.avro.AvroConverter",
converterProvider = { AvroConverter() },
serializerClass = KafkaAvroSerializer::class.java,
testShimSerializer = AvroSerializer),
testShimSerializer = AvroSerializer,
),
JSON_SCHEMA(
className = "io.confluent.connect.json.JsonSchemaConverter",
converterProvider = { JsonSchemaConverter() },
serializerClass = KafkaJsonSchemaSerializer::class.java,
testShimSerializer = JsonSchemaSerializer),
testShimSerializer = JsonSchemaSerializer,
),
JSON_EMBEDDED(
className = "org.apache.kafka.connect.json.JsonConverter",
converterProvider = { JsonConverter() },
serializerClass = KafkaJsonSerializer::class.java,
testShimSerializer = JsonEmbeddedSerializer),
testShimSerializer = JsonEmbeddedSerializer,
),
JSON_RAW(
className = "org.apache.kafka.connect.json.JsonConverter",
converterProvider = { JsonConverter() },
serializerClass = KafkaJsonSerializer::class.java,
testShimSerializer = JsonRawSerializer,
additionalProperties = JSON_RAW_OPTIONS,
),
PROTOBUF(
className = "io.confluent.connect.protobuf.ProtobufConverter",
converterProvider = { ProtobufConverter() },
serializerClass = KafkaProtobufSerializer::class.java,
testShimSerializer = ProtobufSerializer(PROTOBUF_OPTIONS),
additionalProperties = PROTOBUF_OPTIONS),
additionalProperties = PROTOBUF_OPTIONS,
),
STRING(
className = "org.apache.kafka.connect.storage.StringConverter",
converterProvider = { StringConverter() },
serializerClass = org.apache.kafka.common.serialization.StringSerializer::class.java,
testShimSerializer = StringSerializer)
testShimSerializer = StringSerializer,
)
}

@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing.format.json

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.json.JsonConverter
import org.neo4j.connectors.kafka.testing.format.KafkaRecordSerializer

object JsonRawSerializer : KafkaRecordSerializer {

private val converter = JsonConverter()
private val objectMapper = ObjectMapper()

override fun serialize(value: Any, schema: Schema, isKey: Boolean): Any {
converter.configure(mapOf("schemas.enable" to false), isKey)
val data = converter.fromConnectData(null, schema, value)
return objectMapper.readTree(data)
}
}

0 comments on commit 246bb6d

Please sign in to comment.