Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: Use a single source of truth for built-in capabilities
Browse files Browse the repository at this point in the history
edgarrmondragon committed Jul 1, 2024
1 parent 4674b3f commit 93bbe08
Showing 19 changed files with 709 additions and 523 deletions.
17 changes: 17 additions & 0 deletions docs/builtin.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Built-in Settings and Capabilities
==================================

.. currentmodule:: singer_sdk.helpers.capabilities

The Singer SDK library provides a number of built-in settings and capabilities.

.. autodata:: ADD_RECORD_METADATA
:no-value:

.. autoattribute:: ADD_RECORD_METADATA.schema

.. autodata:: BATCH
:no-value:

.. autoattribute:: BATCH.schema
.. autoattribute:: BATCH.capability
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ within the `#singer-tap-development`_ and `#singer-target-development`_ Slack ch
implementation/index
typing
capabilities
builtin

.. toctree::
:caption: Advanced Concepts
4 changes: 2 additions & 2 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers.capabilities import TargetLoadMethods
from singer_sdk.helpers import capabilities

if t.TYPE_CHECKING:
from sqlalchemy.engine import Engine
@@ -779,7 +779,7 @@ def prepare_table(
as_temp_table=as_temp_table,
)
return
if self.config["load_method"] == TargetLoadMethods.OVERWRITE:
if self.config["load_method"] == capabilities.TargetLoadMethods.OVERWRITE:
self.get_table(full_table_name=full_table_name).drop(self._engine)
self.create_empty_table(
full_table_name=full_table_name,
375 changes: 0 additions & 375 deletions singer_sdk/helpers/capabilities.py

This file was deleted.

74 changes: 74 additions & 0 deletions singer_sdk/helpers/capabilities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Module with helpers to declare capabilities and plugin behavior."""

from __future__ import annotations

from singer_sdk.helpers.capabilities import _schema as schema
from singer_sdk.helpers.capabilities._builtin import Builtin
from singer_sdk.helpers.capabilities._config_property import ConfigProperty
from singer_sdk.helpers.capabilities._enum import (
CapabilitiesEnum,
PluginCapabilities,
TapCapabilities,
TargetCapabilities,
TargetLoadMethods,
)

__all__ = [
"ADD_RECORD_METADATA",
"BATCH",
"FLATTENING",
"STREAM_MAPS",
"TARGET_BATCH_SIZE_ROWS",
"TARGET_HARD_DELETE",
"TARGET_LOAD_METHOD",
"TARGET_SCHEMA",
"TARGET_VALIDATE_RECORDS",
"CapabilitiesEnum",
"ConfigProperty",
"PluginCapabilities",
"TapCapabilities",
"TargetCapabilities",
"TargetLoadMethods",
]

#: Add metadata to records.
#:
#: Example:
#:
#: .. code-block:: json
#:
#: {
#: "add_record_metadata": true
#: }
#:
ADD_RECORD_METADATA = Builtin(schema=schema.ADD_RECORD_METADATA_CONFIG)

#: For taps, support emitting BATCH messages. For targets, support consuming BATCH
#: messages.
BATCH = Builtin(
schema=schema.BATCH_CONFIG,
capability=PluginCapabilities.BATCH,
)

FLATTENING = Builtin(
schema=schema.FLATTENING_CONFIG,
capability=PluginCapabilities.FLATTENING,
)
STREAM_MAPS = Builtin(
schema.STREAM_MAPS_CONFIG,
capability=PluginCapabilities.STREAM_MAPS,
)
TARGET_BATCH_SIZE_ROWS = Builtin(schema=schema.TARGET_BATCH_SIZE_ROWS_CONFIG)
TARGET_HARD_DELETE = Builtin(
schema=schema.TARGET_HARD_DELETE_CONFIG,
capability=TargetCapabilities.HARD_DELETE,
)
TARGET_LOAD_METHOD = Builtin(schema=schema.TARGET_LOAD_METHOD_CONFIG)
TARGET_SCHEMA = Builtin(
schema=schema.TARGET_SCHEMA_CONFIG,
capability=TargetCapabilities.TARGET_SCHEMA,
)
TARGET_VALIDATE_RECORDS = Builtin(
schema=schema.TARGET_VALIDATE_RECORDS_CONFIG,
capability=TargetCapabilities.VALIDATE_RECORDS,
)
49 changes: 49 additions & 0 deletions singer_sdk/helpers/capabilities/_builtin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

import typing as t

from ._config_property import ConfigProperty

if t.TYPE_CHECKING:
from ._enum import CapabilitiesEnum

_T = t.TypeVar("_T")


class Builtin:
"""Use this class to define built-in setting(s) for a plugin."""

def __init__(
self,
schema: dict[str, t.Any],
*,
capability: CapabilitiesEnum | None = None,
**kwargs: t.Any,
):
"""Initialize the descriptor.
Args:
schema: The JSON schema for the setting.
capability: The capability that the setting is associated with.
kwargs: Additional keyword arguments.
"""
self.schema = schema
self.capability = capability
self.kwargs = kwargs

def attribute( # noqa: PLR6301
self,
custom_key: str | None = None,
*,
default: _T | None = None,
) -> ConfigProperty[_T]:
"""Generate a class attribute for the setting.
Args:
custom_key: Custom key to use in the config.
default: Default value for the setting.
Returns:
Class attribute for the setting.
"""
return ConfigProperty(custom_key=custom_key, default=default)
41 changes: 41 additions & 0 deletions singer_sdk/helpers/capabilities/_config_property.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

import typing as t

T = t.TypeVar("T")


class ConfigProperty(t.Generic[T]):
"""A descriptor that gets a value from a named key of the config attribute."""

def __init__(self, custom_key: str | None = None, *, default: T | None = None):
"""Initialize the descriptor.
Args:
custom_key: The key to get from the config attribute instead of the
attribute name.
default: The default value if the key is not found.
"""
self.key = custom_key
self.default = default

def __set_name__(self, owner, name: str) -> None: # noqa: ANN001
"""Set the name of the attribute.
Args:
owner: The class of the object.
name: The name of the attribute.
"""
self.key = self.key or name

def __get__(self, instance, owner) -> T | None: # noqa: ANN001
"""Get the value from the instance's config attribute.
Args:
instance: The instance of the object.
owner: The class of the object.
Returns:
The value from the config attribute.
"""
return instance.config.get(self.key, self.default) # type: ignore[no-any-return]
200 changes: 200 additions & 0 deletions singer_sdk/helpers/capabilities/_enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from __future__ import annotations

import enum
import typing as t
import warnings

_EnumMemberT = t.TypeVar("_EnumMemberT")


class TargetLoadMethods(str, enum.Enum):
"""Target-specific capabilities."""

# always write all input records whether that records already exists or not
APPEND_ONLY = "append-only"

# update existing records and insert new records
UPSERT = "upsert"

# delete all existing records and insert all input records
OVERWRITE = "overwrite"


class DeprecatedEnum(enum.Enum):
"""Base class for capabilities enumeration."""

def __new__(
cls,
value: _EnumMemberT,
deprecation: str | None = None,
) -> DeprecatedEnum:
"""Create a new enum member.
Args:
value: Enum member value.
deprecation: Deprecation message.
Returns:
An enum member value.
"""
member: DeprecatedEnum = object.__new__(cls)
member._value_ = value
member.deprecation = deprecation
return member

@property
def deprecation_message(self) -> str | None:
"""Get deprecation message.
Returns:
Deprecation message.
"""
self.deprecation: str | None
return self.deprecation

def emit_warning(self) -> None:
"""Emit deprecation warning."""
warnings.warn(
f"{self.name} is deprecated. {self.deprecation_message}",
DeprecationWarning,
stacklevel=3,
)


class DeprecatedEnumMeta(enum.EnumMeta):
"""Metaclass for enumeration with deprecation support."""

def __getitem__(self, name: str) -> t.Any: # noqa: ANN401
"""Retrieve mapping item.
Args:
name: Item name.
Returns:
Enum member.
"""
obj: enum.Enum = super().__getitem__(name)
if isinstance(obj, DeprecatedEnum) and obj.deprecation_message:
obj.emit_warning()
return obj

def __getattribute__(cls, name: str) -> t.Any: # noqa: ANN401, N805
"""Retrieve enum attribute.
Args:
name: Attribute name.
Returns:
Attribute.
"""
obj = super().__getattribute__(name)
if isinstance(obj, DeprecatedEnum) and obj.deprecation_message:
obj.emit_warning()
return obj

def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any: # noqa: ANN401
"""Call enum member.
Args:
args: Positional arguments.
kwargs: Keyword arguments.
Returns:
Enum member.
"""
obj = super().__call__(*args, **kwargs)
if isinstance(obj, DeprecatedEnum) and obj.deprecation_message:
obj.emit_warning()
return obj


class CapabilitiesEnum(DeprecatedEnum, metaclass=DeprecatedEnumMeta):
"""Base capabilities enumeration."""

def __str__(self) -> str:
"""String representation.
Returns:
Stringified enum value.
"""
return str(self.value)

def __repr__(self) -> str:
"""String representation.
Returns:
Stringified enum value.
"""
return str(self.value)


class PluginCapabilities(CapabilitiesEnum):
"""Core capabilities which can be supported by taps and targets."""

#: Support plugin capability and setting discovery.
ABOUT = "about"

#: Support :doc:`inline stream map transforms</stream_maps>`.
STREAM_MAPS = "stream-maps"

#: Support schema flattening, aka de-nesting of complex properties.
FLATTENING = "schema-flattening"

#: Support the
#: `ACTIVATE_VERSION <https://hub.meltano.com/singer/docs#activate-version>`_
#: extension.
ACTIVATE_VERSION = "activate-version"

#: Input and output from
#: `batched files <https://hub.meltano.com/singer/docs#batch>`_.
#: A.K.A ``FAST_SYNC``.
BATCH = "batch"


class TapCapabilities(CapabilitiesEnum):
"""Tap-specific capabilities."""

#: Generate a catalog with `--discover`.
DISCOVER = "discover"

#: Accept input catalog, apply metadata and selection rules.
CATALOG = "catalog"

#: Incremental refresh by means of state tracking.
STATE = "state"

#: Automatic connectivity and stream init test via :ref:`--test<Test connectivity>`.
TEST = "test"

#: Support for ``replication_method: LOG_BASED``. You can read more about this
#: feature in `MeltanoHub <https://hub.meltano.com/singer/docs#log-based>`_.
LOG_BASED = "log-based"

#: Deprecated. Please use :attr:`~TapCapabilities.CATALOG` instead.
PROPERTIES = "properties", "Please use CATALOG instead."


class TargetCapabilities(CapabilitiesEnum):
"""Target-specific capabilities."""

#: Allows a ``soft_delete=True`` config option.
#: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION`
#: and/or :attr:`TapCapabilities.LOG_BASED`.
SOFT_DELETE = "soft-delete"

#: Allows a ``hard_delete=True`` config option.
#: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION`
#: and/or :attr:`TapCapabilities.LOG_BASED`.
HARD_DELETE = "hard-delete"

#: Fail safe for unknown JSON Schema types.
DATATYPE_FAILSAFE = "datatype-failsafe"

#: Allow de-nesting complex properties.
RECORD_FLATTENING = "record-flattening"

#: Allow setting the target schema.
TARGET_SCHEMA = "target-schema"

#: Validate the schema of the incoming records.
VALIDATE_RECORDS = "validate-records"
182 changes: 182 additions & 0 deletions singer_sdk/helpers/capabilities/_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Default JSON Schema to support config for built-in capabilities."""

from __future__ import annotations

from singer_sdk.typing import (
ArrayType,
BooleanType,
IntegerType,
NumberType,
ObjectType,
OneOf,
PropertiesList,
Property,
StringType,
)

from ._enum import TargetLoadMethods

STREAM_MAPS_CONFIG = PropertiesList(
Property(
"stream_maps",
ObjectType(),
description=(
"Config object for stream maps capability. "
"For more information check out "
"[Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)."
),
),
Property(
"stream_map_config",
ObjectType(),
description="User-defined config values to be used within map expressions.",
),
Property(
"faker_config",
ObjectType(
Property(
"seed",
OneOf(NumberType, StringType, BooleanType),
description=(
"Value to seed the Faker generator for deterministic output: "
"https://faker.readthedocs.io/en/master/#seeding-the-generator"
),
),
Property(
"locale",
OneOf(StringType, ArrayType(StringType)),
description=(
"One or more LCID locale strings to produce localized output for: "
"https://faker.readthedocs.io/en/master/#localization"
),
),
),
description=(
"Config for the [`Faker`](https://faker.readthedocs.io/en/master/) "
"instance variable `fake` used within map expressions. Only applicable if "
"the plugin specifies `faker` as an additional dependency (through the "
"`singer-sdk` `faker` extra or directly)."
),
),
).to_dict()

FLATTENING_CONFIG = PropertiesList(
Property(
"flattening_enabled",
BooleanType(),
description=(
"'True' to enable schema flattening and automatically expand nested "
"properties."
),
),
Property(
"flattening_max_depth",
IntegerType(),
description="The max depth to flatten schemas.",
),
).to_dict()

BATCH_CONFIG = PropertiesList(
Property(
"batch_config",
description="",
wrapped=ObjectType(
Property(
"encoding",
description="Specifies the format and compression of the batch files.",
wrapped=ObjectType(
Property(
"format",
StringType,
allowed_values=["jsonl", "parquet"],
description="Format to use for batch files.",
),
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
description="Compression format to use for batch files.",
),
),
),
Property(
"storage",
description="Defines the storage layer to use when writing batch files",
wrapped=ObjectType(
Property(
"root",
StringType,
description="Root path to use when writing batch files.",
),
Property(
"prefix",
StringType,
description="Prefix to use when writing batch files.",
),
),
),
),
),
).to_dict()

