From 138acf2aafdb3a885f891ea93ce7db9a2a1f8616 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 1 Dec 2024 13:41:01 +0100 Subject: [PATCH] Use maxRequestSizeBytes for batch limiting Signed-off-by: Marek Siarkowicz --- server/config/config.go | 4 +-- server/etcdserver/api/v3rpc/grpc.go | 2 +- server/etcdserver/api/v3rpc/watch.go | 10 +++--- server/etcdserver/api/v3rpc/watch_test.go | 2 +- server/etcdserver/corrupt_test.go | 2 +- server/etcdserver/server.go | 9 +++-- server/etcdserver/server_test.go | 8 ++--- server/storage/mvcc/kv_test.go | 2 +- server/storage/mvcc/watchable_store.go | 25 ++++++++----- .../mvcc/watchable_store_bench_test.go | 10 +++--- server/storage/mvcc/watchable_store_test.go | 36 +++++++++---------- server/storage/mvcc/watcher_bench_test.go | 2 +- server/storage/mvcc/watcher_group.go | 21 ++++++----- server/storage/mvcc/watcher_test.go | 18 +++++----- 14 files changed, 80 insertions(+), 71 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index b4a8f61a575b..2229ca5cb588 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -363,6 +363,6 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration { func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) } -func (c *ServerConfig) MaxRequestBytesWithOverhead() uint { - return c.MaxRequestBytes + grpcOverheadBytes +func (c *ServerConfig) MaxRequestBytesWithOverhead() int { + return int(c.MaxRequestBytes) + grpcOverheadBytes } diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 329492078055..044666d6eb38 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -61,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...)) - opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead()))) + opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRequestBytesWithOverhead())) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index b0a7e4a1926a..a33250a40f36 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -43,7 +43,7 @@ type watchServer struct { clusterID int64 memberID int64 - maxRequestBytes uint + maxRequestBytes int sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -126,7 +126,7 @@ type serverWatchStream struct { clusterID int64 memberID int64 - maxRequestBytes uint + maxRequestBytes int sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool { func sendFragments( wr *pb.WatchResponse, - maxRequestBytes uint, + maxRequestBytes int, sendFunc func(*pb.WatchResponse) error, ) error { // no need to fragment if total request size is smaller // than max request limit or response contains only one event - if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 { + if wr.Size() < maxRequestBytes || len(wr.Events) < 2 { return sendFunc(wr) } @@ -562,7 +562,7 @@ func sendFragments( cur := ow for _, ev := range wr.Events[idx:] { cur.Events = append(cur.Events, ev) - if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes { + if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes { cur.Events = cur.Events[:len(cur.Events)-1] break } diff --git a/server/etcdserver/api/v3rpc/watch_test.go b/server/etcdserver/api/v3rpc/watch_test.go index caa86f91ad71..e7868ddf8d25 100644 --- a/server/etcdserver/api/v3rpc/watch_test.go +++ b/server/etcdserver/api/v3rpc/watch_test.go @@ -27,7 +27,7 @@ import ( func TestSendFragment(t *testing.T) { tt := []struct { wr *pb.WatchResponse - maxRequestBytes uint + maxRequestBytes int fragments int werr error }{ diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 6001542d3481..fe1f192146ad 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -516,7 +516,7 @@ func TestHashKVHandler(t *testing.T) { etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID)) be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) - etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, etcdSrv.kv.Close()) }() diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2677e929f982..824b6328be86 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -368,9 +368,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } - mvccStoreConfig := mvcc.StoreConfig{ - CompactionBatchLimit: cfg.CompactionBatchLimit, - CompactionSleepInterval: cfg.CompactionSleepInterval, + mvccStoreConfig := mvcc.WatchableStoreConfig{ + StoreConfig: mvcc.StoreConfig{ + CompactionBatchLimit: cfg.CompactionBatchLimit, + CompactionSleepInterval: cfg.CompactionSleepInterval, + }, + WatchBatchMaxSize: cfg.MaxRequestBytesWithOverhead(), } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 265bce38f565..a804bb061465 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -652,7 +652,7 @@ func TestSnapshotDisk(t *testing.T) { v2store: st, consistIndex: cindex.NewConsistentIndex(be), } - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, srv.kv.Close()) }() @@ -703,7 +703,7 @@ func TestSnapshotMemory(t *testing.T) { v2store: st, consistIndex: cindex.NewConsistentIndex(be), } - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) defer func() { assert.NoError(t, srv.kv.Close()) }() @@ -775,7 +775,7 @@ func TestSnapshotOrdering(t *testing.T) { beHooks: serverstorage.NewBackendHooks(lg, ci), } - s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) s.be = be s.start() @@ -869,7 +869,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1), } - s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{}) s.be = be s.start() diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af77..6296b9c375b4 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 87991d31d7e1..c4dffde3e9fd 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -50,6 +50,7 @@ type watchable interface { type watchableStore struct { *store + watchBatchMaxSize int // mu protects watcher groups and batches. It should never be locked // before locking store.mu to avoid deadlock. @@ -76,7 +77,7 @@ var _ WatchableKV = (*watchableStore)(nil) // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore { s := newWatchableStore(lg, b, le, cfg) s.wg.Add(2) go s.syncWatchersLoop() @@ -84,16 +85,22 @@ func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *w return s } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { +type WatchableStoreConfig struct { + StoreConfig + WatchBatchMaxSize int +} + +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, cfg), - victimc: make(chan struct{}, 1), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), + store: NewStore(lg, b, le, cfg.StoreConfig), + victimc: make(chan struct{}, 1), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + stopc: make(chan struct{}), + watchBatchMaxSize: cfg.WatchBatchMaxSize, } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} @@ -373,7 +380,7 @@ func (s *watchableStore) syncWatchers() int { tx.RUnlock() victims := make(watcherBatch) - wb := newWatcherBatch(wg, evs) + wb := newWatcherBatch(wg, evs, s.watchBatchMaxSize) for w := range wg.watchers { if w.minRev < compactionRev { // Skip the watcher that failed to send compacted watch response due to w.ch is full. @@ -449,7 +456,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { victim := make(watcherBatch) - for w, eb := range newWatcherBatch(&s.synced, evs) { + for w, eb := range newWatcherBatch(&s.synced, evs, s.watchBatchMaxSize) { if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { pendingEventsGauge.Add(float64(len(eb.evs))) } else { diff --git a/server/storage/mvcc/watchable_store_bench_test.go b/server/storage/mvcc/watchable_store_bench_test.go index c8990576b30a..07bde5859441 100644 --- a/server/storage/mvcc/watchable_store_bench_test.go +++ b/server/storage/mvcc/watchable_store_bench_test.go @@ -27,7 +27,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) // arbitrary number of bytes @@ -47,7 +47,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkWatchableStoreTxnPut(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) // arbitrary number of bytes @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) k := []byte("testkey") @@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(ws, be) @@ -164,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, be) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index 0ae35fea1f72..df21779065e4 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -33,7 +33,7 @@ import ( func TestWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -52,7 +52,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -81,7 +81,7 @@ func TestCancelUnsynced(t *testing.T) { // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) // Put a key so that we can spawn watchers on that key. @@ -125,7 +125,7 @@ func TestCancelUnsynced(t *testing.T) { // and moves these watchers to synced. func TestSyncWatchers(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -167,7 +167,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -205,7 +205,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) lg := zaptest.NewLogger(t) - s := New(lg, b, &lease.FakeLessor{}, StoreConfig{}) + s := New(lg, b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { cleanup(s, b) @@ -259,7 +259,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -298,7 +298,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -344,11 +344,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, _ := betesting.NewDefaultTmpBackend(t) - s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{}) + s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s1, b1) b2, _ := betesting.NewDefaultTmpBackend(t) - s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{}) + s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s2, b2) testKey, testValue := []byte("foo"), []byte("bar") @@ -451,13 +451,9 @@ func TestWatchBatchUnsynced(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - oldMaxRevs := watchBatchMaxSize - defer func() { - watchBatchMaxSize = oldMaxRevs - cleanup(s, b) - }() - watchBatchMaxSize = tc.watchBatchMaxSize + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{ + WatchBatchMaxSize: tc.watchBatchMaxSize, + }) k := []byte("k") eventProtoOverhead := 13 @@ -576,7 +572,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { wg.add(w) } - gwe := newWatcherBatch(&wg, tt.evs) + gwe := newWatcherBatch(&wg, tt.evs, 0) if len(gwe) != len(tt.wwe) { t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe)) } @@ -598,7 +594,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { cleanup(s, b) @@ -675,7 +671,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) testKey, testValue := []byte("foo"), []byte("bar") diff --git a/server/storage/mvcc/watcher_bench_test.go b/server/storage/mvcc/watcher_bench_test.go index 3d0dccea3421..170075ab2a92 100644 --- a/server/storage/mvcc/watcher_bench_test.go +++ b/server/storage/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(watchable, be) diff --git a/server/storage/mvcc/watcher_group.go b/server/storage/mvcc/watcher_group.go index e3e7110d8ada..6db755a497e2 100644 --- a/server/storage/mvcc/watcher_group.go +++ b/server/storage/mvcc/watcher_group.go @@ -22,10 +22,11 @@ import ( "go.etcd.io/etcd/pkg/v3/adt" ) -// watchBatchMaxSize is the maximum distinct revisions that -// may be sent to an unsynced watcher at a time. Declared as -// var instead of const for testing purposes. -var watchBatchMaxSize = 2 * 1024 * 1024 +func newEventBatch(watchBatchMaxSize int) *eventBatch { + return &eventBatch{ + watchBatchMaxSize: watchBatchMaxSize, + } +} type eventBatch struct { // evs is a batch of revision-ordered events @@ -34,6 +35,8 @@ type eventBatch struct { evsSize int // moreRev is first revision with more events following this batch moreRev int64 + // watchBatchMaxSize is maximum size of batch + watchBatchMaxSize int } func (eb *eventBatch) add(ev mvccpb.Event) { @@ -54,7 +57,7 @@ func (eb *eventBatch) add(ev mvccpb.Event) { } size := ev.Size() - if eb.evsSize+size > watchBatchMaxSize { + if eb.watchBatchMaxSize != 0 && eb.evsSize+size > eb.watchBatchMaxSize { eb.moreRev = ev.Kv.ModRevision return } @@ -64,10 +67,10 @@ func (eb *eventBatch) add(ev mvccpb.Event) { type watcherBatch map[*watcher]*eventBatch -func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { +func (wb watcherBatch) add(w *watcher, ev mvccpb.Event, watchBatchMaxSize int) { eb := wb[w] if eb == nil { - eb = &eventBatch{} + eb = newEventBatch(watchBatchMaxSize) wb[w] = eb } eb.add(ev) @@ -75,7 +78,7 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { // newWatcherBatch maps watchers to their matched events. It enables quick // events look up by watcher. -func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { +func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event, watchBatchMaxSize int) watcherBatch { if len(wg.watchers) == 0 { return nil } @@ -85,7 +88,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { if ev.Kv.ModRevision >= w.minRev { // don't double notify - wb.add(w, ev) + wb.add(w, ev, watchBatchMaxSize) } } } diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index 0c1fa5212670..37a7f85bdfe7 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -35,7 +35,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -85,7 +85,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -122,7 +122,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -196,7 +196,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -216,7 +216,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer func() { b.Close() @@ -256,7 +256,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -293,7 +293,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { // report its correct progress. func TestWatcherRequestProgress(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) @@ -336,7 +336,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherRequestProgressAll(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) @@ -375,7 +375,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream()