Skip to content

Commit

Permalink
Fix/qdrant tests in CI (#1526)
Browse files Browse the repository at this point in the history
* Run qdrant server in local tests

* Add qdrant to test destination configs

* Fix stringify UUID objects

* Install qdrant deps

* Fix qdrant image version

* Disable httpx logging in tests

* Add index and use order by for fetching state

* Try qdrant local support

* Fix qdrant load stored state

* Disable parallel load in qdrant local

* Test destination config for qdrant local and server

* Fixes

* qdrant example test

* Missing module

* Cleanup

* resolves configuration to get full capabilities in load

* uses embedded qdrant for zendesk example

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
steinitzu and rudolfix authored Jul 4, 2024
1 parent e55bb0e commit 48c93f5
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 154 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ env:

# Slack hook for chess in production example
RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }}
# Path to local qdrant database
DESTINATION__QDRANT__CREDENTIALS__PATH: zendesk.qdb
# detect if the workflow is executed in a repo fork
IS_FORK: ${{ github.event.pull_request.head.repo.fork }}

Expand Down
10 changes: 8 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ env:
RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]"
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
Expand Down Expand Up @@ -63,6 +63,11 @@ jobs:
--health-timeout 5s
--health-retries 5
qdrant:
image: qdrant/qdrant:v1.8.4
ports:
- 6333:6333

steps:
- name: Check out
uses: actions/checkout@master
Expand Down Expand Up @@ -90,7 +95,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand All @@ -100,6 +105,7 @@ jobs:
name: Run tests Linux
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333

- name: Stop weaviate
if: always()
Expand Down
45 changes: 45 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ class StorageSchemaInfo(NamedTuple):
inserted_at: datetime.datetime
schema: str

@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any], naming_convention: NamingConvention
) -> "StorageSchemaInfo":
"""Instantiate this class from mapping where keys are normalized according to given naming convention
Args:
normalized_doc: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
naming_convention: Naming convention that was used to normalize keys
Returns:
StorageSchemaInfo: Instance of this class
"""
return cls(
version_hash=normalized_doc[naming_convention.normalize_identifier("version_hash")],
schema_name=normalized_doc[naming_convention.normalize_identifier("schema_name")],
version=normalized_doc[naming_convention.normalize_identifier("version")],
engine_version=normalized_doc[naming_convention.normalize_identifier("engine_version")],
inserted_at=normalized_doc[naming_convention.normalize_identifier("inserted_at")],
schema=normalized_doc[naming_convention.normalize_identifier("schema")],
)


@dataclasses.dataclass
class StateInfo:
Expand All @@ -82,6 +104,29 @@ def as_doc(self) -> TPipelineStateDoc:
doc.pop("version_hash")
return doc

@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any], naming_convention: NamingConvention
) -> "StateInfo":
"""Instantiate this class from mapping where keys are normalized according to given naming convention
Args:
normalized_doc: Mapping with normalized keys (e.g. {Version: ..., PipelineName: ...})
naming_convention: Naming convention that was used to normalize keys
Returns:
StateInfo: Instance of this class
"""
return cls(
version=normalized_doc[naming_convention.normalize_identifier("version")],
engine_version=normalized_doc[naming_convention.normalize_identifier("engine_version")],
pipeline_name=normalized_doc[naming_convention.normalize_identifier("pipeline_name")],
state=normalized_doc[naming_convention.normalize_identifier("state")],
created_at=normalized_doc[naming_convention.normalize_identifier("created_at")],
version_hash=normalized_doc.get(naming_convention.normalize_identifier("version_hash")),
_dlt_load_id=normalized_doc.get(naming_convention.normalize_identifier("_dlt_load_id")),
)


@configspec
class DestinationClientConfiguration(BaseConfiguration):
Expand Down
59 changes: 56 additions & 3 deletions dlt/destinations/impl/qdrant/configuration.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import dataclasses
from typing import Optional, Final
from typing_extensions import Annotated
from typing import Optional, Final, Any
from typing_extensions import Annotated, TYPE_CHECKING

