diff --git a/site/content/includes/m3dbnode/annotated_config.yaml b/site/content/includes/m3dbnode/annotated_config.yaml index 87957991b7..8045a28f9d 100644 --- a/site/content/includes/m3dbnode/annotated_config.yaml +++ b/site/content/includes/m3dbnode/annotated_config.yaml @@ -362,7 +362,14 @@ db: writeShardsInitializing: # Whether or not writes to leaving shards count towards consistency # Default = false + # NOTE: shardsLeavingCountTowardsConsistency and shardsLeavingAndInitializingCountTowardsConsistency both + # cannot be true shardsLeavingCountTowardsConsistency: + # Whether or not writes to both leaving and initializing shards as pair counts toward consistency + # Default = false + # NOTE: shardsLeavingCountTowardsConsistency and shardsLeavingAndInitializingCountTowardsConsistency both + # cannot be true + shardsLeavingAndInitializingCountTowardsConsistency: # Specifies the pooling policy pooling: # Initial alloc size for a block diff --git a/src/cluster/placement/algo/sharded_helper.go b/src/cluster/placement/algo/sharded_helper.go index 79a06d69db..74dc36b834 100644 --- a/src/cluster/placement/algo/sharded_helper.go +++ b/src/cluster/placement/algo/sharded_helper.go @@ -827,7 +827,6 @@ func markShardsAvailable(p placement.Placement, instanceID string, shardIDs []ui return nil, err } } - sourceShards.Remove(shardID) if sourceShards.NumShards() == 0 { p = p.SetInstances(removeInstanceFromList(p.Instances(), sourceInstance.ID())) diff --git a/src/cluster/shard/shard.go b/src/cluster/shard/shard.go index aa65ed335e..63bca13bc6 100644 --- a/src/cluster/shard/shard.go +++ b/src/cluster/shard/shard.go @@ -108,7 +108,6 @@ func (s *shard) State() State { return s.state } func (s *shard) SetState(state State) Shard { s.state = state; return s } func (s *shard) SourceID() string { return s.sourceID } func (s *shard) SetSourceID(sourceID string) Shard { s.sourceID = sourceID; return s } - func (s *shard) CutoverNanos() int64 { if s.cutoverNanos != UnInitializedValue { return s.cutoverNanos diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index b72022d54f..45b15a41bd 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -414,6 +414,7 @@ func TestConfiguration(t *testing.T) { fetchSeriesBlocksBatchSize: null writeShardsInitializing: null shardsLeavingCountTowardsConsistency: null + shardsLeavingAndInitializingCountTowardsConsistency: null iterateEqualTimestampStrategy: null gcPercentage: 100 tick: null diff --git a/src/cmd/tools/dtest/util/seed/generator.go b/src/cmd/tools/dtest/util/seed/generator.go index 6a00461be7..fa346564c5 100644 --- a/src/cmd/tools/dtest/util/seed/generator.go +++ b/src/cmd/tools/dtest/util/seed/generator.go @@ -156,6 +156,10 @@ type fakeShardSet struct { shardID uint32 } +func (f *fakeShardSet) LookupShard(id uint32) (shard.Shard, error) { + return nil, fmt.Errorf("not implemented") +} + func (f *fakeShardSet) All() []shard.Shard { sh := shard.NewShard(f.shardID) return []shard.Shard{sh} diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index a997fa514b..04c1e7c062 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: ../../client/types.go -// Copyright (c) 2022 Uber Technologies, Inc. +// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -2361,6 +2361,20 @@ func (mr *MockOptionsMockRecorder) SetSeriesIteratorPoolSize(value interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSeriesIteratorPoolSize", reflect.TypeOf((*MockOptions)(nil).SetSeriesIteratorPoolSize), value) } +// SetShardsLeavingAndInitializingCountTowardsConsistency mocks base method. +func (m *MockOptions) SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetShardsLeavingAndInitializingCountTowardsConsistency", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetShardsLeavingAndInitializingCountTowardsConsistency indicates an expected call of SetShardsLeavingAndInitializingCountTowardsConsistency. +func (mr *MockOptionsMockRecorder) SetShardsLeavingAndInitializingCountTowardsConsistency(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetShardsLeavingAndInitializingCountTowardsConsistency", reflect.TypeOf((*MockOptions)(nil).SetShardsLeavingAndInitializingCountTowardsConsistency), value) +} + // SetShardsLeavingCountTowardsConsistency mocks base method. func (m *MockOptions) SetShardsLeavingCountTowardsConsistency(value bool) Options { m.ctrl.T.Helper() @@ -2599,6 +2613,20 @@ func (mr *MockOptionsMockRecorder) SetWriteTimestampOffset(value interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteTimestampOffset", reflect.TypeOf((*MockOptions)(nil).SetWriteTimestampOffset), value) } +// ShardsLeavingAndInitializingCountTowardsConsistency mocks base method. +func (m *MockOptions) ShardsLeavingAndInitializingCountTowardsConsistency() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardsLeavingAndInitializingCountTowardsConsistency") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShardsLeavingAndInitializingCountTowardsConsistency indicates an expected call of ShardsLeavingAndInitializingCountTowardsConsistency. +func (mr *MockOptionsMockRecorder) ShardsLeavingAndInitializingCountTowardsConsistency() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardsLeavingAndInitializingCountTowardsConsistency", reflect.TypeOf((*MockOptions)(nil).ShardsLeavingAndInitializingCountTowardsConsistency)) +} + // ShardsLeavingCountTowardsConsistency mocks base method. func (m *MockOptions) ShardsLeavingCountTowardsConsistency() bool { m.ctrl.T.Helper() @@ -4260,6 +4288,20 @@ func (mr *MockAdminOptionsMockRecorder) SetSeriesIteratorPoolSize(value interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSeriesIteratorPoolSize", reflect.TypeOf((*MockAdminOptions)(nil).SetSeriesIteratorPoolSize), value) } +// SetShardsLeavingAndInitializingCountTowardsConsistency mocks base method. +func (m *MockAdminOptions) SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetShardsLeavingAndInitializingCountTowardsConsistency", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetShardsLeavingAndInitializingCountTowardsConsistency indicates an expected call of SetShardsLeavingAndInitializingCountTowardsConsistency. +func (mr *MockAdminOptionsMockRecorder) SetShardsLeavingAndInitializingCountTowardsConsistency(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetShardsLeavingAndInitializingCountTowardsConsistency", reflect.TypeOf((*MockAdminOptions)(nil).SetShardsLeavingAndInitializingCountTowardsConsistency), value) +} + // SetShardsLeavingCountTowardsConsistency mocks base method. func (m *MockAdminOptions) SetShardsLeavingCountTowardsConsistency(value bool) Options { m.ctrl.T.Helper() @@ -4512,6 +4554,20 @@ func (mr *MockAdminOptionsMockRecorder) SetWriteTimestampOffset(value interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteTimestampOffset", reflect.TypeOf((*MockAdminOptions)(nil).SetWriteTimestampOffset), value) } +// ShardsLeavingAndInitializingCountTowardsConsistency mocks base method. +func (m *MockAdminOptions) ShardsLeavingAndInitializingCountTowardsConsistency() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardsLeavingAndInitializingCountTowardsConsistency") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShardsLeavingAndInitializingCountTowardsConsistency indicates an expected call of ShardsLeavingAndInitializingCountTowardsConsistency. +func (mr *MockAdminOptionsMockRecorder) ShardsLeavingAndInitializingCountTowardsConsistency() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardsLeavingAndInitializingCountTowardsConsistency", reflect.TypeOf((*MockAdminOptions)(nil).ShardsLeavingAndInitializingCountTowardsConsistency)) +} + // ShardsLeavingCountTowardsConsistency mocks base method. func (m *MockAdminOptions) ShardsLeavingCountTowardsConsistency() bool { m.ctrl.T.Helper() diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index 2e1a9cb6ae..5e009cb8a1 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -124,6 +124,10 @@ type Configuration struct { // count towards consistency, by default they do not. ShardsLeavingCountTowardsConsistency *bool `yaml:"shardsLeavingCountTowardsConsistency"` + // ShardsLeavingAndInitializingCountTowardsConsistency sets whether or not writes to leaving and initializing shards + // count towards consistency, by default they do not. + ShardsLeavingAndInitializingCountTowardsConsistency *bool `yaml:"shardsLeavingAndInitializingCountTowardsConsistency"` + // IterateEqualTimestampStrategy specifies the iterate equal timestamp strategy. IterateEqualTimestampStrategy *encoding.IterateEqualTimestampStrategy `yaml:"iterateEqualTimestampStrategy"` } @@ -213,6 +217,12 @@ func (c *Configuration) Validate() error { *c.AsyncWriteMaxConcurrency) } + if c.ShardsLeavingCountTowardsConsistency != nil && c.ShardsLeavingAndInitializingCountTowardsConsistency != nil && + *c.ShardsLeavingCountTowardsConsistency && *c.ShardsLeavingAndInitializingCountTowardsConsistency { + return fmt.Errorf("m3db client cannot have both shardsLeavingCountTowardsConsistency and " + + "shardsLeavingAndInitializingCountTowardsConsistency as true") + } + if err := c.Proto.Validate(); err != nil { return fmt.Errorf("error validating M3DB client proto configuration: %v", err) } @@ -451,6 +461,10 @@ func (c Configuration) NewAdminClient( if c.WriteShardsInitializing != nil { v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing) } + if c.ShardsLeavingAndInitializingCountTowardsConsistency != nil { + v = v.SetShardsLeavingAndInitializingCountTowardsConsistency(*c.ShardsLeavingAndInitializingCountTowardsConsistency) + } + if c.ShardsLeavingCountTowardsConsistency != nil { v = v.SetShardsLeavingCountTowardsConsistency(*c.ShardsLeavingCountTowardsConsistency) } diff --git a/src/dbnode/client/config_test.go b/src/dbnode/client/config_test.go index 651792ee14..3fd21e5dad 100644 --- a/src/dbnode/client/config_test.go +++ b/src/dbnode/client/config_test.go @@ -21,6 +21,7 @@ package client import ( + "fmt" "io/ioutil" "os" "testing" @@ -132,3 +133,18 @@ proto: assert.Equal(t, expected, cfg) } + +func TestValidateConfig(t *testing.T) { + var ( + boolTrue = true + ) + + config := Configuration{ + ShardsLeavingCountTowardsConsistency: &boolTrue, + ShardsLeavingAndInitializingCountTowardsConsistency: &boolTrue, + } + err := config.Validate() + require.Error(t, err) + require.Equal(t, err, fmt.Errorf("m3db client cannot have both shardsLeavingCountTowardsConsistency and "+ + "shardsLeavingAndInitializingCountTowardsConsistency as true")) +} diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 402336d278..614ffeef4e 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -94,6 +94,10 @@ const ( // defaultShardsLeavingCountTowardsConsistency is the default shards leaving count towards consistency defaultShardsLeavingCountTowardsConsistency = false + // defaultShardsLeavingAndInitializingCountTowardsConsistency is the default shard in leaving and initializing + // as pair count towards consistency + defaultShardsLeavingAndInitializingCountTowardsConsistency = false + // defaultWriteOpPoolSize is the default write op pool size defaultWriteOpPoolSize = 65536 @@ -229,73 +233,74 @@ var ( ) type options struct { - runtimeOptsMgr m3dbruntime.OptionsManager - clockOpts clock.Options - instrumentOpts instrument.Options - logHostWriteErrorSampleRate sampler.Rate - logHostFetchErrorSampleRate sampler.Rate - logErrorSampleRate sampler.Rate - topologyInitializer topology.Initializer - readConsistencyLevel topology.ReadConsistencyLevel - writeConsistencyLevel topology.ConsistencyLevel - bootstrapConsistencyLevel topology.ReadConsistencyLevel - channelOptions *tchannel.ChannelOptions - maxConnectionCount int - minConnectionCount int - hostConnectTimeout time.Duration - clusterConnectTimeout time.Duration - clusterConnectConsistencyLevel topology.ConnectConsistencyLevel - writeRequestTimeout time.Duration - fetchRequestTimeout time.Duration - truncateRequestTimeout time.Duration - backgroundConnectInterval time.Duration - backgroundConnectStutter time.Duration - backgroundHealthCheckInterval time.Duration - backgroundHealthCheckStutter time.Duration - backgroundHealthCheckFailLimit int - backgroundHealthCheckFailThrottleFactor float64 - tagEncoderOpts serialize.TagEncoderOptions - tagEncoderPoolSize pool.Size - tagDecoderOpts serialize.TagDecoderOptions - tagDecoderPoolSize pool.Size - writeRetrier xretry.Retrier - fetchRetrier xretry.Retrier - streamBlocksRetrier xretry.Retrier - writeShardsInitializing bool - shardsLeavingCountTowardsConsistency bool - newConnectionFn NewConnectionFn - readerIteratorAllocate encoding.ReaderIteratorAllocate - writeOperationPoolSize pool.Size - writeTaggedOperationPoolSize pool.Size - fetchBatchOpPoolSize pool.Size - writeBatchSize int - fetchBatchSize int - checkedBytesPool pool.CheckedBytesPool - identifierPool ident.Pool - hostQueueOpsFlushSize int - hostQueueOpsFlushInterval time.Duration - hostQueueOpsArrayPoolSize pool.Size - hostQueueNewPooledWorkerFn xsync.NewPooledWorkerFn - hostQueueEmitsHealthStatus bool - seriesIteratorPoolSize pool.Size - checkedBytesWrapperPoolSize pool.Size - contextPool context.Pool - origin topology.Host - fetchSeriesBlocksMaxBlockRetries int - fetchSeriesBlocksBatchSize int - fetchSeriesBlocksMetadataBatchTimeout time.Duration - fetchSeriesBlocksBatchTimeout time.Duration - fetchSeriesBlocksBatchConcurrency int - schemaRegistry namespace.SchemaRegistry - isProtoEnabled bool - asyncTopologyInitializers []topology.Initializer - asyncWriteWorkerPool xsync.PooledWorkerPool - asyncWriteMaxConcurrency int - useV2BatchAPIs bool - iterationOptions index.IterationOptions - writeTimestampOffset time.Duration - namespaceInitializer namespace.Initializer - thriftContextFn ThriftContextFn + runtimeOptsMgr m3dbruntime.OptionsManager + clockOpts clock.Options + instrumentOpts instrument.Options + logHostWriteErrorSampleRate sampler.Rate + logHostFetchErrorSampleRate sampler.Rate + logErrorSampleRate sampler.Rate + topologyInitializer topology.Initializer + readConsistencyLevel topology.ReadConsistencyLevel + writeConsistencyLevel topology.ConsistencyLevel + bootstrapConsistencyLevel topology.ReadConsistencyLevel + channelOptions *tchannel.ChannelOptions + maxConnectionCount int + minConnectionCount int + hostConnectTimeout time.Duration + clusterConnectTimeout time.Duration + clusterConnectConsistencyLevel topology.ConnectConsistencyLevel + writeRequestTimeout time.Duration + fetchRequestTimeout time.Duration + truncateRequestTimeout time.Duration + backgroundConnectInterval time.Duration + backgroundConnectStutter time.Duration + backgroundHealthCheckInterval time.Duration + backgroundHealthCheckStutter time.Duration + backgroundHealthCheckFailLimit int + backgroundHealthCheckFailThrottleFactor float64 + tagEncoderOpts serialize.TagEncoderOptions + tagEncoderPoolSize pool.Size + tagDecoderOpts serialize.TagDecoderOptions + tagDecoderPoolSize pool.Size + writeRetrier xretry.Retrier + fetchRetrier xretry.Retrier + streamBlocksRetrier xretry.Retrier + writeShardsInitializing bool + shardsLeavingCountTowardsConsistency bool + shardsLeavingAndInitializingCountTowardsConsistency bool + newConnectionFn NewConnectionFn + readerIteratorAllocate encoding.ReaderIteratorAllocate + writeOperationPoolSize pool.Size + writeTaggedOperationPoolSize pool.Size + fetchBatchOpPoolSize pool.Size + writeBatchSize int + fetchBatchSize int + checkedBytesPool pool.CheckedBytesPool + identifierPool ident.Pool + hostQueueOpsFlushSize int + hostQueueOpsFlushInterval time.Duration + hostQueueOpsArrayPoolSize pool.Size + hostQueueNewPooledWorkerFn xsync.NewPooledWorkerFn + hostQueueEmitsHealthStatus bool + seriesIteratorPoolSize pool.Size + checkedBytesWrapperPoolSize pool.Size + contextPool context.Pool + origin topology.Host + fetchSeriesBlocksMaxBlockRetries int + fetchSeriesBlocksBatchSize int + fetchSeriesBlocksMetadataBatchTimeout time.Duration + fetchSeriesBlocksBatchTimeout time.Duration + fetchSeriesBlocksBatchConcurrency int + schemaRegistry namespace.SchemaRegistry + isProtoEnabled bool + asyncTopologyInitializers []topology.Initializer + asyncWriteWorkerPool xsync.PooledWorkerPool + asyncWriteMaxConcurrency int + useV2BatchAPIs bool + iterationOptions index.IterationOptions + writeTimestampOffset time.Duration + namespaceInitializer namespace.Initializer + thriftContextFn ThriftContextFn } // NewOptions creates a new set of client options with defaults @@ -390,61 +395,63 @@ func newOptions() *options { } opts := &options{ - clockOpts: clock.NewOptions(), - instrumentOpts: instrument.NewOptions(), - channelOptions: defaultChannelOptions, - writeConsistencyLevel: defaultWriteConsistencyLevel, - readConsistencyLevel: defaultReadConsistencyLevel, - bootstrapConsistencyLevel: defaultBootstrapConsistencyLevel, - maxConnectionCount: defaultMaxConnectionCount, - minConnectionCount: defaultMinConnectionCount, - hostConnectTimeout: defaultHostConnectTimeout, - clusterConnectTimeout: defaultClusterConnectTimeout, - clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel, - writeRequestTimeout: defaultWriteRequestTimeout, - fetchRequestTimeout: defaultFetchRequestTimeout, - truncateRequestTimeout: defaultTruncateRequestTimeout, - backgroundConnectInterval: defaultBackgroundConnectInterval, - backgroundConnectStutter: defaultBackgroundConnectStutter, - backgroundHealthCheckInterval: defaultBackgroundHealthCheckInterval, - backgroundHealthCheckStutter: defaultBackgroundHealthCheckStutter, - backgroundHealthCheckFailLimit: defaultBackgroundHealthCheckFailLimit, - backgroundHealthCheckFailThrottleFactor: defaultBackgroundHealthCheckFailThrottleFactor, - writeRetrier: defaultWriteRetrier, - fetchRetrier: defaultFetchRetrier, - writeShardsInitializing: defaultWriteShardsInitializing, - shardsLeavingCountTowardsConsistency: defaultShardsLeavingCountTowardsConsistency, - tagEncoderPoolSize: defaultTagEncoderPoolSize, - tagEncoderOpts: serialize.NewTagEncoderOptions(), - tagDecoderPoolSize: defaultTagDecoderPoolSize, - tagDecoderOpts: serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), - streamBlocksRetrier: defaultStreamBlocksRetrier, - newConnectionFn: defaultNewConnectionFn, - writeOperationPoolSize: defaultWriteOpPoolSize, - writeTaggedOperationPoolSize: defaultWriteTaggedOpPoolSize, - fetchBatchOpPoolSize: defaultFetchBatchOpPoolSize, - writeBatchSize: DefaultWriteBatchSize, - fetchBatchSize: defaultFetchBatchSize, - checkedBytesPool: bytesPool, - identifierPool: idPool, - hostQueueOpsFlushSize: defaultHostQueueOpsFlushSize, - hostQueueOpsFlushInterval: defaultHostQueueOpsFlushInterval, - hostQueueOpsArrayPoolSize: defaultHostQueueOpsArrayPoolSize, - hostQueueNewPooledWorkerFn: hostQueueNewPooledWorkerFn, - hostQueueEmitsHealthStatus: defaultHostQueueEmitsHealthStatus, - seriesIteratorPoolSize: defaultSeriesIteratorPoolSize, - checkedBytesWrapperPoolSize: defaultCheckedBytesWrapperPoolSize, - contextPool: contextPool, - fetchSeriesBlocksMaxBlockRetries: defaultFetchSeriesBlocksMaxBlockRetries, - fetchSeriesBlocksBatchSize: defaultFetchSeriesBlocksBatchSize, - fetchSeriesBlocksMetadataBatchTimeout: defaultFetchSeriesBlocksMetadataBatchTimeout, - fetchSeriesBlocksBatchTimeout: defaultFetchSeriesBlocksBatchTimeout, - fetchSeriesBlocksBatchConcurrency: defaultFetchSeriesBlocksBatchConcurrency, - schemaRegistry: namespace.NewSchemaRegistry(false, nil), - asyncTopologyInitializers: []topology.Initializer{}, - asyncWriteMaxConcurrency: defaultAsyncWriteMaxConcurrency, - useV2BatchAPIs: defaultUseV2BatchAPIs, - thriftContextFn: defaultThriftContextFn, + clockOpts: clock.NewOptions(), + instrumentOpts: instrument.NewOptions(), + channelOptions: defaultChannelOptions, + writeConsistencyLevel: defaultWriteConsistencyLevel, + readConsistencyLevel: defaultReadConsistencyLevel, + bootstrapConsistencyLevel: defaultBootstrapConsistencyLevel, + maxConnectionCount: defaultMaxConnectionCount, + minConnectionCount: defaultMinConnectionCount, + hostConnectTimeout: defaultHostConnectTimeout, + clusterConnectTimeout: defaultClusterConnectTimeout, + clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel, + writeRequestTimeout: defaultWriteRequestTimeout, + fetchRequestTimeout: defaultFetchRequestTimeout, + truncateRequestTimeout: defaultTruncateRequestTimeout, + backgroundConnectInterval: defaultBackgroundConnectInterval, + backgroundConnectStutter: defaultBackgroundConnectStutter, + backgroundHealthCheckInterval: defaultBackgroundHealthCheckInterval, + backgroundHealthCheckStutter: defaultBackgroundHealthCheckStutter, + backgroundHealthCheckFailLimit: defaultBackgroundHealthCheckFailLimit, + backgroundHealthCheckFailThrottleFactor: defaultBackgroundHealthCheckFailThrottleFactor, + writeRetrier: defaultWriteRetrier, + fetchRetrier: defaultFetchRetrier, + writeShardsInitializing: defaultWriteShardsInitializing, + shardsLeavingCountTowardsConsistency: defaultShardsLeavingCountTowardsConsistency, + shardsLeavingAndInitializingCountTowardsConsistency: defaultShardsLeavingAndInitializingCountTowardsConsistency, + tagEncoderPoolSize: defaultTagEncoderPoolSize, + tagEncoderOpts: serialize.NewTagEncoderOptions(), + tagDecoderPoolSize: defaultTagDecoderPoolSize, + tagDecoderOpts: serialize. + NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + streamBlocksRetrier: defaultStreamBlocksRetrier, + newConnectionFn: defaultNewConnectionFn, + writeOperationPoolSize: defaultWriteOpPoolSize, + writeTaggedOperationPoolSize: defaultWriteTaggedOpPoolSize, + fetchBatchOpPoolSize: defaultFetchBatchOpPoolSize, + writeBatchSize: DefaultWriteBatchSize, + fetchBatchSize: defaultFetchBatchSize, + checkedBytesPool: bytesPool, + identifierPool: idPool, + hostQueueOpsFlushSize: defaultHostQueueOpsFlushSize, + hostQueueOpsFlushInterval: defaultHostQueueOpsFlushInterval, + hostQueueOpsArrayPoolSize: defaultHostQueueOpsArrayPoolSize, + hostQueueNewPooledWorkerFn: hostQueueNewPooledWorkerFn, + hostQueueEmitsHealthStatus: defaultHostQueueEmitsHealthStatus, + seriesIteratorPoolSize: defaultSeriesIteratorPoolSize, + checkedBytesWrapperPoolSize: defaultCheckedBytesWrapperPoolSize, + contextPool: contextPool, + fetchSeriesBlocksMaxBlockRetries: defaultFetchSeriesBlocksMaxBlockRetries, + fetchSeriesBlocksBatchSize: defaultFetchSeriesBlocksBatchSize, + fetchSeriesBlocksMetadataBatchTimeout: defaultFetchSeriesBlocksMetadataBatchTimeout, + fetchSeriesBlocksBatchTimeout: defaultFetchSeriesBlocksBatchTimeout, + fetchSeriesBlocksBatchConcurrency: defaultFetchSeriesBlocksBatchConcurrency, + schemaRegistry: namespace.NewSchemaRegistry(false, nil), + asyncTopologyInitializers: []topology.Initializer{}, + asyncWriteMaxConcurrency: defaultAsyncWriteMaxConcurrency, + useV2BatchAPIs: defaultUseV2BatchAPIs, + thriftContextFn: defaultThriftContextFn, } return opts.SetEncodingM3TSZ().(*options) } @@ -798,10 +805,20 @@ func (o *options) SetShardsLeavingCountTowardsConsistency(value bool) Options { return &opts } +func (o *options) SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) Options { + opts := *o + opts.shardsLeavingAndInitializingCountTowardsConsistency = value + return &opts +} + func (o *options) ShardsLeavingCountTowardsConsistency() bool { return o.shardsLeavingCountTowardsConsistency } +func (o *options) ShardsLeavingAndInitializingCountTowardsConsistency() bool { + return o.shardsLeavingAndInitializingCountTowardsConsistency +} + func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { opts := *o opts.tagEncoderOpts = value diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 7251b2e706..a6a528cc98 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -144,35 +144,36 @@ func (s *sessionState) readConsistencyLevelWithRLock( } type session struct { - state sessionState - opts Options - runtimeOptsListenerCloser xresource.SimpleCloser - scope tally.Scope - nowFn clock.NowFn - log *zap.Logger - logWriteErrorSampler *sampler.Sampler - logFetchErrorSampler *sampler.Sampler - logHostWriteErrorSampler *sampler.Sampler - logHostFetchErrorSampler *sampler.Sampler - newHostQueueFn newHostQueueFn - writeRetrier xretry.Retrier - fetchRetrier xretry.Retrier - streamBlocksRetrier xretry.Retrier - pools sessionPools - fetchBatchSize int - newPeerBlocksQueueFn newPeerBlocksQueueFn - reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn - pickBestPeerFn pickBestPeerFn - healthCheckNewConnFn healthCheckFn - origin topology.Host - streamBlocksMaxBlockRetries int - streamBlocksWorkers xsync.WorkerPool - streamBlocksBatchSize int - streamBlocksMetadataBatchTimeout time.Duration - streamBlocksBatchTimeout time.Duration - writeShardsInitializing bool - shardsLeavingCountTowardsConsistency bool - metrics sessionMetrics + state sessionState + opts Options + runtimeOptsListenerCloser xresource.SimpleCloser + scope tally.Scope + nowFn clock.NowFn + log *zap.Logger + logWriteErrorSampler *sampler.Sampler + logFetchErrorSampler *sampler.Sampler + logHostWriteErrorSampler *sampler.Sampler + logHostFetchErrorSampler *sampler.Sampler + newHostQueueFn newHostQueueFn + writeRetrier xretry.Retrier + fetchRetrier xretry.Retrier + streamBlocksRetrier xretry.Retrier + pools sessionPools + fetchBatchSize int + newPeerBlocksQueueFn newPeerBlocksQueueFn + reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn + pickBestPeerFn pickBestPeerFn + healthCheckNewConnFn healthCheckFn + origin topology.Host + streamBlocksMaxBlockRetries int + streamBlocksWorkers xsync.WorkerPool + streamBlocksBatchSize int + streamBlocksMetadataBatchTimeout time.Duration + streamBlocksBatchTimeout time.Duration + writeShardsInitializing bool + shardsLeavingCountTowardsConsistency bool + shardsLeavingAndInitializingCountTowardsConsistency bool + metrics sessionMetrics } type shardMetricsKey struct { @@ -182,26 +183,30 @@ type shardMetricsKey struct { type sessionMetrics struct { sync.RWMutex - writeSuccess tally.Counter - writeErrorsBadRequest tally.Counter - writeErrorsInternalError tally.Counter - writeLatencyHistogram tally.Histogram - writeNodesRespondingErrors []tally.Counter - writeNodesRespondingBadRequestErrors []tally.Counter - fetchSuccess tally.Counter - fetchErrorsBadRequest tally.Counter - fetchErrorsInternalError tally.Counter - fetchLatencyHistogram tally.Histogram - fetchNodesRespondingErrors []tally.Counter - fetchNodesRespondingBadRequestErrors []tally.Counter - topologyUpdatedSuccess tally.Counter - topologyUpdatedError tally.Counter - streamFromPeersMetrics map[shardMetricsKey]streamFromPeersMetrics + writeSuccess tally.Counter + writeSuccessForCountLeavingAndInitializingAsPair tally.Counter + writeErrorsBadRequest tally.Counter + writeErrorsInternalError tally.Counter + writeLatencyHistogram tally.Histogram + writeNodesRespondingErrors []tally.Counter + writeNodesRespondingBadRequestErrors []tally.Counter + fetchSuccess tally.Counter + fetchErrorsBadRequest tally.Counter + fetchErrorsInternalError tally.Counter + fetchLatencyHistogram tally.Histogram + fetchNodesRespondingErrors []tally.Counter + fetchNodesRespondingBadRequestErrors []tally.Counter + topologyUpdatedSuccess tally.Counter + topologyUpdatedError tally.Counter + streamFromPeersMetrics map[shardMetricsKey]streamFromPeersMetrics } func newSessionMetrics(scope tally.Scope) sessionMetrics { return sessionMetrics{ writeSuccess: scope.Counter("write.success"), + writeSuccessForCountLeavingAndInitializingAsPair: scope.Tagged(map[string]string{ + "success_type": "leaving_initializing_as_pair", + }).Counter("write.success"), writeErrorsBadRequest: scope.Tagged(map[string]string{ "error_type": "bad_request", }).Counter("write.errors"), @@ -315,9 +320,10 @@ func newSession(opts Options) (clientSession, error) { checkedBytes: opts.CheckedBytesPool(), id: opts.IdentifierPool(), }, - writeShardsInitializing: opts.WriteShardsInitializing(), - shardsLeavingCountTowardsConsistency: opts.ShardsLeavingCountTowardsConsistency(), - metrics: newSessionMetrics(scope), + writeShardsInitializing: opts.WriteShardsInitializing(), + shardsLeavingCountTowardsConsistency: opts.ShardsLeavingCountTowardsConsistency(), + shardsLeavingAndInitializingCountTowardsConsistency: opts.ShardsLeavingAndInitializingCountTowardsConsistency(), + metrics: newSessionMetrics(scope), } s.reattemptStreamBlocksFromPeersFn = s.streamBlocksReattemptFromPeers s.pickBestPeerFn = s.streamBlocksPickBestPeer @@ -478,7 +484,8 @@ func (s *session) newPeerMetadataStreamingProgressMetrics( return &m } -func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, start time.Time) { +func (s *session) recordWriteMetrics(consistencyResultErr error, state *writeState, start time.Time) { + respErrs := int32(len(state.errors)) if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 { if IsBadRequestError(consistencyResultErr) { s.metrics.writeNodesRespondingBadRequestErrors[idx].Inc(1) @@ -487,7 +494,12 @@ func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, } } if consistencyResultErr == nil { - s.metrics.writeSuccess.Inc(1) + + if state.leavingAndInitializingPairCounted { + s.metrics.writeSuccessForCountLeavingAndInitializingAsPair.Inc(1) + } else { + s.metrics.writeSuccess.Inc(1) + } } else if IsBadRequestError(consistencyResultErr) { s.metrics.writeErrorsBadRequest.Inc(1) } else { @@ -1310,8 +1322,7 @@ func (s *session) writeAttempt( err = s.writeConsistencyResult(state.consistencyLevel, majority, enqueued, enqueued-state.pending, int32(len(state.errors)), state.errors) - s.recordWriteMetrics(err, int32(len(state.errors)), startWriteAttempt) - + s.recordWriteMetrics(err, state, startWriteAttempt) // must Unlock before decRef'ing, as the latter releases the writeState back into a // pool if ref count == 0. state.Unlock() @@ -1404,10 +1415,11 @@ func (s *session) writeAttemptWithRLock( state := s.pools.writeState.Get() state.consistencyLevel = s.state.writeLevel state.shardsLeavingCountTowardsConsistency = s.shardsLeavingCountTowardsConsistency + state.shardsLeavingAndInitializingCountTowardsConsistency = s.shardsLeavingAndInitializingCountTowardsConsistency + state.leavingAndInitializingPairCounted = false state.topoMap = s.state.topoMap state.lastResetTime = time.Now() state.incRef() - // todo@bl: Can we combine the writeOpPool and the writeStatePool? state.op, state.majority = op, majority state.nsID, state.tsID, state.tagEncoder, state.annotation = nsID, tsID, tagEncoder, clonedAnnotation diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 78206bd158..a9e49c55b5 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -545,6 +545,14 @@ type Options interface { // that are leaving or not towards consistency level calculations. ShardsLeavingCountTowardsConsistency() bool + // SetShardsLeavingAndInitializingCountTowardsConsistency sets whether to count + // the writes to the shards that are leaving and initializing as pair towards consistency. + SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) Options + + // ShardsLeavingAndInitializingCountTowardsConsistency returns whether to count the writes to the shards + // that are leaving and initializing towards consistency level calculations. + ShardsLeavingAndInitializingCountTowardsConsistency() bool + // SetTagEncoderOptions sets the TagEncoderOptions. SetTagEncoderOptions(value serialize.TagEncoderOptions) Options diff --git a/src/dbnode/client/write_state.go b/src/dbnode/client/write_state.go index 5e3f670515..7512b59d56 100644 --- a/src/dbnode/client/write_state.go +++ b/src/dbnode/client/write_state.go @@ -37,6 +37,18 @@ import ( "go.uber.org/zap" ) +type countTowardsConsistency int64 + +const ( + undefinedCountTowardsConsistency countTowardsConsistency = iota + availableCountTowardsConsistency + leavingCountTowardsConsistency + initializingCountTowardsConsistency + shardLeavingIndividuallyCountTowardsConsistency + shardLeavingAsPairCountTowardsConsistency + shardInitializingAsPairCountTowardsConsistency +) + // writeOp represents a generic write operation type writeOp interface { op @@ -53,22 +65,24 @@ type writeState struct { sync.Mutex refCounter - consistencyLevel topology.ConsistencyLevel - shardsLeavingCountTowardsConsistency bool - topoMap topology.Map - op writeOp - nsID ident.ID - tsID ident.ID - tagEncoder serialize.TagEncoder - annotation checked.Bytes - majority, pending int32 - success int32 - errors []error - lastResetTime time.Time - - queues []hostQueue - tagEncoderPool serialize.TagEncoderPool - pool *writeStatePool + consistencyLevel topology.ConsistencyLevel + shardsLeavingCountTowardsConsistency bool + shardsLeavingAndInitializingCountTowardsConsistency bool + leavingAndInitializingPairCounted bool + topoMap topology.Map + hostSuccessList []string + op writeOp + nsID ident.ID + tsID ident.ID + tagEncoder serialize.TagEncoder + annotation checked.Bytes + majority, pending int32 + success int32 + errors []error + lastResetTime time.Time + queues []hostQueue + tagEncoderPool serialize.TagEncoderPool + pool *writeStatePool } func newWriteState( @@ -89,7 +103,11 @@ func (w *writeState) close() { w.nsID.Finalize() w.tsID.Finalize() - + var emptyString string + for i := range w.hostSuccessList { + w.hostSuccessList[i] = emptyString + } + w.hostSuccessList = w.hostSuccessList[:0] if w.annotation != nil { w.annotation.DecRef() w.annotation.Finalize() @@ -152,27 +170,53 @@ func (w *writeState) completionFn(result interface{}, err error) { errStr := "missing shard %d in host %s" wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) } else { - available := shardState == shard.Available - leaving := shardState == shard.Leaving - leavingAndShardsLeavingCountTowardsConsistency := leaving && - w.shardsLeavingCountTowardsConsistency - // NB(bl): Only count writes to available shards towards success. - // NB(r): If shard is leaving and configured to allow writes to leaving + // in below conditions we consider the write success + // 1. writes to available shards towards success. + // 2. If shard is leaving and configured to allow writes to leaving // shards to count towards consistency then allow that to count - // to success. - if !available && !leavingAndShardsLeavingCountTowardsConsistency { - var errStr string - switch shardState { - case shard.Initializing: - errStr = "shard %d in host %s is not available (initializing)" - case shard.Leaving: - errStr = "shard %d in host %s not available (leaving)" - default: - errStr = "shard %d in host %s not available (unknown state)" + // to success + // 3. If shardsLeavingAndInitializingCountTowardsConsistency flag is true then count the success on writing to both + // leaving and initializing as pair. + switch newCountTowardsConsistency(shardState, + w.shardsLeavingCountTowardsConsistency, + w.shardsLeavingAndInitializingCountTowardsConsistency) { + case availableCountTowardsConsistency: + w.success++ + case shardLeavingIndividuallyCountTowardsConsistency: + w.success++ + case shardLeavingAsPairCountTowardsConsistency: + // get the initializing host corresponding to the leaving host. + initializingHostID, ok := w.topoMap.LookupInitializingHostPair(hostID, w.op.ShardID()) + if !ok || initializingHostID == "" { + errStr := "no initializing host for shard id %d in host %s" + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + } else { + w.setHostSuccessListWithLock(hostID, initializingHostID) + } + case shardInitializingAsPairCountTowardsConsistency: + shard, err := hostShardSet.ShardSet().LookupShard(w.op.ShardID()) + if err != nil { + errStr := "no shard id %d in host %s" + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + } else { + // get the leaving host corresponding to the initializing host. + leavingHostID := shard.SourceID() + if leavingHostID == "" { + errStr := "no leaving host for shard id %d in host %s" + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + } else { + w.setHostSuccessListWithLock(hostID, leavingHostID) + } } + case leavingCountTowardsConsistency: + errStr := "shard %d in host %s not available (leaving)" + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + case initializingCountTowardsConsistency: + errStr := "shard %d in host %s is not available (initializing)" + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + default: + errStr := "shard %d in host %s not available (unknown state)" wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) - } else { - w.success++ } } @@ -199,6 +243,14 @@ func (w *writeState) completionFn(result interface{}, err error) { w.decRef() } +func (w *writeState) setHostSuccessListWithLock(hostID, pairedHostID string) { + if findHost(w.hostSuccessList, pairedHostID) { + w.success++ + w.leavingAndInitializingPairCounted = true + } + w.hostSuccessList = append(w.hostSuccessList, hostID) +} + type writeStatePool struct { pool pool.ObjectPool tagEncoderPool serialize.TagEncoderPool @@ -260,3 +312,44 @@ type maybeHostWriteError struct { // Error field is optionally set when there is actually an error. err error } + +func newCountTowardsConsistency( + shardState shard.State, + leavingCountsIndividually bool, + leavingAndInitializingCountsAsPair bool, +) countTowardsConsistency { + isAvailable := shardState == shard.Available + isLeaving := shardState == shard.Leaving + isInitializing := shardState == shard.Initializing + + if isAvailable { + return availableCountTowardsConsistency + } + if isLeaving && leavingCountsIndividually { + return shardLeavingIndividuallyCountTowardsConsistency + } + if isLeaving && leavingAndInitializingCountsAsPair { + return shardLeavingAsPairCountTowardsConsistency + } + if isInitializing && leavingAndInitializingCountsAsPair { + return shardInitializingAsPairCountTowardsConsistency + } + if isLeaving { + return leavingCountTowardsConsistency + } + if isInitializing { + return initializingCountTowardsConsistency + } + return undefinedCountTowardsConsistency +} + +func findHost(hostSuccessList []string, hostID string) bool { + // The reason for iterating over list(hostSuccessList) instead of taking map here is the slice performs better over + // the map for less than 10 datasets. + for _, val := range hostSuccessList { + if val == hostID { + return true + } + } + return false +} diff --git a/src/dbnode/client/write_test.go b/src/dbnode/client/write_test.go index e2df66c020..c55d6f411b 100644 --- a/src/dbnode/client/write_test.go +++ b/src/dbnode/client/write_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" tterrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors" + "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/x/checked" xerrors "github.com/m3db/m3/src/x/errors" @@ -41,9 +42,9 @@ import ( func testWriteSuccess(t *testing.T, state shard.State, success bool) { var writeWg sync.WaitGroup - wState, s, host := writeTestSetup(t, &writeWg) - setShardStates(t, s, host, state) - wState.completionFn(host, nil) + wState, s, hosts := writeTestSetup(t, &writeWg) + setShardStates(t, s, hosts[0], state) + wState.completionFn(hosts[0], nil) if success { assert.Equal(t, int32(1), wState.success) @@ -78,11 +79,11 @@ func retryabilityCheck(t *testing.T, wState *writeState, testFn errTestFn) { func simpleRetryableTest(t *testing.T, passedErr error, customHost topology.Host, testFn errTestFn) { var writeWg sync.WaitGroup - wState, _, host := writeTestSetup(t, &writeWg) + wState, _, hosts := writeTestSetup(t, &writeWg) if customHost != nil { - host = customHost + hosts[0] = customHost } - wState.completionFn(host, passedErr) + wState.completionFn(hosts[0], passedErr) retryabilityCheck(t, wState, testFn) writeTestTeardown(wState, &writeWg) } @@ -106,10 +107,10 @@ func TestBadHostID(t *testing.T) { func TestBadShardID(t *testing.T) { var writeWg sync.WaitGroup - wState, _, host := writeTestSetup(t, &writeWg) + wState, _, hosts := writeTestSetup(t, &writeWg) o := wState.op.(*writeOperation) o.shardID = writeOperationZeroed.shardID - wState.completionFn(host, nil) + wState.completionFn(hosts[0], nil) retryabilityCheck(t, wState, xerrors.IsRetryableError) writeTestTeardown(wState, &writeWg) } @@ -117,9 +118,9 @@ func TestBadShardID(t *testing.T) { func TestShardNotAvailable(t *testing.T) { var writeWg sync.WaitGroup - wState, s, host := writeTestSetup(t, &writeWg) - setShardStates(t, s, host, shard.Initializing) - wState.completionFn(host, nil) + wState, s, hosts := writeTestSetup(t, &writeWg) + setShardStates(t, s, hosts[0], shard.Initializing) + wState.completionFn(hosts[0], nil) retryabilityCheck(t, wState, xerrors.IsRetryableError) writeTestTeardown(wState, &writeWg) } @@ -127,14 +128,77 @@ func TestShardNotAvailable(t *testing.T) { func TestShardLeavingWithShardsLeavingCountTowardsConsistency(t *testing.T) { var writeWg sync.WaitGroup - wState, s, host := writeTestSetup(t, &writeWg) + wState, s, hosts := writeTestSetup(t, &writeWg) wState.shardsLeavingCountTowardsConsistency = true - setShardStates(t, s, host, shard.Leaving) - wState.completionFn(host, nil) + setShardStates(t, s, hosts[0], shard.Leaving) + wState.completionFn(hosts[0], nil) assert.Equal(t, int32(1), wState.success) writeTestTeardown(wState, &writeWg) } +func TestShardLeavingAndInitializingCountTowardsConsistencyWithTrueFlag(t *testing.T) { + var writeWg sync.WaitGroup + + wState, s, hosts := writeTestSetup(t, &writeWg) + + setupShardLeavingAndInitializingCountTowardsConsistency(t, wState, s, true) + wState.completionFn(hosts[1], nil) + wState.incRef() + assert.Equal(t, int32(0), wState.success) + wState.completionFn(hosts[0], nil) + assert.Equal(t, int32(1), wState.success) + writeTestTeardown(wState, &writeWg) +} + +func TestShardLeavingAndInitializingCountTowardsConsistencyWithFalseFlag(t *testing.T) { + var writeWg sync.WaitGroup + + wState, s, hosts := writeTestSetup(t, &writeWg) + + setupShardLeavingAndInitializingCountTowardsConsistency(t, wState, s, false) + wState.completionFn(hosts[1], nil) + wState.incRef() + wState.completionFn(hosts[0], nil) + assert.Equal(t, int32(0), wState.success) + writeTestTeardown(wState, &writeWg) +} + +func setupShardLeavingAndInitializingCountTowardsConsistency( + t *testing.T, + wState *writeState, + s *session, + leavingAndInitializingFlag bool) { + hostShardSets := []topology.HostShardSet{} + for _, host := range s.state.topoMap.Hosts() { + hostShard, _ := sharding.NewShardSet( + sharding.NewShards([]uint32{0, 1, 2}, shard.Available), + sharding.DefaultHashFn(3), + ) + hostShardSet := topology.NewHostShardSet(host, hostShard) + hostShardSets = append(hostShardSets, hostShardSet) + } + opts := topology.NewStaticOptions(). + SetShardSet(s.state.topoMap.ShardSet()). + SetReplicas(3). + SetHostShardSets(hostShardSets) + m := topology.NewStaticMap(opts) + s.state.topoMap = m + wState.topoMap = m // update topology with hostshards options + + // mark leaving shards in host0 and init in host1 + markHostReplacement(t, s, s.state.topoMap.Hosts()[0], s.state.topoMap.Hosts()[1]) + + opts = topology.NewStaticOptions(). + SetShardSet(s.state.topoMap.ShardSet()). + SetReplicas(3). + SetHostShardSets(hostShardSets) + m = topology.NewStaticMap(opts) + wState.topoMap = m + s.state.topoMap = m // update the topology manually after replace node. + + wState.shardsLeavingAndInitializingCountTowardsConsistency = leavingAndInitializingFlag +} + // utils func getWriteState(s *session, w writeStub) *writeState { @@ -169,13 +233,30 @@ func setShardStates(t *testing.T, s *session, host topology.Host, state shard.St } } +func markHostReplacement(t *testing.T, s *session, leavingHost topology.Host, initializingHost topology.Host) { + s.state.RLock() + leavingHostShardSet, ok := s.state.topoMap.LookupHostShardSet(leavingHost.ID()) + require.True(t, ok) + initializingHostShardSet, ok := s.state.topoMap.LookupHostShardSet(initializingHost.ID()) + s.state.RUnlock() + require.True(t, ok) + + for _, leavinghostShard := range leavingHostShardSet.ShardSet().All() { + leavinghostShard.SetState(shard.Leaving) + } + for _, initializinghostShard := range initializingHostShardSet.ShardSet().All() { + initializinghostShard.SetState(shard.Initializing) + initializinghostShard.SetSourceID(leavingHost.ID()) + } +} + type fakeHost struct{ id string } func (f fakeHost) ID() string { return f.id } func (f fakeHost) Address() string { return "" } func (f fakeHost) String() string { return "" } -func writeTestSetup(t *testing.T, writeWg *sync.WaitGroup) (*writeState, *session, topology.Host) { +func writeTestSetup(t *testing.T, writeWg *sync.WaitGroup) (*writeState, *session, []topology.Host) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -192,12 +273,11 @@ func writeTestSetup(t *testing.T, writeWg *sync.WaitGroup) (*writeState, *sessio require.NoError(t, s.Close()) }() - host := s.state.topoMap.Hosts()[0] // any host + hosts := s.state.topoMap.Hosts() wState := getWriteState(s, w) wState.incRef() // for the test wState.incRef() // allow introspection - // Begin write writeWg.Add(1) go func() { @@ -210,10 +290,10 @@ func writeTestSetup(t *testing.T, writeWg *sync.WaitGroup) (*writeState, *sessio enqueueWg.Wait() require.True(t, s.state.topoMap.Replicas() == sessionTestReplicas) for i := 0; i < s.state.topoMap.Replicas(); i++ { - completionFn(host, nil) // maintain session state + completionFn(hosts[i], nil) // maintain session state } - return wState, s, host + return wState, s, hosts } func writeTestTeardown(wState *writeState, writeWg *sync.WaitGroup) { diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 4c3fdba091..e9a6559088 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -121,19 +121,20 @@ func newMultiAddrAdminClient( // BootstrappableTestSetupOptions defines options for test setups. type BootstrappableTestSetupOptions struct { - FinalBootstrapper string - BootstrapBlocksBatchSize int - BootstrapBlocksConcurrency int - BootstrapConsistencyLevel topology.ReadConsistencyLevel - TopologyInitializer topology.Initializer - TestStatsReporter xmetrics.TestStatsReporter - DisableCommitLogBootstrapper bool - DisablePeersBootstrapper bool - UseTChannelClientForWriting bool - EnableRepairs bool - ForceRepairs bool - RepairType repair.Type - AdminClientCustomOpts []client.CustomAdminOption + FinalBootstrapper string + BootstrapBlocksBatchSize int + BootstrapBlocksConcurrency int + BootstrapConsistencyLevel topology.ReadConsistencyLevel + TopologyInitializer topology.Initializer + TestStatsReporter xmetrics.TestStatsReporter + DisableCommitLogBootstrapper bool + DisablePeersBootstrapper bool + UseTChannelClientForWriting bool + EnableRepairs bool + ForceRepairs bool + RepairType repair.Type + AdminClientCustomOpts []client.CustomAdminOption + ShardsLeavingAndInitializingCountTowardsConsistency bool } type closeFn func() @@ -171,22 +172,24 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo require.NoError(t, err) for i := 0; i < replicas; i++ { var ( - instance = i - usingCommitLogBootstrapper = !setupOpts[i].DisableCommitLogBootstrapper - usingPeersBootstrapper = !setupOpts[i].DisablePeersBootstrapper - finalBootstrapperToUse = setupOpts[i].FinalBootstrapper - useTChannelClientForWriting = setupOpts[i].UseTChannelClientForWriting - bootstrapBlocksBatchSize = setupOpts[i].BootstrapBlocksBatchSize - bootstrapBlocksConcurrency = setupOpts[i].BootstrapBlocksConcurrency - bootstrapConsistencyLevel = setupOpts[i].BootstrapConsistencyLevel - topologyInitializer = setupOpts[i].TopologyInitializer - testStatsReporter = setupOpts[i].TestStatsReporter - enableRepairs = setupOpts[i].EnableRepairs - forceRepairs = setupOpts[i].ForceRepairs - repairType = setupOpts[i].RepairType - origin topology.Host - instanceOpts = newMultiAddrTestOptions(opts, instance) - adminClientCustomOpts = setupOpts[i].AdminClientCustomOpts + instance = i + usingCommitLogBootstrapper = !setupOpts[i].DisableCommitLogBootstrapper + usingPeersBootstrapper = !setupOpts[i].DisablePeersBootstrapper + finalBootstrapperToUse = setupOpts[i].FinalBootstrapper + useTChannelClientForWriting = setupOpts[i].UseTChannelClientForWriting + bootstrapBlocksBatchSize = setupOpts[i].BootstrapBlocksBatchSize + bootstrapBlocksConcurrency = setupOpts[i].BootstrapBlocksConcurrency + bootstrapConsistencyLevel = setupOpts[i].BootstrapConsistencyLevel + topologyInitializer = setupOpts[i].TopologyInitializer + testStatsReporter = setupOpts[i].TestStatsReporter + enableRepairs = setupOpts[i].EnableRepairs + forceRepairs = setupOpts[i].ForceRepairs + repairType = setupOpts[i].RepairType + origin topology.Host + instanceOpts = newMultiAddrTestOptions(opts, instance) + adminClientCustomOpts = setupOpts[i].AdminClientCustomOpts + shardsLeavingAndInitializingCountTowardsConsistency = setupOpts[i]. + ShardsLeavingAndInitializingCountTowardsConsistency ) if finalBootstrapperToUse == "" { @@ -220,7 +223,8 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo instanceOpts = instanceOpts. SetClusterDatabaseTopologyInitializer(topologyInitializer). - SetUseTChannelClientForWriting(useTChannelClientForWriting) + SetUseTChannelClientForWriting(useTChannelClientForWriting). + SetShardsLeavingAndInitializingCountTowardsConsistency(shardsLeavingAndInitializingCountTowardsConsistency) if i > 0 { // NB(bodu): Need to reset the global counter of number of index @@ -240,7 +244,6 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo instrumentOpts = instrumentOpts.SetMetricsScope(scope) } setup.SetStorageOpts(setup.StorageOpts().SetInstrumentOptions(instrumentOpts)) - var ( bsOpts = newDefaulTestResultOptions(setup.StorageOpts()) finalBootstrapper bootstrap.BootstrapperProvider @@ -278,6 +281,7 @@ func NewDefaultBootstrappableTestSetups( // nolint:gocyclo adminOpts = adminOpts.SetFetchSeriesBlocksBatchConcurrency(bootstrapBlocksConcurrency) } adminOpts = adminOpts.SetStreamBlocksRetrier(retrier) + adminOpts.SetShardsLeavingAndInitializingCountTowardsConsistency(shardsLeavingAndInitializingCountTowardsConsistency) adminClient := newMultiAddrAdminClient( t, adminOpts, topologyInitializer, origin, instrumentOpts, adminClientCustomOpts...) diff --git a/src/dbnode/integration/options.go b/src/dbnode/integration/options.go index 8ae1407110..09f1f3a187 100644 --- a/src/dbnode/integration/options.go +++ b/src/dbnode/integration/options.go @@ -261,6 +261,14 @@ type TestOptions interface { // SetWriteConsistencyLevel sets the consistency level for writing with the m3db client. SetWriteConsistencyLevel(value topology.ConsistencyLevel) TestOptions + // SetShardsLeavingAndInitializingCountTowardsConsistency sets ShardsLeavingAndInitializingCountTowardsConsistency + // to true if we count the writes to the shards that are leaving and initializing towards consistency. + SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) TestOptions + + // ShardsLeavingAndInitializingCountTowardsConsistency returns whether to count the writes to the shards + // that are leaving and initializing towards consistency level calculations. + ShardsLeavingAndInitializingCountTowardsConsistency() bool + // NumShards returns the number of shards to use. NumShards() int @@ -330,42 +338,43 @@ type TestOptions interface { } type options struct { - namespaces []namespace.Metadata - nsInitializer namespace.Initializer - id string - tickMinimumInterval time.Duration - tickCancellationCheckInterval time.Duration - httpClusterAddr string - tchannelClusterAddr string - httpNodeAddr string - tchannelNodeAddr string - httpDebugAddr string - filePathPrefix string - serverStateChangeTimeout time.Duration - clusterConnectionTimeout time.Duration - readRequestTimeout time.Duration - writeRequestTimeout time.Duration - truncateRequestTimeout time.Duration - fetchRequestTimeout time.Duration - workerPoolSize int - clusterDatabaseTopologyInitializer topology.Initializer - blockRetrieverManager block.DatabaseBlockRetrieverManager - verifySeriesDebugFilePathPrefix string - writeConsistencyLevel topology.ConsistencyLevel - numShards int - shardSetOptions *TestShardSetOptions - maxWiredBlocks uint - customClientAdminOptions []client.CustomAdminOption - useTChannelClientForReading bool - useTChannelClientForWriting bool - useTChannelClientForTruncation bool - writeNewSeriesAsync bool - protoEncoding bool - assertEqual assertTestDataEqual - nowFn func() time.Time - reportInterval time.Duration - storageOptsFn StorageOption - customAdminOpts []client.CustomAdminOption + namespaces []namespace.Metadata + nsInitializer namespace.Initializer + id string + tickMinimumInterval time.Duration + tickCancellationCheckInterval time.Duration + httpClusterAddr string + tchannelClusterAddr string + httpNodeAddr string + tchannelNodeAddr string + httpDebugAddr string + filePathPrefix string + serverStateChangeTimeout time.Duration + clusterConnectionTimeout time.Duration + readRequestTimeout time.Duration + writeRequestTimeout time.Duration + truncateRequestTimeout time.Duration + fetchRequestTimeout time.Duration + workerPoolSize int + clusterDatabaseTopologyInitializer topology.Initializer + blockRetrieverManager block.DatabaseBlockRetrieverManager + verifySeriesDebugFilePathPrefix string + writeConsistencyLevel topology.ConsistencyLevel + numShards int + shardSetOptions *TestShardSetOptions + maxWiredBlocks uint + customClientAdminOptions []client.CustomAdminOption + useTChannelClientForReading bool + useTChannelClientForWriting bool + useTChannelClientForTruncation bool + writeNewSeriesAsync bool + protoEncoding bool + shardLeavingAndInitializingCountsTowardConsistency bool + assertEqual assertTestDataEqual + nowFn func() time.Time + reportInterval time.Duration + storageOptsFn StorageOption + customAdminOpts []client.CustomAdminOption } // NewTestOptions returns a new set of integration test options. @@ -657,6 +666,16 @@ func (o *options) SetWriteConsistencyLevel(cLevel topology.ConsistencyLevel) Tes return &opts } +func (o *options) SetShardsLeavingAndInitializingCountTowardsConsistency(value bool) TestOptions { + opts := *o + opts.shardLeavingAndInitializingCountsTowardConsistency = value + return &opts +} + +func (o *options) ShardsLeavingAndInitializingCountTowardsConsistency() bool { + return o.shardLeavingAndInitializingCountsTowardConsistency +} + func (o *options) NumShards() int { return o.numShards } diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index b0f7da8684..e191199cf6 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -1138,7 +1138,8 @@ func newClients( SetWriteConsistencyLevel(opts.WriteConsistencyLevel()). SetTopologyInitializer(topoInit). SetUseV2BatchAPIs(true). - SetInstrumentOptions(instrumentOpts) + SetInstrumentOptions(instrumentOpts). + SetShardsLeavingAndInitializingCountTowardsConsistency(opts.ShardsLeavingAndInitializingCountTowardsConsistency()) origin = newOrigin(id, tchannelNodeAddr) verificationOrigin = newOrigin(id+"-verification", tchannelNodeAddr) diff --git a/src/dbnode/integration/write_quorum_test.go b/src/dbnode/integration/write_quorum_test.go index f5074b7872..98e589570a 100644 --- a/src/dbnode/integration/write_quorum_test.go +++ b/src/dbnode/integration/write_quorum_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration // Copyright (c) 2016 Uber Technologies, Inc. @@ -53,7 +54,7 @@ func TestNormalQuorumOnlyOneUp(t *testing.T) { node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -79,7 +80,7 @@ func TestNormalQuorumOnlyTwoUp(t *testing.T) { node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -107,7 +108,7 @@ func TestNormalQuorumAllUp(t *testing.T) { node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -138,7 +139,7 @@ func TestAddNodeQuorumOnlyLeavingInitializingUp(t *testing.T) { node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 3, newClusterShardsRange(minShard, maxShard, shard.Initializing)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -168,7 +169,7 @@ func TestAddNodeQuorumOnlyOneNormalAndLeavingInitializingUp(t *testing.T) { node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 3, newClusterShardsRange(minShard, maxShard, shard.Initializing)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -184,6 +185,91 @@ func TestAddNodeQuorumOnlyOneNormalAndLeavingInitializingUp(t *testing.T) { assert.Error(t, testWrite(topology.ConsistencyLevelAll)) } +func TestReplaceNodeWithShardsLeavingAndInitializingCountTowardsConsistencySet(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + numShards := defaultNumShards + minShard := uint32(0) + maxShard := uint32(numShards - 1) + + initShards := newClusterShardsRange(minShard, maxShard, shard.Initializing) + for i := minShard; i < maxShard; i++ { + shard, _ := initShards.Shard(i) + shard.SetSourceID("testhost0") + } + + // nodes = m3db nodes + nodes, closeFunc, testWrite := makeTestWrite(t, numShards, []services.ServiceInstance{ + node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Leaving)), + node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), + node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), + node(t, 3, initShards), + }, true) + defer closeFunc() + + require.NoError(t, nodes[0].StartServer()) + defer func() { require.NoError(t, nodes[0].StopServer()) }() + require.NoError(t, nodes[1].StartServer()) + defer func() { require.NoError(t, nodes[1].StopServer()) }() + require.NoError(t, nodes[3].StartServerDontWaitBootstrap()) + defer func() { require.NoError(t, nodes[3].StopServer()) }() + + // Writes succeed to one available node and on both leaving and initializing node. + assert.NoError(t, testWrite(topology.ConsistencyLevelOne)) + assert.NoError(t, testWrite(topology.ConsistencyLevelMajority)) + assert.Error(t, testWrite(topology.ConsistencyLevelAll)) +} + +func TestMultipleReplaceNodeWithShardsLeavingAndInitializingCountTowardsConsistencySet(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + numShards := defaultNumShards + minShard := uint32(0) + maxShard := uint32(numShards - 1) + + // 1st replace with testhost0 as source node. + initShards1 := newClusterShardsRange(minShard, maxShard, shard.Initializing) + for i := minShard; i < maxShard; i++ { + shard, _ := initShards1.Shard(i) + shard.SetSourceID("testhost0") + } + + // 2nd replace with testhost1 as source node. + initShards2 := newClusterShardsRange(minShard, maxShard, shard.Initializing) + for i := minShard; i < maxShard; i++ { + shard, _ := initShards2.Shard(i) + shard.SetSourceID("testhost1") + } + + // nodes = m3db nodes + nodes, closeFunc, testWrite := makeTestWrite(t, numShards, []services.ServiceInstance{ + node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Leaving)), + node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Leaving)), + node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), + node(t, 3, initShards1), + node(t, 4, initShards2), + }, true) + defer closeFunc() + + require.NoError(t, nodes[0].StartServer()) + defer func() { require.NoError(t, nodes[0].StopServer()) }() + require.NoError(t, nodes[1].StartServer()) + defer func() { require.NoError(t, nodes[1].StopServer()) }() + require.NoError(t, nodes[3].StartServerDontWaitBootstrap()) + defer func() { require.NoError(t, nodes[3].StopServer()) }() + require.NoError(t, nodes[4].StartServerDontWaitBootstrap()) + defer func() { require.NoError(t, nodes[4].StopServer()) }() + + // Writes succeed to both leaving and initializing pairs. + assert.NoError(t, testWrite(topology.ConsistencyLevelOne)) + assert.NoError(t, testWrite(topology.ConsistencyLevelMajority)) + assert.Error(t, testWrite(topology.ConsistencyLevelAll)) +} + func TestAddNodeQuorumAllUp(t *testing.T) { if testing.Short() { t.SkipNow() @@ -199,7 +285,7 @@ func TestAddNodeQuorumAllUp(t *testing.T) { node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)), node(t, 3, newClusterShardsRange(minShard, maxShard, shard.Initializing)), - }) + }, false) defer closeFn() require.NoError(t, nodes[0].StartServer()) @@ -223,6 +309,7 @@ func makeTestWrite( t *testing.T, numShards int, instances []services.ServiceInstance, + isShardsLeavingAndInitializingCountTowardsConsistency bool, ) (testSetups, closeFn, testWriteFn) { nsOpts := namespace.NewOptions() md, err := namespace.NewMetadata(testNamespaces[0], @@ -245,7 +332,8 @@ func makeTestWrite( SetClusterConnectConsistencyLevel(topology.ConnectConsistencyLevelNone). SetClusterConnectTimeout(2 * time.Second). SetWriteRequestTimeout(2 * time.Second). - SetTopologyInitializer(topoInit) + SetTopologyInitializer(topoInit). + SetShardsLeavingAndInitializingCountTowardsConsistency(isShardsLeavingAndInitializingCountTowardsConsistency) testWrite := func(cLevel topology.ConsistencyLevel) error { clientopts = clientopts.SetWriteConsistencyLevel(cLevel) diff --git a/src/dbnode/sharding/shardset.go b/src/dbnode/sharding/shardset.go index f12d8f4c4c..8d40430609 100644 --- a/src/dbnode/sharding/shardset.go +++ b/src/dbnode/sharding/shardset.go @@ -76,6 +76,14 @@ func (s *shardSet) Lookup(identifier ident.ID) uint32 { return s.fn(identifier) } +func (s *shardSet) LookupShard(shardID uint32) (shard.Shard, error) { + hostShard, ok := s.shardMap[shardID] + if !ok { + return nil, ErrInvalidShardID + } + return hostShard, nil +} + func (s *shardSet) LookupStateByID(shardID uint32) (shard.State, error) { hostShard, ok := s.shardMap[shardID] if !ok { diff --git a/src/dbnode/sharding/types.go b/src/dbnode/sharding/types.go index d7babd5ae5..1e5465252b 100644 --- a/src/dbnode/sharding/types.go +++ b/src/dbnode/sharding/types.go @@ -43,6 +43,9 @@ type ShardSet interface { // Lookup will return a shard for a given identifier. Lookup(id ident.ID) uint32 + // LookupShard will return a shard for a given shard id. + LookupShard(id uint32) (shard.Shard, error) + // LookupStateByID returns the state of the shard with a given ID. LookupStateByID(shardID uint32) (shard.State, error) diff --git a/src/dbnode/storage/bootstrap/process_test.go b/src/dbnode/storage/bootstrap/process_test.go index 330437079c..41ed901dec 100644 --- a/src/dbnode/storage/bootstrap/process_test.go +++ b/src/dbnode/storage/bootstrap/process_test.go @@ -116,7 +116,6 @@ func TestBootstrapProcessRunActiveBlockAdvanced(t *testing.T) { SetShardSet(shardSet). SetHostShardSets([]topology.HostShardSet{hostShardSet}) topoMap := topology.NewStaticMap(topoMapOpts) - topoState, err := newInitialTopologyState(origin, topoMap) require.NoError(t, err) diff --git a/src/dbnode/topology/map.go b/src/dbnode/topology/map.go index 31229c0f45..00d5fcdf04 100644 --- a/src/dbnode/topology/map.go +++ b/src/dbnode/topology/map.go @@ -34,6 +34,7 @@ type staticMap struct { orderedHosts []Host hostsByShard [][]Host orderedShardHostsByShard [][]orderedShardHost + initializingHostMap map[string]map[uint32]string // it stores {leavingHostID : {shardID : initializingHostID}} replicas int majority int } @@ -57,6 +58,7 @@ func NewStaticMap(opts StaticOptions) Map { host := hostShardSet.Host() topoMap.hostShardSetsByID[host.ID()] = hostShardSet topoMap.orderedHosts = append(topoMap.orderedHosts, host) + for _, shard := range hostShardSet.ShardSet().All() { id := shard.ID() topoMap.hostsByShard[id] = append(topoMap.hostsByShard[id], host) @@ -69,7 +71,22 @@ func NewStaticMap(opts StaticOptions) Map { append(topoMap.orderedShardHostsByShard[id], elem) } } - + for _, hostShardSet := range hostShardSets { + host := hostShardSet.Host() + var shardToInitializingHost map[uint32]string + for _, shard := range hostShardSet.ShardSet().All() { + if shard.SourceID() != "" { + if topoMap.initializingHostMap == nil { + topoMap.initializingHostMap = make(map[string]map[uint32]string) + } + if shardToInitializingHost == nil { + shardToInitializingHost = make(map[uint32]string) + } + shardToInitializingHost[shard.ID()] = host.ID() + topoMap.initializingHostMap[shard.SourceID()] = shardToInitializingHost + } + } + } return &topoMap } @@ -92,6 +109,15 @@ func (t *staticMap) LookupHostShardSet(id string) (HostShardSet, bool) { return value, ok } +func (t *staticMap) LookupInitializingHostPair(leavingHostID string, id uint32) (string, bool) { + value, ok := t.initializingHostMap[leavingHostID] + if !ok { + return "", false + } + initializingHost, ok := value[id] + return initializingHost, ok +} + func (t *staticMap) HostsLen() int { return len(t.orderedHosts) } diff --git a/src/dbnode/topology/map_test.go b/src/dbnode/topology/map_test.go index fdc78241a5..47be5cba54 100644 --- a/src/dbnode/topology/map_test.go +++ b/src/dbnode/topology/map_test.go @@ -104,7 +104,6 @@ func TestStaticMap(t *testing.T) { SetHostShardSets(hostShardSets) m := NewStaticMap(opts) - require.Equal(t, 4, len(m.Hosts())) require.Equal(t, 4, m.HostsLen()) for i, h := range hosts { diff --git a/src/dbnode/topology/topology_mock.go b/src/dbnode/topology/topology_mock.go index 29a8501abe..aee9b8a5dd 100644 --- a/src/dbnode/topology/topology_mock.go +++ b/src/dbnode/topology/topology_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: ../../topology/types.go -// Copyright (c) 2022 Uber Technologies, Inc. +// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -495,6 +495,21 @@ func (mr *MockMapMockRecorder) LookupHostShardSet(hostID interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LookupHostShardSet", reflect.TypeOf((*MockMap)(nil).LookupHostShardSet), hostID) } +// LookupInitializingHostPair mocks base method. +func (m *MockMap) LookupInitializingHostPair(leavingHostID string, id uint32) (string, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LookupInitializingHostPair", leavingHostID, id) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// LookupInitializingHostPair indicates an expected call of LookupInitializingHostPair. +func (mr *MockMapMockRecorder) LookupInitializingHostPair(leavingHostID, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LookupInitializingHostPair", reflect.TypeOf((*MockMap)(nil).LookupInitializingHostPair), leavingHostID, id) +} + // MajorityReplicas mocks base method. func (m *MockMap) MajorityReplicas() int { m.ctrl.T.Helper() diff --git a/src/dbnode/topology/types.go b/src/dbnode/topology/types.go index 1756b93bbd..4f9406949a 100644 --- a/src/dbnode/topology/types.go +++ b/src/dbnode/topology/types.go @@ -106,6 +106,9 @@ type Map interface { // LookupHostShardSet returns a HostShardSet for a host in the map LookupHostShardSet(hostID string) (HostShardSet, bool) + // LookupInitializingHostPair returns the initializing host bootstrapping from specific shard from leaving host. + LookupInitializingHostPair(leavingHostID string, id uint32) (string, bool) + // HostsLen returns the length of all hosts in the map HostsLen() int diff --git a/src/x/generated/proto/test/test.pb.go b/src/x/generated/proto/test/test.pb.go index 164c137a07..0801f51722 100644 --- a/src/x/generated/proto/test/test.pb.go +++ b/src/x/generated/proto/test/test.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/m3db/m3/src/x/generated/proto/test/test.proto -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal