Skip to content

Commit

Permalink
Adds resource field reference syntax to template strings
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash committed Jan 14, 2025
1 parent 488bd24 commit 40d5201
Show file tree
Hide file tree
Showing 9 changed files with 642 additions and 241 deletions.
16 changes: 4 additions & 12 deletions dlt/sources/_core_source_templates/rest_api_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,10 @@ def github_source(access_token: Optional[str] = dlt.secrets.value) -> Any:
{
"name": "issue_comments",
"endpoint": {
# The placeholder {issue_number} will be resolved
# from the parent resource
"path": "issues/{issue_number}/comments",
"params": {
# The value of `issue_number` will be taken
# from the `number` field in the `issues` resource
"issue_number": {
"type": "resolve",
"resource": "issues",
"field": "number",
}
},
# The placeholder `{resources.issues.number}`
# will be replaced with the value of `number` field
# in the `issues` resource data
"path": "issues/{resources.issues.number}/comments",
},
# Include data from `id` field of the parent resource
# in the child data. The field name in the child data
Expand Down
10 changes: 7 additions & 3 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,18 @@ def paginate_dependent_resource(
)

for item in items:
formatted_path, parent_record = process_parent_data_item(
path, item, resolved_params, include_from_parent
formatted_path, expanded_params, parent_record = process_parent_data_item(
path=path,
item=item,
params=params,
resolved_params=resolved_params,
include_from_parent=include_from_parent,
)

for child_page in client.paginate(
method=method,
path=formatted_path,
params=params,
params=expanded_params,
paginator=paginator,
data_selector=data_selector,
hooks=hooks,
Expand Down
164 changes: 148 additions & 16 deletions dlt/sources/rest_api/config_setup.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import warnings
from copy import copy
from typing import (
Expand Down Expand Up @@ -96,6 +97,30 @@ class IncrementalParam(NamedTuple):
end: Optional[str]


class AttributeAccessibleDict(Dict[str, Any]):
def __getattr__(self, key: str) -> Any:
try:
return self[key]
except KeyError:
raise AttributeError(key)


class ResourcesContext:
def __init__(self) -> None:
self._resources: Dict[str, AttributeAccessibleDict] = {}

def __getitem__(self, key: str) -> Any:
if key not in self._resources:
self._resources[key] = AttributeAccessibleDict()
return self._resources[key]

def __getattr__(self, key: str) -> Any:
try:
return self[key]
except KeyError:
raise AttributeError(key)


def register_paginator(
paginator_name: str,
paginator_class: Type[BasePaginator],
Expand Down Expand Up @@ -285,7 +310,6 @@ def build_resource_dependency_graph(
dependency_graph: graphlib.TopologicalSorter = graphlib.TopologicalSorter() # type: ignore[type-arg]
resolved_param_map: Dict[str, Optional[List[ResolvedParam]]] = {}
endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults)

# create dependency graph
for resource_name, endpoint_resource in endpoint_resource_map.items():
if isinstance(endpoint_resource, DltResource):
Expand All @@ -296,6 +320,20 @@ def build_resource_dependency_graph(
# find resolved parameters to connect dependent resources
resolved_params = _find_resolved_params(endpoint_resource["endpoint"])

# extract more resolved params from path expressions
path_expressions = _extract_expressions(endpoint_resource["endpoint"]["path"], "resources.")
resolved_params += _expressions_to_resolved_params(path_expressions)

# extract expressions from parameters that are strings
params_expressions = []
for param_value in endpoint_resource["endpoint"].get("params", {}).values():
# If param_value is a plain string (e.g. "{resources.berry.a_property}")
if isinstance(param_value, str):
extracted = _extract_expressions(param_value, "resources.")
params_expressions.extend(extracted)

resolved_params += _expressions_to_resolved_params(params_expressions)

# set of resources in resolved params
named_resources = {rp.resolve_config["resource"] for rp in resolved_params}

Expand All @@ -307,7 +345,7 @@ def build_resource_dependency_graph(
predecessor = first_param.resolve_config["resource"]
if predecessor not in endpoint_resource_map:
raise ValueError(
f"A transformer resource {resource_name} refers to non existing parent resource"
f"A dependent resource {resource_name} refers to non existing parent resource"
f" {predecessor} on {first_param}"
)

Expand Down Expand Up @@ -383,15 +421,46 @@ def _make_endpoint_resource(
return _merge_resource_endpoints(default_config, resource)


def _encode_template_placeholders(
text: str, prefix: str, delimiter_char: str = "||"
) -> Tuple[str, Dict[str, str]]:
"""Encodes substrings starting with prefix in text using delimiter_char."""

# Store original values for restoration
replacements = {}

def replace_match(match: re.Match[str]) -> Any:
content = match.group(1)
if content.startswith(prefix):
# Generate a unique key for this replacement
key = f"{delimiter_char}{content}{delimiter_char}"
replacements[key] = match.group(0)
return key
# Return unchanged for further processing
return match.group(0)

# Find all {...} patterns and selectively replace them
pattern = r"\{\s*([^}]+)\}"
transformed = re.sub(pattern, replace_match, text)
return transformed, replacements


def _decode_special_objects(text: str, replacements: Dict[str, str]) -> str:
for key, value in replacements.items():
text = text.replace(key, value)
return text


def _bind_path_params(resource: EndpointResource) -> None:
"""Binds params declared in path to params available in `params`. Pops the
bound params but. Params of type `resolve` and `incremental` are skipped
bound params. Params of type `resolve` and `incremental` are skipped
and bound later.
"""
path_params: Dict[str, Any] = {}
assert isinstance(resource["endpoint"], dict) # type guard
resolve_params = [r.param_name for r in _find_resolved_params(resource["endpoint"])]
path = resource["endpoint"]["path"]
resolved_params = [r.param_name for r in _find_resolved_params(resource["endpoint"])]
path, replacements = _encode_template_placeholders(resource["endpoint"]["path"], "resources.")

for format_ in string.Formatter().parse(path):
name = format_[1]
if name:
Expand All @@ -401,8 +470,8 @@ def _bind_path_params(resource: EndpointResource) -> None:
f"The path {path} defined in resource {resource['name']} requires param with"
f" name {name} but it is not found in {params}"
)
if name in resolve_params:
resolve_params.remove(name)
if name in resolved_params:
resolved_params.remove(name)
if name in params:
if not isinstance(params[name], dict):
# bind resolved param and pop it from endpoint
Expand All @@ -418,13 +487,13 @@ def _bind_path_params(resource: EndpointResource) -> None:
# resolved params are bound later
path_params[name] = "{" + name + "}"

if len(resolve_params) > 0:
if len(resolved_params) > 0:
raise NotImplementedError(
f"Resource {resource['name']} defines resolve params {resolve_params} that are not"
f"Resource {resource['name']} defines resolve params {resolved_params} that are not"
f" bound in path {path}. Resolve query params not supported yet."
)

resource["endpoint"]["path"] = path.format(**path_params)
resource["endpoint"]["path"] = _decode_special_objects(path.format(**path_params), replacements)


def _setup_single_entity_endpoint(endpoint: Endpoint) -> Endpoint:
Expand Down Expand Up @@ -457,6 +526,49 @@ def _find_resolved_params(endpoint_config: Endpoint) -> List[ResolvedParam]:
]


def _extract_expressions(template_string: str, prefix: str) -> List[str]:
"""Takes a tepmlate string and extracts expressions that start with a prefix.
Args:
template_string (str): A string with expressions to extract
prefix (str): A string that marks the beginning of an expression
Example:
>>> _extract_expressions("blog/{resources.blog.id}/comments", "resources.")
["resources.blog.id"]
"""
expressions = []
for field_parts in string.Formatter().parse(template_string):
field_name = (field_parts[1] or "").strip()
if field_name and field_name.startswith(prefix):
expressions.append(field_name)
return expressions


def _expressions_to_resolved_params(expressions: List[str]) -> List[ResolvedParam]:
resolved_params = []
# We assume that the expressions are in the format 'resources.<resource>.<field>'
# and not more complex expressions
for expression in expressions:
parts = expression.strip().split(".")
if len(parts) != 3:
raise ValueError(
f"Invalid definition of {expression}. Expected format:"
" 'resources.<resource>.<field>'"
)
resolved_params.append(
ResolvedParam(
expression,
{
"type": "resolve",
"resource": parts[1],
"field": parts[2],
},
)
)
return resolved_params


def _action_type_unless_custom_hook(
action_type: Optional[str], custom_hook: Optional[List[Callable[..., Any]]]
) -> Union[Tuple[str, Optional[List[Callable[..., Any]]]], Tuple[None, List[Callable[..., Any]]],]:
Expand Down Expand Up @@ -580,42 +692,62 @@ def remove_field(response: Response, *args, **kwargs) -> Response:
def process_parent_data_item(
path: str,
item: Dict[str, Any],
params: Dict[str, Any],
resolved_params: List[ResolvedParam],
include_from_parent: List[str],
) -> Tuple[str, Dict[str, Any]]:
include_from_parent: Optional[List[str]],
) -> Tuple[str, Dict[str, str], Dict[str, Any]]:
parent_resource_name = resolved_params[0].resolve_config["resource"]

resources_context = ResourcesContext()

param_values = {}

# Collect plain resolved params
for resolved_param in resolved_params:
field_values = jsonpath.find_values(resolved_param.field_path, item)

if not field_values:
field_path = resolved_param.resolve_config["field"]
raise ValueError(
f"Transformer expects a field '{field_path}' to be present in the incoming data"
f"Resource expects a field '{field_path}' to be present in the incoming data"
f" from resource {parent_resource_name} in order to bind it to path param"
f" {resolved_param.param_name}. Available parent fields are"
f" {', '.join(item.keys())}"
)

param_values[resolved_param.param_name] = field_values[0]
# If resolved param was defined as `resources.<resource>.<field>`, then
# add it to the resources context
param_name = resolved_param.param_name
if param_name.startswith("resources."):
resource_name, field_name = param_name.split(".")[1:]
resources_context[resource_name][field_name] = field_values[0]
param_values["resources"] = resources_context
else:
param_values[resolved_param.param_name] = field_values[0]

bound_path = path.format(**param_values)

# Expand params with resolved values
expanded_params = {}
for key, value in params.items():
if isinstance(value, str):
expanded_params[key] = value.format(**param_values)
else:
expanded_params[key] = value

parent_record: Dict[str, Any] = {}
if include_from_parent:
for parent_key in include_from_parent:
child_key = make_parent_key_name(parent_resource_name, parent_key)
if parent_key not in item:
raise ValueError(
f"Transformer expects a field '{parent_key}' to be present in the incoming data"
f"Resource expects a field '{parent_key}' to be present in the incoming data"
f" from resource {parent_resource_name} in order to include it in child records"
f" under {child_key}. Available parent fields are {', '.join(item.keys())}"
)
parent_record[child_key] = item[parent_key]

return bound_path, parent_record
return bound_path, expanded_params, parent_record


def _merge_resource_endpoints(
Expand Down
2 changes: 1 addition & 1 deletion tests/sources/helpers/rest_client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def test_paginate_json_body_without_params(self, rest_client) -> None:
posts_skip = (DEFAULT_TOTAL_PAGES - 3) * DEFAULT_PAGE_SIZE

class JSONBodyPageCursorPaginator(BaseReferencePaginator):
def update_state(self, response, data): # type: ignore[override]
def update_state(self, response, data):
self._next_reference = response.json().get("next_page")

def update_request(self, request):
Expand Down
Loading

0 comments on commit 40d5201

Please sign in to comment.