forked from buraksezer/olric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocker.go
158 lines (132 loc) · 3.43 KB
/
locker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package olric
import (
"errors"
"sync"
"sync/atomic"
)
// Slightly modified version of https://github.com/moby/moby/tree/master/pkg/locker
/*
locker provides a mechanism for creating finer-grained locking to help
free up more global locks to handle other tasks.
The implementation looks close to a sync.Mutex, however the user must provide a
reference to use to refer to the underlying lock when locking and unlocking,
and unlock may generate an error.
If a lock with a given name does not exist when `Lock` is called, one is
created.
Lock references are automatically cleaned up on `Unlock` if nothing else is
waiting for the lock.
*/
// ErrNoSuchLock is returned when the requested lock does not exist
var ErrNoSuchLock = errors.New("no such lock")
// locker provides a locking mechanism based on the passed in reference name
type locker struct {
mu sync.Mutex
locks map[string]*lockCtr
}
// lockCtr is used by Locker to represent a lock with a given name.
type lockCtr struct {
mu sync.Mutex
// waiters is the number of waiters waiting to acquire the lock
// this is int32 instead of uint32 so we can add `-1` in `dec()`
waiters int32
done chan struct{}
}
// inc increments the number of waiters waiting for the lock
func (l *lockCtr) inc() {
atomic.AddInt32(&l.waiters, 1)
}
// dec decrements the number of waiters waiting on the lock
func (l *lockCtr) dec() {
atomic.AddInt32(&l.waiters, -1)
}
// count gets the current number of waiters
func (l *lockCtr) count() int32 {
return atomic.LoadInt32(&l.waiters)
}
// lock locks the mutex
func (l *lockCtr) lock() {
l.mu.Lock()
}
// unlock unlocks the mutex
func (l *lockCtr) unlock() {
l.mu.Unlock()
}
// newLocker creates a new Locker
func newLocker() *locker {
return &locker{
locks: make(map[string]*lockCtr),
}
}
func (l *locker) length() int {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks == nil {
return 0
}
return len(l.locks)
}
func (l *locker) check(name string) bool {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks == nil {
return false
}
_, exists := l.locks[name]
return exists
}
func (l *locker) unlockNotifier(name string) chan struct{} {
l.mu.Lock()
defer l.mu.Unlock()
if l.locks == nil {
return nil
}
nameLock, exists := l.locks[name]
if !exists {
return nil
}
return nameLock.done
}
// lock locks a mutex with the given name. If it doesn't exist, one is created
func (l *locker) lock(name string) {
l.mu.Lock()
if l.locks == nil {
l.locks = make(map[string]*lockCtr)
}
nameLock, exists := l.locks[name]
if !exists {
nameLock = &lockCtr{}
l.locks[name] = nameLock
}
// increment the nameLock waiters while inside the main mutex
// this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently
nameLock.inc()
nameLock.done = make(chan struct{})
l.mu.Unlock()
// Lock the nameLock outside the main mutex so we don't block other operations
// once locked then we can decrement the number of waiters for this lock
nameLock.lock()
nameLock.dec()
}
// unlock unlocks the mutex with the given name
// If the given lock is not being waited on by any other callers, it is deleted
func (l *locker) unlock(name string) error {
l.mu.Lock()
nameLock, exists := l.locks[name]
if !exists {
l.mu.Unlock()
return ErrNoSuchLock
}
if nameLock.count() == 0 {
delete(l.locks, name)
}
nameLock.unlock()
select {
case <-nameLock.done:
l.mu.Unlock()
return nil
default:
}
close(nameLock.done)
l.mu.Unlock()
return nil
}