Skip to content

Commit

Permalink
Merge branch 'master' into agarctfi/source-linkedin-ads/creative-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
agarctfi authored Feb 3, 2025
2 parents bc7deab + 29c3bb5 commit 2e01b91
Show file tree
Hide file tree
Showing 1,266 changed files with 43,405 additions and 25,624 deletions.
1 change: 0 additions & 1 deletion .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
internal_poetry_packages:
- airbyte-ci/connectors/pipelines/**
- airbyte-ci/connectors/base_images/**
- airbyte-ci/connectors/common_utils/**
- airbyte-ci/connectors/connectors_insights/**
- airbyte-ci/connectors/connector_ops/**
- airbyte-ci/connectors/connectors_qa/**
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/bump-version-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
git_repo_url: https://github.com/${{ steps.job-vars.outputs.repo }}.git
subcommand: |
connectors --modified bump-version \
${{ github.event.inputs.type }} \
Expand Down
50 changes: 0 additions & 50 deletions .github/workflows/connector_teams_review_requirements.yml

This file was deleted.

15 changes: 14 additions & 1 deletion .github/workflows/live_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ on:
description: Use the local CDK when building the target connector
default: "false"
type: boolean
disable_proxy:
description: Disable proxy for requests
default: "false"
type: boolean
connection_subset:
description: The subset of connections to select from.
required: true
Expand Down Expand Up @@ -108,6 +112,15 @@ jobs:
echo "READ_WITH_STATE_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Proxy Flag
if: github.event_name == 'workflow_dispatch'
run: |
if ${{ github.event.inputs.disable_proxy }}; then
echo "DISABLE_PROXY_FLAG=--connector_live_tests.disable-proxy" >> $GITHUB_ENV
else
echo "DISABLE_PROXY_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Connection Subset Option
if: github.event_name == 'workflow_dispatch'
run: |
Expand All @@ -129,4 +142,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=live --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.READ_WITH_STATE_FLAG }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=live --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.READ_WITH_STATE_FLAG }} ${{ env.DISABLE_PROXY_FLAG }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }}
15 changes: 14 additions & 1 deletion .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ on:
description: Use the local CDK when building the target connector
default: "false"
type: boolean
disable_proxy:
description: Disable proxy for requests
default: "false"
type: boolean
connection_subset:
description: The subset of connections to select from.
required: true
Expand Down Expand Up @@ -114,6 +118,15 @@ jobs:
echo "READ_WITH_STATE_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Proxy Flag
if: github.event_name == 'workflow_dispatch'
run: |
if ${{ github.event.inputs.disable_proxy }}; then
echo "DISABLE_PROXY_FLAG=--connector_live_tests.disable-proxy" >> $GITHUB_ENV
else
echo "DISABLE_PROXY_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Connection Subset Option
if: github.event_name == 'workflow_dispatch'
run: |
Expand All @@ -135,4 +148,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=regression --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.READ_WITH_STATE_FLAG }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }} --global-status-check-context="Regression Tests" --global-status-check-description='Running regression tests'
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=regression --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.READ_WITH_STATE_FLAG }} ${{ env.DISABLE_PROXY_FLAG }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }} --global-status-check-context="Regression Tests" --global-status-check-description='Running regression tests'
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.time.OffsetDateTime
interface MetaFieldDecorator {

/** [MetaField] to use as a global cursor, if applicable. */
val globalCursor: MetaField?
val globalCursor: FieldOrMetaField?

/**
* All [MetaField]s to be found in [Global] stream records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.time.ZoneOffset
/**
* [FeedBootstrap] is the input to a [PartitionsCreatorFactory].
*
* This object conveniently packages the [StateQuerier] singleton with the [feed] for which the
* This object conveniently packages the [StateManager] singleton with the [feed] for which the
* [PartitionsCreatorFactory] is to operate on, eventually causing the emission of Airbyte RECORD
* messages for the [Stream]s in the [feed]. For this purpose, [FeedBootstrap] provides
* [StreamRecordConsumer] instances which essentially provide a layer of caching over
Expand All @@ -34,15 +34,30 @@ sealed class FeedBootstrap<T : Feed>(
* The [MetaFieldDecorator] instance which [StreamRecordConsumer] will use to decorate records.
*/
val metaFieldDecorator: MetaFieldDecorator,
/** [StateQuerier] singleton for use by [PartitionsCreatorFactory]. */
val stateQuerier: StateQuerier,
/** [StateManager] singleton which is encapsulated by this [FeedBootstrap]. */
private val stateManager: StateManager,
/** [Feed] to emit records for. */
val feed: T
) {

/** Convenience getter for the current state value for the [feed]. */
/** Delegates to [StateManager.feeds]. */
val feeds: List<Feed>
get() = stateManager.feeds

/** Deletages to [StateManager] to return the current state value for any [Feed]. */
fun currentState(feed: Feed): OpaqueStateValue? = stateManager.scoped(feed).current()

/** Convenience getter for the current state value for this [feed]. */
val currentState: OpaqueStateValue?
get() = stateQuerier.current(feed)
get() = currentState(feed)

/** Resets the state value of this feed and the streams in it to zero. */
fun resetAll() {
stateManager.scoped(feed).reset()
for (stream in feed.streams) {
stateManager.scoped(stream).reset()
}
}

/** A map of all [StreamRecordConsumer] for this [feed]. */
fun streamRecordConsumers(): Map<StreamIdentifier, StreamRecordConsumer> =
Expand Down Expand Up @@ -98,7 +113,7 @@ sealed class FeedBootstrap<T : Feed>(
}

private val precedingGlobalFeed: Global? =
stateQuerier.feeds
stateManager.feeds
.filterIsInstance<Global>()
.filter { it.streams.contains(stream) }
.firstOrNull()
Expand All @@ -109,7 +124,7 @@ sealed class FeedBootstrap<T : Feed>(
if (feed is Stream && precedingGlobalFeed != null) {
metaFieldDecorator.decorateRecordData(
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
globalStateValue = stateQuerier.current(precedingGlobalFeed),
globalStateValue = stateManager.scoped(precedingGlobalFeed).current(),
stream,
recordData,
)
Expand Down Expand Up @@ -192,14 +207,14 @@ sealed class FeedBootstrap<T : Feed>(
fun create(
outputConsumer: OutputConsumer,
metaFieldDecorator: MetaFieldDecorator,
stateQuerier: StateQuerier,
stateManager: StateManager,
feed: Feed,
): FeedBootstrap<*> =
when (feed) {
is Global ->
GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed)
GlobalFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed)
is Stream ->
StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateQuerier, feed)
StreamFeedBootstrap(outputConsumer, metaFieldDecorator, stateManager, feed)
}
}
}
Expand Down Expand Up @@ -241,17 +256,17 @@ enum class FieldValueChange {
class GlobalFeedBootstrap(
outputConsumer: OutputConsumer,
metaFieldDecorator: MetaFieldDecorator,
stateQuerier: StateQuerier,
stateManager: StateManager,
global: Global,
) : FeedBootstrap<Global>(outputConsumer, metaFieldDecorator, stateQuerier, global)
) : FeedBootstrap<Global>(outputConsumer, metaFieldDecorator, stateManager, global)

