From 7bc4c605f1daf297addd21fdee93b3e84b555581 Mon Sep 17 00:00:00 2001 From: Martin Kysel Date: Tue, 28 Jan 2025 13:21:10 -0500 Subject: [PATCH] reduce traffic --- pkg/api/metadata/cursorUpdater.go | 29 +++++++++++++++++++++++------ pkg/api/metadata/service.go | 10 ++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pkg/api/metadata/cursorUpdater.go b/pkg/api/metadata/cursorUpdater.go index d0d15b84..10cc0335 100644 --- a/pkg/api/metadata/cursorUpdater.go +++ b/pkg/api/metadata/cursorUpdater.go @@ -42,21 +42,35 @@ func (cu *CursorUpdater) start() { case <-cu.ctx.Done(): return case <-ticker.C: - err := cu.read() + updated, err := cu.read() if err != nil { //TODO proper error handling return } - cu.notifySubscribers() + if updated { + cu.notifySubscribers() + } } } } -func (cu *CursorUpdater) read() error { +func equalCursors(a, b map[uint32]uint64) bool { + if len(a) != len(b) { + return false + } + for key, valA := range a { + if valB, ok := b[key]; !ok || valA != valB { + return false + } + } + return true +} + +func (cu *CursorUpdater) read() (bool, error) { rows, err := queries.New(cu.store).GetLatestCursor(cu.ctx) if err != nil { - return err + return false, err } nodeIdToSequenceId := make(map[uint32]uint64) @@ -67,9 +81,12 @@ func (cu *CursorUpdater) read() error { cu.cursorMu.Lock() defer cu.cursorMu.Unlock() - cu.cursor = nodeIdToSequenceId + if !equalCursors(cu.cursor, nodeIdToSequenceId) { + cu.cursor = nodeIdToSequenceId + return true, nil + } - return nil + return false, nil } func (cu *CursorUpdater) notifySubscribers() { diff --git a/pkg/api/metadata/service.go b/pkg/api/metadata/service.go index c5395ea4..4ac3aa37 100644 --- a/pkg/api/metadata/service.go +++ b/pkg/api/metadata/service.go @@ -51,6 +51,16 @@ func (s *Service) SubscribeSyncCursor( return status.Errorf(codes.Internal, "could not send header: %v", err) } + // send the initial cursor + // the subscriber will only send a new message if there was a change + cursor := s.cu.GetCursor() + err = stream.Send(&metadata_api.GetSyncCursorResponse{ + LatestSync: cursor, + }) + if err != nil { + return status.Errorf(codes.Internal, "error sending cursor: %v", err) + } + clientID := fmt.Sprintf("client-%d", time.Now().UnixNano()) updateChan := make(chan struct{}, 1) s.cu.AddSubscriber(clientID, updateChan)