Skip to content

Commit

Permalink
Fix Chunk Manager Path mis use (milvus-io#19141)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofan-luan <[email protected]>

Signed-off-by: xiaofan-luan <[email protected]>
  • Loading branch information
xiaofan-luan authored Sep 23, 2022
1 parent c9195c5 commit 928a213
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 179 deletions.
23 changes: 4 additions & 19 deletions cmd/milvus/mck.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

type mck struct {
params paramtable.GrpcServerConfig
params *paramtable.ComponentParam
taskKeyMap map[int64]string
taskNameMap map[int64]string
allTaskInfo map[string]string
Expand Down Expand Up @@ -232,27 +232,12 @@ func (c *mck) connectEctd() {
}

func (c *mck) connectMinio() {
useSSL := c.params.MinioCfg.UseSSL
if c.minioUseSSL == "true" || c.minioUseSSL == "false" {
minioUseSSL, err := strconv.ParseBool(c.minioUseSSL)
if err != nil {
log.Panic("fail to parse the 'minioUseSSL' string to the bool value", zap.String("minioUseSSL", c.minioUseSSL), zap.Error(err))
}
useSSL = minioUseSSL
}
chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio",
storage.RootPath(c.params.LocalStorageCfg.Path),
storage.Address(getConfigValue(c.minioAddress, c.params.MinioCfg.Address, "minio_address")),
storage.AccessKeyID(getConfigValue(c.minioUsername, c.params.MinioCfg.AccessKeyID, "minio_username")),
storage.SecretAccessKeyID(getConfigValue(c.minioPassword, c.params.MinioCfg.SecretAccessKey, "minio_password")),
storage.UseSSL(useSSL),
storage.BucketName(getConfigValue(c.minioBucketName, c.params.MinioCfg.BucketName, "minio_bucket_name")),
storage.CreateBucket(true))
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(c.params)

var err error
c.minioChunkManager, err = chunkManagerFactory.NewVectorStorageChunkManager(context.Background())
c.minioChunkManager, err = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
if err != nil {
log.Fatal("failed to connect to etcd", zap.Error(err))
log.Fatal("failed to connect to minio", zap.Error(err))
}
}

Expand Down
9 changes: 4 additions & 5 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type GcOption struct {
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerance time
dropTolerance time.Duration // dropped segment related key tolerance time
rootPath string
}

// garbageCollector handles garbage files in object storage
Expand Down Expand Up @@ -130,9 +129,9 @@ func (gc *garbageCollector) scan() {

// walk only data cluster related prefixes
prefixes := make([]string, 0, 3)
prefixes = append(prefixes, path.Join(gc.option.rootPath, insertLogPrefix))
prefixes = append(prefixes, path.Join(gc.option.rootPath, statsLogPrefix))
prefixes = append(prefixes, path.Join(gc.option.rootPath, deltaLogPrefix))
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), insertLogPrefix))
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), statsLogPrefix))
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), deltaLogPrefix))
var removedKeys []string

for _, prefix := range prefixes {
Expand All @@ -148,7 +147,7 @@ func (gc *garbageCollector) scan() {
continue
}

segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.rootPath, infoKey)
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
if err != nil && !common.IsIgnorableError(err) {
log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err))
continue
Expand Down
9 changes: 1 addition & 8 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func Test_garbageCollector_basic(t *testing.T) {
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
rootPath: rootPath,
})
gc.start()

Expand All @@ -82,7 +81,6 @@ func Test_garbageCollector_basic(t *testing.T) {
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
rootPath: rootPath,
})
assert.NotPanics(t, func() {
gc.start()
Expand Down Expand Up @@ -145,7 +143,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
rootPath: rootPath,
})
gc.segRefer = segReferManager
gc.scan()
Expand All @@ -167,7 +164,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
rootPath: rootPath,
})
gc.scan()

Expand All @@ -193,7 +189,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
rootPath: rootPath,
})
gc.start()
gc.scan()
Expand Down Expand Up @@ -222,7 +217,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
rootPath: rootPath,
})
gc.clearEtcd()
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
Expand All @@ -239,7 +233,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: 0,
dropTolerance: 0,
rootPath: rootPath,
})
gc.start()
gc.scan()
Expand Down Expand Up @@ -324,7 +317,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i
mcm = &storage.MinioChunkManager{
Client: cli,
}
mcm.SetVar(context.TODO(), bucket)
mcm.SetVar(context.TODO(), bucket, root)
return mcm, inserts, stats, delta, other, nil
}

