-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsql.go
284 lines (236 loc) · 6.49 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Copyright (c) 2013 VividCortex. Please see the LICENSE file for license terms.
package dbcontrol
import (
"database/sql"
"runtime/debug"
"time"
)
// SetBlockDurationCh sets a channel used to report blocks on connections. Each
// time a connection has to be waited for due to the limit imposed by
// SetConcurrency(), this channel will receive the duration for that wait as
// soon as the connection becomes available. Setting the channel to nil (i.e.,
// calling SetBlockDurationCh(nil)) will close the previously assigned channel,
// if any. Note also that this operation is safe to be called in parallel with
// other database requests.
func (db *DB) SetBlockDurationCh(c chan<- time.Duration) {
db.blockChMux.Lock()
defer db.blockChMux.Unlock()
if db.blockCh != nil {
close(db.blockCh)
}
db.blockCh = c
}
// SetUsageTimeout sets a maximum time for connection usage since it was granted
// to the caller (i.e., usage starts when a spare connection could be withdrawn
// from the pool, in case connection limiting is in use; see SetConcurrency()).
// After the time has elapsed a notification will be sent to the provided
// channel including the stack trace for the offending consumer (at the time the
// connection was requested). Setting the timeout to zero (the default) disables
// this feature. Note that this function is safe to be called anytime. Changes
// in the timeout will take effect for new requests; pending timers will still
// use the previous value. Changing the channel takes effect immediately,
// though. The previously set channel is guaranteed not to be used again after
// SetUsageTimeout() returns, thus allowing to safely close it if appropriate.
// Setting the channel to nil disables all pending and future notifications,
// until set to another valid channel. Note though that, in order to avoid
// needless resource usage, setting the channel to nil implies that no further
// timers will be started. (That is, you won't get a notification for a long
// running consumer that requested a connection when the channel was nil, even
// if you set the channel to a non-nil value before the timeout would expire.
// Consider fixing the channel and filtering events when reading from it if
// you're looking for that effect.) Note also that the timer expiring, whether
// notified or not, has no effect whatsoever on the routine using the
// connection.
func (db *DB) SetUsageTimeout(c chan<- string, timeout time.Duration) {
db.usageTimeoutMux.Lock()
defer db.usageTimeoutMux.Unlock()
db.usageTimeoutCh = c
if c != nil {
db.usageTimeout = timeout
} else {
db.usageTimeout = 0
}
}
func (db *DB) conn() func() {
releaseLock := func() {}
if db.sem != nil {
select {
case <-db.sem:
default:
start := time.Now()
<-db.sem
db.blockChMux.RLock()
if db.blockCh != nil {
db.blockCh <- time.Now().Sub(start)
}
db.blockChMux.RUnlock()
}
releaseLock = func() {
db.sem <- true
}
}
db.usageTimeoutMux.RLock()
usageTimeout := db.usageTimeout
db.usageTimeoutMux.RUnlock()
cancelUsageTimeout := func() {}
if usageTimeout != 0 {
cancelTimeoutCh := make(chan struct{}, 1)
cancelUsageTimeout = func() {
cancelTimeoutCh <- struct{}{}
close(cancelTimeoutCh)
}
stack := debug.Stack()
go func() {
select {
case <-time.After(usageTimeout):
db.usageTimeoutMux.RLock()
if db.usageTimeoutCh != nil {
db.usageTimeoutCh <- string(stack)
}
db.usageTimeoutMux.RUnlock()
case <-cancelTimeoutCh:
}
}()
}
return func() {
releaseLock()
cancelUsageTimeout()
}
}
// SetMaxIdleConns sets the maximum number of idle connections to the database.
// However, note that this only makes sense if you're not limiting the number
// of concurrent connections. Databases opened under SetConcurrency(n) for n>0
// will silently ignore this call. (The maximum number of connections in that
// case will match the concurrency value n.)
func (db *DB) SetMaxIdleConns(n int) {
if db.sem == nil {
// Not using tokens
db.DB.SetMaxIdleConns(n)
}
}
func (db *DB) Ping() error {
release := db.conn()
defer release()
return db.DB.Ping()
}
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
release := db.conn()
defer release()
return db.DB.Exec(query, args...)
}
type Rows struct {
*sql.Rows
closed bool
release func()
}
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
release := db.conn()
rows, err := db.DB.Query(query, args...)
if err != nil {
release()
return nil, err
}
return &Rows{Rows: rows, release: release}, nil
}
func (rows *Rows) Next() bool {
if rows.closed {
return false
}
next := rows.Rows.Next()
if !next {
// EOF or error: the result set was closed by Rows.Next()
rows.release()
rows.closed = true
}
return next
}
func (rows *Rows) Close() error {
err := rows.Rows.Close()
if !rows.closed {
rows.release()
rows.closed = true
}
return err
}
type Row struct {
*sql.Row
closed bool
release func()
}
func (db *DB) QueryRow(query string, args ...interface{}) *Row {
release := db.conn()
row := db.DB.QueryRow(query, args...)
return &Row{Row: row, release: release}
}
func (row *Row) Scan(dest ...interface{}) error {
err := row.Row.Scan(dest...)
if !row.closed {
row.release()
row.closed = true
}
return err
}
type Stmt struct {
*sql.Stmt
db *DB
}
func (db *DB) Prepare(query string) (*Stmt, error) {
release := db.conn()
defer release()
stmt, err := db.DB.Prepare(query)
if err != nil {
return nil, err
}
return &Stmt{Stmt: stmt, db: db}, nil
}
func (s *Stmt) Exec(args ...interface{}) (sql.Result, error) {
release := s.db.conn()
defer release()
return s.Stmt.Exec(args...)
}
func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
release := s.db.conn()
rows, err := s.Stmt.Query(args...)
if err != nil {
release()
return nil, err
}
return &Rows{Rows: rows, release: release}, nil
}
func (s *Stmt) QueryRow(args ...interface{}) *Row {
release := s.db.conn()
row := s.Stmt.QueryRow(args...)
return &Row{Row: row, release: release}
}
type Tx struct {
*sql.Tx
closed bool
release func()
}
func (db *DB) Begin() (*Tx, error) {
release := db.conn()
tx, err := db.DB.Begin()
if err != nil {
release()
return nil, err
}
return &Tx{Tx: tx, release: release}, nil
}
func (tx *Tx) Commit() error {
if !tx.closed {
defer func() {
tx.release()
tx.closed = true
}()
}
return tx.Tx.Commit()
}
func (tx *Tx) Rollback() error {
if !tx.closed {
defer func() {
tx.release()
tx.closed = true
}()
}
return tx.Tx.Rollback()
}