Skip to content

Commit

Permalink
refactor: Implement a SingerWriter class in singer_sdk.io_base an…
Browse files Browse the repository at this point in the history
…d use it to emit Singer messages (#2058)

* Add SingerWriter to io_base

* cleanup singer_lib.messages

* Removed wirte_message from singerlib init

* Add SingerWriter as a Tap parent class

* Update Sink class to use self_tap.write_message

* Add SingerWriter to InlineMapper as a parent class

* update test to utilize SingerWriter class

* write_message placed back in _singerlib

* SingerWriter to utilize _singerlib write_message and format_message

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
BuzzCutNorman and edgarrmondragon authored Nov 21, 2023
1 parent 8e6a070 commit f96df37
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
27 changes: 26 additions & 1 deletion singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import typing as t
from collections import Counter, defaultdict

from singer_sdk._singerlib import SingerMessageType
from singer_sdk._singerlib.messages import Message, SingerMessageType
from singer_sdk._singerlib.messages import format_message as singer_format_message
from singer_sdk._singerlib.messages import write_message as singer_write_message
from singer_sdk.helpers._compat import final

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -143,3 +145,26 @@ def _process_unknown_message(self, message_dict: dict) -> None:

def _process_endofpipe(self) -> None:
logger.debug("End of pipe reached")


class SingerWriter:
"""Interface for all plugins writting Singer messages to stdout."""

def format_message(self, message: Message) -> str:
"""Format a message as a JSON string.
Args:
message: The message to format.
Returns:
The formatted message.
"""
return singer_format_message(message)

def write_message(self, message: Message) -> None:
"""Write a message to stdout.
Args:
message: The message to write.
"""
singer_write_message(message)
13 changes: 7 additions & 6 deletions singer_sdk/mapper_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

import click

import singer_sdk._singerlib as singer
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities
from singer_sdk.io_base import SingerReader
from singer_sdk.io_base import SingerReader, SingerWriter
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
import singer_sdk._singerlib as singer

class InlineMapper(PluginBase, SingerReader, metaclass=abc.ABCMeta):

class InlineMapper(PluginBase, SingerReader, SingerWriter, metaclass=abc.ABCMeta):
"""Abstract base class for inline mappers."""

@classproperty
Expand All @@ -28,10 +30,9 @@ def capabilities(self) -> list[CapabilitiesEnum]:
PluginCapabilities.STREAM_MAPS,
]

@staticmethod
def _write_messages(messages: t.Iterable[singer.Message]) -> None:
def _write_messages(self, messages: t.Iterable[singer.Message]) -> None:
for message in messages:
singer.write_message(message)
self.write_message(message)

def _process_schema_message(self, message_dict: dict) -> None:
self._write_messages(self.map_schema_message(message_dict))
Expand Down
8 changes: 4 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ def _write_state_message(self) -> None:
if (not self._is_state_flushed) and (
self.tap_state != self._last_emitted_state
):
singer.write_message(singer.StateMessage(value=self.tap_state))
self._tap.write_message(singer.StateMessage(value=self.tap_state))
self._last_emitted_state = copy.deepcopy(self.tap_state)
self._is_state_flushed = True

Expand Down Expand Up @@ -797,7 +797,7 @@ def _generate_schema_messages(
def _write_schema_message(self) -> None:
"""Write out a SCHEMA message with the stream schema."""
for schema_message in self._generate_schema_messages():
singer.write_message(schema_message)
self._tap.write_message(schema_message)

@property
def mask(self) -> singer.SelectionMask:
Expand Down Expand Up @@ -849,7 +849,7 @@ def _write_record_message(self, record: dict) -> None:
record: A single stream record.
"""
for record_message in self._generate_record_messages(record):
singer.write_message(record_message)
self._tap.write_message(record_message)

self._is_state_flushed = False

Expand All @@ -864,7 +864,7 @@ def _write_batch_message(
encoding: The encoding to use for the batch.
manifest: A list of filenames for the batch.
"""
singer.write_message(
self._tap.write_message(
SDKBatchMessage(
stream=self.name,
encoding=encoding,
Expand Down
7 changes: 4 additions & 3 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import click

from singer_sdk._singerlib import Catalog, StateMessage, write_message
from singer_sdk._singerlib import Catalog, StateMessage
from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema
from singer_sdk.exceptions import AbortedSyncFailedException, AbortedSyncPausedException
from singer_sdk.helpers import _state
Expand All @@ -25,6 +25,7 @@
PluginCapabilities,
TapCapabilities,
)
from singer_sdk.io_base import SingerWriter
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
Expand All @@ -45,7 +46,7 @@ class CliTestOptionValue(Enum):
Disabled = "disabled"


class Tap(PluginBase, metaclass=abc.ABCMeta):
class Tap(PluginBase, SingerWriter, metaclass=abc.ABCMeta):
"""Abstract base class for taps.
The Tap class governs configuration, validation, and stream discovery for tap
Expand Down Expand Up @@ -439,7 +440,7 @@ def sync_all(self) -> None:
"""Sync all streams."""
self._reset_state_progress_markers()
self._set_compatible_replication_methods()
write_message(StateMessage(value=self.state))
self.write_message(StateMessage(value=self.state))

stream: Stream
for stream in self.streams.values():
Expand Down
8 changes: 5 additions & 3 deletions tests/_singerlib/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pytz import timezone

import singer_sdk._singerlib as singer
from singer_sdk._singerlib.messages import format_message
from singer_sdk.io_base import SingerWriter

UTC = datetime.timezone.utc

Expand All @@ -19,22 +19,24 @@ def test_exclude_null_dict():


def test_format_message():
singerwriter = SingerWriter()
message = singer.RecordMessage(
stream="test",
record={"id": 1, "name": "test"},
)
assert format_message(message) == (
assert singerwriter.format_message(message) == (
'{"type": "RECORD", "stream": "test", "record": {"id": 1, "name": "test"}}'
)


def test_write_message():
singerwriter = SingerWriter()
message = singer.RecordMessage(
stream="test",
record={"id": 1, "name": "test"},
)
with redirect_stdout(io.StringIO()) as out:
singer.write_message(message)
singerwriter.write_message(message)

assert out.getvalue() == (
'{"type": "RECORD", "stream": "test", "record": {"id": 1, "name": "test"}}\n'
Expand Down

0 comments on commit f96df37

Please sign in to comment.