-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsemaphore.go
71 lines (63 loc) · 1.34 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package kaburaya
import (
"math"
"time"
)
// Semaphore represents an elastic semaphore.
type Semaphore struct {
controller Controller
reporter Reporter
ch *elasticSemaphore
done chan struct{}
Recorder Recorder
}
// NewSem returns a semaphore.
func NewSem(duration time.Duration, gain float64) *Semaphore {
sem := &Semaphore{
controller: newDynamicTargetController(newPIDController(0.0, gain, gain, gain)),
reporter: newCPUReporter(),
ch: newElasticSemaphore(1),
done: make(chan struct{}),
}
go sem.adjust(duration)
return sem
}
// Wait decrements semaphore. If semaphore will be negative,
// it blocks the process.
func (s *Semaphore) Wait() {
s.ch.wait()
}
// Signal increments semaphore.
func (s *Semaphore) Signal() {
s.ch.signal()
}
// Stop finalize resources.
func (s *Semaphore) Stop() {
s.done <- struct{}{}
}
func (s *Semaphore) adjust(duration time.Duration) {
t := time.NewTicker(duration)
for {
select {
case <-t.C:
usage, err := s.reporter.Report()
if err != nil {
break
}
inc := s.controller.Compute(usage)
l := s.ch.incrementLimit(round(inc))
if s.Recorder != nil {
s.Recorder.Record([]float64{usage, float64(l)})
}
case <-s.done:
t.Stop()
break
}
}
}
func round(n float64) int {
if n > 0.0 {
return int(math.Ceil(n))
}
return int(math.Floor(n))
}