-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathsession.go
61 lines (53 loc) · 1.13 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import (
"context"
"sync"
log "github.com/sirupsen/logrus"
"github.com/tonkeeper/bridge/datatype"
)
type Session struct {
mux sync.RWMutex
ClientIds []string
MessageCh chan datatype.SseMessage
storage db
Closer chan interface{}
lastEventId int64
}
func NewSession(s db, clientIds []string, lastEventId int64) *Session {
session := Session{
mux: sync.RWMutex{},
ClientIds: clientIds,
storage: s,
MessageCh: make(chan datatype.SseMessage, 10),
Closer: make(chan interface{}),
lastEventId: lastEventId,
}
return &session
}
func (s *Session) worker() {
log := log.WithField("prefix", "Session.worker")
queue, err := s.storage.GetMessages(context.TODO(), s.ClientIds, s.lastEventId)
if err != nil {
log.Info("get queue error: ", err)
}
for _, m := range queue {
select {
case <-s.Closer:
break
default:
s.MessageCh <- m
}
}
<-s.Closer
close(s.MessageCh)
}
func (s *Session) AddMessageToQueue(ctx context.Context, mes datatype.SseMessage) {
select {
case <-s.Closer:
default:
s.MessageCh <- mes
}
}
func (s *Session) Start() {
go s.worker()
}