from dlt.common.configuration import configspec, NotResolved
from dlt.common.configuration.specs.base_configuration import (
BaseConfiguration,
CredentialsConfiguration,
)
from dlt.common.destination.reference import DestinationClientDwhConfiguration
from dlt.destinations.impl.qdrant.exceptions import InvalidInMemoryQdrantCredentials

if TYPE_CHECKING:
from qdrant_client import QdrantClient


@configspec
class QdrantCredentials(CredentialsConfiguration):
# If `:memory:` - use in-memory Qdrant instance.
if TYPE_CHECKING:
_external_client: "QdrantClient"

# If `str` - use it as a `url` parameter.
# If `None` - use default values for `host` and `port`
location: Optional[str] = None
Expand All @@ -21,6 +27,47 @@ class QdrantCredentials(CredentialsConfiguration):
# Persistence path for QdrantLocal. Default: `None`
path: Optional[str] = None

def is_local(self) -> bool:
return self.path is not None

def on_resolved(self) -> None:
if self.location == ":memory:":
raise InvalidInMemoryQdrantCredentials()

def parse_native_representation(self, native_value: Any) -> None:
try:
from qdrant_client import QdrantClient

if isinstance(native_value, QdrantClient):
self._external_client = native_value
self.resolve()
except ModuleNotFoundError:
pass

super().parse_native_representation(native_value)

def _create_client(self, model: str, **options: Any) -> "QdrantClient":
from qdrant_client import QdrantClient

creds = dict(self)
if creds["path"]:
del creds["location"]

client = QdrantClient(**creds, **options)
client.set_model(model)
return client

def get_client(self, model: str, **options: Any) -> "QdrantClient":
client = getattr(self, "_external_client", None)
return client or self._create_client(model, **options)

def close_client(self, client: "QdrantClient") -> None:
"""Close client if not external"""
if getattr(self, "_external_client", None) is client:
# Do not close client created externally
return
client.close()

def __str__(self) -> str:
return self.location or "localhost"

Expand Down Expand Up @@ -81,6 +128,12 @@ class QdrantClientConfiguration(DestinationClientDwhConfiguration):
# Find the list here. https://qdrant.github.io/fastembed/examples/Supported_Models/.
model: str = "BAAI/bge-small-en"

def get_client(self) -> "QdrantClient":
return self.credentials.get_client(self.model, **dict(self.options))

def close_client(self, client: "QdrantClient") -> None:
self.credentials.close_client(client)

def fingerprint(self) -> str:
"""Returns a fingerprint of a connection string"""

Expand Down
11 changes: 11 additions & 0 deletions dlt/destinations/impl/qdrant/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dlt.common.destination.exceptions import DestinationTerminalException


class InvalidInMemoryQdrantCredentials(DestinationTerminalException):
def __init__(self) -> None:
super().__init__(
"To use in-memory instance of qdrant, "
"please instantiate it first and then pass to destination factory\n"
'\nclient = QdrantClient(":memory:")\n'
'dlt.pipeline(pipeline_name="...", destination=dlt.destinations.qdrant(client)'
)
16 changes: 16 additions & 0 deletions dlt/destinations/impl/qdrant/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typing as t

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.reference import TDestinationConfig
from dlt.common.normalizers.naming import NamingConvention

from dlt.destinations.impl.qdrant.configuration import QdrantCredentials, QdrantClientConfiguration

Expand All @@ -26,6 +28,20 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

return caps

@classmethod
def adjust_capabilities(
cls,
caps: DestinationCapabilitiesContext,
config: QdrantClientConfiguration,
naming: t.Optional[NamingConvention],
) -> DestinationCapabilitiesContext:
caps = super(qdrant, cls).adjust_capabilities(caps, config, naming)
if config.credentials.is_local():
# Local qdrant can not load in parallel
caps.loader_parallelism_strategy = "sequential"
caps.max_parallel_load_jobs = 1
return caps

@property
def client_class(self) -> t.Type["QdrantClient"]:
from dlt.destinations.impl.qdrant.qdrant_client import QdrantClient
Expand Down
Loading

0 comments on commit 48c93f5

Please sign in to comment.