diff --git a/kafka-connect-neo4j/pom.xml b/kafka-connect-neo4j/pom.xml index 3c2a975d..fbbd9e68 100644 --- a/kafka-connect-neo4j/pom.xml +++ b/kafka-connect-neo4j/pom.xml @@ -49,6 +49,11 @@ connect-utils ${kafka.connect.utils.version} + + org.hamcrest + hamcrest + test + com.google.guava diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt index 376f21c4..19e17316 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt @@ -6,12 +6,8 @@ import org.apache.kafka.connect.source.SourceRecord import org.apache.kafka.connect.source.SourceTask import org.apache.kafka.connect.source.SourceTaskContext import org.apache.kafka.connect.storage.OffsetStorageReader -import org.hamcrest.Matchers -import org.junit.After -import org.junit.AfterClass -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Test +import org.hamcrest.Matchers.* +import org.junit.* import org.mockito.Mockito import org.neo4j.driver.Driver import org.neo4j.driver.Session @@ -22,6 +18,10 @@ import streams.kafka.connect.common.Neo4jConnectorConfig import streams.kafka.connect.sink.AuthenticationType import streams.utils.JSONUtils import streams.utils.StreamsUtils +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId import java.util.* import java.util.concurrent.TimeUnit @@ -84,103 +84,153 @@ class Neo4jSourceTaskTest { @Test fun `should source data from Neo4j with custom QUERY from NOW`() { - val props = mutableMapOf() - props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl - props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString() - props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000" - props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp" - props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery() - props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + // create data with timestamp set as NOW - 5m + insertRecords( + 100, Clock.fixed(Instant.now().minus(Duration.ofMinutes(5)), ZoneId.systemDefault()), longToInt = true + ) - task.start(props) - val totalRecords = 10 - val expected = insertRecords(totalRecords, true) + // create data with timestamp set as NOW + 5m + val expected = + insertRecords( + 75, Clock.fixed(Instant.now().plus(Duration.ofMinutes(5)), ZoneId.systemDefault()), longToInt = true + ) - val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { - task.poll()?.let { list.addAll(it) } - val actualList = list.map { JSONUtils.readValue>(it.value()) } - expected.containsAll(actualList) - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + task.start( + mapOf( + Neo4jConnectorConfig.SERVER_URI to neo4j.boltUrl, + Neo4jSourceConnectorConfig.TOPIC to UUID.randomUUID().toString(), + Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "1000", + Neo4jSourceConnectorConfig.STREAMING_PROPERTY to "timestamp", + Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to getSourceQuery(), + Neo4jConnectorConfig.AUTHENTICATION_TYPE to AuthenticationType.NONE.toString() + ) + ) + + val list = mutableListOf>() + Assert.assertEventually(ThrowingSupplier, Exception> { + task.poll()?.let { received -> + list.addAll(received.map { JSONUtils.readValue>(it.value()) } + ) + } + + list + }, equalTo(expected), 30, TimeUnit.SECONDS) } @Test fun `should source data from Neo4j with custom QUERY from NOW with Schema`() { - val props = mutableMapOf() - props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl - props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString() - props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000" - props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true" - props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp" - props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery() - props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + // create data with timestamp set as NOW - 5m + insertRecords( + 100, Clock.fixed(Instant.now().minus(Duration.ofMinutes(5)), ZoneId.systemDefault()) + ) - task.start(props) - val totalRecords = 10 - val expected = insertRecords(totalRecords) + // create data with timestamp set as NOW + 5m + val expected = + insertRecords( + 75, Clock.fixed(Instant.now().plus(Duration.ofMinutes(5)), ZoneId.systemDefault()) + ) - val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { - task.poll()?.let { list.addAll(it) } - val actualList = list.map { (it.value() as Struct).toMap() } - expected.containsAll(actualList) - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + task.start( + mapOf( + Neo4jConnectorConfig.SERVER_URI to neo4j.boltUrl, + Neo4jSourceConnectorConfig.TOPIC to UUID.randomUUID().toString(), + Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "1000", + Neo4jSourceConnectorConfig.ENFORCE_SCHEMA to "true", + Neo4jSourceConnectorConfig.STREAMING_PROPERTY to "timestamp", + Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to getSourceQuery(), + Neo4jConnectorConfig.AUTHENTICATION_TYPE to AuthenticationType.NONE.toString() + ) + ) + + val list = mutableListOf>() + Assert.assertEventually(ThrowingSupplier, Exception> { + task.poll()?.let { received -> + list.addAll(received.map { (it.value() as Struct).toMap() }) + } + + list + }, equalTo(expected), 30, TimeUnit.SECONDS) } @Test fun `should source data from Neo4j with custom QUERY from ALL`() { - val props = mutableMapOf() - props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl - props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString() - props[Neo4jSourceConnectorConfig.STREAMING_FROM] = "ALL" - props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000" - props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp" - props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery() - props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + // create data with timestamp set as NOW - 5m + insertRecords( + 100, Clock.fixed(Instant.now().minus(Duration.ofMinutes(5)), ZoneId.systemDefault()), longToInt = true + ) - task.start(props) - val totalRecords = 10 - val expected = insertRecords(totalRecords, true) + // create data with timestamp set as NOW + 5m + insertRecords( + 75, Clock.fixed(Instant.now().plus(Duration.ofMinutes(5)), ZoneId.systemDefault()), longToInt = true + ) - val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { - task.poll()?.let { list.addAll(it) } - val actualList = list.map { JSONUtils.readValue>(it.value()) } - expected == actualList - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + task.start( + mapOf( + Neo4jConnectorConfig.SERVER_URI to neo4j.boltUrl, + Neo4jSourceConnectorConfig.TOPIC to UUID.randomUUID().toString(), + Neo4jSourceConnectorConfig.STREAMING_FROM to "ALL", + Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "1000", + Neo4jSourceConnectorConfig.STREAMING_PROPERTY to "timestamp", + Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to getSourceQuery(), + Neo4jConnectorConfig.AUTHENTICATION_TYPE to AuthenticationType.NONE.toString() + ) + ) + + val list = mutableListOf>() + Assert.assertEventually(ThrowingSupplier, Exception> { + task.poll()?.let { received -> + list.addAll(received.map { JSONUtils.readValue>(it.value()) } + ) + } + + list + }, hasSize(175), 30, TimeUnit.SECONDS) } @Test fun `should source data from Neo4j with custom QUERY from ALL with Schema`() { - val props = mutableMapOf() - props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl - props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString() - props[Neo4jSourceConnectorConfig.STREAMING_FROM] = "ALL" - props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000" - props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true" - props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp" - props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery() - props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + // create data with timestamp set as 0 + insertRecords( + 100, Clock.fixed(Instant.EPOCH, ZoneId.systemDefault()) + ) - task.start(props) - val totalRecords = 10 - val expected = insertRecords(totalRecords) + // create data with timestamp set as NOW + 5m + insertRecords( + 75, Clock.fixed(Instant.now().plus(Duration.ofMinutes(5)), ZoneId.systemDefault()) + ) - val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { - task.poll()?.let { list.addAll(it) } - val actualList = list.map { (it.value() as Struct).toMap() } - expected == actualList - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + task.start( + mapOf( + Neo4jConnectorConfig.SERVER_URI to neo4j.boltUrl, + Neo4jSourceConnectorConfig.TOPIC to UUID.randomUUID().toString(), + Neo4jSourceConnectorConfig.STREAMING_FROM to "ALL", + Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "1000", + Neo4jSourceConnectorConfig.STREAMING_PROPERTY to "timestamp", + Neo4jSourceConnectorConfig.ENFORCE_SCHEMA to "true", + Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to getSourceQuery(), + Neo4jConnectorConfig.AUTHENTICATION_TYPE to AuthenticationType.NONE.toString() + ) + ) + + val list = mutableListOf>() + Assert.assertEventually(ThrowingSupplier, Exception> { + task.poll()?.let { received -> + list.addAll(received.map { (it.value() as Struct).toMap() } + ) + } + + list + }, hasSize(175), 30, TimeUnit.SECONDS) } - private fun insertRecords(totalRecords: Int, longToInt: Boolean = false) = session.beginTransaction().use { tx -> - val elements = (1..totalRecords).map { - val result = tx.run( - """ + private fun insertRecords(totalRecords: Int, clock: Clock = Clock.systemDefaultZone(), longToInt: Boolean = false) = + session.beginTransaction().use { tx -> + val elements = (1..totalRecords).map { + val result = tx.run( + """ |CREATE (n:Test{ | name: 'Name ' + $it, - | timestamp: timestamp(), + | timestamp: ${'$'}timestamp, | point: point({longitude: 56.7, latitude: 12.78, height: 8}), | array: [1,2,3], | datetime: localdatetime(), @@ -196,28 +246,28 @@ class Neo4jSourceTaskTest { | key2: "value2" | } AS map, | n AS node - """.trimMargin() - ) - val next = result.next() - val map = next.asMap().toMutableMap() - map["array"] = next["array"].asList() - .map { if (longToInt) (it as Long).toInt() else it } - map["point"] = JSONUtils.readValue>(map["point"]!!) - map["datetime"] = next["datetime"].asLocalDateTime().toString() - val node = next["node"].asNode() - val nodeMap = node.asMap().toMutableMap() - nodeMap[""] = if (longToInt) node.id().toInt() else node.id() - nodeMap[""] = node.labels() - // are the same value as above - nodeMap["array"] = map["array"] - nodeMap["point"] = map["point"] - nodeMap["datetime"] = map["datetime"] - map["node"] = nodeMap - map + """.trimMargin(), mapOf("timestamp" to clock.instant().toEpochMilli() + it) + ) + val next = result.next() + val map = next.asMap().toMutableMap() + map["array"] = next["array"].asList() + .map { if (longToInt) (it as Long).toInt() else it } + map["point"] = JSONUtils.readValue>(map["point"]!!) + map["datetime"] = next["datetime"].asLocalDateTime().toString() + val node = next["node"].asNode() + val nodeMap = node.asMap().toMutableMap() + nodeMap[""] = if (longToInt) node.id().toInt() else node.id() + nodeMap[""] = node.labels() + // are the same value as above + nodeMap["array"] = map["array"] + nodeMap["point"] = map["point"] + nodeMap["datetime"] = map["datetime"] + map["node"] = nodeMap.toMap() + map.toMap() + } + tx.commit() + elements } - tx.commit() - elements - } @Test fun `should source data from Neo4j with custom QUERY without streaming property`() { @@ -233,11 +283,10 @@ class Neo4jSourceTaskTest { insertRecords(totalRecords) val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { + Assert.assertEventually(ThrowingSupplier, Exception> { task.poll()?.let { list.addAll(it) } - val actualList = list.map { JSONUtils.readValue>(it.value()) } - actualList.size >= 2 - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + list.map { JSONUtils.readValue>(it.value()) } + }, hasSize(greaterThanOrEqualTo(2)), 30, TimeUnit.SECONDS) } @Test @@ -255,11 +304,10 @@ class Neo4jSourceTaskTest { insertRecords(totalRecords) val list = mutableListOf() - Assert.assertEventually(ThrowingSupplier { + Assert.assertEventually(ThrowingSupplier, Exception> { task.poll()?.let { list.addAll(it) } - val actualList = list.map { (it.value() as Struct).toMap() } - actualList.size >= 2 - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + list.map { (it.value() as Struct).toMap() } + }, hasSize(greaterThanOrEqualTo(2)), 30, TimeUnit.SECONDS) } private fun getSourceQuery() = """ @@ -300,7 +348,7 @@ class Neo4jSourceTaskTest { exception = e true } - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) + }, equalTo(true), 30, TimeUnit.SECONDS) if (exception != null) throw exception as ConnectException } @@ -343,7 +391,7 @@ class Neo4jSourceTaskTest { Assert.assertEventually(ThrowingSupplier { task.poll()?.map { (it.value() as Struct).toMap() }?.first() - }, Matchers.equalTo(expected), 30, TimeUnit.SECONDS) + }, equalTo(expected), 30, TimeUnit.SECONDS) } @Test @@ -385,6 +433,49 @@ class Neo4jSourceTaskTest { Assert.assertEventually(ThrowingSupplier { task.poll()?.map { (it.value() as Struct).toMap() }?.first() - }, Matchers.equalTo(expected), 30, TimeUnit.SECONDS) + }, equalTo(expected), 30, TimeUnit.SECONDS) + } + + @Test + fun `should convert point data`() { + val topic = UUID.randomUUID().toString() + val props = mutableMapOf() + props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl + props[Neo4jSourceConnectorConfig.TOPIC] = topic + props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000" + props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp" + props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true" + props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString() + props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = """ + MATCH (n:SourceNode) + WHERE n.timestamp > 0 + RETURN n.cartesian2d AS cartesian2d, + n.cartesian3d AS cartesian3d, + n.geo2d AS geo2d, + n.geo3d AS geo3d + """ + task.start(props) + + session.run( + "CREATE (n:SourceNode" + + "{" + + "timestamp: timestamp(), " + + "cartesian2d: point({x: 56.7, y: 12.78}), " + + "cartesian3d: point({x: 56.7, y: 12.78, z: 8}), " + + "geo2d: point({longitude: 56.7, latitude: 12.78}), " + + "geo3d: point({longitude: 56.7, latitude: 12.78, height: 8})" + + "})" + ).consume() + + val expected = mapOf( + "cartesian2d" to mapOf("crs" to "cartesian", "x" to 56.7, "y" to 12.78), + "cartesian3d" to mapOf("crs" to "cartesian-3d", "x" to 56.7, "y" to 12.78, "z" to 8.0), + "geo2d" to mapOf("crs" to "wgs-84", "longitude" to 56.7, "latitude" to 12.78), + "geo3d" to mapOf("crs" to "wgs-84-3d", "longitude" to 56.7, "latitude" to 12.78, "height" to 8.0) + ) + + Assert.assertEventually(ThrowingSupplier { + task.poll()?.map { (it.value() as Struct).toMap() }?.first() + }, equalTo(expected), 30, TimeUnit.SECONDS) } } \ No newline at end of file