From 310c42c2eb2c93f23e041b0a821121ee6b6fde3e Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Thu, 28 Nov 2024 18:28:02 +0100 Subject: [PATCH] Refactor and fix hubspot source --- sources/hubspot/README.md | 23 +-- sources/hubspot/__init__.py | 213 ++++++++++++++------------- sources/hubspot/helpers.py | 14 +- sources/hubspot/settings.py | 15 +- sources/hubspot/utils.py | 22 ++- sources/hubspot_pipeline.py | 6 +- tests/hubspot/test_hubspot_source.py | 22 ++- 7 files changed, 183 insertions(+), 132 deletions(-) diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index 17ca44e1d..e55a5ac13 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -4,17 +4,18 @@ HubSpot is a customer relationship management (CRM) software and inbound marketi The `dlt` HubSpot verified source allows you to automatically load data from HubSpot into a [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/) of your choice. It loads data from the following endpoints: -| API | Data | -| --- | --- | -| Contacts | visitors, potential customers, leads | -| Companies | information about organizations | -| Deals | deal records, deal tracking | -| Products | goods, services | -| Tickets | requests for help from customers or users | -| Quotes | pricing information of a product | -| Web analytics | events | -| Owners | information about account managers or users | -| Pipelines | stages and progress tracking | +| API | Data | +| --- |-----------------------------------------------------------------| +| Contacts | visitors, potential customers, leads | +| Companies | information about organizations | +| Deals | deal records, deal tracking | +| Products | goods, services | +| Tickets | requests for help from customers or users | +| Quotes | pricing information of a product | +| Web analytics | events | +| Owners | information about account managers or users | +| Pipelines | stages and progress tracking, separate resource for each object | +| Properties | custom labels for properties with multiple choice | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 0d9af7a61..3097c1efb 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -1,5 +1,5 @@ """ -This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API +This is a module that provides a dlt source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. The source retrieves data from the following endpoints: @@ -23,7 +23,7 @@ To retrieve data from all endpoints, use the following code: """ -from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Tuple, Union +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Union from urllib.parse import quote import dlt @@ -48,11 +48,13 @@ OBJECT_TYPE_PLURAL, OBJECT_TYPE_SINGULAR, PIPELINES_OBJECTS, + PROPERTIES_WITH_CUSTOM_LABELS, SOFT_DELETE_KEY, STAGE_PROPERTY_PREFIX, STARTDATE, WEB_ANALYTICS_EVENTS_ENDPOINT, ) +from .utils import chunk_properties THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] @@ -90,7 +92,9 @@ def fetch_data_for_properties( """ params: Dict[str, Any] = {"properties": props, "limit": 100} - context: Optional[Dict[str, Any]] = {SOFT_DELETE_KEY: False} if soft_delete else None + context: Optional[Dict[str, Any]] = ( + {SOFT_DELETE_KEY: False} if soft_delete else None + ) yield from fetch_data( CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context @@ -158,10 +162,10 @@ def crm_object_history( props, ) + def resource_template( entity: THubspotObjectType, api_key: str = dlt.config.value, - props: Optional[Sequence[str]] = None, # Add props as an argument include_custom_props: bool = False, soft_delete: bool = False, ) -> Iterator[TDataItems]: @@ -180,7 +184,7 @@ def resource_template( """ # Use provided props or fetch from ENTITY_PROPERTIES if not provided - properties: List[str] = ENTITY_PROPERTIES.get(entity, list(props or [])) + properties: List[str] = ENTITY_PROPERTIES.get(entity, []) # Use these properties to yield the crm_objects yield from crm_objects( @@ -213,38 +217,6 @@ def resource_history_template( ) -@dlt.resource(name="properties", write_disposition="replace") -def hubspot_properties( - properties_list: Optional[List[Dict[str, Any]]] = None, - api_key: str = dlt.secrets.value, -) -> Iterator[TDataItems]: - """ - A DLT resource that retrieves HubSpot properties for a given list of objects. - - Args: - properties_list (Optional[List[Dict[str, Any]]], optional): List of properties to retrieve. - api_key (str, optional): HubSpot API key for authentication. - - Yields: - DltResource: A DLT resource containing properties for HubSpot objects. - """ - - def get_properties_description( - properties_list_inner: List[Dict[str, Any]] - ) -> Iterator[Dict[str, Any]]: - """Fetch properties.""" - for property_info in properties_list_inner: - yield from get_properties_labels( - api_key=api_key, - object_type=property_info["object_type"], - property_name=property_info["property_name"], - ) - - # Ensure properties_list is defined - properties_list_inner: List[Dict[str, Any]] = properties_list or [] - yield from get_properties_description(properties_list_inner) - - def pivot_stages_properties( data: List[Dict[str, Any]], property_prefix: str = STAGE_PROPERTY_PREFIX, @@ -263,7 +235,9 @@ def pivot_stages_properties( """ new_data: List[Dict[str, Any]] = [] for record in data: - record_not_null: Dict[str, Any] = {k: v for k, v in record.items() if v is not None} + record_not_null: Dict[str, Any] = { + k: v for k, v in record.items() if v is not None + } if id_prop not in record_not_null: continue id_val = record_not_null.pop(id_prop) @@ -279,80 +253,50 @@ def stages_timing( object_type: str, api_key: str = dlt.config.value, soft_delete: bool = False, - limit: Optional[int] = None, ) -> Iterator[TDataItems]: """ - Fetch stage timing data for a specific object type from the HubSpot API. + Fetch stage timing data for a specific object type from the HubSpot API. Some entities, like, + deals and tickets actually have pipelines with multiple stages, which they can enter and exit. This function fetches + history of entering and exiting different stages for the given object. + + We have to request them separately, because these properties has the pipeline stage_id in the name. + For example, "hs_date_entered_12345678", where 12345678 is the stage_id. + Args: object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). api_key (str, optional): HubSpot API key for authentication. soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. - limit (Optional[int], optional): Limit the number of properties to fetch. Defaults to None. Yields: Iterator[TDataItems]: Stage timing data. """ + all_properties: List[str] = list(_get_property_names(api_key, object_type)) date_entered_properties: List[str] = [ prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) ] - props: str = ",".join(date_entered_properties) - idx: int = 0 - if limit is None: - limit = len(date_entered_properties) - while idx < limit: - if len(props) - idx < MAX_PROPS_LENGTH: - props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) - else: - props_part = props[idx: idx + MAX_PROPS_LENGTH] - idx += len(props_part) + + # Since the length of request should be less than MAX_PROPS_LENGTH, we cannot request + # data for the whole properties list. Therefore, in the following lines we request + # data iteratively for chunks of the properties list. + for chunk in chunk_properties(date_entered_properties, MAX_PROPS_LENGTH): + props_part = ",".join(chunk) for data in fetch_data_for_properties( props_part, api_key, object_type, soft_delete ): yield pivot_stages_properties(data) -def owners( - api_key: str, - soft_delete: bool = False, -) -> Iterator[TDataItems]: - """ - Fetch HubSpot owners data. - - Args: - api_key (str): HubSpot API key for authentication. - soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. - - Yields: - Iterator[TDataItems]: Owner data. - """ - - # Fetch data for owners - for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): - yield page - - # Fetch soft-deleted owners if requested - if soft_delete: - for page in fetch_data( - endpoint=CRM_OBJECT_ENDPOINTS["owner"], - params=ARCHIVED_PARAM, - api_key=api_key, - context={SOFT_DELETE_KEY: True}, - ): - yield page - - @dlt.source(name="hubspot") def hubspot( api_key: str = dlt.secrets.value, include_history: bool = False, soft_delete: bool = False, include_custom_props: bool = True, - props: Optional[Sequence[str]] = None, # Add props argument here ) -> Iterator[DltResource]: """ - A DLT source that retrieves data from the HubSpot API using the + A dlt source that retrieves data from the HubSpot API using the specified API key. This function retrieves data for several HubSpot API endpoints, @@ -381,17 +325,77 @@ def hubspot( `api_key` argument. """ - def hubspot_pipelines_for_objects( - api_key_inner: str = dlt.secrets.value, + @dlt.resource(name="owners", write_disposition="merge", primary_key="id") + def owners( + api_key: str = api_key, soft_delete: bool = soft_delete + ) -> Iterator[TDataItems]: + """Fetch HubSpot owners data. The owners resource implemented separately, + because it doesn't have endpoint for properties requesting + + Args: + api_key (str): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. + + Yields: + Iterator[TDataItems]: Owner data. + """ + + # Fetch data for owners + for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): + yield page + + # Fetch soft-deleted owners if requested + if soft_delete: + for page in fetch_data( + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, + ): + yield page + + @dlt.resource(name="properties", write_disposition="replace") + def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]: + """ + A dlt resource that retrieves custom labels for given list of properties. + + Args: + api_key (str, optional): HubSpot API key for authentication. + + Yields: + DltResource: A dlt resource containing properties for HubSpot objects. + """ + + def get_properties_description( + properties_list_inner: List[Dict[str, Any]] + ) -> Iterator[Dict[str, Any]]: + """Fetch properties.""" + for property_info in properties_list_inner: + yield from get_properties_labels( + api_key=api_key, + object_type=property_info["object_type"], + property_name=property_info["property_name"], + ) + + if PROPERTIES_WITH_CUSTOM_LABELS: + yield from get_properties_description(PROPERTIES_WITH_CUSTOM_LABELS) + else: + return + + def pipelines_for_objects( + pipelines_objects: List[str], + api_key_inner: str = api_key, ) -> Iterator[DltResource]: """ - A standalone DLT resource that retrieves pipelines for HubSpot objects. + Function that yields all resources for HubSpot objects, which have pipelines. + (could be deals or/and tickets, specified in PIPELINES_OBJECTS) Args: + pipelines_objects (list of strings): The list of objects, which have pipelines. api_key_inner (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. Yields: - Iterator[DltResource]: DLT resources for pipelines and stages. + Iterator[DltResource]: dlt resources for pipelines and stages. """ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: @@ -400,7 +404,8 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: api_key=api_key_inner, ) - for obj_type in PIPELINES_OBJECTS: + # get the pipelines data + for obj_type in pipelines_objects: name = f"pipelines_{obj_type}" yield dlt.resource( get_pipelines, @@ -411,6 +416,7 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="id", )(obj_type) + # get the history of entering for pipeline stages name = f"stages_timing_{obj_type}" if obj_type in OBJECT_TYPE_SINGULAR: yield dlt.resource( @@ -420,16 +426,7 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key=["id", "stage"], )(OBJECT_TYPE_SINGULAR[obj_type], soft_delete=soft_delete) - yield dlt.resource( - owners, - name="owners", - write_disposition="merge", - primary_key="id", - )( - api_key=api_key, # Pass the API key here - soft_delete=soft_delete # Pass the soft_delete flag here - ) - + # resources for all objects for obj in ALL_OBJECTS: yield dlt.resource( resource_template, @@ -438,11 +435,11 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="id", )( entity=obj, - props=props, # Pass the props argument here include_custom_props=include_custom_props, soft_delete=soft_delete, ) + # corresponding history resources if include_history: for obj in ALL_OBJECTS: yield dlt.resource( @@ -452,8 +449,14 @@ def get_pipelines(object_type: THubspotObjectType) -> Iterator[TDataItems]: primary_key="object_id", )(entity=obj, include_custom_props=include_custom_props) - yield from hubspot_pipelines_for_objects(api_key) - yield hubspot_properties + # owners resource + yield owners + + # pipelines resources + yield from pipelines_for_objects(PIPELINES_OBJECTS, api_key) + + # custom properties labels resource + yield properties_custom_labels def fetch_props( @@ -486,7 +489,9 @@ def fetch_props( if include_custom_props: all_props: List[str] = _get_property_names(api_key, object_type) - custom_props: List[str] = [prop for prop in all_props if not prop.startswith("hs_")] + custom_props: List[str] = [ + prop for prop in all_props if not prop.startswith("hs_") + ] props_list += custom_props props_str = ",".join(sorted(set(props_list))) @@ -510,7 +515,7 @@ def hubspot_events_for_objects( start_date: pendulum.DateTime = STARTDATE, ) -> DltResource: """ - A standalone DLT resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. + A standalone dlt resource that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. Args: object_type (THubspotObjectType): One of the hubspot object types see definition of THubspotObjectType literal. @@ -519,7 +524,7 @@ def hubspot_events_for_objects( start_date (pendulum.DateTime, optional): The initial date time from which start getting events, default to STARTDATE. Returns: - DltResource: Incremental DLT resource to track events for objects from the list. + DltResource: Incremental dlt resource to track events for objects from the list. """ end_date: str = pendulum.now().isoformat() diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 0b5a1c833..f58fe9499 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -1,7 +1,7 @@ """Hubspot source helpers""" import urllib.parse -from typing import Any, Dict, Iterator, List, Optional, Generator +from typing import Any, Dict, Generator, Iterator, List, Optional from dlt.sources.helpers import requests @@ -31,14 +31,16 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") -def pagination(_data: Dict[str, Any], headers: Dict[str, Any]) -> Optional[Dict[str, Any]]: +def pagination( + _data: Dict[str, Any], headers: Dict[str, Any] +) -> Optional[Dict[str, Any]]: _next = _data.get("paging", {}).get("next", None) # _next = False if _next: next_url = _next["link"] # Get the next page response r = requests.get(next_url, headers=headers) - return r.json() # type: ignore + return r.json() # type: ignore else: return None @@ -148,8 +150,6 @@ def fetch_data( if "results" in _data: _objects: List[Dict[str, Any]] = [] for _result in _data["results"]: - # if _result["properties"]["hs_merged_object_ids"] is not None: - # _result["properties"]["hs_merged_object_ids"] = _result["properties"]["hs_merged_object_ids"].split(";") _obj = _result.get("properties", _result) if "id" not in _obj and "id" in _result: # Move id from properties to top level @@ -201,7 +201,9 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: return properties -def get_properties_labels(api_key: str, object_type: str, property_name: str) -> Iterator[Dict[str, Any]]: +def get_properties_labels( + api_key: str, object_type: str, property_name: str +) -> Iterator[Dict[str, Any]]: endpoint = f"/crm/v3/properties/{object_type}/{property_name}" url = get_url(endpoint) headers = _get_headers(api_key) diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index e5ab05588..a73529ea5 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -60,14 +60,14 @@ ] DEFAULT_DEAL_PROPS = [ - #"amount", - #"closedate", - #"createdate", + # "amount", + # "closedate", + # "createdate", "dealname", "dealstage", - #"hs_lastmodifieddate", - #"hs_object_id", - #"pipeline", + # "hs_lastmodifieddate", + # "hs_object_id", + # "pipeline", ] DEFAULT_TICKET_PROPS = [ @@ -119,4 +119,5 @@ ARCHIVED_PARAM = {"archived": True} PREPROCESSING = {"split": ["hs_merged_object_ids"]} STAGE_PROPERTY_PREFIX = "hs_date_entered_" -MAX_PROPS_LENGTH = 2000 \ No newline at end of file +MAX_PROPS_LENGTH = 2000 +PROPERTIES_WITH_CUSTOM_LABELS = () diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py index 246d06d43..ad8e81794 100644 --- a/sources/hubspot/utils.py +++ b/sources/hubspot/utils.py @@ -1,9 +1,29 @@ -from typing import Dict, Any +from typing import Any, Dict, Iterator, List + from .settings import PREPROCESSING + def split_data(doc: Dict[str, Any]) -> Dict[str, Any]: for prop in PREPROCESSING["split"]: if prop in doc and doc[prop] is not None: if isinstance(doc[prop], str): doc[prop] = doc[prop].split(";") return doc + + +def chunk_properties(properties: List[str], max_length: int) -> Iterator[List[str]]: + """Function which yields chunk of properties list, making sure that + for each chunk, len(",".join(chunk)) =< max_length. + """ + chunk: List[str] = [] + length = 0 + for prop in properties: + prop_len = len(prop) + (1 if chunk else 0) # include comma length if not first + if length + prop_len > max_length: + yield chunk + chunk, length = [prop], len(prop) + else: + chunk.append(prop) + length += prop_len + if chunk: + yield chunk diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index ba16d027b..7f5ac0af0 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -27,6 +27,7 @@ def load_crm_data() -> None: # Print information about the pipeline run print(info) + def load_crm_data_with_history() -> None: """ Loads all HubSpot CRM resources and property change history for each entity. @@ -97,7 +98,7 @@ def load_crm_objects_with_custom_properties() -> None: ) load_data = hubspot() - #load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) + # load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) load_info = pipeline.run(load_data) print(load_info) @@ -122,6 +123,7 @@ def load_pipelines() -> None: load_info = p.run(load_data) print(load_info) + def load_web_analytics_events( object_type: THubspotObjectType, object_ids: List[str] ) -> None: @@ -155,4 +157,4 @@ def load_web_analytics_events( load_crm_objects_with_custom_properties() load_pipelines() load_crm_data_with_soft_delete() - load_web_analytics_events("company", ["7086461639", "7086464459"]) \ No newline at end of file + load_web_analytics_events("company", ["7086461639", "7086464459"]) diff --git a/tests/hubspot/test_hubspot_source.py b/tests/hubspot/test_hubspot_source.py index a0415e01b..7e4c6b36f 100644 --- a/tests/hubspot/test_hubspot_source.py +++ b/tests/hubspot/test_hubspot_source.py @@ -2,7 +2,6 @@ import dlt import pytest -from itertools import chain from typing import Any from urllib.parse import urljoin @@ -20,6 +19,7 @@ CRM_TICKETS_ENDPOINT, CRM_QUOTES_ENDPOINT, ) +from sources.hubspot.utils import chunk_properties from tests.hubspot.mock_data import ( mock_contacts_data, mock_companies_data, @@ -416,3 +416,23 @@ def test_event_resources(destination_name: str) -> None: ) print(load_info) assert_load_info(load_info) + + +def test_chunk_properties(): + properties = ["prop1", "prop2", "prop3", "prop4"] + max_length = 12 + expected = [["prop1", "prop2"], ["prop3", "prop4"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["prop1", "prop2"] + max_length = len("prop1,prop2") + expected = [["prop1", "prop2"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected + + properties = ["p1", "p2", "p3", "p4", "p5"] + max_length = 5 # Should accommodate 'p1,p2' + expected = [["p1", "p2"], ["p3", "p4"], ["p5"]] + result = list(chunk_properties(properties, max_length)) + assert result == expected