Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using upsert to add entities, the number of entities retrieved from query() gives all the entites and doesn't consider overwrites #249

Open
nisharyan opened this issue Jan 24, 2025 · 18 comments

Comments

@nisharyan
Copy link

With pymilvus version 2.4.9, I am trying to upsert data to a collection. For a test example, I upserted entities with same primary_key to check for overrides. When I try to get the num of entities from query(), it returns the total number of entities and doesn't consider the overwrites because of the upsert calls.

vector_count = client.query(
    collection_name="test_collection",
    filter="",
    output_fields=["count(*)"]
)

But when I create a dataframe of the output of following query, the len of the dataframe is correct.

vector_count = client.query(
    collection_name="inline_test_collection_6",
    filter="",
    output_fields=["*"],
    limit=100,
)

print(len(pd.DataFrame(vector_count)))

Is this the expected behaviour?
Thnak you for the help!!

@xiaofan-luan
Copy link

With pymilvus version 2.4.9, I am trying to upsert data to a collection. For a test example, I upserted entities with same primary_key to check for overrides. When I try to get the num of entities from query(), it returns the total number of entities and doesn't consider the overwrites because of the upsert calls.

vector_count = client.query(
    collection_name="test_collection",
    filter="",
    output_fields=["count(*)"]
)

But when I create a dataframe of the output of following query, the len of the dataframe is correct.

vector_count = client.query(
    collection_name="inline_test_collection_6",
    filter="",
    output_fields=["*"],
    limit=100,
)

print(len(pd.DataFrame(vector_count)))

Is this the expected behaviour? Thnak you for the help!!

So this is using milvus lite? Could you offer a script to reproduce this issue?

@junjiejiangjjj
Copy link
Collaborator

@xiaofan-luan The query interface results will be deduplicated, and count(*) will directly return the results of segcore (current version 2.4.1)

@nisharyan
Copy link
Author

import pandas as pd
from pymilvus import connections, Collection, CollectionSchema, DataType, FieldSchema, MilvusClient
from uuid import uuid4

# Sample milvus-lite client.
client = MilvusClient(
    "test_upsert.db",
)

# Sample schema for collection.
field_schemas = [
    FieldSchema(
        "id",
        DataType.INT64,
        is_primary=True,
        auto_id=False,
        max_length=100,
    ),
    FieldSchema(
        "chunk_text",
        DataType.VARCHAR,
        max_length=100,
    ),
    FieldSchema(
        "vector", DataType.FLOAT_VECTOR, dim=384
    ),
]
collection_schema = CollectionSchema(fields=field_schemas)

milvus_connection_key_alias = uuid4().hex
connections.connect(
    alias=milvus_connection_key_alias,
    uri="test_upsert.db"
)

collection = Collection(
    name="upsert_test",
    schema=collection_schema,
    using=milvus_connection_key_alias
)

# Create an index.
collection.create_index(
    "vector",
    index_params = {
        "metric_type": "COSINE",
        "index_type": "FLAT",
        "params": {},
    }
)

# Insert sample data.
docs = [
    "Artificial intelligence was founded as an academic discipline in 1956.",
    "Alan Turing was the first person to conduct substantial research in AI.",
    "Born in Maida Vale, London, Turing was raised in southern England.",
]

