Skip to content

Commit

Permalink
couchbase implementation works
Browse files Browse the repository at this point in the history
  • Loading branch information
Toby Tobkin committed Jul 17, 2024
1 parent f40d288 commit b3132c0
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 39 deletions.
5 changes: 2 additions & 3 deletions 1 query_qa_rag_agent.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -41,7 +41,6 @@
"├── LICENSE\n",
"├── README.md\n",
"├── cheat_code # Completed, working code you can cheat off of if you get stuck\n",
"├── data # Nothing useful in here for now\n",
"├── requirements.txt # Python packages\n",
"└── workshop_code # The code you will edit in the tutorials\n",
"```\n",
Expand Down Expand Up @@ -90,7 +89,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion 3 retriever.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion 4 generator.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion 5 summary.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
63 changes: 40 additions & 23 deletions cheat_code/common_components/vectordb_client_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.exceptions import CouchbaseException
import couchbase.search as search
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
from pydantic import BaseModel
from pinecone import Pinecone, ServerlessSpec
Expand Down Expand Up @@ -60,14 +62,14 @@ class CouchbaseClientAdapter(VectorDbClientAdapter):
def reset_index(self) -> None:
cluster, collection = self._initialize_cluster()
try:
# FIXME: this delete query doesn't work
# Execute a N1QL query to delete all documents in the collection
query = f"DELETE FROM `{CB_BUCKET_NAME}`"
q_res = cluster.query(query)

status = q_res.metadata().status()
print(f"Query status: {status}")

print("All documents in the collection have been deleted.")
# status = q_res.metadata().status()
# print(f"Query status: {status}")
# print("All documents in the collection have been deleted.")
except CouchbaseException as e:
print("Failed to delete documents:", e)
sys.exit()
Expand Down Expand Up @@ -101,35 +103,50 @@ def insert(self, text_splits: List[str], text_split_vectors: List[List[float]])
_, collection = self._initialize_cluster()
try:
for obj in vector_objs:
collection.insert(obj["id"], obj)
# FIXME: using an upsert instead of an insert because delete query above doesnt work
# collection.insert(obj["id"], obj)
collection.upsert(obj["id"], obj)
except CouchbaseException as e:
print("Failed to insert document:", e)
sys.exit()

def retrieve(self, query_vector: List[float], k: int) -> List[str]:
def retrieve(self, query_vector: List[float], k: int = 4) -> List[Tuple[Document, float]]:
cluster, collection = self._initialize_cluster()

bucket = cluster.bucket(CB_BUCKET_NAME)
scope = bucket.scope(CB_SCOPE_NAME)

fields = ["*"]

search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
'vector_1536_text_embedding_3_small',
query_vector,
k,
)
)
)
try:
# Create a VectorQuery
vector_query = VectorQuery.create(
field_name="vector_1536_text_embedding_3_small",
vector=query_vector,
num_candidates=k
search_iter = scope.search(
index=INDEX_NAME,
request=search_req,
options=SearchOptions(
limit=k,
fields=fields,
),
)

# Create a VectorSearch from the VectorQuery
vector_search = VectorSearch.from_vector_query(vector_query)
text_splits = []

# Execute the search
query = f"SELECT text_split FROM `{CB_BUCKET_NAME}` WHERE SEARCH({vector_search})"
result = cluster.query(query).rows()
# Parse the results
for row in search_iter.rows():
key = row.id
text_split = collection.get(key).value.get('text_split')
text_splits.append(text_split)
except Exception as e:
raise ValueError(f"Search failed with error: {e}")

# Extract and return the text_splits from the results
text_splits = [row['text_split'] for row in result]
return text_splits
except CouchbaseException as e:
print("Failed to retrieve documents:", e)
sys.exit()
return text_splits

def count_entries(self) -> int:
cluster, collection = self._initialize_cluster()
Expand Down
32 changes: 29 additions & 3 deletions couchbase-index-definition.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": "fulltext-index",
"name": "textsplits",
"name": "rag-workshop._default.textsplits",
"sourceType": "gocbcore",
"sourceName": "rag-workshop",
"planParams": {
Expand All @@ -20,7 +20,7 @@
"default_datetime_parser": "dateTimeOptional",
"default_field": "_all",
"default_mapping": {
"dynamic": false,
"dynamic": true,
"enabled": false
},
"default_type": "_default",
Expand All @@ -30,9 +30,35 @@
"type_field": "_type",
"types": {
"_default._default": {
"dynamic": false,
"dynamic": true,
"enabled": true,
"properties": {
"id": {
"dynamic": false,
"enabled": true,
"fields": [
{
"analyzer": "en",
"index": true,
"name": "id",
"store": true,
"type": "text"
}
]
},
"text_split": {
"dynamic": false,
"enabled": true,
"fields": [
{
"analyzer": "en",
"index": true,
"name": "text_split",
"store": true,
"type": "text"
}
]
},
"vector_1536_text_embedding_3_small": {
"dynamic": false,
"enabled": true,
Expand Down
144 changes: 137 additions & 7 deletions workshop_code/common_components/vectordb_client_adapters.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
from __future__ import annotations

import os
import sys
import traceback
from abc import ABC, abstractmethod
from typing import List, Any
from datetime import timedelta
from typing import Any, List

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.exceptions import CouchbaseException
import couchbase.search as search
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
from pydantic import BaseModel
import os
from pinecone import Pinecone, ServerlessSpec
import weaviate
from weaviate.auth import AuthApiKey
# from weaviate.classes import config, data
from pinecone import Pinecone
from pinecone import ServerlessSpec

# Couchbase
CB_ENDPOINT=os.getenv("CB_ENDPOINT")
CB_USERNAME=os.getenv("CB_USERNAME")
CB_PASSWORD=os.getenv("CB_PASSWORD")
CB_BUCKET_NAME = "rag-workshop"
CB_SCOPE_NAME = "_default"
CB_COLLECTION_NAME = "_default"
# Weaviate
WCS_URL = os.getenv("WCS_URL")
WCS_API_KEY = os.getenv("WCS_API_KEY")
# Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Common
INDEX_NAME = "textsplits"

class VectorDbClientAdapter(ABC):
@abstractmethod
def setup_index(self) -> None:
def reset_index(self) -> None:
pass

@abstractmethod
Expand All @@ -32,9 +53,118 @@ def retrieve(self, query_vector: List[float], k: int) -> List[str]:
def count_entries(self) -> int:
pass

# SDK Docs: https://docs.couchbase.com/sdk-api/couchbase-python-client/couchbase_api/couchbase_search.html#
class CouchbaseClientAdapter(VectorDbClientAdapter):

# Capella only supports setting up the index via the cloud console,
# so unlike other instances of this method, it does not delete and
# re-setup the index. It only clears the documents in the index.
def reset_index(self) -> None:
cluster, collection = self._initialize_cluster()
try:
# FIXME: this delete query doesn't work
# Execute a N1QL query to delete all documents in the collection
query = f"DELETE FROM `{CB_BUCKET_NAME}`"
q_res = cluster.query(query)

# status = q_res.metadata().status()
# print(f"Query status: {status}")
# print("All documents in the collection have been deleted.")
except CouchbaseException as e:
print("Failed to delete documents:", e)
sys.exit()

def _initialize_cluster(self):
try:
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
options.apply_profile("wan_development")

cluster = Cluster(CB_ENDPOINT, options)
cluster.wait_until_ready(timedelta(seconds=5))

bucket = cluster.bucket(CB_BUCKET_NAME)
collection = bucket.scope(CB_SCOPE_NAME).collection(CB_COLLECTION_NAME)
return cluster, collection
except Exception as e:
traceback.print_exc()
sys.exit("Failed to initialize the cluster or collection")

def insert(self, text_splits: List[str], text_split_vectors: List[List[float]]) -> None:
vector_objs = []
for i, text_split in enumerate(text_splits):
obj = {
"id": f"{i}",
"vector_1536_text_embedding_3_small": text_split_vectors[i],
"text_split": text_split
}
vector_objs.append(obj)

_, collection = self._initialize_cluster()
try:
for obj in vector_objs:
# FIXME: using an upsert instead of an insert because delete query above doesnt work
# collection.insert(obj["id"], obj)
collection.upsert(obj["id"], obj)
except CouchbaseException as e:
print("Failed to insert document:", e)
sys.exit()

def retrieve(self, query_vector: List[float], k: int = 4) -> List[Tuple[Document, float]]:
cluster, collection = self._initialize_cluster()
bucket = cluster.bucket(CB_BUCKET_NAME)
scope = bucket.scope(CB_SCOPE_NAME)

fields = ["*"]

search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
'vector_1536_text_embedding_3_small',
query_vector,
k,
)
)
)
try:
search_iter = scope.search(
index=INDEX_NAME,
request=search_req,
options=SearchOptions(
limit=k,
fields=fields,
),
)

text_splits = []

# Parse the results
for row in search_iter.rows():
key = row.id
text_split = collection.get(key).value.get('text_split')
text_splits.append(text_split)
except Exception as e:
raise ValueError(f"Search failed with error: {e}")

return text_splits

def count_entries(self) -> int:
cluster, collection = self._initialize_cluster()
try:
query = f"SELECT COUNT(*) AS count FROM `{CB_BUCKET_NAME}`.`{CB_SCOPE_NAME}`.`{CB_COLLECTION_NAME}`"
result = cluster.query(query)
count = 0
for row in result.rows():
count = row['count']
return count
except Exception as e:
print("Failed to count documents:", e)
return 0


class PineconeClientAdapter(VectorDbClientAdapter):

def setup_index(self) -> None:
def reset_index(self) -> None:
client = Pinecone(api_key=PINECONE_API_KEY)

if self._index_exists(INDEX_NAME):
Expand Down Expand Up @@ -90,7 +220,7 @@ def _index_exists(self, index_name: str) -> bool:

class WcsClientAdapter(VectorDbClientAdapter):

def setup_index(self) -> None:
def reset_index(self) -> None:
client = self._get_wcs_client()
try:
if client.collections.exists(INDEX_NAME):
Expand Down

0 comments on commit b3132c0

Please sign in to comment.