Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.5]Add schema update time verification for insert and upsert to use cache #39382

Open
wants to merge 2 commits into
base: 2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4 h1:ByPm7/opli5WnRaMXOiBWbAmEf2KloTjbIBdB1P1NYk=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2 h1:PBb2s4yFMiZApXoV41AyUbMcAuIhTjI1sz8rMyxTcoM=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
5 changes: 5 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Collection struct {
Properties []*commonpb.KeyValuePair
State pb.CollectionState
EnableDynamicField bool
UpdateTimestamp uint64
}

func (c *Collection) Available() bool {
Expand Down Expand Up @@ -74,6 +75,7 @@ func (c *Collection) ShallowClone() *Collection {
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: c.Functions,
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand All @@ -99,6 +101,7 @@ func (c *Collection) Clone() *Collection {
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: CloneFunctions(c.Functions),
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -156,6 +159,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
State: coll.State,
Properties: coll.Properties,
EnableDynamicField: coll.Schema.EnableDynamicField,
UpdateTimestamp: coll.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -218,6 +222,7 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
StartPositions: coll.StartPositions,
State: coll.State,
Properties: coll.Properties,
UpdateTimestamp: coll.UpdateTimestamp,
}

if c.withPartitions {
Expand Down
18 changes: 10 additions & 8 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2628,10 +2628,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down Expand Up @@ -2871,10 +2872,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
},
},

idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type collectionInfo struct {
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
replicateID string
updateTimestamp uint64
}

type databaseInfo struct {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
}, nil
}
_, dbOk := m.collInfo[database]
Expand All @@ -490,6 +492,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
}

log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
Expand Down
1 change: 1 addition & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Properties = result.Properties
t.result.DbName = result.GetDbName()
t.result.NumPartitions = result.NumPartitions
t.result.UpdateTimestamp = result.UpdateTimestamp
for _, field := range result.Schema.Fields {
if field.IsDynamic {
continue
Expand Down
37 changes: 28 additions & 9 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
insertMsg *BaseInsertTask
ctx context.Context

result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
schemaTimestamp uint64
}

// TraceCtx returns insertTask context
Expand Down Expand Up @@ -134,6 +135,24 @@
return merr.WrapErrCollectionReplicateMode("insert")
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 142 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L140-L142

Added lines #L140 - L142 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 147 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L145-L147

Added lines #L145 - L147 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Ctx(ctx).Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
Expand Down
29 changes: 29 additions & 0 deletions internal/proxy/task_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,32 @@ func TestMaxInsertSize(t *testing.T) {
assert.ErrorIs(t, err, merr.ErrParameterTooLarge)
})
}

func TestInsertTaskForSchemaMismatch(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
ctx := context.Background()

t.Run("schema ts mismatch", func(t *testing.T) {
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{
DbName: "hooooooo",
CollectionName: "fooooo",
},
},
schemaTimestamp: 99,
}
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := it.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
})
}
21 changes: 20 additions & 1 deletion internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
partitionKeys *schemapb.FieldData
// automatic generate pk as new pk wehen autoID == true
// delete task need use the oldIds
oldIds *schemapb.IDs
oldIds *schemapb.IDs
schemaTimestamp uint64
}

// TraceCtx returns upsertTask context
Expand Down Expand Up @@ -301,6 +302,24 @@
return merr.WrapErrCollectionReplicateMode("upsert")
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 309 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L307-L309

Added lines #L307 - L309 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 314 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L312-L314

Added lines #L312 - L314 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("Failed to get collection schema",
Expand Down
27 changes: 27 additions & 0 deletions internal/proxy/task_upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/testutils"
)

Expand Down Expand Up @@ -360,3 +361,29 @@ func TestUpsertTaskForReplicate(t *testing.T) {
assert.Error(t, err)
})
}

func TestUpsertTaskForSchemaMismatch(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
ctx := context.Background()

t.Run("schema ts mismatch", func(t *testing.T) {
ut := upsertTask{
ctx: ctx,
req: &milvuspb.UpsertRequest{
CollectionName: "col-0",
},
schemaTimestamp: 99,
}
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := ut.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
})
}
10 changes: 10 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
oldColl.Properties = oldProperties
newColl := col.Clone()
newColl.Properties = newProperties
tso, err := core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down Expand Up @@ -280,6 +284,12 @@
if err != nil {
return err
}

tso, err := core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}

Check warning on line 291 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L288-L291

Added lines #L288 - L291 were not covered by tests

redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down
10 changes: 5 additions & 5 deletions internal/rootcoord/alter_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
).Return(errors.New("err"))
meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{})

core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -131,7 +131,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -166,7 +166,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -200,7 +200,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
},
},
}, nil)
core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -254,7 +254,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator())
newPros := append(properties, &commonpb.KeyValuePair{
Key: common.ReplicateEndTSKey,
Value: "10000",
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
State: pb.PartitionState_PartitionCreated,
}
}

collInfo := model.Collection{
CollectionID: collID,
DBID: t.dbID,
Expand All @@ -592,6 +591,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
Partitions: partitions,
Properties: t.Req.Properties,
EnableDynamicField: t.schema.EnableDynamicField,
UpdateTimestamp: ts,
}

// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
resp.Properties = collInfo.Properties
resp.NumPartitions = int64(len(collInfo.Partitions))
resp.DbId = collInfo.DBID
resp.UpdateTimestamp = collInfo.UpdateTimestamp
return resp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4 h1:ByPm7/opli5WnRaMXOiBWbAmEf2KloTjbIBdB1P1NYk=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.4/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2 h1:PBb2s4yFMiZApXoV41AyUbMcAuIhTjI1sz8rMyxTcoM=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.5-0.20250117031653-5377f6d19da2/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
Loading
Loading