Skip to content

Commit

Permalink
Checkpointing: Worker use destination (instead of source) for state (a…
Browse files Browse the repository at this point in the history
…irbytehq#3290)

* Migrate BufferedStreamConsumer users (e.g. all JDBC destinations, MeiliSearch) (airbytehq#3473)

* Add checkpointing test cases in Acceptance Tests (airbytehq#3473)

* Add testing for emitting state in Destination Standard Test (airbytehq#3546)

* Migrate BQ to support checkpointing (airbytehq#3546)

* Migrate copy destinations support checkpointing (airbytehq#3547)

* Checkpointing: Migrate CSV and JSON destinations (airbytehq#3551)
  • Loading branch information
cgardens authored May 25, 2021
1 parent 5647c09 commit aa6afb7
Show file tree
Hide file tree
Showing 88 changed files with 3,013 additions and 639 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ task generateApiServer(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down Expand Up @@ -59,6 +60,7 @@ task generateApiClient(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
]

library = "native"
Expand Down Expand Up @@ -90,6 +92,7 @@ task generateApiDocs(type: GenerateTask) {
'DestinationDefinitionSpecification': 'com.fasterxml.jackson.databind.JsonNode',
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'ConnectionStateObject' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down
34 changes: 34 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,29 @@ paths:
description: Connection not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/state/get:
post:
tags:
- connection
summary: Fetch the current state for a connection.
operationId: getState
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionState"
"404":
description: Connection not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/connections/delete:
post:
tags:
Expand Down Expand Up @@ -2426,6 +2449,17 @@ components:
type: string
jobInfo:
$ref: "#/components/schemas/SynchronousJobRead"
ConnectionState:
type: object
required:
- connectionId
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
state:
$ref: "#/components/schemas/ConnectionStateObject"
ConnectionStateObject:
type: object
# Web Backend
WbConnectionRead:
type: object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class MoreLists {

/**
* @return returns empty optional if the list is empty or if the last element in the list is null.
*/
public static <T> Optional<T> last(List<T> list) {
if (list.isEmpty()) {
return Optional.empty();
}
return Optional.ofNullable(list.get(list.size() - 1));
}

/**
* Reverses a list by creating a new list with the same elements of the input list and then
* reversing it. The input list will not be altered.
*
* @param list to reverse
* @param <T> type
* @return new list with elements of original reversed.
*/
public static <T> List<T> reversed(List<T> list) {
final ArrayList<T> reversed = new ArrayList<>(list);
Collections.reverse(reversed);
return reversed;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.util;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Test;

class MoreListsTest {

@Test
void testLast() {
assertEquals(Optional.of(3), MoreLists.last(List.of(1, 2, 3)));
assertEquals(Optional.empty(), MoreLists.last(List.of()));

List<Integer> ints = new ArrayList<>();
ints.add(1);
ints.add(2);
ints.add(null);
assertEquals(Optional.empty(), MoreLists.last(ints));
}

@Test
void testReverse() {
final ArrayList<Integer> originalList = Lists.newArrayList(1, 2, 3);
assertEquals(List.of(3, 2, 1), MoreLists.reversed(originalList));
assertEquals(List.of(1, 2, 3), originalList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.2",
"dockerImageTag": "0.3.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.3.4",
"dockerImageTag": "0.3.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.5",
"dockerImageTag": "0.2.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.5",
"dockerImageTag": "0.2.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.5",
"dockerImageTag": "0.2.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "d4353156-9217-4cad-8dd7-c108fd4f74cf",
"name": "MS SQL Server",
"dockerRepository": "airbyte/destination-mssql",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.7",
"dockerImageTag": "0.3.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.5
dockerImageTag: 0.2.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.5
dockerImageTag: 0.2.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
Expand All @@ -27,21 +27,21 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.7
dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.5
dockerImageTag: 0.2.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
dockerRepository: airbyte/destination-mssql
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum ConfigSchema {
OPERATOR_DBT_INPUT("OperatorDbtInput.yaml"),

STANDARD_SYNC_OUTPUT("StandardSyncOutput.yaml"),
REPLICATION_OUTPUT("ReplicationOutput.yaml"),

STATE("State.yaml");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ReplicationAttemptSummary.yaml
title: ReplicationAttemptSummary
type: object
required:
- status
- recordsSynced
- bytesSynced
- startTime
- endTime
additionalProperties: false
properties:
status:
"$ref": ReplicationStatus.yaml
recordsSynced:
type: integer
minValue: 0
bytesSynced:
type: integer
minValue: 0
startTime:
type: integer
endTime:
type: integer
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ReplicationOutput.yaml
title: ReplicationOutput
description: metadata summary of a replication attempt
type: object
additionalProperties: false
required:
- replicationAttemptSummary
- state
- output_catalog
properties:
replicationAttemptSummary:
"$ref": ReplicationAttemptSummary.yaml
state:
"$ref": State.yaml
output_catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ReplicationStatus.yaml
title: ReplicationStatus
additionalProperties: false
type: string
enum:
- completed
- failed
- cancelled
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ required:
additionalProperties: false
properties:
status:
type: string
enum:
- completed
- failed
- cancelled
"$ref": ReplicationStatus.yaml
recordsSynced:
type: integer
minValue: 0
Expand Down
Loading

0 comments on commit aa6afb7

Please sign in to comment.