Skip to content

Commit

Permalink
Extract segments info into a new struct. (milvus-io#15537)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>

Co-authored-by: sunby <[email protected]>
  • Loading branch information
sunby and sunby authored Feb 18, 2022
1 parent 5751759 commit 2e676a4
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 95 deletions.
119 changes: 31 additions & 88 deletions internal/querycoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ type MetaReplica struct {
//sync.RWMutex
collectionInfos map[UniqueID]*querypb.CollectionInfo
collectionMu sync.RWMutex
segmentInfos map[UniqueID]*querypb.SegmentInfo
segmentMu sync.RWMutex
queryChannelInfos map[UniqueID]*querypb.QueryChannelInfo
channelMu sync.RWMutex
deltaChannelInfos map[UniqueID][]*datapb.VchannelInfo
Expand All @@ -113,13 +111,13 @@ type MetaReplica struct {
queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex

segmentsInfo *segmentsInfo
//partitionStates map[UniqueID]*querypb.PartitionStates
}

func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAllocator func() (UniqueID, error)) (Meta, error) {
childCtx, cancel := context.WithCancel(ctx)
collectionInfos := make(map[UniqueID]*querypb.CollectionInfo)
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo)
dmChannelInfos := make(map[string]*querypb.DmChannelWatchInfo)
Expand All @@ -133,11 +131,12 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
idAllocator: idAllocator,

collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
deltaChannelInfos: deltaChannelInfos,
dmChannelInfos: dmChannelInfos,
queryStreams: queryMsgStream,

segmentsInfo: newSegmentsInfo(kv),
}

err := m.reloadFromKV()
Expand Down Expand Up @@ -167,22 +166,9 @@ func (m *MetaReplica) reloadFromKV() error {
m.collectionInfos[collectionID] = collectionInfo
}

segmentKeys, segmentValues, err := m.client.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil {
if err := m.segmentsInfo.loadSegments(); err != nil {
return err
}
for index := range segmentKeys {
segmentID, err := strconv.ParseInt(filepath.Base(segmentKeys[index]), 10, 64)
if err != nil {
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.Unmarshal([]byte(segmentValues[index]), segmentInfo)
if err != nil {
return err
}
m.segmentInfos[segmentID] = segmentInfo
}

deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil {
Expand Down Expand Up @@ -522,29 +508,17 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
}

// save segmentInfo to etcd
segmentInfoKvs := make(map[string]string)
for _, infos := range saves {
for _, info := range infos {
segmentInfoBytes, err := proto.Marshal(info)
if err != nil {
return col2SegmentChangeInfos, err
if err := m.segmentsInfo.saveSegment(info); err != nil {
panic(err)
}
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, info.CollectionID, info.PartitionID, info.SegmentID)
segmentInfoKvs[segmentKey] = string(segmentInfoBytes)
}
}
for key, value := range segmentInfoKvs {
err := m.client.Save(key, value)
if err != nil {
panic(err)
}
}

// remove compacted segment info from etcd
for _, segmentInfo := range segmentsCompactionFrom {
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
err := m.client.Remove(segmentKey)
if err != nil {
if err := m.segmentsInfo.removeSegment(segmentInfo); err != nil {
panic(err)
}
}
Expand All @@ -569,18 +543,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
panic(err)
}

m.segmentMu.Lock()
for _, segmentInfos := range saves {
for _, info := range segmentInfos {
segmentID := info.SegmentID
m.segmentInfos[segmentID] = info
}
}
for _, segmentInfo := range segmentsCompactionFrom {
delete(m.segmentInfos, segmentInfo.SegmentID)
}
m.segmentMu.Unlock()

m.channelMu.Lock()
for collectionID, channelInfo := range queryChannelInfosMap {
m.queryChannelInfos[collectionID] = channelInfo
Expand Down Expand Up @@ -640,9 +602,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio

// remove meta from etcd
for _, info := range removes {
segmentKey := fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, info.CollectionID, info.PartitionID, info.SegmentID)
err = m.client.Remove(segmentKey)
if err != nil {
if err = m.segmentsInfo.removeSegment(info); err != nil {
panic(err)
}
}
Expand All @@ -664,12 +624,6 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
panic(err)
}

m.segmentMu.Lock()
for _, info := range removes {
delete(m.segmentInfos, info.SegmentID)
}
m.segmentMu.Unlock()

m.channelMu.Lock()
m.queryChannelInfos[collectionID] = queryChannelInfo
m.channelMu.Unlock()
Expand Down Expand Up @@ -727,52 +681,41 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryC
}

func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()

results := make([]*querypb.SegmentInfo, 0)
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.CollectionID == collectionID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
ignorePartitionCmp := len(partitionIDs) == 0
partitionFilter := make(map[int64]struct{})
for _, pid := range partitionIDs {
partitionFilter[pid] = struct{}{}
}

segments := m.segmentsInfo.getSegments()
var res []*querypb.SegmentInfo
for _, segment := range segments {
_, ok := partitionFilter[segment.GetPartitionID()]
if (ignorePartitionCmp || ok) && segment.GetCollectionID() == collectionID {
res = append(res, segment)
}
}
if len(partitionIDs) == 0 {
return segmentInfos
}

partitionIDMap := getCompareMapFromSlice(partitionIDs)
for _, info := range segmentInfos {
partitionID := info.PartitionID
if _, ok := partitionIDMap[partitionID]; ok {
results = append(results, info)
}
}
return results
return res
}

