Skip to content

Commit

Permalink
test: update tests with more config inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
dhrudevalia committed Jul 26, 2024
1 parent ea373d8 commit 760fe11
Showing 1 changed file with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package streams.kafka.connect.sink
import org.junit.Assert.assertEquals
import org.junit.Test
import streams.kafka.connect.common.ConfigurationMigrator
import streams.kafka.connect.common.Neo4jConnectorConfig.Companion.ENCRYPTION_CA_CERTIFICATE_PATH
import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.ENFORCE_SCHEMA
import streams.kafka.connect.source.Neo4jSourceConnectorConfig.Companion.SOURCE_TYPE
import streams.kafka.connect.source.SourceType

class ConfigurationMigratorTest {
Expand All @@ -16,27 +13,25 @@ class ConfigurationMigratorTest {
val originals =
mapOf(
"neo4j.topic.pattern.merge.node.properties.enabled" to "true",
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to
"CREATE (p:Person{name: event.firstName})",
"neo4j.server.uri" to "neo4j+s://x.x.x.x",
"neo4j.retry.max.attemps" to "1"
)

// When the configuration is migrated
val migratedConfig = ConfigurationMigrator(originals).migrate()

// Then the keys are updated to new key format containing existing value
// Then the keys are updated to new key format containing the original value
assertEquals(migratedConfig["neo4j.pattern.node.merge-properties"], "true")
assertEquals(
migratedConfig["neo4j.cypher.topic.foo"],
"CREATE (p:Person{name: event.firstName})",
)
assertEquals(migratedConfig["neo4j.uri"], "neo4j+s://x.x.x.x")
assertEquals(migratedConfig["neo4j.max-retry-attempts"], "1")
}

@Test fun `should not migrate keys with no matching configuration key`() {
// Given a configuration which has no equivalent in the updated connector
val originals = mapOf(
ENCRYPTION_CA_CERTIFICATE_PATH to "./cert.pem",
SOURCE_TYPE to SourceType.QUERY.toString(),
ENFORCE_SCHEMA to "true"
"neo4j.encryption.ca.certificate.path" to "./cert.pem",
"neo4j.source.type" to SourceType.QUERY.toString(),
"neo4j.enforce.schema" to "true"
)

// When the configuration is migrated
Expand All @@ -49,22 +44,30 @@ class ConfigurationMigratorTest {
@Test
fun `should migrate time-based keys to new configuration format`() {
// Given a configuration originally defined in milliseconds
var originals = mapOf("neo4j.retry.backoff.msecs" to "1200")
var originals = mapOf(
"neo4j.retry.backoff.msecs" to "1200",
"neo4j.connection.max.lifetime.msecs" to "1000",
"neo4j.batch.timeout.msecs" to "500",
"neo4j.streaming.poll.interval.msecs" to "800"
)

// When the configuration is migrated
val migratedConfig = ConfigurationMigrator(originals).migrate()

// Then the new configuration should be labelled with its units
assertEquals(migratedConfig["neo4j.max-retry-time"], "1200ms")
assertEquals(migratedConfig["neo4j.connection-timeout"], "1000ms")
assertEquals(migratedConfig["neo4j.batch-timeout"], "500ms")
assertEquals(migratedConfig["neo4j.query.poll-interval"], "800ms")
}

@Test
fun `should migrate prefix based keys to new configuration`() {
// Given a configuration containing prefix/user-defined keys
val originals =
mapOf(
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to
"CREATE (p:Person{name: event.firstName})"
"neo4j.topic.cypher.foo" to "CREATE (p:Person{name: event.firstName})",
"neo4j.topic.pattern.node.bar" to "(:Bar{!barId,barName})"
)

// When the configuration is migrated
Expand All @@ -75,5 +78,9 @@ class ConfigurationMigratorTest {
migratedConfig["neo4j.cypher.topic.foo"],
"CREATE (p:Person{name: event.firstName})",
)
assertEquals(
migratedConfig["neo4j.pattern.node.topic.bar"],
"(:Bar{!barId,barName})",
)
}
}

0 comments on commit 760fe11

Please sign in to comment.