Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: Delete collection resource leak (single-node Chroma) #3297

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_collection(
id=model.id,
name=model.name,
configuration=model.get_configuration(),
segments=[], # Passing empty till backend changes are deployed.
segments=[], # Passing empty till backend changes are deployed.
metadata=model.metadata,
dimension=None, # This is lazily populated on the first add
get_or_create=get_or_create,
Expand Down Expand Up @@ -384,10 +384,10 @@ def delete_collection(
)

if existing:
self._manager.delete_segments(collection_id=existing[0].id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohitcpbot, this is the actual change as we discussed. rest is just black formatting changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tazarov.
If possible leave a note with the following comment or similar -
"""
This call will delete segment related data that is stored locally and cannot be part of the atomic SQL transaction.
It is a NoOp for the distributed sysdb implementation.
Omitting this call will lead to leak of segment related resources.
"""

Can you answer something for me - If the process crashes immediately after self._manager.delete_segments(collection_id=existing[0].id)
Then the actual entries in SQL are not deleted, which means the collection is not deleted.
Now if user issues a Get or Query, will the local manager work correctly ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fix to make local manager work in the above failure scenario is non trivial then we could leave a note here, and take it up as a separate task. But it will be good to know the state of the Db with above change.

The same scenario would had to be thought through even with your earlier changes of doing the local manager delete after the sysdb delete... where the sql could have gone through but the local manager did not because of a crash.. leading to a leak.

Copy link
Contributor Author

@tazarov tazarov Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohitcpbot, local manager has two segments for each collection:

  • sqlite - this will actually delete the segment from segments -
    def delete(self) -> None:
  • hnsw - it will delete the directory where the the HNSW is stored but will not delete the segment from segments dir

So here is a diagram to explain the point of failure:

image

The main problem as I see it in the current impl (with possible solutions):

image

self._sysdb.delete_collection(
existing[0].id, tenant=tenant, database=database
)
self._manager.delete_segments(existing[0].id)
else:
raise ValueError(f"Collection {name} does not exist.")

Expand Down Expand Up @@ -894,11 +894,15 @@ def _get_collection(self, collection_id: UUID) -> t.Collection:

@trace_method("SegmentAPI._scan", OpenTelemetryGranularity.ALL)
def _scan(self, collection_id: UUID) -> Scan:
collection_and_segments = self._sysdb.get_collection_with_segments(collection_id)
collection_and_segments = self._sysdb.get_collection_with_segments(
collection_id
)
# For now collection should have exactly one segment per scope:
# - Local scopes: vector, metadata
# - Distributed scopes: vector, metadata, record
scope_to_segment = {segment["scope"]: segment for segment in collection_and_segments["segments"]}
# - Distributed scopes: vector, metadata, record
scope_to_segment = {
segment["scope"]: segment for segment in collection_and_segments["segments"]
}
return Scan(
collection=collection_and_segments["collection"],
knn=scope_to_segment[t.SegmentScope.VECTOR],
Expand Down
4 changes: 3 additions & 1 deletion chromadb/segment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ class SegmentManager(Component):
segments as required"""

@abstractmethod
def prepare_segments_for_new_collection(self, collection: Collection) -> Sequence[Segment]:
def prepare_segments_for_new_collection(
self, collection: Collection
) -> Sequence[Segment]:
"""Return the segments required for a new collection. Returns only segment data,
does not persist to the SysDB"""
pass
Expand Down
11 changes: 8 additions & 3 deletions chromadb/segment/impl/manager/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
OpenTelemetryGranularity,
trace_method,
)
from chromadb.types import Collection, CollectionAndSegments, Operation, Segment, SegmentScope
from chromadb.types import (
Collection,
CollectionAndSegments,
Operation,
Segment,
SegmentScope,
)


class DistributedSegmentManager(SegmentManager):
Expand Down Expand Up @@ -77,8 +83,7 @@ def prepare_segments_for_new_collection(

@override
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
segments = self._sysdb.get_segments(collection=collection_id)
tazarov marked this conversation as resolved.
Show resolved Hide resolved
return [s["id"] for s in segments]
return [] # noop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HammadB, talked with @rohitcpbot and he mentioned that this should be noop, is this fine or should I revert back to the older version with distributed sysdb query?


@trace_method(
"DistributedSegmentManager.get_endpoint",
Expand Down
5 changes: 4 additions & 1 deletion chromadb/segment/impl/manager/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def reset_state(self) -> None:
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
)
@override
def prepare_segments_for_new_collection(self, collection: Collection) -> Sequence[Segment]:
def prepare_segments_for_new_collection(
self, collection: Collection
) -> Sequence[Segment]:
vector_segment = _segment(
self._vector_segment_type, SegmentScope.VECTOR, collection
)
Expand All @@ -154,6 +156,7 @@ def prepare_segments_for_new_collection(self, collection: Collection) -> Sequenc
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
segments = self._sysdb.get_segments(collection=collection_id)
for segment in segments:
collection_id = segment["collection"]
if segment["id"] in self._instances:
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
instance = self.get_segment(collection_id, VectorReader)
Expand Down
58 changes: 57 additions & 1 deletion chromadb/test/property/invariants.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import gc
import math
import os.path
from uuid import UUID
from contextlib import contextmanager

from chromadb.api.segment import SegmentAPI
from chromadb.db.system import SysDB
from chromadb.ingest.impl.utils import create_topic_name

from chromadb.config import System
from chromadb.db.base import get_sql
from chromadb.db.impl.sqlite import SqliteDB
from time import sleep
import psutil

from chromadb.segment import SegmentType
from chromadb.test.property.strategies import NormalizedRecordSet, RecordSet
from typing import Callable, Optional, Tuple, Union, List, TypeVar, cast, Any, Dict
from typing_extensions import Literal
import numpy as np
import numpy.typing as npt
from chromadb.api import types
from chromadb.api import types, ClientAPI
from chromadb.api.models.Collection import Collection
from hypothesis import note
from hypothesis.errors import InvalidArgument
Expand Down Expand Up @@ -457,3 +463,53 @@ def log_size_for_collections_match_expected(

else:
assert _total_embedding_queue_log_size(sqlite) == 0


@contextmanager
def collection_deleted(client: ClientAPI, collection_name: str):
# Invariant checks before deletion
assert collection_name in client.list_collections()
collection = client.get_collection(collection_name)
segments = []
if isinstance(client._server, SegmentAPI): # type: ignore
sysdb: SysDB = client._server._sysdb # type: ignore
segments = sysdb.get_segments(collection=collection.id)
segment_types = {}
should_have_hnsw = False
for segment in segments:
segment_types[segment["type"]] = True
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
sync_threshold = (
collection.metadata["hnsw:sync_threshold"]
if collection.metadata is not None
and "hnsw:sync_threshold" in collection.metadata
else 1000
)
if (
collection.count() > sync_threshold
): # we only check if vector segment dir exists if we've synced at least once
should_have_hnsw = True
assert os.path.exists(
os.path.join(
client.get_settings().persist_directory, str(segment["id"])
)
)
if should_have_hnsw:
assert segment_types[SegmentType.HNSW_LOCAL_PERSISTED.value]
assert segment_types[SegmentType.SQLITE.value]

yield

# Invariant checks after deletion
assert collection_name not in client.list_collections()
if len(segments) > 0:
sysdb: SysDB = client._server._sysdb # type: ignore
segments_after = sysdb.get_segments(collection=collection.id)
assert len(segments_after) == 0
for segment in segments:
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
assert not os.path.exists(
os.path.join(
client.get_settings().persist_directory, str(segment["id"])
)
)
6 changes: 4 additions & 2 deletions chromadb/test/property/test_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
run_state_machine_as_test,
MultipleResults,
)
import chromadb.test.property.invariants as invariants
from typing import Any, Dict, Mapping, Optional
import numpy
from chromadb.test.property.strategies import hashing_embedding_function
Expand Down Expand Up @@ -75,8 +76,9 @@ def get_coll(self, coll: strategies.ExternalCollection) -> None:
@rule(coll=consumes(collections))
def delete_coll(self, coll: strategies.ExternalCollection) -> None:
if coll.name in self.model:
self.client.delete_collection(name=coll.name)
self.delete_from_model(coll.name)
with invariants.collection_deleted(self.client, coll.name):
self.client.delete_collection(name=coll.name)
self.delete_from_model(coll.name)
else:
with pytest.raises(Exception):
self.client.delete_collection(name=coll.name)
Expand Down
Loading