Skip to content

Commit

Permalink
Updated syncWait method to include syncDone channel
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Dec 4, 2023
1 parent 31b739a commit 2740853
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type RealtimePresence struct {
syncMtx sync.Mutex
syncState syncState
queue *msgQueue
syncDone chan struct{}
}

func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
Expand All @@ -41,6 +42,7 @@ func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence {
members: make(map[string]*PresenceMessage),
internalMembers: make(map[string]*PresenceMessage),
syncState: syncInitial,
syncDone: make(chan struct{}),
}
pres.queue = newMsgQueue(pres.channel.client.Connection)
// Lock syncMtx to make all callers to Get(true) wait until the presence
Expand Down Expand Up @@ -122,12 +124,16 @@ func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) {
}), nil
}

func (pres *RealtimePresence) syncWait() {
func (pres *RealtimePresence) syncWait(ctx context.Context) error {
// If there's an undergoing sync operation or we wait till channel gets
// attached, the following lock is going to block until the operations
// complete.
pres.syncMtx.Lock()
pres.syncMtx.Unlock()
select {
case <-pres.syncDone:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// RTP18
Expand Down Expand Up @@ -169,6 +175,7 @@ func (pres *RealtimePresence) onAttach(msg *protocolMessage, isAttachWithoutMess
pres.leaveMembers(pres.members) // RTP19a
if pres.syncState == syncInitial {
pres.syncState = syncComplete
close(pres.syncDone)
pres.syncMtx.Unlock()
}
}
Expand Down Expand Up @@ -200,6 +207,7 @@ func (pres *RealtimePresence) syncStart() {
} else if pres.syncState != syncInitial {
// Sync has started, make all callers to Get(true) wait. If it's channel's
// initial sync, the callers are already waiting.
pres.syncDone = make(chan struct{})
pres.syncMtx.Lock()
}
pres.syncState = syncInProgress
Expand Down Expand Up @@ -237,6 +245,7 @@ func (pres *RealtimePresence) syncEnd() {
pres.syncState = syncComplete
// Sync has completed, unblock all callers to Get(true) waiting
// for the sync.
close(pres.syncDone)
pres.syncMtx.Unlock()
}

Expand Down Expand Up @@ -395,7 +404,10 @@ func (pres *RealtimePresence) GetWithOptions(ctx context.Context, options ...Pre
}

if opts.waitForSync {
pres.syncWait()
err = pres.syncWait(ctx)
if err != nil {
return nil, err
}
}

pres.mtx.Lock()
Expand Down

0 comments on commit 2740853

Please sign in to comment.