TARGET_SCHEMA_CONFIG = PropertiesList(
Property(
"default_target_schema",
StringType(),
description="The default target database schema name to use for all streams.",
),
).to_dict()

ADD_RECORD_METADATA_CONFIG = PropertiesList(
Property(
"add_record_metadata",
BooleanType(),
description="Add metadata to records.",
),
).to_dict()

TARGET_HARD_DELETE_CONFIG = PropertiesList(
Property(
"hard_delete",
BooleanType(),
description="Hard delete records.",
default=False,
),
).to_dict()

TARGET_VALIDATE_RECORDS_CONFIG = PropertiesList(
Property(
"validate_records",
BooleanType(),
description="Whether to validate the schema of the incoming streams.",
default=True,
),
).to_dict()

TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList(
Property(
"batch_size_rows",
IntegerType,
description="Maximum number of rows in each batch.",
),
).to_dict()

TARGET_LOAD_METHOD_CONFIG = PropertiesList(
Property(
"load_method",
StringType(),
description=(
"The method to use when loading data into the destination. "
"`append-only` will always write all input records whether that records "
"already exists or not. `upsert` will update existing records and insert "
"new records. `overwrite` will delete all existing records and insert all "
"input records."
),
allowed_values=[
TargetLoadMethods.APPEND_ONLY,
TargetLoadMethods.UPSERT,
TargetLoadMethods.OVERWRITE,
],
default=TargetLoadMethods.APPEND_ONLY,
),
).to_dict()
2 changes: 1 addition & 1 deletion singer_sdk/mapper.py
Original file line number Diff line number Diff line change
@@ -626,7 +626,7 @@ def _init_faker_instance(self) -> Faker | None:


