Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Destination MSSQL v2 rc. #52096

Draft
wants to merge 32 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
98acc63
feat: implement converters for airbyte type/value to SQL
jdpgrailsdev Dec 18, 2024
0b84aa8
Revert "feat: implement converters for airbyte type/value to SQL"
jdpgrailsdev Dec 18, 2024
b340305
feat: implement converters for airbyte type/value to SQL (#49924)
jdpgrailsdev Dec 19, 2024
338d411
fix: add missing import (#49945)
jdpgrailsdev Dec 19, 2024
2dff567
Remove duplicate class
jdpgrailsdev Dec 19, 2024
d07c175
chore: destination MSSQL v2 add spec (#49947)
gosusnp Dec 19, 2024
4ce3854
Test data source configuration (#49951)
jdpgrailsdev Dec 19, 2024
de855ec
chore: add ssl method to dest mssql v2 spec (#49956)
gosusnp Dec 20, 2024
49e8a63
chore: destination mssqlv2 implement configuration and datasource (#4…
gosusnp Dec 20, 2024
6b733d1
chore: mssqlv2 implement check (#49965)
gosusnp Dec 26, 2024
4f005f0
chore(destination-mssql-v2): add datasource url tests (#50426)
gosusnp Dec 30, 2024
89b6074
refactor: use Kotlin extension function
jdpgrailsdev Dec 30, 2024
6839ac4
feat: use mssql server test container in integration tests (#50869)
jdpgrailsdev Jan 3, 2025
66eb26b
[Destination MSSQLv2] Implement typed insertions (#50434)
gosusnp Jan 15, 2025
fe5bf91
[Destination MSSQL] schema management (#51027)
gosusnp Jan 16, 2025
056ecd2
fix: use prepared statement instead of dynamic string (#51597)
jdpgrailsdev Jan 16, 2025
86d7fa9
[Destination MSSQL] Simple truncate (#51528)
gosusnp Jan 16, 2025
2c25863
[Destination MSSQL] fix statement syntax (#51598)
gosusnp Jan 16, 2025
86f6f95
chore: cleanup/fix warnings
jdpgrailsdev Jan 17, 2025
554722d
fix: use testcontainer for write tests (#51594)
jdpgrailsdev Jan 17, 2025
90149da
chore: update dependencies
jdpgrailsdev Jan 17, 2025
deaa103
fix: use correct type in tests
jdpgrailsdev Jan 17, 2025
9c38694
[Destination MSSQL] add dedup (#51612)
gosusnp Jan 21, 2025
a8d31b4
chore:(destination-mssql-v2): address create schema concurrency (#52055)
gosusnp Jan 21, 2025
b051849
chore: disable known tests with issues (#52070)
gosusnp Jan 22, 2025
565a263
chore(destination-mssql-v2): prep merge to master (#52073)
gosusnp Jan 22, 2025
4d0eb78
Merge branch 'master' into move/destination-mssql-v2
gosusnp Jan 23, 2025
785c642
[Destination MSSQL] Escape column names to avoid issues with reserved…
gosusnp Jan 23, 2025
70c2b97
Merge branch 'master' into move/destination-mssql-v2
gosusnp Jan 23, 2025
9bc0ee9
[Destination MSSQL] fix name escaping and dedup (#52126)
gosusnp Jan 24, 2025
bd5cb83
[Destination MSSQL] re-enable some tests (#52127)
gosusnp Jan 24, 2025
fc8b75e
Merge branch 'master' into move/destination-mssql-v2
gosusnp Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ abstract class BasicFunctionalityIntegrationTest(
configUpdater = configUpdater,
envVars = envVars,
) {
val parsedConfig =
ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents))

// Update config with any replacements. This may be necessary when using testcontainers.
val configAsString = configUpdater.update(configContents)
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)

@Test
open fun testBasicWrite() {
Expand All @@ -209,7 +211,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -260,7 +262,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
if (verifyDataWriting) {
dumpAndDiffRecords(
ValidatedJsonUtils.parseOne(configSpecClass, configContents),
ValidatedJsonUtils.parseOne(configSpecClass, configAsString),
listOf(
OutputRecord(
extractedAt = 1234,
Expand Down Expand Up @@ -321,7 +323,7 @@ abstract class BasicFunctionalityIntegrationTest(

val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputFile(
Expand Down Expand Up @@ -359,7 +361,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
})

val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)
val fileContent = dataDumper.dumpFile(config, stream)

assertEquals(listOf("123"), fileContent)
Expand All @@ -380,7 +382,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stateMessage =
runSyncUntilStateAck(
configContents,
this@BasicFunctionalityIntegrationTest.configContents,
stream,
listOf(
InputRecord(
Expand All @@ -398,7 +400,7 @@ abstract class BasicFunctionalityIntegrationTest(
),
allowGracefulShutdown = false,
)
runSync(configContents, stream, emptyList())
runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList())

val streamName = stateMessage.stream.streamDescriptor.name
val streamNamespace = stateMessage.stream.streamDescriptor.namespace
Expand Down Expand Up @@ -461,7 +463,7 @@ abstract class BasicFunctionalityIntegrationTest(
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
runSync(
configContents,
configAsString,
DestinationCatalog(
listOf(
stream1,
Expand Down Expand Up @@ -590,7 +592,7 @@ abstract class BasicFunctionalityIntegrationTest(
serialized = "",
)
}
runSync(configContents, catalog, messages)
runSync(configAsString, catalog, messages)
assertAll(
catalog.streams.map { stream ->
{
Expand Down Expand Up @@ -630,7 +632,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42),
listOf(
InputRecord(
Expand All @@ -643,7 +645,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -725,7 +727,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -764,7 +766,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a status incomplete. This should not delete any existing data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -813,7 +815,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a third sync, this time with a successful status.
// This should delete the first sync's data, and retain the second+third syncs' data.
runSync(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -894,7 +896,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a stream status incomplete.
runSyncUntilStateAck(
configContents,
configAsString,
stream,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -929,7 +931,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a second sync, this time with a successful status.
// This should retain the first syncs' data.
runSync(
configContents,
configAsString,
stream,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1015,7 +1017,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -1055,7 +1057,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a sync, but emit a stream status incomplete. This should not delete any existing
// data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -1110,7 +1112,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 43,
)
runSync(
configContents,
configAsString,
stream3,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1170,7 +1172,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(syncId = 42),
listOf(
InputRecord(
Expand All @@ -1183,7 +1185,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1236,7 +1238,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType)
Expand All @@ -1256,7 +1258,7 @@ abstract class BasicFunctionalityIntegrationTest(
linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType)
)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1330,7 +1332,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync1Stream = makeStream(syncId = 42)
runSync(
configContents,
configAsString,
sync1Stream,
listOf(
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
Expand Down Expand Up @@ -1391,7 +1393,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync2Stream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
sync2Stream,
listOf(
// Update both Alice and Bob
Expand Down Expand Up @@ -1475,9 +1477,9 @@ abstract class BasicFunctionalityIntegrationTest(
// instead of being able to fallback onto extractedAt.
emittedAtMs = 100,
)
runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1")))
runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1")))
val stream2 = makeStream("cursor2")
runSync(configContents, stream2, listOf(makeRecord("cursor2")))
runSync(configAsString, stream2, listOf(makeRecord("cursor2")))
dumpAndDiffRecords(
parsedConfig,
listOf(
Expand Down Expand Up @@ -1534,7 +1536,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
// Just verify that we don't crash.
assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) }
assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) }
}

/**
Expand Down Expand Up @@ -1587,7 +1589,7 @@ abstract class BasicFunctionalityIntegrationTest(
emittedAtMs = 100,
)
runSync(
configContents,
configAsString,
stream,
listOf(
// A record with valid values for all fields
Expand Down Expand Up @@ -1871,7 +1873,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2050,7 +2052,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2221,7 +2223,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2413,7 +2415,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ application {
//applicationDefaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED")
}

val junitVersion = "5.11.3"
val junitVersion = "5.11.4"

configurations.configureEach {
// Exclude additional SLF4J providers from all classpaths
Expand All @@ -38,13 +38,16 @@ dependencies {
implementation("com.microsoft.sqlserver:mssql-jdbc:12.8.1.jre11")
implementation("io.github.oshai:kotlin-logging-jvm:7.0.0")
implementation("jakarta.inject:jakarta.inject-api:2.0.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.8.6")
implementation("io.micronaut:micronaut-inject:4.6.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.9.0")
implementation("io.micronaut:micronaut-inject:4.7.12")
implementation("com.zaxxer:HikariCP:6.2.1")

testImplementation("io.mockk:mockk:1.13.13")
testImplementation("io.mockk:mockk:1.13.16")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter-params:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")

integrationTestImplementation("org.testcontainers:mssqlserver:1.20.4")
}

tasks.named<Test>("test") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
sql-server:
image: mcr.microsoft.com/mssql/server:2022-latest
ports:
- "1433:1433"
environment:
- ACCEPT_EULA=Y
- MSSQL_SA_PASSWORD=A_Str0ng_Required_Password
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
type: GSM
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
dockerRepository: airbyte/destination-mssql-v2
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2
githubIssueLabel: destination-mssql-v2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2

import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLDataSourceFactory
import jakarta.inject.Singleton
import java.util.UUID
import javax.sql.DataSource

@Singleton
class MSSQLChecker(private val dataSourceFactory: MSSQLDataSourceFactory) :
DestinationChecker<MSSQLConfiguration> {
override fun check(config: MSSQLConfiguration) {
val dataSource: DataSource = dataSourceFactory.getDataSource(config)
val testTableName = "check_test_${UUID.randomUUID()}"
val fullyQualifiedTableName = "[${config.rawDataSchema}].[${testTableName}]"
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
statement.executeUpdate(
"""
CREATE TABLE ${fullyQualifiedTableName} (test int);
DROP TABLE ${fullyQualifiedTableName};
""".trimIndent(),
)
}
}
}
}
Loading
Loading