Skip to content

Commit

Permalink
Merge branch 'master' into artem1205/source-amazon-SP-update-cdk
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjlai committed Jan 24, 2025
2 parents e2302a9 + 4e2eea6 commit 2da6092
Show file tree
Hide file tree
Showing 1,568 changed files with 77,201 additions and 67,157 deletions.
29 changes: 29 additions & 0 deletions .github/actions/install-java-environment/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Install Java Environment
description: "Installs the Java environment"
inputs:
java_version:
description: "Java version"
required: false
default: "21"
type: string
gradle_cache_read_only:
description: "Whether to use a read-only Gradle cache"
required: false
default: false
type: boolean
gradle_cache_write_only:
description: "Whether to use a write-only Gradle cache"
required: false
default: false
type: boolean
runs:
using: "composite"
steps:
- uses: actions/setup-java@v4
with:
distribution: corretto
java-version: ${{ inputs.java_version }}
- uses: gradle/actions/setup-gradle@v3
with:
cache-read-only: ${{ inputs.gradle_cache_read_only }}
cache-write-only: ${{ inputs.gradle_cache_write_only }}
4 changes: 3 additions & 1 deletion .github/actions/run-airbyte-ci/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ runs:
ls -la
ls -la airbyte-python-cdk || echo "No airbyte-python-cdk directory"
ls -laL ../airbyte-python-cdk || echo "No airbyte-python-cdk symlink"
- name: Install Java Environment
id: install-java-environment
uses: ./.github/actions/install-java-environment
- name: Docker login
id: docker-login
uses: docker/login-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/airbyte-ci-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
working-directory: airbyte-ci/connectors/pipelines/
run: poetry run poe build-release-binary ${{ env.BINARY_FILE_NAME }}

- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: airbyte-ci-${{ matrix.os }}-${{ steps.get_short_sha.outputs.sha }}
path: airbyte-ci/connectors/pipelines/dist/${{ env.BINARY_FILE_NAME }}
Expand Down
34 changes: 27 additions & 7 deletions .github/workflows/connectors_nightly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,38 @@ on:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
workflow_dispatch:
inputs:
test-connectors-options:
default: --concurrency=5 --support-level=certified
required: true

run-name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"

jobs:
generate_matrix:
name: Generate matrix
runs-on: ubuntu-24.04
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: "connectors --support-level=certified list --output=selected_connectors.json"
- name: Generate matrix - 30 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 30 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "generated_matrix=$matrix" >> $GITHUB_OUTPUT
test_connectors:
needs: generate_matrix
name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"
timeout-minutes: 720 # 12 hours
runs-on: connector-nightly-xlarge
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}

steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
Expand All @@ -32,7 +52,7 @@ jobs:
with:
context: "master"
ci_job_key: "nightly_builds"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
# dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -41,4 +61,4 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ inputs.test-connectors-options || '--concurrency=8 --support-level=certified' }} test"
subcommand: "connectors ${{ matrix.connector_names}} test"
2 changes: 1 addition & 1 deletion .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ on:
jobs:
publish_connectors:
name: Publish connectors
runs-on: connector-publish-large
runs-on: ubuntu-24.04-4core
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ jobs:
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Install Python
id: install_python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Check PAT rate limits
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
- name: Archive test reports artifacts
if: github.event.inputs.comment-id && failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-reports
path: |
Expand All @@ -145,7 +145,7 @@ jobs:
- name: Test coverage reports artifacts
if: github.event.inputs.comment-id && success()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-reports
path: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
@DefaultImplementation(StdoutOutputConsumer::class)
interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
val emittedAt: Instant
abstract class OutputConsumer(private val clock: Clock) : Consumer<AirbyteMessage>, AutoCloseable {
/**
* The constant emittedAt timestamp we use for record timestamps.
*
* TODO: use the correct emittedAt time for each record. Ryan: not changing this now as it could
* have performance implications for sources given the delicate serialization logic in place
* here.
*/
val recordEmittedAt: Instant = Instant.ofEpochMilli(clock.millis())

fun accept(record: AirbyteRecordMessage) {
record.emittedAt = emittedAt.toEpochMilli()
open fun accept(record: AirbyteRecordMessage) {
record.emittedAt = recordEmittedAt.toEpochMilli()
accept(AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(record))
}

Expand Down Expand Up @@ -66,7 +73,9 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
}

fun accept(trace: AirbyteTraceMessage) {
trace.emittedAt = emittedAt.toEpochMilli().toDouble()
// Use the correct emittedAt timestamp for trace messages. This allows platform and other
// downstream consumers to take emission time into account for error classification.
trace.emittedAt = clock.millis().toDouble()
accept(AirbyteMessage().withType(AirbyteMessage.Type.TRACE).withTrace(trace))
}

Expand Down Expand Up @@ -107,7 +116,7 @@ const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
@Secondary
private class StdoutOutputConsumer(
val stdout: PrintStream,
clock: Clock,
private val clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
Expand All @@ -132,9 +141,7 @@ private class StdoutOutputConsumer(
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)

) : OutputConsumer(clock) {
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
Expand Down Expand Up @@ -233,7 +240,7 @@ private class StdoutOutputConsumer(
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
RecordTemplate.create(stream, namespace, recordEmittedAt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.time.Clock
import java.time.Instant

/** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */
@Singleton
@Requires(notEnv = [Environment.CLI])
@Replaces(OutputConsumer::class)
class BufferingOutputConsumer(
clock: Clock,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)
) : OutputConsumer(clock) {

private val records = mutableListOf<AirbyteRecordMessage>()
private val states = mutableListOf<AirbyteStateMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.DoubleCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
Expand Down Expand Up @@ -63,8 +64,8 @@ interface MetaField : FieldOrMetaField {
enum class CommonMetaField(
override val type: FieldType,
) : MetaField {
CDC_UPDATED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_DELETED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_UPDATED_AT(CdcStringMetaFieldType),
CDC_DELETED_AT(CdcStringMetaFieldType),
;

override val id: String
Expand All @@ -89,3 +90,9 @@ data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
}

data object CdcNumberMetaFieldType : LosslessFieldType {
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.NUMBER
override val jsonEncoder: JsonEncoder<Double> = DoubleCodec
override val jsonDecoder: JsonDecoder<Double> = DoubleCodec
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ sealed class FeedBootstrap<T : Feed>(
stream.schema.forEach { recordData.putNull(it.id) }
if (feed is Stream && precedingGlobalFeed != null) {
metaFieldDecorator.decorateRecordData(
timestamp = outputConsumer.emittedAt.atOffset(ZoneOffset.UTC),
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
globalStateValue = stateQuerier.current(precedingGlobalFeed),
stream,
recordData,
Expand All @@ -125,7 +125,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
)

Expand All @@ -138,7 +138,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
.withMeta(reusedRecordMeta)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FeedBootstrapTest {
FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateQuerier, this)

fun expected(vararg data: String): List<String> {
val ts = outputConsumer.emittedAt.toEpochMilli()
val ts = outputConsumer.recordEmittedAt.toEpochMilli()
return data.map { """{"namespace":"ns","stream":"tbl","data":$it,"emitted_at":$ts}""" }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
Expand All @@ -23,7 +25,9 @@ class MockBasicFunctionalityIntegrationTest :
isStreamSchemaRetroactive = false,
supportsDedup = true,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
Expand All @@ -40,6 +43,7 @@ class CheckOperation<T : ConfigurationSpecification, C : DestinationConfiguratio
)
outputConsumer.accept(successMessage)
} catch (t: Throwable) {
logger.warn(t) { "Caught throwable during CHECK" }
val (traceMessage, statusMessage) = exceptionHandler.handleCheckFailure(t)
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ abstract class DestinationConfiguration : Configuration {

/**
* The amount of time given to implementor tasks (e.g. open, processBatch) to complete their
* current work after a failure.
* current work after a failure. Input consuming will stop right away, so this will give the
* tasks time to persist the messages already read.
*/
open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes
open val gracefulCancellationTimeoutMs: Long = 10 * 60 * 1000L // 10 minutes

open val numOpenStreamWorkers: Int = 1
open val numProcessRecordsWorkers: Int = 2
open val numProcessBatchWorkers: Int = 5
open val numProcessBatchWorkersForFileTransfer: Int = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package io.airbyte.cdk.load.config

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
Expand Down Expand Up @@ -89,4 +91,8 @@ class SyncBeanFactory {
val channel = Channel<FileTransferQueueMessage>(config.batchQueueDepth)
return MultiProducerChannel(1, channel, "fileMessageQueue")
}

@Singleton
@Named("openStreamQueue")
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()
}
Loading

0 comments on commit 2da6092

Please sign in to comment.