/** [FeedBootstrap] implementation for [Stream] feeds. */
class StreamFeedBootstrap(
outputConsumer: OutputConsumer,
metaFieldDecorator: MetaFieldDecorator,
stateQuerier: StateQuerier,
stateManager: StateManager,
stream: Stream,
) : FeedBootstrap<Stream>(outputConsumer, metaFieldDecorator, stateQuerier, stream) {
) : FeedBootstrap<Stream>(outputConsumer, metaFieldDecorator, stateManager, stream) {

/** A [StreamRecordConsumer] instance for this [Stream]. */
fun streamRecordConsumer(): StreamRecordConsumer = streamRecordConsumers()[feed.id]!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import io.airbyte.cdk.read.PartitionsCreator.TryAcquireResourcesStatus
interface PartitionsCreatorFactory {
/**
* Returns a [PartitionsCreator] which will cause the READ to advance for the [Feed] for which
* the [FeedBootstrap] argument is associated to. The latter exposes a [StateQuerier] to obtain
* the current [OpaqueStateValue] for this [feed] but may also be used to peek at the state of
* other [Feed]s. This may be useful for synchronizing the READ for this [feed] by waiting for
* other [Feed]s to reach a desired state before proceeding; the waiting may be triggered by
* the [FeedBootstrap] argument is associated to. The latter exposes methods to obtain the
* current [OpaqueStateValue] for this [feed] but also to peek at the state of other [Feed]s.
* This may be useful for synchronizing the READ for this [feed] by waiting for other [Feed]s to
* reach a desired state before proceeding; the waiting may be triggered by
* [PartitionsCreator.tryAcquireResources] or [PartitionReader.tryAcquireResources].
*
* Returns null when the factory is unable to generate a [PartitionsCreator]. This causes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,12 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamState

/** A [StateQuerier] is like a read-only [StateManager]. */
interface StateQuerier {
/** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */
val feeds: List<Feed>

/** Returns the current state value for the given [feed]. */
fun current(feed: Feed): OpaqueStateValue?

/** Rolls back each feed state. This is required when resyncing CDC from scratch */
fun resetFeedStates()
}

/** Singleton object which tracks the state of an ongoing READ operation. */
class StateManager(
global: Global? = null,
initialGlobalState: OpaqueStateValue? = null,
initialStreamStates: Map<Stream, OpaqueStateValue?> = mapOf(),
) : StateQuerier {
) {
private val global: GlobalStateManager?
private val nonGlobal: Map<StreamIdentifier, NonGlobalStreamStateManager>

Expand All @@ -52,16 +40,14 @@ class StateManager(
}
}

override val feeds: List<Feed> =
/** [feeds] is all the [Feed]s in the configured catalog passed via the CLI. */
val feeds: List<Feed> =
listOfNotNull(this.global?.feed) +
(this.global?.streamStateManagers?.values?.map { it.feed } ?: listOf()) +
nonGlobal.values.map { it.feed }

override fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current()

override fun resetFeedStates() {
feeds.forEach { f -> scoped(f).set(Jsons.objectNode(), 0) }
}
/** Returns the current state value for the given [feed]. */
fun current(feed: Feed): OpaqueStateValue? = scoped(feed).current()

/** Returns a [StateManagerScopedToFeed] instance scoped to this [feed]. */
fun scoped(feed: Feed): StateManagerScopedToFeed =
Expand All @@ -86,6 +72,9 @@ class StateManager(
state: OpaqueStateValue,
numRecords: Long,
)

/** Resets the current state value in the [StateManager] for this [feed] to zero. */
fun reset()
}

/**
Expand Down Expand Up @@ -119,6 +108,13 @@ class StateManager(
pendingNumRecords += numRecords
}

@Synchronized
override fun reset() {
currentStateValue = null
pendingStateValue = null
pendingNumRecords = 0L
}

/**
* Called by [StateManager.checkpoint] to generate the Airbyte STATE messages for the
* checkpoint.
Expand Down
Loading

0 comments on commit 2e01b91

Please sign in to comment.