Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 175 additions & 5 deletions llama_index/vector_stores/deeplake.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

"""
import logging
from typing import Any, List, Optional, cast
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, cast

from llama_index.schema import BaseNode, MetadataMode
from llama_index.vector_stores.types import VectorStore as VectorStoreBase
Expand All @@ -18,11 +18,181 @@
)

try:
from deeplake.core.vectorstore import VectorStore
import deeplake

DEEPLAKE_INSTALLED = True
if deeplake.__version__.startswith("3."):
from deeplake.core.vectorstore import VectorStore
else:

class VectorStore:
def __init__(
self,
path: str,
read_only: bool = False,
token: Optional[str] = None,
exec_option: Optional[str] = None,
verbose: bool = False,
runtime: Optional[Dict] = None,
index_params: Optional[Dict[str, Union[int, str]]] = None,
**kwargs: Any,
):
if _DEEPLAKE_INSTALLED is False:
raise ImportError(
"Could not import deeplake python package. "
"Please install it with `pip install deeplake[enterprise]`."
)
self.path = path
self.read_only = read_only
self.token = token
self.exec_options = exec_option
self.verbose = verbose
self.runtime = runtime
self.index_params = index_params
self.kwargs = kwargs
if read_only:
self.ds = deeplake.open_read_only(self.path, self.token)
else:
try:
self.ds = deeplake.open(self.path, self.token)
except deeplake.LogNotexistsError:
self.__create_dataset()

def tensors(self) -> list[str]:
return [c.name for c in self.ds.schema.columns]

def add(
self,
text: List[str],
metadata: Optional[List[dict]],
embedding_data: Iterable[str],
embedding_tensor: str,
embedding_function: Optional[Callable],
return_ids: bool,
**tensors: Any,
) -> Optional[list[str]]:
if embedding_function is not None:
embedding_data = embedding_function(text)
if embedding_tensor is not None:
embedding_tensor = "embedding"
_id = (
tensors["id"]
if "id" in tensors
else [str(uuid.uuid1()) for _ in range(len(text))]
)
self.ds.append(
{
"text": text,
"metadata": metadata,
"id": _id,
embedding_tensor: embedding_data,
}
)
self.ds.commit()
if return_ids:
return _id
else:
return None

def search_tql(
self, query: str, exec_options: Optional[str]
) -> Dict[str, Any]:
view = self.ds.query(query)
return self.__view_to_docs(view)

def search(
self,
embedding: Union[str, List[float]],
k: int,
distance_metric: str,
filter: Optional[Dict[str, Any]],
exec_option: Optional[str],
return_tensors: List[str],
deep_memory: Optional[bool],
query: Optional[str] = None,
) -> Dict[str, Any]:
if query is None and embedding is None:
raise ValueError(
"Both `embedding` and `query` were specified."
" Please specify either one or the other."
)
if query is not None:
return self.search_tql(query, exec_option)

if isinstance(embedding, str):
if self.embedding_function is None:
raise ValueError(
"embedding_function is required when embedding is a string"
)
embedding = self.embedding_function.embed_documents([embedding])[0]
emb_str = ", ".join([str(e) for e in embedding])

column_list = " * " if return_tensors else ", ".join(return_tensors)

metric = self.__metric_to_function(distance_metric)
order_by = " ASC "
if metric == "cosine_similarity":
order_by = " DESC "
dp = f"(embedding, ARRAY[{emb_str}])"
column_list += (
f", {self.__metric_to_function(distance_metric)}{dp} as score"
)
mf = self.__metric_to_function(distance_metric)
query = f"SELECT {column_list} ORDER BY {mf}{dp} {order_by} LIMIT {k}"
view = self.ds.query(query)
return self.__view_to_docs(view)

def delete(
self, ids: List[str], filter: Dict[str, Any], delete_all: bool
) -> None:
raise NotImplementedError

def dataset(self) -> Any:
return self.ds

def __view_to_docs(self, view: Any) -> Dict[str, Any]:
docs = {}
tenors = [(c.name, str(c.dtype)) for c in view.schema.columns]
for name, type in tenors:
if type == "dict":
docs[name] = [i.to_dict() for i in view[name][:]]
else:
try:
docs[name] = view[name][:].tolist()
except AttributeError:
docs[name] = view[name][:]
return docs

def __metric_to_function(self, metric: str) -> str:
if (
metric is None
or metric == "cosine"
or metric == "cosine_similarity"
):
return "cosine_similarity"
elif metric == "l2" or metric == "l2_norm":
return "l2_norm"
else:
raise ValueError(
f"Unknown metric: {metric}, should be one of "
"['cosine', 'cosine_similarity', 'l2', 'l2_norm']"
)

def __create_dataset(self) -> None:
if self.embedding_function is None:
raise ValueError(
"embedding_function is required to create a new dataset"
)
emb_size = len(self.embedding_function.embed_documents(["test"])[0])
self.ds = deeplake.create(self.path, self.token)
self.ds.add_column("text", deeplake.types.Text("inverted"))
self.ds.add_column("metadata", deeplake.types.Dict())
self.ds.add_column("embedding", deeplake.types.Embedding(size=emb_size))
self.ds.add_column("id", deeplake.types.Text)
self.ds.commit()

_DEEPLAKE_INSTALLED = True
except ImportError:
DEEPLAKE_INSTALLED = False
_DEEPLAKE_INSTALLED = False

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,7 +265,7 @@ def __init__(
self.read_only = read_only
self.dataset_path = dataset_path

if not DEEPLAKE_INSTALLED:
if not _DEEPLAKE_INSTALLED:
raise ImportError(
"Could not import deeplake python package. "
"Please install it with `pip install deeplake`."
Expand Down