Skip to content

Commit

Permalink
working full messages stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Aug 13, 2022
1 parent 9c6c70e commit d449a4b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ dmypy.json

# Pyre type checker
.pyre/

# IDE
.vscode/
9 changes: 5 additions & 4 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
version: 1
send_anonymous_usage_stats: true
project_id: "tap-gmail"
project_id: tap-gmail
plugins:
extractors:
- name: "tap-gmail"
namespace: "tap_gmail"
- name: tap-gmail
namespace: tap_gmail
pip_url: -e .
capabilities:
- state
- catalog
- discover
config: {}
settings:
- name: oauth_credentials.client_id
- name: oauth_credentials.client_secret
- name: oauth_credentials.refresh_token
- name: user_id
select:
- messages.*
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
21 changes: 0 additions & 21 deletions tap_gmail/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ class GmailStream(RESTStream):
"""Gmail stream class."""

url_base = "https://gmail.googleapis.com"
records_jsonpath = "$.messages[*]" # Or override `parse_response`.
next_page_token_jsonpath = "$.nextPageToken" # Or override `get_next_page_token`.

@property
@cached
Expand All @@ -38,9 +36,6 @@ def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
# TODO: If pagination is required, return a token which can be used to get the
# next page. If this is the final page, return "None" to end the
# pagination loop.
if self.next_page_token_jsonpath:
all_matches = extract_jsonpath(
self.next_page_token_jsonpath, response.json()
Expand All @@ -61,22 +56,6 @@ def get_url_params(
params["pageToken"] = next_page_token
return params

def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
"""
# TODO: Delete this method if no payload is required. (Most REST APIs.)
return None

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
# TODO: Parse response body and return a set of records.
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
# TODO: Delete this method if not needed.
return row
13 changes: 13 additions & 0 deletions tap_gmail/schemas/message_list.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The immutable ID of the message."
},
"threadId": {
"type": "string",
"description": "The ID of the thread the message belongs to."
}
}
}
29 changes: 25 additions & 4 deletions tap_gmail/streams.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
"""Stream type classes for tap-gmail."""

from pathlib import Path
from typing import Optional

from tap_gmail.client import GmailStream

SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class MessagesStream(GmailStream):
class MessageListStream(GmailStream):
"""Define custom stream."""

name = "messages"
name = "message_list"
primary_keys = ["id"]
replication_key = None
# Optionally, you may also use `schema_filepath` in place of `schema`:
schema_filepath = SCHEMAS_DIR / "messages.json"
schema_filepath = SCHEMAS_DIR / "message_list.json"
records_jsonpath = "$.messages[*]"
next_page_token_jsonpath = "$.nextPageToken"

@property
def path(self):
"""Set the path for the stream."""
return "/gmail/v1/users/" + self.config["user_id"] + "/messages"

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"message_id": record["id"]}


class MessagesStream(GmailStream):

name = "messages"
replication_key = None
schema_filepath = SCHEMAS_DIR / "messages.json"
parent_stream_type = MessageListStream
ignore_parent_replication_keys = True
state_partitioning_keys = []

@property
def path(self):
"""Set the path for the stream."""
return "/gmail/v1/users/" + self.config["user_id"] + "/messages/{message_id}"
4 changes: 2 additions & 2 deletions tap_gmail/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from singer_sdk import Stream, Tap
from singer_sdk import typing as th # JSON schema typing helpers

from tap_gmail.streams import GmailStream, MessagesStream
from tap_gmail.streams import GmailStream, MessageListStream, MessagesStream

STREAM_TYPES = [MessagesStream]
STREAM_TYPES = [MessageListStream, MessagesStream]


class TapGmail(Tap):
Expand Down

0 comments on commit d449a4b

Please sign in to comment.