-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: hint
s are retained across different yields
#2109
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
It looks like the table_name of the most recent dlt mark is retained during extraction. I'll need to verify if this is the correct behavior or wether we would consider this a bug. If you explicitely mark every row, then it will work. |
yes! Can you point me to the code where that happens? I searched and couldn't find it.
I most definitely do not expect it and the examples in the documentation don't mention it either.
It doesn't actually, I believe. I have tried this as the first attempt and I remember it not working, however figuring out where that row came from drove me a bit nuts, so I tried a lot of things and not very methodical. I will add a second test case to this pull request with an explicit mark for the parent resource. |
If you replace
you will get the expected result. For where this problem originates this will be in the normalizer or the pipe iterator. If you can have a stab at it. We are currently very busy but should come back to this shortly, but you can always ask for help here :) |
PS: Yes, we consider this a bug. |
just added a second parameter to the test to make sure it works as intended: 3cff1a3 - it does, so \o/
OK, will have a short look if I can find it. |
via diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py
index 41d3035a..c62fa44e 100644
--- a/dlt/extract/extractors.py
+++ b/dlt/extract/extractors.py
@@ -135,6 +135,7 @@ class Extractor:
if table_name := self._get_static_table_name(resource, meta):
# write item belonging to table with static name
+ print(table_name, items)
self._write_to_static_table(resource, table_name, items, meta)
else:
# table has name or other hints depending on data items
@@ -148,8 +149,10 @@ class Extractor: shows that in subsequent runs after setting a hint on an item, the resource seems to store it. The reason it stores it, is this call to Line 129 in 61c2ed9
Line 153 in 61c2ed9
I believe this would possibly not only cause issues for table names, but also for all other metadata, if any of it would divert from what's on the resource. There are various fixes that could be done, but I am unsure if there are any backwards-compatible ones if anyone is relying on this not very sane behavior, as someone could have used this as a feature like: yield item1
yield item2
yield dlt.mark.with_hints(item3, hints=dlt.mark.make_hints(table_name="xxx"))
yield item4
yield item5
yield item... whoch would have yielded 2 items into the table defined in the resource and then all other subsequent items into table Generally, I think I'd make sure that yielding an item does not at all overwrite the resource, e.g. we need to get rid of the call to |
Something like this also fails: @dlt.transformer(
table_name=lambda item: "xxxx"
)
def xxxx() -> Iterable[TDataItem]:
yield {"id": 1}
yield dlt.mark.with_hints(
item={"id": 1000},
hints=dlt.mark.make_hints(
table_name="yyyy",
write_disposition="merge",
primary_key="id",
),
) with:
for the same reason - it tries to merge the hitns for the sub-table ( |
@joscha thanks for the investigation, this makes sense! I have assigned the task to myself and hope I'll get to fixing it next week. Keep an eye on the pr :) |
Do you have time to give me guidance towards the right solution? Then I'd give it a stab! |
I found another issue relating to columns, which is the same codepath: @dlt.resource()
def my_resource():
yield dlt.mark.with_hints(
item={"hello": "..."},
hints=dlt.mark.make_hints(
table_name="a",
columns={
"hello": {
"data_type": "text"
}
}
),
)
yield dlt.mark.with_hints(
item={"world": "..."},
hints=dlt.mark.make_hints(
table_name="b",
),
) should create:
but creates: (the column definition is retained on the pipeline for subsequent runs until another |
hint
s are retained across different yields
@sh-rp this bug makes dlt almost unusable when trying to yield data into different tables, I'd love to work towards a fix, can I assume that we agree this is a regression and hints should not be shared across different yields at all, e.g. the pipeline state must not be tainted by whatever hint is passed with a marked yield? |
@joscha we are a little in the area of undefined behavior. try to create table variants if you use dynamic table name (maybe we should set this automatically and for sure we should document it) def with_hints(
item: TDataItems, hints: TResourceHints, create_table_variant: bool = False
) -> DataItemWithMeta:
"""Marks `item` to update the resource with specified `hints`.
Will create a separate variant of hints for a table if `name` is provided in `hints` and `create_table_variant` is set.
Create `TResourceHints` with `make_hints`.
Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name`
"""
return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) |
regarding the state: all the tables are generated by a single resource and will be marked as such. so they share a single state |
I mark per yielded entity though, it is not clear from the API that I mutate a global or resource -level state with the way hints are currently done. If the marking API was on the resource, then maybe, however this would not do well with parallel and async generators as you could not guarantee that a state mutation on the resource will result in a yield being processed with the correct metadata. The only clean way I can see to fix this is to allow both: resource-level hints which are retained throughout a resource execution and item-level hints which are used when processing the yielded entity but do not mutate the resource state by itself. |
@joscha what happens if you do @dlt.resource()
def my_resource():
yield dlt.mark.with_hints(
item={"hello": "..."},
hints=dlt.mark.make_hints(
table_name="a",
create_table_variant=true,
columns={
"hello": {
"data_type": "text"
}
}
),
)
yield dlt.mark.with_hints(
item={"world": "..."},
hints=dlt.mark.make_hints(
table_name="b",
create_table_variant=true,
),
) are you getting tables in desired shape? I think if we always create a variant when state (ie. incremental) is managed on the resource level. table names are just hints and data routing during loading... for users that want to maintain state per table: the resource state dictionary is available for any custom work. note that refreshing multi table resource drops all tables produced by it |
Still running; So far the only thing that I see is a constant stream of:
with
Doesn't seem to be an easy fix, the run failed with:
The diff is: diff --git a/sources/affinity/__init__.py b/sources/affinity/__init__.py
index a76ea177..7d9c9747 100644
--- a/sources/affinity/__init__.py
+++ b/sources/affinity/__init__.py
@@ -65,8 +65,7 @@ def __create_id_resource(entity: ENTITY | LISTS_LITERAL, is_id_generator: bool =
@dlt.resource(
write_disposition="replace",
primary_key="id",
- # can't use this yet, due to https://github.com/dlt-hub/dlt/pull/2109
- # columns=datacls,
+ columns=datacls,
name=name,
parallelized=True
)
@@ -121,18 +120,9 @@ def mark_dropdown_item(dropdown_item: Dropdown | RankedDropdown, field: FieldMod
write_disposition="merge",
primary_key="id",
merge_key="id",
- # can't use this yet, due to https://github.com/dlt-hub/dlt/pull/2109
- # columns={
- # "id": {
- # "primary_key": True,
- # "unique": True,
-
- # },
- # "text": {
- # "data_type": "text"
- # }
- # }
+ columns=type(dropdown_item)
),
+ create_table_variant=True,
)
@@ -147,7 +137,8 @@ def process_and_yield_fields(entity: Company | Person | OpportunityWithFields, r
write_disposition="merge",
primary_key="id",
merge_key="id",
- )
+ ),
+ create_table_variant=True,
)
new_column = f"{field.id}_{field.name}" if field.id.startswith("field-") else field.id
value = field.value.root
@@ -181,6 +172,7 @@ def process_and_yield_fields(entity: Company | Person | OpportunityWithFields, r
write_disposition="merge",
primary_key="id",
),
+ create_table_variant=True,
)
case PersonValue() | CompanyValue():
ret[new_column] = value.data.id if value.data else None a run without this diff succeeds. |
I added another parametrized test in e1cbe19.
so |
I've been running this a while with table variants and I believe it is working as expected.
configurable, so it's usually suppressed. Update: see example here: https://github.com/planet-a-ventures/dlt-source-affinity/blob/1f5ec2b7cf39d8a59f46149899d8818a50e0c123/affinity/__init__.py#L33-L45 on how to hide this message when using variants with Pydantic column definitions. Also, I still think a fix is needed for the standard case. Or at the very very least extensive documentation both markdown and code around the current behavior. |
Hey @joscha, to summarize what the current state of this discussion is.
Is there anything else? |
Only one extra item comes to mind: if we agree that yielding items with hints without |
Description
I noticed in a real-life environment that mixing yields without markers and markers sometimes pushes data into the wrong table.
I was able to create a test case that reproduces it.
The expected outcome is 4 tables, looking like this:
things
:things_a
:things_b
:things_c
:but instead
things_c
receives an additional item intended for the non-marked tablethings
, so it looks like this:things_c
:None
None