Skip to content

Commit

Permalink
add writeback feature for refresh tokens to the config.json
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Mar 20, 2024
1 parent d93bfd9 commit 2b919c4
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ environment variable is set either in the terminal context or in the `.env` file

See the docs for more details https://highlevel.stoplight.io/docs/integrations/0443d7d1a4bd0-overview.

## Write Back Feature

The GoHighlevel API uses OAuth credentials but each time the refresh token is used to get a new access token, the refresh token is invalidated.
This means that we need to store the new refresh token provided in the access token response, otherwise we need to go through the authorization flow again to get a new valid refresh token.
This is a pain and isn't directly solved by the SDK or Meltano, see https://github.com/meltano/sdk/issues/106 and https://github.com/meltano/meltano/issues/2660.

To solve this, the tap implements a writeback feature where the new refresh token is set in the input config.json file every time it changes.
As a result an exception will be thrown unless exactly one config file is provided.
Also the config file will need have write access so the tap can edit it.

## Usage

You can easily run `tap-gohighlevel` by itself or in a pipeline using [Meltano](https://meltano.com/).
Expand Down
36 changes: 32 additions & 4 deletions tap_gohighlevel/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import json

import requests
from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta
from singer_sdk.helpers._util import utc_now
Expand All @@ -12,7 +14,14 @@
class GoHighLevelAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta):
"""Authenticator class for GoHighLevel."""

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
"""Create a new authenticator.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
self.write_back_config_path = kwargs.pop("write_back_config_path")
super().__init__(*args, **kwargs)
self.refresh_token = str(self.config["refresh_token"])

Expand All @@ -27,7 +36,7 @@ def oauth_request_body(self) -> dict:
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"],
"grant_type": "refresh_token",
"refresh_token": str(self.config["refresh_token"]),
"refresh_token": self.refresh_token,
"user_type": "Location",
}

Expand All @@ -48,9 +57,27 @@ def create_for_stream(cls, stream) -> GoHighLevelAuthenticator: # noqa: ANN001
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
},
default_expiration=86400 # 24 hours
default_expiration=86400, # 24 hours,
write_back_config_path=stream._tap._write_back_config_path,
)

def _write_back_to_config(self, key: str, value: str) -> None:
"""Write back the value to the config file.
Args:
key: The key to write back.
value: The value to write back.
"""
# Read the JSON file
with open(self.write_back_config_path) as file:
data = json.load(file)

# Update the value for the specified key
data[key] = value

with open(self.write_back_config_path, "w") as file:
json.dump(data, file, indent=4)

def update_access_token(self) -> None:
"""Update `access_token` along with: `last_refreshed` and `expires_in`.
Expand All @@ -75,8 +102,9 @@ def update_access_token(self) -> None:

token_json = token_response.json()
self.access_token = token_json["access_token"]
# TODO: store refresh token

self.refresh_token = token_json["refresh_token"]
self._write_back_to_config("refresh_token", self.refresh_token)
self.logger.info("OAuth refresh_token: %s", self.refresh_token)
expiration = token_json.get("expires_in", self._default_expiration)
self.expires_in = int(expiration) if expiration else None
Expand Down
4 changes: 2 additions & 2 deletions tap_gohighlevel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ def get_url_params(
}
if next_page_token:
params.update(parse_qsl(next_page_token.query))
if self.replication_key:
params["startAfter"] = self.replication_key
elif self.replication_key and self.get_starting_replication_key_value(context):
params["startAfter"] = self.get_starting_replication_key_value(context)
return params
8 changes: 0 additions & 8 deletions tap_gohighlevel/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@

from __future__ import annotations

import sys
import typing as t

from singer_sdk import typing as th # JSON Schema typing helpers

from tap_gohighlevel.client import GoHighLevelStream

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
else:
import importlib_resources


SCHEMAS_DIR = importlib_resources.files(__package__) / "schemas"

class ContactsStream(GoHighLevelStream):
"""Contacts stream."""
Expand Down
19 changes: 18 additions & 1 deletion tap_gohighlevel/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

from __future__ import annotations

from pathlib import Path, PurePath

from singer_sdk import Tap
from singer_sdk import typing as th # JSON schema typing helpers
from singer_sdk.helpers._util import read_json_file

from tap_gohighlevel import streams

Expand Down Expand Up @@ -37,10 +40,24 @@ class TapGoHighLevel(Tap):
th.StringType,
required=True,
description="The Location Id to request data",
examples=["ve9EPM428h8vShlRW1KT"]
examples=["ve9EPM428h8vShlRW1KT"],
),
).to_dict()

def __init__(self, *args, **kwargs):
config = kwargs.get("config")
if isinstance(config, (str, PurePath)):
self._write_back_config_path = Path(config)
elif isinstance(config, list):
if len(config) > 1:
msg = "Multiple config files not supported due to OAuth refresh tokens write back."
raise Exception(msg)
self._write_back_config_path = Path(config[0])
else:
msg = "Config file must be provided, OAuth refresh tokens need to be written back."
raise Exception(msg)
super().__init__(*args, **kwargs)

def discover_streams(self) -> list[streams.GoHighLevelStream]:
"""Return a list of discovered streams.
Expand Down

0 comments on commit 2b919c4

Please sign in to comment.