diff --git a/.vscode/settings.json b/.vscode/settings.json index f033cb881e892..9ba677ef55a9a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -32,5 +32,6 @@ "[json]": { "editor.formatOnSave": true, "editor.defaultFormatter": "esbenp.prettier-vscode" - } + }, + "python.formatting.provider": "black" } diff --git a/airbyte-integrations/connectors/destination-deeplake/.dockerignore b/airbyte-integrations/connectors/destination-deeplake/.dockerignore new file mode 100644 index 0000000000000..d148c9e5a512a --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/.dockerignore @@ -0,0 +1,5 @@ +* +!Dockerfile +!main.py +!destination_deeplake +!setup.py diff --git a/airbyte-integrations/connectors/destination-deeplake/Dockerfile b/airbyte-integrations/connectors/destination-deeplake/Dockerfile new file mode 100644 index 0000000000000..b127ed1ad8583 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.9.11-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY destination_deeplake ./destination_deeplake + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-deeplake diff --git a/airbyte-integrations/connectors/destination-deeplake/README.md b/airbyte-integrations/connectors/destination-deeplake/README.md new file mode 100644 index 0000000000000..65720ffda791b --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/README.md @@ -0,0 +1,132 @@ +# Deeplake Destination + +This is the repository for the Deeplake destination connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/deeplake). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-deeplake:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/deeplake) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_deeplake/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination deeplake test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/destination-deeplake:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-deeplake:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-deeplake:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-deeplake:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-deeplake:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Coming soon: + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-deeplake:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-deeplake:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. + +### Local tests + +``` +python main.py spec +python main.py check --config secrets/config.json +cat integration_tests/messages.jsonl | python main.py write --config secrets/config.json --catalog integration_tests/configured_catalog.json +docker build . -t airbyte/destination-deeplake:dev +``` \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-deeplake/build.gradle b/airbyte-integrations/connectors/destination-deeplake/build.gradle new file mode 100644 index 0000000000000..57630d070365b --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/build.gradle @@ -0,0 +1,8 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' +} + +airbytePython { + moduleDirectory 'destination_deeplake' +} diff --git a/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/__init__.py b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/__init__.py new file mode 100644 index 0000000000000..4f121655ecbc7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .destination import DestinationDeeplake + +__all__ = ["DestinationDeeplake"] diff --git a/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/destination.py b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/destination.py new file mode 100644 index 0000000000000..d0454866e0a59 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/destination.py @@ -0,0 +1,282 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from sre_constants import ANY +from typing import Any, Dict, Iterable, List, Mapping, Union + +from asyncio.log import logger +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, DestinationSyncMode, Status, Type + +import numpy as np +import hub +import json + + +class DestinationDeeplake(Destination): + + type_map = { + "integer": hub.htype.DEFAULT, + "number": hub.htype.DEFAULT, + "string": hub.htype.TEXT, + "array": hub.htype.DEFAULT, + "json": hub.htype.JSON, + "null": hub.htype.TEXT, + "object": hub.htype.JSON, + "boolean": hub.htype.DEFAULT, + } + + def map_types(self, dtype: Union[str, List]): + """ + Translate type into hub type, if it is complex then just output as a json + + Args: + type (Union[str, List]): the type specified by schemma + + Returns: + str: hub type + """ + if str(dtype) in hub.HTYPE_CONFIGURATIONS.keys(): + return str(dtype) + + if isinstance(dtype, list) or isinstance(dtype, dict) or dtype not in self.type_map: + dtype = "json" + return self.type_map[dtype] + + def denulify(self, element: Union[int, list, str], dtype: dict): + """ + Replace Nones on higher level with empty arrays or drop nones from arrays recursively + + Args: + element (Union[int, list, str]): element or cell + dtype (dict): specifies the type of the element + + Returns: + Union[int, list, str]: return the element + """ + if (dtype["type"] == "number") and element is None: + return np.array([]) + elif dtype["type"] == "string" and element is None: + return "" + elif dtype["type"] == "array": + return [self.denulify(x, dtype["items"]) for x in element if x is not None] + return element + + def process_row(self, data: dict, schema: Mapping[str, Any], transform=None) -> Iterable[Dict]: + """ + Fill in missing columns, apply transformations upon definition and clean up nulls + + Args: + data (dict): record.data + schema (Mapping[str, Any]): structure of the data + + Returns: + dict: cleaned sample to append into hub dataset + """ + tran_sample = eval(transform, {"row": data, "hub": hub, "np": np}) if not transform is None else {} + + sample = {} + for column, definition in schema: + if transform is not None and column in tran_sample.keys(): + sample[column] = self.denulify(tran_sample[column], definition) + elif column in data: + sample[column] = self.denulify(data[column], definition) + else: + sample[column] = np.array([]) + + return sample + + def construct_schema(self, schema: Mapping[str, Any], config: Mapping[str, Any]) -> Iterable[Dict]: + """ + Construct schame from existing schema and provided configurations including overwrite_tensor_definitions and neglected_tensors + + Args: + schema (Mapping[str, Any]): structure of the data + config (Mapping[str, Any]): configs + + Returns: + dict: restructured schema + """ + new_schema = {} + for column_name, items in schema: + if "neglected_tensors" in config and column_name in config["neglected_tensors"]: + continue + new_schema[column_name] = items + + if "overwrite_tensor_definitions" in config: + tensor_defs = str(config["overwrite_tensor_definitions"]).replace("'", '"') + tensor_defs = json.loads(tensor_defs) + for column_name, value in tensor_defs.items(): + new_schema[column_name] = {"type": tensor_defs[column_name]["htype"], "kwargs": tensor_defs[column_name]} + + return new_schema.items() + + def load_datasets(self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog) -> Iterable[Dict]: + """ + Create or load datasets + + Args: + config (Mapping[str, Any]): configurations of datasets + configured_catalog (ConfiguredAirbyteCatalog): configurations of streams + + Returns: + Iterable[Dict]: stream dictionary that contain schema, sync_mode, dataset and cache + """ + streams = { + s.stream.name: {"schema": s.stream.json_schema["properties"].items(), "sync_mode": s.destination_sync_mode} + for s in configured_catalog.streams + } + + for name, schema in streams.items(): + print(f"Creating dataset at {config['path']}/{name} in sync={schema['sync_mode']}") + overwrite = schema["sync_mode"] == DestinationSyncMode.overwrite + token = config["token"] if "token" in config else None + + if hub.exists(f"{config['path']}/{name}", token=token): + ds = hub.load(f"{config['path']}/{name}", token=token, read_only=False) + else: + ds = hub.empty(f"{config['path']}/{name}", overwrite=overwrite, token=token) + + self.setup_activeloop_creds(ds, config) + + # construct schema + schema["schema"] = self.construct_schema(schema["schema"], config) + + with ds: + for column_name, definition in schema["schema"]: + + htype = self.map_types(definition["type"]) + if overwrite and column_name in ds.tensors: + ds.delete_tensor(column_name, large_ok=True) + + if column_name not in ds.tensors: + if "kwargs" in definition: + kwargs = definition["kwargs"] + ds.create_tensor(column_name, **kwargs) + logger.info(f"Created tensor {column_name} with overwritten arguments {kwargs}") + else: + ds.create_tensor( + column_name, + htype=htype, + exist_ok=True, + create_shape_tensor=False, + create_sample_info_tensor=False, + create_id_tensor=False, + ) + logger.info(f"Created tensor {column_name} with htype {htype}") + + print(f"Loaded {name} dataset") + streams[name]["ds"] = ds + streams[name]["cache"] = [] + + return streams + + def flush(self, streams: Iterable[Dict]): + """ + Flushes the cache into datasets + + Args: + streams (Iterable[Dict]): _description_ + """ + for name, stream in streams.items(): + length = len(stream["cache"]) + + if length == 0: + continue + cache = {column_name: [row[column_name] for row in stream["cache"]] for column_name, _ in stream["schema"]} + + with stream["ds"] as ds: + for column_name, column in cache.items(): + ds[column_name].extend(column) + ds.commit(f"appended {length} rows", allow_empty=True) + + print(f"Appended into {name} {length} rows") + stream["cache"] = [] + + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + + """ + Reads the input stream of messages, config, and catalog to write data to the destination. + + This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received + in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been + successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, + then the source is given the last state message output from this method as the starting point of the next sync. + + :param config: dict of JSON configuration matching the configuration declared in spec.json + :param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the + destination + :param input_messages: The stream of input messages received from the source + :return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs + """ + streams = self.load_datasets(config, configured_catalog) + + transform = compile(config["custom_transform"], "", "eval") if "custom_transform" in config else None + + for message in input_messages: + if message.type == Type.STATE: + self.flush(streams) + yield message + + elif message.type == Type.RECORD: + record = message.record + sample = self.process_row(record.data, streams[record.stream]["schema"], transform) + streams[record.stream]["cache"].append(sample) + else: + # ignore other message types for now + continue + + self.flush(streams) + + def setup_activeloop_creds(self, ds: hub.Dataset, config: Mapping[str, Any], managed=False): + if "managed_creds" in config: + existing_keys = ds.get_creds_keys() + for creds_name in config["managed_creds"]: + if creds_name not in existing_keys: + ds.add_creds_key(creds_name, managed=managed) + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the destination with the needed permissions + e.g: if a provided API token or password can be used to connect and write to the destination. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this destination, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteConnectionStatus indicating a Success or Failure + """ + try: + token = config["token"] if "token" in config else None + path = f"{config['path']}/_airbyte_test" + ds = hub.empty(path, token=token, overwrite=True) + + self.setup_activeloop_creds(ds, config) + + if "overwrite_tensor_definitions" in config: + tensor_defs = str(config["overwrite_tensor_definitions"]).replace("'", '"') + tensor_defs = json.loads(tensor_defs) + for tensor_name, kwargs in tensor_defs.items(): + ds.create_tensor(tensor_name, **kwargs) + logger.info(msg=f"Tensor named '{tensor_name}' with args {kwargs} will be updated") + + if "neglected_tensors" in config: + for tensor_name in config["neglected_tensors"]: + logger.info(msg=f"Tensor named '{tensor_name}' will be neglected") + + if "custom_transform" in config: + # parse python code here + compile(config["custom_transform"], "", "eval") + logger.info("successfully compiled the transformation") + ds.delete() + + # TODO add more exhaustive checks for parameters + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") diff --git a/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/spec.json b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/spec.json new file mode 100644 index 0000000000000..78f94bda3b180 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/destination_deeplake/spec.json @@ -0,0 +1,63 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/deeplake", + "supported_destination_sync_modes": ["overwrite", "append"], + "supportsIncremental": true, + "supportsDBT": false, + "supportsNormalization": false, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination Deeplake", + "type": "object", + "required": ["path"], + "additionalProperties": false, + "properties": { + "path": { + "title": "Path", + "type": "string", + "description": "Destination", + "order": 1, + "examples": ["hub://{USERNAME}", "s3://path/to/dataset"] + }, + "token": { + "title": "Token", + "type": "string", + "description": "token provided from app.activeloop.ai", + "order": 2, + "airbyte_secret": true, + "examples": ["{API TOKEN}"] + }, + "managed_creds": { + "title": "Managed Credentials in Activeloop (optional, advanced)", + "type": "array", + "description": "Credentials that would be used for accessing remote images", + "order": 5, + "examples": [["creds_1", "creds_2"]] + }, + "overwrite_tensor_definitions": { + "title": "Overwrite tensor definitions (optional)", + "type": "string", + "description": "If tensor definition written, then it would overwrite upon creation", + "order": 4, + "multiline": true, + "examples": ["{\n'images': \n { \n 'htype': 'link[images]',\n 'sample_compression': 'jpeg'\n },\n}"] + }, + "neglected_tensors": { + "title": "Neglected Tensors (optional)", + "type": "array", + "description": "Tensor names specified here will be neglected after transformation happens", + "order": 5, + "examples": [["column_name_1", "column_name_2"]] + }, + "custom_transform": { + "title": "Custom Transformation (optional, advanced)", + "type": "string", + "multiline": true, + "description": "Custom python row-wise transformation as a dictionary. If column name is not specified then it will assume 1:1 mapping. For each tensor added make sure to have definition overwritten.", + "order": 6, + "examples": ["{\n 'column_name': int(row['column_name'])+1, \n 'images': hub.link('https://localhost:8000?img_id=' + row['image_url'], 'creds')}"] + } + } + } +} + + diff --git a/airbyte-integrations/connectors/destination-deeplake/integration_tests/config.json b/airbyte-integrations/connectors/destination-deeplake/integration_tests/config.json new file mode 100644 index 0000000000000..9d1ef3e87dd94 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/integration_tests/config.json @@ -0,0 +1,7 @@ +{ + "path": "mem://username", + "managed_creds": ["cred_1", "creds_2"], + "overwrite_tensor_definitions": "{'images': {'htype': 'link[image]', 'sample_compression': 'jpeg'}}", + "neglected_tensors": ["labels", "column5"], + "custom_transform": "{\n 'column1': str(row['column1'])+'1' if 'column1' in row else None, \n 'images': hub.link('https://filesamples.com/samples/image/jpeg/sample_640%C3%97426.jpeg')}" +} diff --git a/airbyte-integrations/connectors/destination-deeplake/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/destination-deeplake/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..a6f78b6ff00ca --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/integration_tests/configured_catalog.json @@ -0,0 +1,345 @@ +{ + "streams": [ + { + "stream": { + "name": "airbyte_acceptance_table", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": { + "type": "object", + "properties": { + "column1": { + "type": "string" + }, + "column2": { + "type": "number" + }, + "column3": { + "type": "string", + "format": "datetime", + "airbyte_type": "timestamp_without_timezone" + }, + "column4": { + "type": "number" + }, + "column5": { + "type": "array", + "items": { + "type": "integer" + } + } + } + } + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "nested_stream_with_complex_columns_resulting_into_long_names", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "date": { + "type": ["null", "string"] + }, + "partition": { + "type": ["null", "object"], + "properties": { + "double_array_data": { + "type": ["null", "array"], + "items": { + "type": ["null", "array"], + "items": { + "properties": { + "id": { + "type": ["null", "string"] + } + } + } + } + }, + "DATA": { + "type": ["null", "array"], + "items": { + "properties": { + "currency": { + "type": ["null", "string"] + } + } + } + } + } + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "cursor_field": ["date"], + "destination_sync_mode": "append_dedup", + "primary_key": [["id"]] + }, + { + "stream": { + "name": "non_nested_stream_without_namespace_resulting_into_long_names", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "date": { + "type": ["null", "string"] + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "primary_key": [] + }, + { + "stream": { + "name": "some_stream_that_was_empty", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "date": { + "type": ["null", "string"] + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "cursor_field": ["date"], + "destination_sync_mode": "append_dedup", + "primary_key": [["id"]] + }, + { + "stream": { + "name": "simple_stream_with_namespace_resulting_into_long_names", + "namespace": "test_normalization_namespace", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "date": { + "type": ["null", "string"] + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append", + "primary_key": [] + }, + { + "stream": { + "name": "conflict_stream_name", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "conflict_stream_name": { + "type": ["null", "object"], + "properties": { + "conflict_stream_name": { + "type": "object", + "items": { + "type": "object", + "properties": { + "groups": { + "type": "string" + } + }, + "custom_fields": { + "items": { + "properties": { + "id": { + "type": ["null", "integer"] + }, + "value": {} + }, + "type": ["null", "object"] + }, + "type": ["null", "array"] + }, + "conflict_stream_name": { + "type": "integer" + } + } + } + } + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "primary_key": [] + }, + { + "stream": { + "name": "conflict_stream_scalar", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "conflict_stream_scalar": { + "type": "integer" + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "primary_key": [] + }, + { + "stream": { + "name": "conflict_stream_array", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": ["null", "number", "string"] + }, + "conflict_stream_array": { + "type": ["null", "array"], + "properties": { + "conflict_stream_name": { + "type": ["null", "array"], + "items": { + "properties": { + "id": { + "type": ["null", "integer"] + } + } + } + } + } + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "primary_key": [] + }, + { + "stream": { + "name": "unnest_alias", + "json_schema": { + "type": ["null", "object"], + "properties": { + "id": { + "type": "integer" + }, + "children": { + "type": ["null", "array"], + "items": { + "type": "object", + "properties": { + "ab_id": { + "type": ["null", "integer"] + }, + "owner": { + "type": ["null", "object"], + "properties": { + "owner_id": { + "type": ["null", "integer"] + }, + "column`_'with\"_quotes": { + "type": ["null", "array"], + "items": { + "properties": { + "currency": { + "type": ["null", "string"] + } + } + } + } + } + } + } + } + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "incremental", + "cursor_field": [], + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "arrays", + "json_schema": { + "type": ["null", "object"], + "properties": { + "array_of_strings": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + }, + "nested_array_parent": { + "type": ["null", "object"], + "properties": { + "nested_array": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + } + } + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": true, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "cursor_field": [], + "destination_sync_mode": "overwrite", + "primary_key": [] + } + ] +} diff --git a/airbyte-integrations/connectors/destination-deeplake/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-deeplake/integration_tests/integration_test.py new file mode 100644 index 0000000000000..cc8e8791a8789 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/integration_tests/integration_test.py @@ -0,0 +1,119 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json +import random +import string +import tempfile +from datetime import datetime +from typing import Dict +from unittest.mock import MagicMock + +import pytest +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Status, + SyncMode, + Type, +) +from destination_deeplake import DestinationDeeplake + +import hub + +@pytest.fixture(scope="module") +def local_file_config() -> Dict[str, str]: + path = tempfile.TemporaryDirectory() + yield {"path": f"{path.name}"} + path.cleanup() + + +@pytest.fixture(scope="module") +def test_table_name() -> str: + letters = string.ascii_lowercase + rand_string = "".join(random.choice(letters) for _ in range(10)) + return f"airbyte_integration_{rand_string}" + + +@pytest.fixture +def table_schema() -> str: + schema = {"type": "object", "properties": {"column1": {"type": ["null", "string"]}}} + return schema + + +@pytest.fixture +def configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredAirbyteCatalog: + append_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name=test_table_name, json_schema=table_schema), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + return ConfiguredAirbyteCatalog(streams=[append_stream]) + + +@pytest.fixture +def invalid_config() -> Dict[str, str]: + return {"path": "/sqlite.db"} + + +@pytest.fixture +def airbyte_message1(test_table_name: str): + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage(stream=test_table_name, data={"column1": 1}, emitted_at=int(datetime.now().timestamp()) * 1000), + ) + + +@pytest.fixture +def airbyte_message2(test_table_name: str): + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage(stream=test_table_name, data={"column1": 2}, emitted_at=int(datetime.now().timestamp()) * 1000), + ) + + +@pytest.mark.parametrize("config", ["invalid_config"]) +@pytest.mark.disable_autouse +def test_check_fails(config, request): + config = request.getfixturevalue(config) + destination = DestinationDeeplake() + status = destination.check(logger=MagicMock(), config=config) + assert status.status == Status.FAILED + + +@pytest.mark.parametrize("config", ["local_file_config"]) +def test_check_succeeds(config, request): + config = request.getfixturevalue(config) + destination = DestinationDeeplake() + status = destination.check(logger=MagicMock(), config=config) + assert status.status == Status.SUCCEEDED + + +@pytest.mark.parametrize("config", ["local_file_config"]) +def test_write( + config: Dict[str, str], + request, + configured_catalogue: ConfiguredAirbyteCatalog, + airbyte_message1: AirbyteMessage, + airbyte_message2: AirbyteMessage, + test_table_name: str, +): + config = request.getfixturevalue(config) + destination = DestinationDeeplake() + generator = destination.write( + config=config, configured_catalog=configured_catalogue, input_messages=[airbyte_message1, airbyte_message2] + ) + + result = list(generator) + assert len(result) == 0 + ds = hub.load(f'{config.get("path")}/{test_table_name}') + + column1 = ds["column1"].numpy() + assert len(ds) == 2 + assert column1[0] == airbyte_message1.record.data["column1"] + assert column1[1] == airbyte_message2.record.data["column1"] diff --git a/airbyte-integrations/connectors/destination-deeplake/integration_tests/invalid_config.json b/airbyte-integrations/connectors/destination-deeplake/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..0049db5ecd7e5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "path": "//something-not-valid" +} diff --git a/airbyte-integrations/connectors/destination-deeplake/integration_tests/messages.jsonl b/airbyte-integrations/connectors/destination-deeplake/integration_tests/messages.jsonl new file mode 100644 index 0000000000000..d6cdb85d88282 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/integration_tests/messages.jsonl @@ -0,0 +1,26 @@ +{"type": "RECORD", "record": {"stream": "airbyte_acceptance_table", "data": {"column1": "my_value", "column2": 221, "column3": "2021-01-01T20:10:22", "column4": 1.214, "column5": [1,2,3]}, "emitted_at": 1626172757000}} +{"type": "RECORD", "record": {"stream": "airbyte_acceptance_table", "data": {"column1": "my_value2", "column2": 222, "column3": "2021-01-02T22:10:22", "column5": [1,2,null]}, "emitted_at": 1626172757000}} +{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} + +{"type": "RECORD", "record": {"stream": "nested_stream_with_complex_columns_resulting_into_long_names", "emitted_at": 1602638599000, "data": { "id": 4.2, "date": "2020-08-29T00:00:00Z", "partition": { "double_array_data": [[ { "id": "EUR" } ]], "DATA": [ {"currency": "EUR" } ], "column`_'with\"_quotes": [ {"currency": "EUR" } ] } }}} +{"type": "RECORD", "record": {"stream": "nested_stream_with_complex_columns_resulting_into_long_names", "emitted_at": 1602638599100, "data": { "id": "test record", "date": "2020-08-31T00:00:00Z", "partition": { "double_array_data": [[ { "id": "USD" } ], [ { "id": "GBP" } ]], "DATA": [ {"currency": "EUR" } ], "column`_'with\"_quotes": [ {"currency": "EUR" } ] } }}} +{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} + +{"type":"RECORD","record":{"stream":"conflict_stream_name","data":{"id":1,"conflict_stream_name":{"conflict_stream_name": {"groups": "1", "custom_fields": [{"id":1, "value":3}, {"id":2, "value":4}], "conflict_stream_name": 3}}},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"conflict_stream_name","data":{"id":2,"conflict_stream_name":{"conflict_stream_name": {"groups": "2", "custom_fields": [{"id":1, "value":3}, {"id":2, "value":4}], "conflict_stream_name": 3}}},"emitted_at":1623861660}} +{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} + +{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} +{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} + +{"type":"RECORD","record":{"stream":"conflict_stream_array","data":{"id":1, "conflict_stream_array": {"conflict_stream_array": [{"id": 1}, {"id": 2}, {"id": 3}]}}, "emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"conflict_stream_array","data":{"id":2, "conflict_stream_array": {"conflict_stream_array": [{"id": 4}, {"id": 5}, {"id": 6}]}}, "emitted_at":1623861860}} + +{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":1,"conflict_stream_scalar": 2},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"conflict_stream_scalar","data":{"id":2,"conflict_stream_scalar": 2},"emitted_at":1623861660}} + +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":1, "children": [{"ab_id": 1, "owner": {"owner_id": 1, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 2, "owner": {"owner_id": 2, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"unnest_alias","data":{"id":2, "children": [{"ab_id": 3, "owner": {"owner_id": 3, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}},{"ab_id": 4, "owner": {"owner_id": 4, "column`_'with\"_quotes": [ {"currency": "EUR" } ]}}]},"emitted_at":1623861660}} +{"type":"RECORD","record":{"stream":"arrays","emitted_at":1602638599000,"data":{"array_of_strings":["string1",null,"string2","string3"],"nested_array_parent":{"nested_array":["string1",null,"string2"]}}}} +{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-deeplake/main.py b/airbyte-integrations/connectors/destination-deeplake/main.py new file mode 100644 index 0000000000000..441621f6affd1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/main.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from destination_deeplake import DestinationDeeplake + +if __name__ == "__main__": + DestinationDeeplake().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-deeplake/requirements.txt b/airbyte-integrations/connectors/destination-deeplake/requirements.txt new file mode 100644 index 0000000000000..d6e1198b1ab1f --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-deeplake/setup.py b/airbyte-integrations/connectors/destination-deeplake/setup.py new file mode 100644 index 0000000000000..8c2ad7c58746c --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/setup.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "hub", + "airbyte-cdk", +] + +TEST_REQUIREMENTS = ["pytest~=6.1"] + +setup( + name="destination_deeplake", + description="Destination implementation for Deeplake.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-deeplake/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-deeplake/unit_tests/unit_test.py new file mode 100644 index 0000000000000..dddaea0060fa1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-deeplake/unit_tests/unit_test.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +def test_example_method(): + assert True diff --git a/docs/integrations/destinations/deeplake.md b/docs/integrations/destinations/deeplake.md new file mode 100644 index 0000000000000..b4db27f773240 --- /dev/null +++ b/docs/integrations/destinations/deeplake.md @@ -0,0 +1,32 @@ +# Activeloop Lakehouse + +## Overview +This destination syncs data to Deep Lake on Activeloop. Each stream is written to its own dep lake dataset. + +## Sync Mode + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the path. | +| Incremental - Append Sync | ✅ | | +| Incremental - Deduped History | ❌ | | +| Namespaces | ❌| | + +## Configuration + +| Category | Parameter | Type | Notes | +|:-----------------|:----------------------|:-------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Path | destination | string | DeepLake path dataset including hub://..., s3://... +| Token | API token | string | Activeloop api token generated + +## Output Schema + +Each dataset will have the following columns as tensors: + +| Tensor | Type | Notes | +| :--- | :---: | :--- | +| Data fields from the source stream | various | | + +Under the hood, the data that is empty is replaced and nulls transformed into hub compatible Nones such as empty arrays or dropped if they are in nested structures. + +## ChangeLog \ No newline at end of file