Skip to content

Commit

Permalink
reduce traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Jan 28, 2025
1 parent 73bdd70 commit 7bc4c60
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
29 changes: 23 additions & 6 deletions pkg/api/metadata/cursorUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,35 @@ func (cu *CursorUpdater) start() {
case <-cu.ctx.Done():
return
case <-ticker.C:
err := cu.read()
updated, err := cu.read()
if err != nil {
//TODO proper error handling
return
}
cu.notifySubscribers()
if updated {
cu.notifySubscribers()
}
}
}
}

func (cu *CursorUpdater) read() error {
func equalCursors(a, b map[uint32]uint64) bool {
if len(a) != len(b) {
return false
}
for key, valA := range a {
if valB, ok := b[key]; !ok || valA != valB {
return false
}
}
return true
}

func (cu *CursorUpdater) read() (bool, error) {

rows, err := queries.New(cu.store).GetLatestCursor(cu.ctx)
if err != nil {
return err
return false, err
}

nodeIdToSequenceId := make(map[uint32]uint64)
Expand All @@ -67,9 +81,12 @@ func (cu *CursorUpdater) read() error {
cu.cursorMu.Lock()
defer cu.cursorMu.Unlock()

cu.cursor = nodeIdToSequenceId
if !equalCursors(cu.cursor, nodeIdToSequenceId) {
cu.cursor = nodeIdToSequenceId
return true, nil
}

return nil
return false, nil
}

func (cu *CursorUpdater) notifySubscribers() {
Expand Down
10 changes: 10 additions & 0 deletions pkg/api/metadata/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func (s *Service) SubscribeSyncCursor(
return status.Errorf(codes.Internal, "could not send header: %v", err)
}

// send the initial cursor
// the subscriber will only send a new message if there was a change
cursor := s.cu.GetCursor()
err = stream.Send(&metadata_api.GetSyncCursorResponse{
LatestSync: cursor,
})
if err != nil {
return status.Errorf(codes.Internal, "error sending cursor: %v", err)
}

clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())
updateChan := make(chan struct{}, 1)
s.cu.AddSubscriber(clientID, updateChan)
Expand Down

0 comments on commit 7bc4c60

Please sign in to comment.