Expand Down
40 changes: 9 additions & 31 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,39 +361,17 @@ func (s *Server) stopCompactionTrigger() {
func (s *Server) initGarbageCollection() error {
var cli storage.ChunkManager
var err error
if Params.CommonCfg.StorageType == "minio" {
chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio",
storage.RootPath(Params.LocalStorageCfg.Path),
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.UseIAM(Params.MinioCfg.UseIAM),
storage.IAMEndpoint(Params.MinioCfg.IAMEndpoint),
storage.CreateBucket(true))
cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx)
if err != nil {
log.Error("minio chunk manager init failed", zap.String("error", err.Error()))
return err
}
log.Info("minio chunk manager init success", zap.String("bucketname", Params.MinioCfg.BucketName))
} else if Params.CommonCfg.StorageType == "local" {
chunkManagerFactory := storage.NewChunkManagerFactory("local", "local",
storage.RootPath(Params.LocalStorageCfg.Path))
cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx)
if err != nil {
log.Error("local chunk manager init failed", zap.String("error", err.Error()))
return err
}
log.Info("local chunk manager init success")
}

chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(&Params)
cli, err = chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
if err != nil {
log.Error("chunk manager init failed", zap.Error(err))
return err
}
log.Info("Datacoord garbage collector chunk manager init success")
s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.indexCoord, GcOption{
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
rootPath: Params.MinioCfg.RootPath,

cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
checkInterval: Params.DataCoordCfg.GCInterval,
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
dropTolerance: Params.DataCoordCfg.GCDropTolerance,
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (node *DataNode) Start() error {
return errors.New("DataNode fail to connect etcd")
}

chunkManager, err := node.factory.NewVectorStorageChunkManager(node.ctx)
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/indexcoord/index_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (i *IndexCoord) Init() error {
// TODO silverxia add Rewatch logic
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)

chunkManager, err := i.factory.NewVectorStorageChunkManager(i.loopCtx)
chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx)
if err != nil {
log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
initErr = err
Expand Down
18 changes: 4 additions & 14 deletions internal/indexnode/chunk_mgr_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
)
Expand All @@ -23,24 +24,13 @@ func (m *chunkMgr) NewChunkManager(ctx context.Context, config *indexpb.StorageC
return v.(storage.ChunkManager), nil
}

opts := make([]storage.Option, 0)
if config.StorageType == "local" {
opts = append(opts, storage.RootPath(config.RootPath))
} else {
opts = append(opts, storage.Address(config.Address),
storage.AccessKeyID(config.AccessKeyID),
storage.SecretAccessKeyID(config.SecretAccessKey),
storage.UseSSL(config.UseSSL),
storage.BucketName(config.BucketName),
storage.UseIAM(config.UseIAM),
storage.IAMEndpoint(config.IAMEndpoint))
}
factory := storage.NewChunkManagerFactory("local", config.StorageType, opts...)
mgr, err := factory.NewVectorStorageChunkManager(ctx)
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(&Params)
mgr, err := chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
if err != nil {
return nil, err
}
v, _ := m.cached.LoadOrStore(key, mgr)
log.Ctx(ctx).Info("index node successfully init chunk manager")
return v.(storage.ChunkManager), nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/indexnode/chunkmgr_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (f *mockFactory) NewCacheStorageChunkManager(context.Context) (storage.Chun
return nil, errNotImplErr
}

