forked from milvus-io/milvus
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support meta storing on database (milvus-io#17236) (milvus-io#18541)
Signed-off-by: kejiang <[email protected]> Signed-off-by: kejiang <[email protected]> Co-authored-by: kejiang <[email protected]>
- Loading branch information
Showing
59 changed files
with
6,423 additions
and
333 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ ignore: | |
- "docs/.*" | ||
- "**/*.pb.go" | ||
- "**/*.proto" | ||
- "internal/metastore/db/dbmodel/mocks/.*" | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package dao | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/milvus-io/milvus/internal/log" | ||
"github.com/milvus-io/milvus/internal/metastore/db/dbmodel" | ||
"github.com/milvus-io/milvus/internal/util/typeutil" | ||
"go.uber.org/zap" | ||
"gorm.io/gorm" | ||
"gorm.io/gorm/clause" | ||
) | ||
|
||
type collectionDb struct { | ||
db *gorm.DB | ||
} | ||
|
||
func (s *collectionDb) GetCollectionIDTs(tenantID string, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*dbmodel.Collection, error) { | ||
var col dbmodel.Collection | ||
|
||
err := s.db.Model(&dbmodel.Collection{}).Select("collection_id, ts").Where("tenant_id = ? AND collection_id = ? AND ts <= ?", tenantID, collectionID, ts).Order("ts desc").Take(&col).Error | ||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) { | ||
log.Warn("record not found", zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err)) | ||
return nil, fmt.Errorf("record not found, collID=%d, ts=%d", collectionID, ts) | ||
} | ||
if err != nil { | ||
log.Error("get collection ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return &col, nil | ||
} | ||
|
||
func (s *collectionDb) ListCollectionIDTs(tenantID string, ts typeutil.Timestamp) ([]*dbmodel.Collection, error) { | ||
var r []*dbmodel.Collection | ||
|
||
err := s.db.Model(&dbmodel.Collection{}).Select("collection_id, MAX(ts) ts").Where("tenant_id = ? AND ts <= ?", tenantID, ts).Group("collection_id").Find(&r).Error | ||
if err != nil { | ||
log.Error("list collection_id & latest ts pairs in collections failed", zap.String("tenant", tenantID), zap.Uint64("ts", ts), zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
func (s *collectionDb) Get(tenantID string, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*dbmodel.Collection, error) { | ||
var r dbmodel.Collection | ||
|
||
err := s.db.Model(&dbmodel.Collection{}).Where("tenant_id = ? AND collection_id = ? AND ts = ? AND is_deleted = false", tenantID, collectionID, ts).Take(&r).Error | ||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) { | ||
return nil, fmt.Errorf("collection not found, collID=%d, ts=%d", collectionID, ts) | ||
} | ||
if err != nil { | ||
log.Error("get collection by collection_id and ts failed", zap.String("tenant", tenantID), zap.Int64("collID", collectionID), zap.Uint64("ts", ts), zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return &r, nil | ||
} | ||
|
||
func (s *collectionDb) GetCollectionIDByName(tenantID string, collectionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { | ||
var r dbmodel.Collection | ||
|
||
err := s.db.Model(&dbmodel.Collection{}).Select("collection_id").Where("tenant_id = ? AND collection_name = ? AND ts <= ?", tenantID, collectionName, ts).Order("ts desc").Take(&r).Error | ||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) { | ||
return 0, fmt.Errorf("get collection_id by collection_name not found, collName=%s, ts=%d", collectionName, ts) | ||
} | ||
if err != nil { | ||
log.Error("get collection_id by collection_name failed", zap.String("tenant", tenantID), zap.String("collName", collectionName), zap.Uint64("ts", ts), zap.Error(err)) | ||
return 0, err | ||
} | ||
|
||
return r.CollectionID, nil | ||
} | ||
|
||
// Insert used in create & drop collection, needs be an idempotent operation, so we use DoNothing strategy here so it will not throw exception for retry, equivalent to kv catalog | ||
func (s *collectionDb) Insert(in *dbmodel.Collection) error { | ||
err := s.db.Clauses(clause.OnConflict{ | ||
// constraint UNIQUE (tenant_id, collection_id, ts) | ||
DoNothing: true, | ||
}).Create(&in).Error | ||
|
||
if err != nil { | ||
log.Error("insert collection failed", zap.String("tenant", in.TenantID), zap.Int64("collID", in.CollectionID), zap.Uint64("ts", in.Ts), zap.Error(err)) | ||
return err | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package dao | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/milvus-io/milvus/internal/log" | ||
"github.com/milvus-io/milvus/internal/metastore/db/dbmodel" | ||
"github.com/milvus-io/milvus/internal/util/typeutil" | ||
"go.uber.org/zap" | ||
"gorm.io/gorm" | ||
"gorm.io/gorm/clause" | ||
) | ||
|
||
type collAliasDb struct { | ||
db *gorm.DB | ||
} | ||
|
||
func (s *collAliasDb) Insert(in []*dbmodel.CollectionAlias) error { | ||
err := s.db.Clauses(clause.OnConflict{ | ||
// constraint UNIQUE (tenant_id, collection_alias, ts) | ||
DoNothing: true, | ||
}).Create(&in).Error | ||
|
||
if err != nil { | ||
log.Error("insert collection alias failed", zap.Error(err)) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *collAliasDb) GetCollectionIDByAlias(tenantID string, alias string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { | ||
var r dbmodel.CollectionAlias | ||
|
||
err := s.db.Model(&dbmodel.CollectionAlias{}).Select("collection_id").Where("tenant_id = ? AND collection_alias = ? AND ts <= ?", tenantID, alias, ts).Order("ts desc").Take(&r).Error | ||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) { | ||
return 0, fmt.Errorf("get collection_id by alias not found, alias=%s, ts=%d", alias, ts) | ||
} | ||
if err != nil { | ||
log.Error("get collection_id by alias failed", zap.String("tenant", tenantID), zap.String("alias", alias), zap.Uint64("ts", ts), zap.Error(err)) | ||
return 0, err | ||
} | ||
|
||
return r.CollectionID, nil | ||
} | ||
|
||
func (s *collAliasDb) ListCollectionIDTs(tenantID string, ts typeutil.Timestamp) ([]*dbmodel.CollectionAlias, error) { | ||
var r []*dbmodel.CollectionAlias | ||
|
||
err := s.db.Model(&dbmodel.CollectionAlias{}).Select("collection_id, MAX(ts) ts").Where("tenant_id = ? AND ts <= ?", tenantID, ts).Group("collection_id").Find(&r).Error | ||
if err != nil { | ||
log.Error("list collection_id & latest ts pairs in collection_aliases failed", zap.String("tenant", tenantID), zap.Uint64("ts", ts), zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
func (s *collAliasDb) List(tenantID string, cidTsPairs []*dbmodel.CollectionAlias) ([]*dbmodel.CollectionAlias, error) { | ||
var collAliases []*dbmodel.CollectionAlias | ||
|
||
inValues := make([][]interface{}, 0, len(cidTsPairs)) | ||
for _, pair := range cidTsPairs { | ||
in := []interface{}{pair.CollectionID, pair.Ts} | ||
inValues = append(inValues, in) | ||
} | ||
|
||
err := s.db.Model(&dbmodel.CollectionAlias{}).Select("collection_id, collection_alias"). | ||
Where("tenant_id = ? AND is_deleted = false AND (collection_id, ts) IN ?", tenantID, inValues).Find(&collAliases).Error | ||
if err != nil { | ||
log.Error("list alias by collection_id and alias pairs failed", zap.String("tenant", tenantID), zap.Any("collIdTs", inValues), zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return collAliases, nil | ||
} |
Oops, something went wrong.