Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dlt.destinations.exceptions.DatabaseUndefinedRelation sql server destination #2088

Open
axellpadilla opened this issue Nov 22, 2024 · 10 comments
Assignees

Comments

@axellpadilla
Copy link
Contributor

axellpadilla commented Nov 22, 2024

dlt version

1.4

Describe the problem

While processing documents using a source.incremental, some causes a lot of errors of this type, this shouldn't happen.
It works without the incremental config.

Edit: Using a filter instead of incremental still causes errors, so this could be a underlying problem with handling data quality issues.
Edit 2: max_table_nesting doesn't work with the verified source, it still fails on the same deeper level than specified parameter. But adding a mongodb.resources["orders"].max_table_nesting = 2 do work partially, for example, still receiving errors for orders__shopping_cart__products__key.
Edit 3: The problem continued even without drop_sources option.

Im using the verified mongodb source, tested many ways, some collections load if are cleaner.

Expected behavior

No errors, documents loaded.

Steps to reproduce

Load documents with nested documents on fields, where some aren't documents, for example, mix array with undefined.
Use refresh = "drop_sources"
And the error is true, for example:
Cannot find the object "orders__shopping_cart" because it does not exist or you do not have permissions.
Doesn't exists on any schema.

Operating system

Linux, Windows

Runtime environment

Local

Python version

3.11

dlt data source

dlt verified source mongodb

dlt destination

No response

Other deployment details

sql server destination

Additional information

consistent error, 1.3 version same problem.

@sh-rp
Copy link
Collaborator

sh-rp commented Nov 27, 2024

Hey @axellpadilla, can you provide a script with example data (you can just extract a dictionary that produces this error from your mongo and remove sensitive data) so i can reproduce this? otherwise it will be very hard to debug. Thanks.

@axellpadilla
Copy link
Contributor Author

Yeah @sh-rp , it is indeed sensitive and it will be hard to reproduce manually (but I will try to find time to build a test case.), the schema of this collection is like 1900 lines, using the modified schema as import for the pipeline helped with the reference issue when NOT using refresh = drop_resources on pipeline execution, after trying for a couple times with write_disposition=replace, it changed the name of the table reference errored for each execution (random sort order for the creation of the tables?).