func (f *mockFactory) NewVectorStorageChunkManager(context.Context) (storage.ChunkManager, error) {
func (f *mockFactory) NewPersistentStorageChunkManager(context.Context) (storage.ChunkManager, error) {
if f.chunkMgr != nil {
return f.chunkMgr, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoord/query_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (qc *QueryCoord) Init() error {
// we only try best to reload the leader addresses
reloadShardLeaderAddress(qc.meta, qc.cluster)

qc.chunkManager, initError = qc.factory.NewVectorStorageChunkManager(qc.loopCtx)
qc.chunkManager, initError = qc.factory.NewPersistentStorageChunkManager(qc.loopCtx)

if initError != nil {
log.Error("query coordinator init cluster failed", zap.Error(initError))
Expand Down
7 changes: 1 addition & 6 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"

"go.uber.org/zap"
Expand All @@ -37,20 +36,16 @@ type CoordinatorBroker struct {
dataCoord types.DataCoord
rootCoord types.RootCoord
indexCoord types.IndexCoord

cm storage.ChunkManager
}

func NewCoordinatorBroker(
dataCoord types.DataCoord,
rootCoord types.RootCoord,
indexCoord types.IndexCoord,
cm storage.ChunkManager) *CoordinatorBroker {
indexCoord types.IndexCoord) *CoordinatorBroker {
return &CoordinatorBroker{
dataCoord,
rootCoord,
indexCoord,
cm,
}
}

Expand Down
10 changes: 0 additions & 10 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
Expand All @@ -60,7 +59,6 @@ type Server struct {
idAllocator func() (int64, error)
factory dependency.Factory
metricsCacheManager *metricsinfo.MetricsCacheManager
chunkManager storage.ChunkManager

// Coordinators
dataCoord types.DataCoord
Expand Down Expand Up @@ -159,13 +157,6 @@ func (s *Server) Init() error {
// Init metrics cache manager
s.metricsCacheManager = metricsinfo.NewMetricsCacheManager()

// Init chunk manager
s.chunkManager, err = s.factory.NewVectorStorageChunkManager(s.ctx)
if err != nil {
log.Error("failed to init chunk manager", zap.Error(err))
return err
}

// Init meta
err = s.initMeta()
if err != nil {
Expand Down Expand Up @@ -252,7 +243,6 @@ func (s *Server) initMeta() error {
s.dataCoord,
s.rootCoord,
s.indexCoord,
s.chunkManager,
)
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions internal/querynode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,9 +1741,12 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory
// init shard cluster service
node.ShardClusterService = newShardClusterService(node.etcdCli, node.session, node)

node.queryShardService = newQueryShardService(node.queryNodeLoopCtx,
node.queryShardService, err = newQueryShardService(node.queryNodeLoopCtx,
node.metaReplica, node.tSafeReplica,
node.ShardClusterService, node.factory, node.scheduler)
if err != nil {
return nil, err
}

node.UpdateStateCode(internalpb.StateCode_Healthy)

Expand Down Expand Up @@ -1908,7 +1911,7 @@ func (mm *mockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstrea
func (mm *mockMsgStreamFactory) NewCacheStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
return nil, nil
}
func (mm *mockMsgStreamFactory) NewVectorStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
func (mm *mockMsgStreamFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
return nil, nil
}

Expand Down
16 changes: 6 additions & 10 deletions internal/querynode/query_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ type QueryNode struct {
eventCh <-chan *sessionutil.SessionEvent

vectorStorage storage.ChunkManager
cacheStorage storage.ChunkManager
etcdKV *etcdkv.EtcdKV

// shard cluster service, handle shard leader functions
Expand Down Expand Up @@ -245,20 +244,13 @@ func (node *QueryNode) Init() error {
}
log.Info("QueryNode init rateCollector done", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))

node.vectorStorage, err = node.factory.NewVectorStorageChunkManager(node.queryNodeLoopCtx)
node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.queryNodeLoopCtx)
if err != nil {
log.Error("QueryNode init vector storage failed", zap.Error(err))
initError = err
return
}

node.cacheStorage, err = node.factory.NewCacheStorageChunkManager(node.queryNodeLoopCtx)
if err != nil {
log.Error("QueryNode init cache storage failed", zap.Error(err))
initError = err
return
}

node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath)
log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))

Expand Down Expand Up @@ -320,8 +312,12 @@ func (node *QueryNode) Start() error {
// create shardClusterService for shardLeader functions.
node.ShardClusterService = newShardClusterService(node.etcdCli, node.session, node)
// create shard-level query service
node.queryShardService = newQueryShardService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica,
queryShardService, err := newQueryShardService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica,
node.ShardClusterService, node.factory, node.scheduler)
if err != nil {
return err
}
node.queryShardService = queryShardService

Params.QueryNodeCfg.CreatedTime = time.Now()
Params.QueryNodeCfg.UpdatedTime = time.Now()
Expand Down
Loading

0 comments on commit 928a213

Please sign in to comment.