diff --git a/.gitignore b/.gitignore index 4b4b38c..01bcb11 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,6 @@ dmypy.json # Pyre type checker .pyre/ + +# IDE +.vscode/ \ No newline at end of file diff --git a/meltano.yml b/meltano.yml index 5140aa9..de0d496 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,21 +1,22 @@ version: 1 send_anonymous_usage_stats: true -project_id: "tap-gmail" +project_id: tap-gmail plugins: extractors: - - name: "tap-gmail" - namespace: "tap_gmail" + - name: tap-gmail + namespace: tap_gmail pip_url: -e . capabilities: - state - catalog - discover - config: {} settings: - name: oauth_credentials.client_id - name: oauth_credentials.client_secret - name: oauth_credentials.refresh_token - name: user_id + select: + - messages.* loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_gmail/client.py b/tap_gmail/client.py index 9d63520..33b8a83 100644 --- a/tap_gmail/client.py +++ b/tap_gmail/client.py @@ -17,8 +17,6 @@ class GmailStream(RESTStream): """Gmail stream class.""" url_base = "https://gmail.googleapis.com" - records_jsonpath = "$.messages[*]" # Or override `parse_response`. - next_page_token_jsonpath = "$.nextPageToken" # Or override `get_next_page_token`. @property @cached @@ -38,9 +36,6 @@ def get_next_page_token( self, response: requests.Response, previous_token: Optional[Any] ) -> Optional[Any]: """Return a token for identifying next page or None if no more pages.""" - # TODO: If pagination is required, return a token which can be used to get the - # next page. If this is the final page, return "None" to end the - # pagination loop. if self.next_page_token_jsonpath: all_matches = extract_jsonpath( self.next_page_token_jsonpath, response.json() @@ -61,22 +56,6 @@ def get_url_params( params["pageToken"] = next_page_token return params - def prepare_request_payload( - self, context: Optional[dict], next_page_token: Optional[Any] - ) -> Optional[dict]: - """Prepare the data payload for the REST API request. - - By default, no payload will be sent (return None). - """ - # TODO: Delete this method if no payload is required. (Most REST APIs.) - return None - def parse_response(self, response: requests.Response) -> Iterable[dict]: """Parse the response and return an iterator of result rows.""" - # TODO: Parse response body and return a set of records. yield from extract_jsonpath(self.records_jsonpath, input=response.json()) - - def post_process(self, row: dict, context: Optional[dict]) -> dict: - """As needed, append or transform raw data to match expected structure.""" - # TODO: Delete this method if not needed. - return row diff --git a/tap_gmail/schemas/message_list.json b/tap_gmail/schemas/message_list.json new file mode 100644 index 0000000..eeb38a8 --- /dev/null +++ b/tap_gmail/schemas/message_list.json @@ -0,0 +1,13 @@ +{ + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "The immutable ID of the message." + }, + "threadId": { + "type": "string", + "description": "The ID of the thread the message belongs to." + } + } +} diff --git a/tap_gmail/streams.py b/tap_gmail/streams.py index 49354af..61a5973 100644 --- a/tap_gmail/streams.py +++ b/tap_gmail/streams.py @@ -1,22 +1,43 @@ """Stream type classes for tap-gmail.""" from pathlib import Path +from typing import Optional from tap_gmail.client import GmailStream SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") -class MessagesStream(GmailStream): +class MessageListStream(GmailStream): """Define custom stream.""" - name = "messages" + name = "message_list" primary_keys = ["id"] replication_key = None - # Optionally, you may also use `schema_filepath` in place of `schema`: - schema_filepath = SCHEMAS_DIR / "messages.json" + schema_filepath = SCHEMAS_DIR / "message_list.json" + records_jsonpath = "$.messages[*]" + next_page_token_jsonpath = "$.nextPageToken" @property def path(self): """Set the path for the stream.""" return "/gmail/v1/users/" + self.config["user_id"] + "/messages" + + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + """Return a context dictionary for child streams.""" + return {"message_id": record["id"]} + + +class MessagesStream(GmailStream): + + name = "messages" + replication_key = None + schema_filepath = SCHEMAS_DIR / "messages.json" + parent_stream_type = MessageListStream + ignore_parent_replication_keys = True + state_partitioning_keys = [] + + @property + def path(self): + """Set the path for the stream.""" + return "/gmail/v1/users/" + self.config["user_id"] + "/messages/{message_id}" diff --git a/tap_gmail/tap.py b/tap_gmail/tap.py index d4e71b9..3646446 100644 --- a/tap_gmail/tap.py +++ b/tap_gmail/tap.py @@ -5,9 +5,9 @@ from singer_sdk import Stream, Tap from singer_sdk import typing as th # JSON schema typing helpers -from tap_gmail.streams import GmailStream, MessagesStream +from tap_gmail.streams import GmailStream, MessageListStream, MessagesStream -STREAM_TYPES = [MessagesStream] +STREAM_TYPES = [MessageListStream, MessagesStream] class TapGmail(Tap):