func (m *MetaReplica) getSegmentInfoByID(segmentID UniqueID) (*querypb.SegmentInfo, error) {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()

if info, ok := m.segmentInfos[segmentID]; ok {
return proto.Clone(info).(*querypb.SegmentInfo), nil
segment := m.segmentsInfo.getSegment(segmentID)
if segment != nil {
return segment, nil
}

return nil, errors.New("getSegmentInfoByID: can't find segmentID in segmentInfos")
}
func (m *MetaReplica) getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()

segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.NodeID == nodeID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
func (m *MetaReplica) getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo {
var res []*querypb.SegmentInfo
segments := m.segmentsInfo.getSegments()
for _, segment := range segments {
if segment.GetNodeID() == nodeID {
res = append(res, segment)
}
}

return segmentInfos
return res
}

func (m *MetaReplica) getCollectionInfoByID(collectionID UniqueID) (*querypb.CollectionInfo, error) {
Expand Down
14 changes: 7 additions & 7 deletions internal/querycoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func TestMetaFunc(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)

nodeID := defaultQueryNodeID
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
segmentInfos[defaultSegmentID] = &querypb.SegmentInfo{
segmentsInfo := newSegmentsInfo(kv)
segmentsInfo.segmentIDMap[defaultSegmentID] = &querypb.SegmentInfo{
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
Expand All @@ -112,9 +112,9 @@ func TestMetaFunc(t *testing.T) {
meta := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: segmentInfos,
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
segmentsInfo: segmentsInfo,
}

dmChannels := []string{"testDm1", "testDm2"}
Expand Down Expand Up @@ -297,10 +297,10 @@ func TestReloadMetaFromKV(t *testing.T) {
meta := &MetaReplica{
client: kv,
collectionInfos: map[UniqueID]*querypb.CollectionInfo{},
segmentInfos: map[UniqueID]*querypb.SegmentInfo{},
queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{},
dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{},
deltaChannelInfos: map[UniqueID][]*datapb.VchannelInfo{},
segmentsInfo: newSegmentsInfo(kv),
}

kvs := make(map[string]string)
Expand Down Expand Up @@ -348,9 +348,9 @@ func TestReloadMetaFromKV(t *testing.T) {
assert.Nil(t, err)

assert.Equal(t, 1, len(meta.collectionInfos))
assert.Equal(t, 1, len(meta.segmentInfos))
assert.Equal(t, 1, len(meta.segmentsInfo.getSegments()))
_, ok := meta.collectionInfos[defaultCollectionID]
assert.Equal(t, true, ok)
_, ok = meta.segmentInfos[defaultSegmentID]
assert.Equal(t, true, ok)
segment := meta.segmentsInfo.getSegment(defaultSegmentID)
assert.NotNil(t, segment)
}
111 changes: 111 additions & 0 deletions internal/querycoord/segments_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package querycoord

import (
"fmt"
"sync"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
)

// segmentsInfo provides interfaces to do persistence/retrieve for segments with an in-memory cache
type segmentsInfo struct {
mu sync.RWMutex
loadOnce sync.Once
segmentIDMap map[int64]*querypb.SegmentInfo
kv kv.TxnKV
}

func newSegmentsInfo(kv kv.TxnKV) *segmentsInfo {
return &segmentsInfo{
kv: kv,
segmentIDMap: make(map[int64]*querypb.SegmentInfo),
}
}

func (s *segmentsInfo) loadSegments() error {
var err error
s.loadOnce.Do(func() {
s.mu.Lock()
defer s.mu.Unlock()
var values []string
_, values, err = s.kv.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil {
return
}
for _, v := range values {
segment := &querypb.SegmentInfo{}
if err = proto.Unmarshal([]byte(v), segment); err != nil {
return
}
s.segmentIDMap[segment.GetSegmentID()] = segment
}
})
return err
}

func (s *segmentsInfo) saveSegment(segment *querypb.SegmentInfo) error {
s.mu.Lock()
defer s.mu.Unlock()

k := getSegmentKey(segment)
v, err := proto.Marshal(segment)
if err != nil {
return err
}
if err = s.kv.Save(k, string(v)); err != nil {
return err
}
s.segmentIDMap[segment.GetSegmentID()] = segment
return nil
}

func (s *segmentsInfo) removeSegment(segment *querypb.SegmentInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
k := getSegmentKey(segment)
if err := s.kv.Remove(k); err != nil {
return err
}
delete(s.segmentIDMap, segment.GetSegmentID())
return nil
}

func (s *segmentsInfo) getSegment(ID int64) *querypb.SegmentInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.segmentIDMap[ID]
}

func (s *segmentsInfo) getSegments() []*querypb.SegmentInfo {
s.mu.RLock()
defer s.mu.RUnlock()
res := make([]*querypb.SegmentInfo, 0, len(s.segmentIDMap))
for _, segment := range s.segmentIDMap {
res = append(res, segment)
}
return res
}

func getSegmentKey(segment *querypb.SegmentInfo) string {
return fmt.Sprintf("%s/%d/%d/%d", util.SegmentMetaPrefix, segment.GetCollectionID(), segment.GetPartitionID(),
segment.GetSegmentID())
}
Loading

0 comments on commit 2e676a4

Please sign in to comment.