Skip to content

Commit

Permalink
Updated syncWait method to include syncDone channel
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Dec 4, 2023
1 parent 31b739a commit 14252e8
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type RealtimePresence struct {
internalMembers map[string]*PresenceMessage // RTP17
beforeSyncMembers map[string]*PresenceMessage
state PresenceAction
syncMtx sync.Mutex
syncState syncState
queue *msgQueue
syncDone chan struct{}
}

func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
Expand All @@ -41,12 +41,12 @@ func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
members: make(map[string]*PresenceMessage),
internalMembers: make(map[string]*PresenceMessage),
syncState: syncInitial,
syncDone: make(chan struct{}),
}
pres.queue = newMsgQueue(pres.channel.client.Connection)
// Lock syncMtx to make all callers to Get(true) wait until the presence
// is in initial sync state. This is to not make them early return
// with an empty presence list before channel attaches.
pres.syncMtx.Lock()
return pres
}

Expand Down Expand Up @@ -122,12 +122,16 @@ func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) {
}), nil
}

func (pres *RealtimePresence) syncWait() {
func (pres *RealtimePresence) syncWait(ctx context.Context) error {
// If there's an undergoing sync operation or we wait till channel gets
// attached, the following lock is going to block until the operations
// complete.
pres.syncMtx.Lock()
pres.syncMtx.Unlock()
select {
case <-pres.syncDone:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// RTP18
Expand Down Expand Up @@ -169,7 +173,7 @@ func (pres *RealtimePresence) onAttach(msg *protocolMessage, isAttachWithoutMess
pres.leaveMembers(pres.members) // RTP19a
if pres.syncState == syncInitial {
pres.syncState = syncComplete
pres.syncMtx.Unlock()
close(pres.syncDone)
}
}
pres.queue.Flush() // RTP5b
Expand Down Expand Up @@ -200,7 +204,7 @@ func (pres *RealtimePresence) syncStart() {
} else if pres.syncState != syncInitial {
// Sync has started, make all callers to Get(true) wait. If it's channel's
// initial sync, the callers are already waiting.
pres.syncMtx.Lock()
pres.syncDone = make(chan struct{})
}
pres.syncState = syncInProgress
pres.beforeSyncMembers = make(map[string]*PresenceMessage, len(pres.members)) // RTP19
Expand Down Expand Up @@ -237,7 +241,7 @@ func (pres *RealtimePresence) syncEnd() {
pres.syncState = syncComplete
// Sync has completed, unblock all callers to Get(true) waiting
// for the sync.
pres.syncMtx.Unlock()
close(pres.syncDone)
}

// RTP2a, RTP2b, RTP2c
Expand Down Expand Up @@ -395,7 +399,10 @@ func (pres *RealtimePresence) GetWithOptions(ctx context.Context, options ...Pre
}

if opts.waitForSync {
pres.syncWait()
err = pres.syncWait(ctx)
if err != nil {
return nil, err
}
}

pres.mtx.Lock()
Expand Down

0 comments on commit 14252e8

Please sign in to comment.