From c51fa32230800db87f15c71517071784fb883efb Mon Sep 17 00:00:00 2001 From: Rita Filatova Date: Fri, 6 Dec 2024 14:32:04 +0300 Subject: [PATCH 1/3] add bundle selections schema, add bundle selection stream to the manifest --- .../source_recharge/manifest.yaml | 9 +++ .../schemas/bundle_selections.json | 63 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml b/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml index b170940e9a90a..2f6762ec4af92 100644 --- a/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/manifest.yaml @@ -280,6 +280,14 @@ definitions: $parameters: name: "events" data_path: "events" + # BUNDLE SELECTIONS + bundle_selections_stream: + description: >- + Bundle Selections Stream: https://developer.rechargepayments.com/2021-11/bundle_selections/bundle_selection_list + $ref: "#/definitions/base_incremental_stream" + $parameters: + name: "bundle_selections" + data_path: "bundle_selections" streams: - "#/definitions/addresses_stream" @@ -293,6 +301,7 @@ streams: - "#/definitions/shop_stream" - "#/definitions/subscriptions_stream" - "#/definitions/events_stream" + - "#/definitions/bundle_selections_stream" # The `orders` stream remains implemented in `streams.py` due to: # 1. Inability to resolve `$ref` conditionally # 2. Inability to dynamically switch between paginators (diff api versions, require diff pagination approach) (or create the CustomPaginator component) diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json b/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json new file mode 100644 index 0000000000000..f027a83438735 --- /dev/null +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json @@ -0,0 +1,63 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["integer"] + }, + "bundle_variant": { + "type": ["integer"] + }, + "purchase_item_id": { + "type": ["integer"] + }, + "created_at": { + "type": ["string"], + "format": "date-time" + }, + "external_product_id": { + "type": ["string"] + }, + "external_variant_id": { + "type": ["string"] + }, + "items": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "id": { + "type": ["integer"] + }, + "collection_id": { + "type": ["string"] + }, + "collection_source": { + "type": ["string"] + }, + "created_at": { + "type": ["string"], + "format": "date-time" + }, + "external_product_id": { + "type": ["string"] + }, + "external_variant_id": { + "type": ["string"] + }, + "quantity": { + "type": ["integer"] + }, + "updated_at": { + "type": ["string"], + "format": "date-time" + } + } + } + }, + "updated_at": { + "type": ["string"], + "format": "date-time" + } + } +} From 72576131283d66339b0f174db09412b17916e9bc Mon Sep 17 00:00:00 2001 From: Rita Filatova Date: Mon, 9 Dec 2024 11:43:57 +0300 Subject: [PATCH 2/3] add tests --- .../acceptance-test-config.yml | 4 + .../integration_tests/abnormal_state.json | 7 ++ .../integration_tests/configured_catalog.json | 13 +++ .../streams_with_output_records_catalog.json | 13 +++ .../sample_files/sample_state.json | 3 + .../streams/test_bundle_selections.py | 87 ++++++++++++++++++ .../http/response/bundle_selections.json | 89 +++++++++++++++++++ .../unit_tests/test_streams.py | 2 +- 8 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-recharge/unit_tests/integration/streams/test_bundle_selections.py create mode 100644 airbyte-integrations/connectors/source-recharge/unit_tests/resource/http/response/bundle_selections.json diff --git a/airbyte-integrations/connectors/source-recharge/acceptance-test-config.yml b/airbyte-integrations/connectors/source-recharge/acceptance-test-config.yml index e749ad84372ec..cbaea31448498 100644 --- a/airbyte-integrations/connectors/source-recharge/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-recharge/acceptance-test-config.yml @@ -11,6 +11,8 @@ acceptance_tests: bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" - name: events bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" + - name: bundle_selections + bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" - name: shop bypass_reason: "The stream is not empty, but the primary key is the entire record, so it is constantly changing" expect_records: @@ -25,6 +27,8 @@ acceptance_tests: bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" - name: events bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" + - name: bundle_selections + bypass_reason: "The stream is tested with `Integration Tests`, since no data is available" - name: shop bypass_reason: "The stream is not empty, but the primary key is the entire record, so it is constantly changing" expect_records: diff --git a/airbyte-integrations/connectors/source-recharge/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-recharge/integration_tests/abnormal_state.json index 47519c9adc8b8..bef7bce52ee08 100644 --- a/airbyte-integrations/connectors/source-recharge/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-recharge/integration_tests/abnormal_state.json @@ -54,5 +54,12 @@ "stream_state": { "created_at": "2050-05-18T00:00:00Z" }, "stream_descriptor": { "name": "events" } } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { "updated_at": "2050-05-18T00:00:00Z" }, + "stream_descriptor": { "name": "bundle_selections" } + } } ] diff --git a/airbyte-integrations/connectors/source-recharge/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-recharge/integration_tests/configured_catalog.json index 5132b71311ba0..d30546acb6174 100644 --- a/airbyte-integrations/connectors/source-recharge/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-recharge/integration_tests/configured_catalog.json @@ -143,6 +143,19 @@ }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "bundle_selections", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["updated_at"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append", + "cursor_field": ["updated_at"] } ] } diff --git a/airbyte-integrations/connectors/source-recharge/integration_tests/streams_with_output_records_catalog.json b/airbyte-integrations/connectors/source-recharge/integration_tests/streams_with_output_records_catalog.json index 9ff989eb5e0ba..41696ba5b2f77 100644 --- a/airbyte-integrations/connectors/source-recharge/integration_tests/streams_with_output_records_catalog.json +++ b/airbyte-integrations/connectors/source-recharge/integration_tests/streams_with_output_records_catalog.json @@ -130,6 +130,19 @@ }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "bundle_selections", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["updated_at"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append", + "cursor_field": ["updated_at"] } ] } diff --git a/airbyte-integrations/connectors/source-recharge/sample_files/sample_state.json b/airbyte-integrations/connectors/source-recharge/sample_files/sample_state.json index 6ece9affd1eb5..7993da59a6a27 100644 --- a/airbyte-integrations/connectors/source-recharge/sample_files/sample_state.json +++ b/airbyte-integrations/connectors/source-recharge/sample_files/sample_state.json @@ -22,5 +22,8 @@ }, "events": { "created_at": "2021-04-02T00:00:00" + }, + "bundle_selections": { + "updated_at": "2021-04-02T00:00:00" } } diff --git a/airbyte-integrations/connectors/source-recharge/unit_tests/integration/streams/test_bundle_selections.py b/airbyte-integrations/connectors/source-recharge/unit_tests/integration/streams/test_bundle_selections.py new file mode 100644 index 0000000000000..706818143e74f --- /dev/null +++ b/airbyte-integrations/connectors/source-recharge/unit_tests/integration/streams/test_bundle_selections.py @@ -0,0 +1,87 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + + +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.mock_http import HttpMocker + +from ..config import NOW, START_DATE +from ..response_builder import NEXT_PAGE_TOKEN, get_stream_record, get_stream_response +from ..utils import StreamTestCase, config, get_cursor_value_from_state_message, read_full_refresh, read_incremental + +_STREAM_NAME = "bundle_selections" +_CURSOR_FIELD = "updated_at" + + +@freezegun.freeze_time(NOW.isoformat()) +class TestFullRefresh(StreamTestCase): + _STREAM_NAME = "bundle_selections" + + @HttpMocker() + def test_given_one_page_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + self.stream_request().with_limit(250).with_updated_at_min(START_DATE).build(), + get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(), + ) + output = read_full_refresh(self._config, _STREAM_NAME) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_multiple_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + + http_mocker.get( + self.stream_request().with_limit(250).with_next_page_token(NEXT_PAGE_TOKEN).build(), + get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(), + ) + http_mocker.get( + self.stream_request().with_limit(250).with_updated_at_min(START_DATE).build(), + get_stream_response(_STREAM_NAME).with_pagination().with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(), + ) + + output = read_full_refresh(self._config, _STREAM_NAME) + assert len(output.records) == 2 + + +@freezegun.freeze_time(NOW.isoformat()) +class TestIncremental(StreamTestCase): + _STREAM_NAME = "bundle_selections" + + @HttpMocker() + def test_state_message_produced_while_read_and_state_match_latest_record(self, http_mocker: HttpMocker) -> None: + min_cursor_value = "2024-01-01T00:00:00+00:00" + max_cursor_value = "2024-02-01T00:00:00+00:00" + + http_mocker.get( + self.stream_request().with_limit(250).with_updated_at_min(START_DATE).build(), + get_stream_response(_STREAM_NAME) + .with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(min_cursor_value)) + .with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(max_cursor_value)) + .build(), + ) + + output = read_incremental(self._config, _STREAM_NAME) + test_cursor_value = get_cursor_value_from_state_message(output, _CURSOR_FIELD) + assert test_cursor_value == max_cursor_value + + @HttpMocker() + def test_given_multiple_pages_when_read_then_return_records_with_state(self, http_mocker: HttpMocker) -> None: + min_cursor_value = "2024-01-01T00:00:00+00:00" + max_cursor_value = "2024-02-01T00:00:00+00:00" + http_mocker.get( + self.stream_request().with_limit(250).with_next_page_token(NEXT_PAGE_TOKEN).build(), + get_stream_response(_STREAM_NAME).with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD)).build(), + ) + http_mocker.get( + self.stream_request().with_limit(250).with_updated_at_min(START_DATE).build(), + get_stream_response(_STREAM_NAME) + .with_pagination() + .with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(min_cursor_value)) + .with_record(get_stream_record(_STREAM_NAME, "id", _CURSOR_FIELD).with_cursor(max_cursor_value)) + .build(), + ) + + output = read_incremental(self._config, _STREAM_NAME) + assert len(output.records) == 3 diff --git a/airbyte-integrations/connectors/source-recharge/unit_tests/resource/http/response/bundle_selections.json b/airbyte-integrations/connectors/source-recharge/unit_tests/resource/http/response/bundle_selections.json new file mode 100644 index 0000000000000..183beb4ac9438 --- /dev/null +++ b/airbyte-integrations/connectors/source-recharge/unit_tests/resource/http/response/bundle_selections.json @@ -0,0 +1,89 @@ +{ + "next_cursor": null, + "previous_cursor": null, + "bundle_selections": [ + { + "id": 1, + "bundle_variant_id": 1, + "charge_id": null, + "order_id": null, + "purchase_item_id": 1, + "created_at": "2024-12-09T08:11:01+00:00", + "external_product_id": "1234567890123", + "external_variant_id": "12345", + "is_fallback": false, + "items": [ + { + "id": 1, + "collection_id": "2", + "collection_source": "test", + "created_at": "2024-12-09T08:11:01+00:00", + "external_product_id": "1", + "external_variant_id": "2", + "price": "10.00", + "quantity": 1, + "updated_at": "2024-12-09T08:11:01+00:00" + }, + { + "id": 2, + "collection_id": "2", + "collection_source": "test", + "created_at": "2024-12-09T08:11:01+00:00", + "external_product_id": "7625137913910", + "external_variant_id": "42941000745014", + "price": "11.00", + "quantity": 1, + "updated_at": "2024-12-09T08:11:01+00:00" + }, + { + "id": 3, + "collection_id": "1", + "collection_source": "test", + "created_at": "2024-12-09T08:11:01+00:00", + "external_product_id": "1", + "external_variant_id": "2", + "price": "12.00", + "quantity": 2, + "updated_at": "2024-12-09T08:11:01+00:00" + }, + { + "id": 4, + "collection_id": "3", + "collection_source": "test", + "created_at": "2024-12-09T08:11:01+00:00", + "external_product_id": "4", + "external_variant_id": "2", + "price": "13.00", + "quantity": 2, + "updated_at": "2024-12-09T08:11:01+00:00" + } + ], + "updated_at": "2024-12-09T08:11:01+00:00" + }, + { + "id": 2, + "bundle_variant_id": 3, + "charge_id": null, + "order_id": null, + "purchase_item_id": 4, + "created_at": "2024-12-09T08:08:34+00:00", + "external_product_id": "1", + "external_variant_id": "2", + "is_fallback": false, + "items": [ + { + "id": 6, + "collection_id": "4", + "collection_source": "test", + "created_at": "2024-12-09T08:08:34+00:00", + "external_product_id": "2", + "external_variant_id": "5", + "price": "14.00", + "quantity": 1, + "updated_at": "2024-12-09T08:08:34+00:00" + } + ], + "updated_at": "2024-12-09T08:08:34+00:00" + } + ] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py index af3ba6d425b82..81ea99bc83140 100644 --- a/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py @@ -30,7 +30,7 @@ def test_get_auth_header(config) -> None: def test_streams(config) -> None: streams = SourceRecharge().streams(config) - assert len(streams) == 12 + assert len(streams) == 13 class TestCommon: From ab4c77c18f4c9df686d65e35188bc0af5eac820b Mon Sep 17 00:00:00 2001 From: Rita Filatova Date: Mon, 9 Dec 2024 13:49:01 +0300 Subject: [PATCH 3/3] add price field to schema definition --- .../source_recharge/schemas/bundle_selections.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json b/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json index f027a83438735..4b802543cc47e 100644 --- a/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/schemas/bundle_selections.json @@ -45,6 +45,9 @@ "external_variant_id": { "type": ["string"] }, + "price": { + "type": ["string"] + }, "quantity": { "type": ["integer"] },