class PluginMapper:
"""Inline map tranformer."""
"""Inline map transformer."""

def __init__(
self,
15 changes: 4 additions & 11 deletions singer_sdk/mapper_base.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@

import click

from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities
from singer_sdk.io_base import SingerReader, SingerWriter
from singer_sdk.plugin_base import PluginBase
@@ -19,16 +18,10 @@
class InlineMapper(PluginBase, SingerReader, SingerWriter, metaclass=abc.ABCMeta):
"""Abstract base class for inline mappers."""

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301
"""Get capabilities.
Returns:
A list of plugin capabilities.
"""
return [
PluginCapabilities.STREAM_MAPS,
]
#: A list of plugin capabilities.
capabilities: t.ClassVar[list[CapabilitiesEnum]] = [
PluginCapabilities.STREAM_MAPS,
]

def _write_messages(self, messages: t.Iterable[singer.Message]) -> None:
for message in messages:
48 changes: 20 additions & 28 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
@@ -22,15 +22,10 @@
parse_environment_config,
)
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers import capabilities
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._secrets import SecretString, is_common_secret_key
from singer_sdk.helpers._util import read_json_file
from singer_sdk.helpers.capabilities import (
FLATTENING_CONFIG,
STREAM_MAPS_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
)
from singer_sdk.mapper import PluginMapper
from singer_sdk.typing import extend_validator_with_defaults

