-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpool.go
126 lines (107 loc) · 2.74 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pool
import (
"sync"
"github.com/nats-io/nats.go"
)
// ConnPool implements pool of *nats.Conn of a bounded channel
type ConnPool struct {
mutex *sync.RWMutex
poolSize int
pool chan *nats.Conn
url string
options []nats.Option
}
// New create new ConnPool bounded to given poolSize,
// with specify URL to connect to natsd on url.
// option is used for nats#Connect when creating pool connections
func New(poolSize int, url string, options ...nats.Option) *ConnPool {
return &ConnPool{
mutex: new(sync.RWMutex),
poolSize: poolSize,
pool: make(chan *nats.Conn, poolSize),
url: url,
options: options,
}
}
func (p *ConnPool) connect() (*nats.Conn, error) {
return nats.Connect(p.url, p.options...)
}
// DisconnectAll close all connected *nats.Conn connections in pool
func (p *ConnPool) DisconnectAll() {
p.mutex.Lock()
defer p.mutex.Unlock()
close(p.pool)
for nc := range p.pool {
nc.Close()
}
p.pool = make(chan *nats.Conn, p.poolSize)
}
// Get returns *nats.Conn, if connection is available,
// or makes a new connection and returns a value if not.
// if *nats.Conn is not connected in pool, make new connection in the same way.
func (p *ConnPool) Get() (*nats.Conn, error) {
p.mutex.RLock()
defer p.mutex.RUnlock()
var nc *nats.Conn
var err error
select {
case nc = <-p.pool:
// reuse exists pool
if nc.IsConnected() != true {
// Close to be sure
nc.Close()
// disconnected conn, create new *nats.Conn
nc, err = p.connect()
}
default:
// create *nats.Conn
nc, err = p.connect()
}
return nc, err
}
// GetEncoded returns *nats.EncodedConn, the argument must be an Encoder registered with nats.RegisterEncoder.
func (p *ConnPool) GetEncoded(encType string) (*nats.EncodedConn, error) {
conn, err := p.Get()
if err != nil {
return nil, err
}
return nats.NewEncodedConn(conn, encType)
}
// Put puts *nats.Conn back into pool.
// there is no need to do Close() ahead of time,
// ConnPool will automatically do a Close() if it cannot be returned to the pool.
func (p *ConnPool) Put(nc *nats.Conn) (bool, error) {
if nc == nil {
return false, nil
}
p.mutex.RLock()
defer p.mutex.RUnlock()
var err error
if nc.IsConnected() {
err = nc.Flush()
}
select {
case p.pool <- nc:
// free capacity
return true, err
default:
// full capacity, discard & disconnect
nc.Close()
return false, err
}
}
// PutEncoded puts *nats.EncodedConn back into pool.
func (p *ConnPool) PutEncoded(ec *nats.EncodedConn) (bool, error) {
if ec == nil {
return false, nil
}
return p.Put(ec.Conn)
}
// Len returns the number of items currently pooled
func (p *ConnPool) Len() int {
return len(p.pool)
}
// Cap returns the number of pool capacity
func (p *ConnPool) Cap() int {
return cap(p.pool)
}