From f96df372545ff0a5bc4675a2b056fd2d0de51f84 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 21 Nov 2023 12:47:10 -0700 Subject: [PATCH] refactor: Implement a `SingerWriter` class in `singer_sdk.io_base` and use it to emit Singer messages (#2058) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add SingerWriter to io_base * cleanup singer_lib.messages * Removed wirte_message from singerlib init * Add SingerWriter as a Tap parent class * Update Sink class to use self_tap.write_message * Add SingerWriter to InlineMapper as a parent class * update test to utilize SingerWriter class * write_message placed back in _singerlib * SingerWriter to utilize _singerlib write_message and format_message --------- Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- singer_sdk/io_base.py | 27 ++++++++++++++++++++++++++- singer_sdk/mapper_base.py | 13 +++++++------ singer_sdk/streams/core.py | 8 ++++---- singer_sdk/tap_base.py | 7 ++++--- tests/_singerlib/test_messages.py | 8 +++++--- 5 files changed, 46 insertions(+), 17 deletions(-) diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index d389bfd08..f7c2ed668 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -10,7 +10,9 @@ import typing as t from collections import Counter, defaultdict -from singer_sdk._singerlib import SingerMessageType +from singer_sdk._singerlib.messages import Message, SingerMessageType +from singer_sdk._singerlib.messages import format_message as singer_format_message +from singer_sdk._singerlib.messages import write_message as singer_write_message from singer_sdk.helpers._compat import final logger = logging.getLogger(__name__) @@ -143,3 +145,26 @@ def _process_unknown_message(self, message_dict: dict) -> None: def _process_endofpipe(self) -> None: logger.debug("End of pipe reached") + + +class SingerWriter: + """Interface for all plugins writting Singer messages to stdout.""" + + def format_message(self, message: Message) -> str: + """Format a message as a JSON string. + + Args: + message: The message to format. + + Returns: + The formatted message. + """ + return singer_format_message(message) + + def write_message(self, message: Message) -> None: + """Write a message to stdout. + + Args: + message: The message to write. + """ + singer_write_message(message) diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index b0be198bd..2cc943a46 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -7,14 +7,16 @@ import click -import singer_sdk._singerlib as singer from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities -from singer_sdk.io_base import SingerReader +from singer_sdk.io_base import SingerReader, SingerWriter from singer_sdk.plugin_base import PluginBase +if t.TYPE_CHECKING: + import singer_sdk._singerlib as singer -class InlineMapper(PluginBase, SingerReader, metaclass=abc.ABCMeta): + +class InlineMapper(PluginBase, SingerReader, SingerWriter, metaclass=abc.ABCMeta): """Abstract base class for inline mappers.""" @classproperty @@ -28,10 +30,9 @@ def capabilities(self) -> list[CapabilitiesEnum]: PluginCapabilities.STREAM_MAPS, ] - @staticmethod - def _write_messages(messages: t.Iterable[singer.Message]) -> None: + def _write_messages(self, messages: t.Iterable[singer.Message]) -> None: for message in messages: - singer.write_message(message) + self.write_message(message) def _process_schema_message(self, message_dict: dict) -> None: self._write_messages(self.map_schema_message(message_dict)) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 9463b2f3f..f306f1ec4 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -769,7 +769,7 @@ def _write_state_message(self) -> None: if (not self._is_state_flushed) and ( self.tap_state != self._last_emitted_state ): - singer.write_message(singer.StateMessage(value=self.tap_state)) + self._tap.write_message(singer.StateMessage(value=self.tap_state)) self._last_emitted_state = copy.deepcopy(self.tap_state) self._is_state_flushed = True @@ -797,7 +797,7 @@ def _generate_schema_messages( def _write_schema_message(self) -> None: """Write out a SCHEMA message with the stream schema.""" for schema_message in self._generate_schema_messages(): - singer.write_message(schema_message) + self._tap.write_message(schema_message) @property def mask(self) -> singer.SelectionMask: @@ -849,7 +849,7 @@ def _write_record_message(self, record: dict) -> None: record: A single stream record. """ for record_message in self._generate_record_messages(record): - singer.write_message(record_message) + self._tap.write_message(record_message) self._is_state_flushed = False @@ -864,7 +864,7 @@ def _write_batch_message( encoding: The encoding to use for the batch. manifest: A list of filenames for the batch. """ - singer.write_message( + self._tap.write_message( SDKBatchMessage( stream=self.name, encoding=encoding, diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index f7a31e73c..2f0f5874d 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -11,7 +11,7 @@ import click -from singer_sdk._singerlib import Catalog, StateMessage, write_message +from singer_sdk._singerlib import Catalog, StateMessage from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException from singer_sdk.helpers import _state @@ -25,6 +25,7 @@ PluginCapabilities, TapCapabilities, ) +from singer_sdk.io_base import SingerWriter from singer_sdk.plugin_base import PluginBase if t.TYPE_CHECKING: @@ -45,7 +46,7 @@ class CliTestOptionValue(Enum): Disabled = "disabled" -class Tap(PluginBase, metaclass=abc.ABCMeta): +class Tap(PluginBase, SingerWriter, metaclass=abc.ABCMeta): """Abstract base class for taps. The Tap class governs configuration, validation, and stream discovery for tap @@ -439,7 +440,7 @@ def sync_all(self) -> None: """Sync all streams.""" self._reset_state_progress_markers() self._set_compatible_replication_methods() - write_message(StateMessage(value=self.state)) + self.write_message(StateMessage(value=self.state)) stream: Stream for stream in self.streams.values(): diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index b4a33db44..e10259497 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -8,7 +8,7 @@ from pytz import timezone import singer_sdk._singerlib as singer -from singer_sdk._singerlib.messages import format_message +from singer_sdk.io_base import SingerWriter UTC = datetime.timezone.utc @@ -19,22 +19,24 @@ def test_exclude_null_dict(): def test_format_message(): + singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) - assert format_message(message) == ( + assert singerwriter.format_message(message) == ( '{"type": "RECORD", "stream": "test", "record": {"id": 1, "name": "test"}}' ) def test_write_message(): + singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) with redirect_stdout(io.StringIO()) as out: - singer.write_message(message) + singerwriter.write_message(message) assert out.getvalue() == ( '{"type": "RECORD", "stream": "test", "record": {"id": 1, "name": "test"}}\n'