@@ -84,7 +79,7 @@ def invoke(self, ctx: click.Context) -> t.Any: # noqa: ANN401
sys.exit(1)


class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904
class PluginBase(metaclass=abc.ABCMeta):
"""Abstract base class for taps."""

#: The executable name of the tap or target plugin. e.g. tap-foo
@@ -98,6 +93,14 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904

_config: dict

#: Advertised built-in plugin capabilities. Developers may override this property
#: in order to add or remove advertised capabilities for this plugin.
capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [
capabilities.PluginCapabilities.STREAM_MAPS,
capabilities.PluginCapabilities.FLATTENING,
capabilities.PluginCapabilities.BATCH,
]

@classproperty
def logger(cls) -> logging.Logger: # noqa: N805
"""Get logger.
@@ -210,22 +213,6 @@ def initialized_at(self) -> int:
"""
return self.__initialized_at

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301
"""Get capabilities.
Developers may override this property in oder to add or remove
advertised capabilities for this plugin.
Returns:
A list of plugin capabilities.
"""
return [
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
PluginCapabilities.BATCH,
]

@classproperty
def _env_var_config(cls) -> dict[str, t.Any]: # noqa: N805
"""Return any config specified in environment variables.
@@ -443,12 +430,17 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non
Args:
config_jsonschema: [description]
"""
capabilities = cls.capabilities
if PluginCapabilities.STREAM_MAPS in capabilities:
merge_missing_config_jsonschema(STREAM_MAPS_CONFIG, config_jsonschema)
if capabilities.STREAM_MAPS.capability in cls.capabilities:
merge_missing_config_jsonschema(
capabilities.STREAM_MAPS.schema,
config_jsonschema,
)