After this sequence (not sure if it was just good luck) it worked but launched a different error for data types or decimals too long on sql server:

  1. use an imported schema
  2. execute refresh = drop_resources + write_disposition=replace
  3. write_disposition=replace
  4. write_disposition=merge
  5. sadly, error on datatypes on sql server side (for example, couldnt convert nvarchar to bigint, shouldn't the datatypes be checked if convertible by dlt before load to any destination if an import schema was provided?)
    Example:
Checking failed jobs in load id '1732580035.753799'
JOB: orders.ab71c490bb.insert_values(orders)
JOB file type: insert_values
JOB file path: C:\Users\bpadilla\.dlt\pipelines\local\mongodb_ecommerce_hn_orders\load\loaded\1732580035.753799\failed_jobs\orders.ab71c490bb.0.insert_values
('22003', "[22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '1288.30999999999994543031789362430572509765625' is out of the range for numeric representation (maximum precision 38). (1007) (SQLExecDirectW); [22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '353.26999999999998181010596454143524169921875' is out of the range for numeric representation (maximum precision 38). (1007); [22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '4.71999999999999975131004248396493494510650634765625' is out of the range for numeric representation (maximum precision 38). (1007); [22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '1497.920000000000072759576141834259033203125' is out of the range for numeric representation (maximum precision 38). (1007); [22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '4.71999999999999975131004248396493494510650634765625' is out of the range for numeric representation (maximum precision 38). (1007); [22003] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]The number '620.73000000000001818989403545856475830078125' is out of the range for numeric representation (maximum precision 38). (1007)")

Another thing, there is a "root_key" creation error when using merge write_disposition on existing data (this shouldn't happen even changing write_disposition if you have "root_key" always enabled, previous runs should have it created).

Probably, is that some order of execution of the sql is not being correctly executed on the first case, considering also that for each load the schema could be different, one with text and one with document on the same field, and on the second case, I even checked the code of dlt and can't find a reason why the root_key isn't being created at first use, but maybe the reason is the same, a change from nested to no nested.

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 16, 2024

Hey @axellpadilla, I'll close this since I don't have a test to reprocude this.

@sh-rp sh-rp closed this as completed Dec 16, 2024
@github-project-automation github-project-automation bot moved this from Planned to Done in dlt core library Dec 16, 2024
@axellpadilla

This comment has been minimized.

@rudolfix
Copy link
Collaborator

@axellpadilla did you try to run this outside of dagster? I'm not getting any errors on duckdb and mssql destinations. I'm on current devel branch of dlt.

in your code there's one suspicious thing: you set import_schema_path=import_schema_path only on the second pipeline run. The import schema is the source of truth and is applied over the existing pipeline schema. The information on all tables will be erased (if you use empty import schema) before they are dropped. But tbh. I'm getting errors in neither cases

The system cannot find the path specified: 'C:\\Users\\userhome\\.dlt\\pipelines\\local\\tests_pipeline\\normalize\\extracted\\1736406440.0727198\\new_jobs'

this is most probably related to dagster. are you running several pipelines in parallel?

@axellpadilla
Copy link
Contributor Author

axellpadilla commented Jan 21, 2025

@rudolfix It failed using that code with multiple variations, it doesn't fail always but when it starts failing it keeps failing with the same error, that dagster import just provides the pipeline with the destination created but it also fails directly, I'm using sql server 2017.

I hope to have more time to reproduce it consistently on a fully featured test on the repository itself (dlt works perfectly with multiple pipelines and collections, but complex ones with multiple nested tables always have multiple different problems, maybe given a specific point in memory something executes on the wrong order?)

@sh-rp
Copy link
Collaborator

sh-rp commented Jan 27, 2025

@axellpadilla it just occured to me that we fixed a similar problem not so long ago, but I cannot find the note of this fix in the release docs. Could you try to run this on dlt 1.5.0 and see if the problem persists?

@axellpadilla
Copy link
Contributor Author

axellpadilla commented Jan 27, 2025

@axellpadilla it just occurred to me that we fixed a similar problem not so long ago, but I cannot find the note of this fix in the release docs. Could you try to run this on dlt 1.5.0 and see if the problem persists?

Sad, yeah, already updated, I'm currently trying to make a test on my local repo, I will generate enough random data with changing structure.

I realized the pokeapi test is failing but it should be a change on the api:

>       assert table_counts["pokemon_list"] == 1302
E       assert 1304 == 1302

@axellpadilla
Copy link
Contributor Author

Hi, got a test that sometimes is non-deterministic (not always the same error), it seems that create_indexes=True is the problem and hope it also works for you @sh-rp :

import hashlib
from typing import Dict, List, Any, Literal
import random

from dlt.common import pendulum
from dlt import pipeline
import logging
from dlt.destinations import mssql

import dlt
from dlt.common.utils import uniq_id

from tests.pipeline.utils import (
    load_table_counts,
)



def generate_nested_json_structure(depth: int = 0, max_depth: int = 2) -> Dict[str, Any]:
    if depth >= max_depth:
        return {
            "leaf_value": random.randint(1, 1000),
            "leaf_date": pendulum.now()
        }
    
    return {
        f"level_{depth}_field": {
            "nested_doc": generate_nested_json_structure(depth + 1, max_depth),
            "nested_array": [
                generate_nested_json_structure(depth + 1, max_depth)
                for _ in range(random.randint(1, 2))
            ]
        }
    }

def generate_deterministic_id(index: int, stage: str) -> str:
    """Generates a deterministic but unique id based on index and stage"""
    return hashlib.sha256(f"{stage}_{index}".encode()).hexdigest()[:24]

def generate_json_like_data(stage: Literal["s1", "s2", "s3"], size: int = 100, seed: int = 42) -> List[Dict[str, Any]]:
    random.seed(seed)
    data = []
    
    # Pre-generate common values to avoid repeated random calls
    names = ["Alice", "Bob", "Charlie", "David", None, None]  # Added more Nones
    ages = [random.randint(18, 80) for _ in range(10)] + [None, None]  # Added Nones to ages
    timestamps = [pendulum.now().subtract(days=d) for d in range(365)] + [None]
    versions = list(range(1, 6)) + [None]
    
    # Pre-generate a pool of nested structures with some None values
    nested_pool = {
        'small': [generate_nested_json_structure(0, 2) for _ in range(5)] + [None],
        'medium': [generate_nested_json_structure(0, 3) for _ in range(5)] + [None],
        'large': [generate_nested_json_structure(0, 4) for _ in range(5)] + [None]
    }

    nested_tables = {
        "users": (lambda: {
            "name": random.choice(names),
            "age": random.choice(ages),
            "preferences": random.choice(nested_pool['small']) if random.random() > 0.2 else None
        }, 1.0),
        "orders": (lambda: {
            "order_date": random.choice([pendulum.now(), None, None]),  # Increased chance of None
            "items": random.sample(nested_pool['medium'], k=random.randint(0, 3)) if random.random() > 0.3 else None,
            "shipping": random.choice(nested_pool['small'])
        }, 0.8),
        "products": (lambda: {
            "details": random.choice(nested_pool['large']) if random.random() > 0.2 else None,
            "categories": random.choice([
                random.sample(nested_pool['small'], k=2),
                None,
                [],
                None  # Added another None option
            ]),
            "specs": random.choice(nested_pool['medium']) if random.random() > 0.25 else None
        }, 0.7)
    }

    for i in range(size):
        base_doc = {
            "_id": generate_deterministic_id(i, stage),
            "timestamp": random.choice(timestamps),  # Now can be None
            "version": random.choice(versions)  # Now can be None
        }

        if stage == "s1":
            # Most consistent stage, but still with some missing fields
            doc = {
                **base_doc,
                **{k: v[0]() for k, v in nested_tables.items() if random.random() < v[1]}
            }
        elif stage == "s2":
            # Even more variable with missing fields
            doc = {
                **base_doc,
                **{k: v[0]() for k, v in nested_tables.items() if random.random() < v[1] * 0.9}
            }
            # Add some completely new fields, sometimes null
            if random.random() > 0.5:
                doc["dynamic_field"] = random.choice([
                    generate_nested_json_structure(0, random.randint(1, 4)),
                    None
                ])
        else:  # s3
            # Most chaotic stage with more nulls and missing fields
            doc = {
                **base_doc,
                **{
                    k: random.choice([
                        v[0](),
                        None,
                        None,  # Increased chance of None
                        {"modified": True},
                        [1, 2, 3],
                        "converted_to_string"
                    ]) for k, v in nested_tables.items() if random.random() < v[1] * 0.7
                }
            }
            # Add stage specific fields, sometimes null
            if random.random() > 0.3:
                doc["s3_specific"] = random.choice([
                    generate_nested_json_structure(0, 4),
                    None
                ])

            # Randomly remove some fields
            if random.random() > 0.8:
                keys_to_remove = random.sample(list(doc.keys() - {"_id"}), k=random.randint(1, 2))
                for key in keys_to_remove:
                    doc.pop(key)

        data.append(doc)
    
    return data




def test_generate_json_like_data():
    logging.basicConfig(level=logging.DEBUG)
    logging.debug("Starting test_generate_json_like_data")

    data = generate_json_like_data("s1", 10)
    assert len(data) == 10

    logging.debug("Finished test_generate_json_like_data")



def test_mssql_json_loading() -> None:
    """Test loading jsonDB-like data into MSSQL with different sequences"""
    dest = mssql(create_indexes=True)
    
    @dlt.source(name="json_source", parallelized=True)
    def source(stage: str):
        @dlt.resource(name="documents", write_disposition="merge", primary_key="_id")
        def documents():
            yield from generate_json_like_data(stage)
        return documents

    # Test sequence 1: s1 -> s2 -> s3
    p1 = pipeline(
        "json_test_1",
        dataset_name="json_test_1_" + uniq_id()
    )
    
    p1.run(source("s1"), destination=dest)
    p1.run(source("s2"), destination=dest)
    p1.run(source("s3"), destination=dest)

    # Test sequence 2: s2 -> s1 -> s3
    p2 = pipeline(
        "json_test_2",
        dataset_name="json_test_2_" + uniq_id()
    )
    
    p2.run(source("s2"), destination=dest)
    p2.run(source("s1"), destination=dest)
    p2.run(source("s3"), destination=dest)

    # Test sequence 3: s3 -> s2 -> s1
    p3 = pipeline(
        "json_test_3",
        dataset_name="json_test_3_" + uniq_id()
    )
    
    p3.run(source("s3"), destination=dest)
    p3.run(source("s2"), destination=dest)
    p3.run(source("s1"), destination=dest)

    # Test sequence 4: s2 with root_id -> s1 -> s3
    p4 = pipeline(
        "json_test_4",
        dataset_name="json_test_4_" + uniq_id()
    )
    
    @dlt.source(name="json_source_root_id", root_key=True)
    def source_with_root_id(stage: str):
        @dlt.resource(name="documents", write_disposition="replace", primary_key="_id")
        def documents():
            yield from generate_json_like_data(stage)
        return documents

    p4.run(source_with_root_id("s2"), destination=dest)
    p4.run(source_with_root_id("s1"), destination=dest, write_disposition="merge")
    p4.run(source_with_root_id("s3"), destination=dest, write_disposition="merge")

    # Verify data for each pipeline
    for p in [p1, p2, p3, p4]:
        assert load_table_counts(p,"documents")["documents"] == 300

@axellpadilla
Copy link
Contributor Author

@sh-rp Where you able to reproduce it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

3 participants