This repository has been archived by the owner on Feb 13, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathspooler.go
64 lines (57 loc) · 1.47 KB
/
spooler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package butteredscones
import (
"time"
)
// Spooler accepts items on the In channel and chunks them into items on the
// Out channel.
type Spooler struct {
In chan *FileData
Out chan []*FileData
size int
timeout time.Duration
}
const (
// The number of items that can be buffered in the Out channel.
spoolOutBuffer = 4
)
func NewSpooler(size int, timeout time.Duration) *Spooler {
return &Spooler{
In: make(chan *FileData, size*spoolOutBuffer),
Out: make(chan []*FileData, spoolOutBuffer),
size: size,
timeout: timeout,
}
}
// Spool accepts items from the In channel and spools them into the Out
// channel. To stop the spooling, close the In channel.
func (s *Spooler) Spool() {
timer := time.NewTimer(s.timeout)
currentChunk := make([]*FileData, 0, s.size)
for {
select {
case fileData, ok := <-s.In:
if ok {
currentChunk = append(currentChunk, fileData)
if len(currentChunk) >= s.size {
s.Out <- currentChunk
currentChunk = make([]*FileData, 0, s.size)
}
} else {
return
}
case <-timer.C:
if len(currentChunk) > 0 {
select {
case s.Out <- currentChunk:
currentChunk = make([]*FileData, 0, s.size)
default:
// Never block trying to send to the channel because of a timer
// firing. Otherwise, small chunks may be added to the channel. If
// we can't send immediately, we might as well keep spooling to build
// up a bigger chunk.
}
}
}
timer.Reset(s.timeout)
}
}