if PluginCapabilities.FLATTENING in capabilities:
merge_missing_config_jsonschema(FLATTENING_CONFIG, config_jsonschema)
if capabilities.FLATTENING.capability in cls.capabilities:
merge_missing_config_jsonschema(
capabilities.FLATTENING.schema,
config_jsonschema,
)

@classmethod
def print_about(
31 changes: 12 additions & 19 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@
import json
import time
import typing as t
from functools import cached_property
from gzip import GzipFile
from gzip import open as gzip_open
from types import MappingProxyType
@@ -22,6 +21,7 @@
InvalidRecord,
MissingKeyPropertiesError,
)
from singer_sdk.helpers import capabilities
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
@@ -138,6 +138,17 @@ class Sink(metaclass=abc.ABCMeta): # noqa: PLR0904
fail_on_record_validation_exception: bool = True
"""Interrupt the target execution when a record fails schema validation."""

#: Enable JSON schema record validation.
validate_schema = capabilities.TARGET_VALIDATE_RECORDS.attribute(
"validate_records",
default=True,
)

include_sdc_metadata_properties = capabilities.ADD_RECORD_METADATA.attribute(
"add_record_metadata",
default=False,
)

def __init__(
self,
target: Target,
@@ -189,15 +200,6 @@ def __init__(

self._validator: BaseJSONSchemaValidator | None = self.get_validator()

@cached_property
def validate_schema(self) -> bool:
"""Enable JSON schema record validation.
Returns:
True if JSON schema validation is enabled.
"""
return self.config.get("validate_records", True)

def get_validator(self) -> BaseJSONSchemaValidator | None:
"""Get a record validator for this sink.
@@ -359,15 +361,6 @@ def batch_config(self) -> BatchConfig | None:
raw = self.config.get("batch_config")
return BatchConfig.from_dict(raw) if raw else None

@property
def include_sdc_metadata_properties(self) -> bool:
"""Check if metadata columns should be added.
Returns:
True if metadata columns should be added.
"""
return self.config.get("add_record_metadata", False)

@property
def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL.
45 changes: 17 additions & 28 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
@@ -17,16 +17,9 @@
AbortedSyncPausedException,
ConfigValidationError,
)
from singer_sdk.helpers import _state
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers import _state, capabilities
from singer_sdk.helpers._state import write_stream_state
from singer_sdk.helpers._util import read_json_file
from singer_sdk.helpers.capabilities import (
BATCH_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TapCapabilities,
)
from singer_sdk.io_base import SingerWriter
from singer_sdk.plugin_base import PluginBase

@@ -59,6 +52,17 @@ class Tap(PluginBase, SingerWriter, metaclass=abc.ABCMeta): # noqa: PLR0904
"""Whether the tap's catalog is dynamic. Set to True if the catalog is
generated dynamically (e.g. by querying a database's system tables)."""

#: A list of capabilities supported by this tap.
capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [
capabilities.TapCapabilities.CATALOG,
capabilities.TapCapabilities.STATE,
capabilities.TapCapabilities.DISCOVER,
capabilities.PluginCapabilities.ABOUT,
capabilities.PluginCapabilities.STREAM_MAPS,
capabilities.PluginCapabilities.FLATTENING,
capabilities.PluginCapabilities.BATCH,
]

# Constructor

def __init__(
@@ -179,23 +183,6 @@ def setup_mapper(self) -> None:
super().setup_mapper()
self.mapper.register_raw_streams_from_catalog(self.catalog)

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301
"""Get tap capabilities.
Returns:
A list of capabilities supported by this tap.
"""
return [
TapCapabilities.CATALOG,
TapCapabilities.STATE,
TapCapabilities.DISCOVER,
PluginCapabilities.ABOUT,
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
PluginCapabilities.BATCH,
]

@classmethod
def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.
@@ -214,9 +201,11 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non
"""
PluginBase.append_builtin_config(config_jsonschema)

capabilities = cls.capabilities
if PluginCapabilities.BATCH in capabilities:
merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema)
if capabilities.BATCH.capability in cls.capabilities: # pragma: no branch
merge_missing_config_jsonschema(
capabilities.BATCH.schema,
config_jsonschema,
)

# Connection and sync tests:

89 changes: 30 additions & 59 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
@@ -13,20 +13,8 @@
from joblib import Parallel, delayed, parallel_config

from singer_sdk.exceptions import RecordsWithoutSchemaException
from singer_sdk.helpers import capabilities
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
TARGET_VALIDATE_RECORDS_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.plugin_base import PluginBase

@@ -56,6 +44,14 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta):
# Required if `Target.get_sink_class()` is not defined.
default_sink_class: type[Sink]

#: Target capabilities.
capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [
capabilities.PluginCapabilities.ABOUT,
capabilities.PluginCapabilities.STREAM_MAPS,
capabilities.PluginCapabilities.FLATTENING,
capabilities.TargetCapabilities.VALIDATE_RECORDS,
]

def __init__(
self,
*,
@@ -95,20 +91,6 @@ def __init__(
if setup_mapper:
self.setup_mapper()

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301
"""Get target capabilities.
Returns:
A list of capabilities supported by this target.
"""
return [
PluginCapabilities.ABOUT,
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
TargetCapabilities.VALIDATE_RECORDS,
]

@property
def max_parallelism(self) -> int:
"""Get max parallel sinks.
@@ -610,17 +592,18 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)
_merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema)

capabilities = cls.capabilities
_merge_missing(capabilities.ADD_RECORD_METADATA.schema, config_jsonschema)
_merge_missing(capabilities.TARGET_LOAD_METHOD.schema, config_jsonschema)
_merge_missing(capabilities.TARGET_BATCH_SIZE_ROWS.schema, config_jsonschema)

if PluginCapabilities.BATCH in capabilities:
_merge_missing(BATCH_CONFIG, config_jsonschema)
if capabilities.BATCH.capability in cls.capabilities:
_merge_missing(capabilities.BATCH.schema, config_jsonschema)

if TargetCapabilities.VALIDATE_RECORDS in capabilities:
_merge_missing(TARGET_VALIDATE_RECORDS_CONFIG, config_jsonschema)
if capabilities.TARGET_VALIDATE_RECORDS.capability in cls.capabilities:
_merge_missing(
capabilities.TARGET_VALIDATE_RECORDS.schema,
config_jsonschema,
)

super().append_builtin_config(config_jsonschema)

@@ -632,6 +615,13 @@ class SQLTarget(Target):

default_sink_class: type[SQLSink]

#: Target capabilities.
capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [
*Target.capabilities,
capabilities.TargetCapabilities.TARGET_SCHEMA,
capabilities.TargetCapabilities.HARD_DELETE,
]

@property
def target_connector(self) -> SQLConnector:
"""The connector object.
@@ -645,23 +635,6 @@ def target_connector(self) -> SQLConnector:
)
return self._target_connector

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get target capabilities.
Returns:
A list of capabilities supported by this target.
"""
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend(
[
TargetCapabilities.TARGET_SCHEMA,
TargetCapabilities.HARD_DELETE,
]
)

return sql_target_capabilities

@classmethod
def append_builtin_config(cls: type[SQLTarget], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.
@@ -685,13 +658,11 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

capabilities = cls.capabilities

if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)
if capabilities.TARGET_SCHEMA.capability in cls.capabilities:
_merge_missing(capabilities.TARGET_SCHEMA.schema, config_jsonschema)

if TargetCapabilities.HARD_DELETE in capabilities:
_merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema)
if capabilities.TARGET_HARD_DELETE.capability in cls.capabilities:
_merge_missing(capabilities.TARGET_HARD_DELETE.schema, config_jsonschema)

super().append_builtin_config(config_jsonschema)

Empty file added tests/core/helpers/__init__.py
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -16,10 +16,36 @@ class DummyCapabilitiesEnum(CapabilitiesEnum):


def test_deprecated_capabilities():
# Dictionary access
with warnings.catch_warnings():
warnings.simplefilter("error")
_ = DummyCapabilitiesEnum["MY_SUPPORTED_FEATURE"]

# Call
with warnings.catch_warnings():
warnings.simplefilter("error")
_ = DummyCapabilitiesEnum("supported")

# Attribute access
with warnings.catch_warnings():
warnings.simplefilter("error")
_ = DummyCapabilitiesEnum.MY_SUPPORTED_FEATURE

# Dictionary access
with pytest.warns(
DeprecationWarning,
match="is deprecated. No longer supported",
) as record:
_ = DummyCapabilitiesEnum["MY_DEPRECATED_FEATURE"]

# Call
with pytest.warns(
DeprecationWarning,
match="is deprecated. No longer supported",
) as record:
DummyCapabilitiesEnum("deprecated")

# Attribute access
with pytest.warns(
DeprecationWarning,
match="is deprecated. No longer supported",
33 changes: 33 additions & 0 deletions tests/core/helpers/capabilities/test_config_property.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Test the BuiltinSetting descriptor."""

from __future__ import annotations

from singer_sdk.helpers.capabilities import ConfigProperty


def test_builtin_setting_descriptor():
class ObjWithConfig:
example = ConfigProperty(default=1)

def __init__(self):
self.config = {"example": 1}

obj = ObjWithConfig()
assert obj.example == 1

obj.config["example"] = 2
assert obj.example == 2


def test_builtin_setting_descriptor_custom_key():
class ObjWithConfig:
my_attr = ConfigProperty("example", default=1)

def __init__(self):
self.config = {"example": 1}

obj = ObjWithConfig()
assert obj.my_attr == 1

obj.config["example"] = 2
assert obj.my_attr == 2

0 comments on commit 93bbe08

Please sign in to comment.