vectors = [[ 0 for _ in range(384) ] for _ in range(len(docs)) ]
data = [ {"id": 1, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]
res = client.insert(
    collection_name="upsert_test",
    data=data
)

# Query_1
vector_count = client.query(
    collection_name="upsert_test",
    filter="",
    output_fields=["count(*)"]
)
print(f"Vector count as per query_1: {vector_count}")

# Query_2
vector_count = client.query(
    collection_name="upsert_test",
    filter="",
    output_fields=["*"],
    limit=100
)
print(f"Vector count as per query_2: {len(pd.DataFrame(vector_count))}")

@xiaofan-luan
Copy link

import pandas as pd
from pymilvus import connections, Collection, CollectionSchema, DataType, FieldSchema, MilvusClient
from uuid import uuid4

# Sample milvus-lite client.
client = MilvusClient(
    "test_upsert.db",
)

# Sample schema for collection.
field_schemas = [
    FieldSchema(
        "id",
        DataType.INT64,
        is_primary=True,
        auto_id=False,
        max_length=100,
    ),
    FieldSchema(
        "chunk_text",
        DataType.VARCHAR,
        max_length=100,
    ),
    FieldSchema(
        "vector", DataType.FLOAT_VECTOR, dim=384
    ),
]
collection_schema = CollectionSchema(fields=field_schemas)

milvus_connection_key_alias = uuid4().hex
connections.connect(
    alias=milvus_connection_key_alias,
    uri="test_upsert.db"
)

collection = Collection(
    name="upsert_test",
    schema=collection_schema,
    using=milvus_connection_key_alias
)

# Create an index.
collection.create_index(
    "vector",
    index_params = {
        "metric_type": "COSINE",
        "index_type": "FLAT",
        "params": {},
    }
)

# Insert sample data.
docs = [
    "Artificial intelligence was founded as an academic discipline in 1956.",
    "Alan Turing was the first person to conduct substantial research in AI.",
    "Born in Maida Vale, London, Turing was raised in southern England.",
]

vectors = [[ 0 for _ in range(384) ] for _ in range(len(docs)) ]
data = [ {"id": 1, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]
res = client.insert(
    collection_name="upsert_test",
    data=data
)

# Query_1
vector_count = client.query(
    collection_name="upsert_test",
    filter="",
    output_fields=["count(*)"]
)
print(f"Vector count as per query_1: {vector_count}")

# Query_2
vector_count = client.query(
    collection_name="upsert_test",
    filter="",
    output_fields=["*"],
    limit=100
)
print(f"Vector count as per query_2: {len(pd.DataFrame(vector_count))}")

did you only insert in one entity?
what's the different of query1 and query2?
should all return you 1 row.

@nisharyan
Copy link
Author

There is minor change to code above (insert has to be change with upsert)

res = client.upsert(
    collection_name="upsert_test",
    data=data
)

I have upserted 3 vectors with the same id (primary_key). Shouldn't the upsert call override the the ids and finally the total entities after querying must be 1 (Please correct me if I am wrong)?

This is the output of queries:

Vector count as per query_1: data: ["{'count(*)': 3}"] 
Vector count as per query_2: 1

@xiaofan-luan
Copy link

data = [ {"id": 1, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]

so all your data is with same ID?

@xiaofan-luan
Copy link

for milvus, each row should have a unique ID.

@nisharyan
Copy link
Author

I referred to this documentation https://milvus.io/docs/v2.4.x/insert-update-delete.md#Upsert-entities
It says

If the primary key of the entity already exists in the collection, the existing entity will be overwritten.

@xiaofan-luan
Copy link

usually this could be your DB primary key, a chunkID, a image url ID.

we do fix some upsert issues to make sure upsert are not duplicated even with the same IDs. you can try with 2.5.4 and see.
But again, it doens't make sense for you to insert duplicate IDs

@xiaofan-luan
Copy link

Image all the document has differnet ids

@nisharyan
Copy link
Author

Let's say I have a use case where the hash of my chunk text acts as the primary key. I want to use upsert so that there is some level of dedup happening at milvus end.

@xiaofan-luan
Copy link

please take a look at this line

data = [ {"id": 1, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]

I think it should be data = [ {"id": i, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]

My suggestion is to use LLM help you to debug and the current code seems to be buggy

@xiaofan-luan
Copy link

Let's say I have a use case where the hash of my chunk text acts as the primary key. I want to use upsert so that there is some level of dedup happening at milvus end.

then your ID should be hashfunction(docs[i])
please also upgrade to latest 2.5.4 or 2.4.21 to avoid unecessary debuging since we do fix several bugs of data deduplication

@nisharyan
Copy link
Author

Image all the document has differnet ids

Agree. But it also says that entries will be overriden in case of upsert.

@xiaofan-luan
Copy link

it will be overwritten with same IDs. that is true.
Try to use latest milvus and see.

@nisharyan
Copy link
Author

please take a look at this line

data = [ {"id": 1, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]

I think it should be data = [ {"id": i, "vector": vectors[i], "chunk_text": docs[i]} for i in range(len(vectors)) ]

My suggestion is to use LLM help you to debug and the current code seems to be buggy

I tried to replicate scenario where all of the ids are the same and verify the query() results

With distributed milvus, the query() takes into account the duplicate primary keys. (Verified it through query() and attu)

@xiaofan-luan
Copy link

query will dedup pks on different segments but count won't

@xiaofan-luan
Copy link

Right now, if you insert two entities with same PK, we don't guarantee what happened.
However, but doing upsert, the old entity should be deleted thus it should not be counted

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants