Skip to content

Commit

Permalink
ArrayBlockingQueue: Performance Enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
Theo committed Dec 24, 2017
1 parent a15448c commit f42bc06
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test:

.PHONY: bench
bench:
GOPATH=$(GOPATH) go test -bench=. -check.b -benchmem
GOPATH=$(GOPATH) go test -bench=. -check.b

.PHONY: docs
docs:
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ Using:
Simple operations - no goroutines

```text
ArrayBlockingQueueSuite.BenchmarkPeek 50000000 56.5 ns/op
ArrayBlockingQueueSuite.BenchmarkPop 50000000 49.1 ns/op
ArrayBlockingQueueSuite.BenchmarkPopOverflow 50000000 51.3 ns/op
ArrayBlockingQueueSuite.BenchmarkPush 20000000 84.7 ns/op
ArrayBlockingQueueSuite.BenchmarkPushOverflow 20000000 75.3 ns/op
ArrayBlockingQueueSuite.BenchmarkPeek 100000000 21.0 ns/op
ArrayBlockingQueueSuite.BenchmarkPop 100000000 20.7 ns/op
ArrayBlockingQueueSuite.BenchmarkPopOverflow 100000000 20.8 ns/op
ArrayBlockingQueueSuite.BenchmarkPush 50000000 38.9 ns/op
ArrayBlockingQueueSuite.BenchmarkPushOverflow 50000000 39.0 ns/op
```

## LICENCE
Expand Down
6 changes: 3 additions & 3 deletions arrayBlockingQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func NewArrayBlockingQueue(capacity uint64) (*BlockingQueue, error) {
return nil, ErrorCapacity
}

lock := new(sync.RWMutex)
lock := new(sync.Mutex)

return &BlockingQueue{
lock: lock,
notEmpty: sync.NewCond(lock.RLocker()),
notFull: sync.NewCond(lock.RLocker()),
notEmpty: sync.NewCond(lock),
notFull: sync.NewCond(lock),
count: uint64(0),
store: NewArrayStore(capacity),
}, nil
Expand Down
52 changes: 40 additions & 12 deletions blockingQueue_test.go → arrayBlockingQueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *ArrayBlockingQueueSuite) TestIncrement(c *C) {
}

func (s *ArrayBlockingQueueSuite) TestPush(c *C) {
for i:= 0;i < 16; i+=1 {
for i := 0; i < 16; i += 1 {
s.queue.Push(i)
}

Expand All @@ -62,7 +62,6 @@ func (s *ArrayBlockingQueueSuite) TestPush(c *C) {
c.Assert(err, ErrorMatches, "ERROR_FULL: attempt to Put while Queue is Full")
}


func (s *ArrayBlockingQueueSuite) BenchmarkPushOverflow(c *C) {
for i := 0; i < c.N; i++ {
s.queue.Push(i)
Expand All @@ -80,22 +79,21 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPush(c *C) {
}

func (s *ArrayBlockingQueueSuite) TestPop(c *C) {
for i:= 0;i < 10; i+=1 {
for i := 0; i < 10; i += 1 {
s.queue.Push(i)
}

for i:= 0;i < 10; i+=1 {
for i := 0; i < 10; i += 1 {
s.queue.Pop()
}

c.Assert(s.queue.Size(), Equals, uint64(0))

res, err := s.queue.Pop()
c.Assert(res, Equals, false)
c.Assert(res, IsNil)
c.Assert(err, ErrorMatches, "ERROR_EMPTY: attempt to Get while Queue is Empty")
}


func (s *ArrayBlockingQueueSuite) BenchmarkPopOverflow(c *C) {
for i := 0; i < c.N; i++ {
s.queue.Pop()
Expand All @@ -116,9 +114,8 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPop(c *C) {
}
}


func (s *ArrayBlockingQueueSuite) TestClear(c *C) {
for i:= 0;i < 10; i+=1 {
for i := 0; i < 10; i += 1 {
s.queue.Push(i)
}

Expand All @@ -127,9 +124,8 @@ func (s *ArrayBlockingQueueSuite) TestClear(c *C) {
c.Assert(s.queue.Size(), Equals, uint64(0))
}


func (s *ArrayBlockingQueueSuite) TestPeek(c *C) {
for i:= 0;i < 10; i+=1 {
for i := 0; i < 10; i += 1 {
s.queue.Push(i)
}

Expand All @@ -140,15 +136,16 @@ func (s *ArrayBlockingQueueSuite) TestPeek(c *C) {
c.Assert(s.queue.Peek(), Equals, 1)
}


func (s *ArrayBlockingQueueSuite) BenchmarkPeek(c *C) {
for i := 0; i < c.N; i++ {
s.queue.Peek()
}

q, _ := NewArrayBlockingQueue(math.MaxUint16)

q.Push(1);q.Push(1);q.Push(1)
q.Push(1)
q.Push(1)
q.Push(1)

c.ResetTimer()

Expand All @@ -157,3 +154,34 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPeek(c *C) {
}
}

//func (s *ArrayBlockingQueueSuite) TestPut(c *C) {
// defer func() {
// if r := recover(); r == nil {
// c.Errorf("TestPutNotFull should have panicked!")
// }
// }()
//
// s.queue.Put(nil)
//
// for i := 0; i < 16; i += 1 {
// s.queue.Push(i)
// }
//
// c.Assert(s.queue.Size(), Equals, uint64(16))
//
// n := 2
// running := make(chan bool, n)
// awake := make(chan bool, n)
// for i := 0; i < n; i++ {
// go func() {
// m.Lock()
// running <- true
// c.Wait()
// awake <- true
// m.Unlock()
// }()
// }
// for i := 0; i < n; i++ {
// <-running // Wait for everyone to run.
// }
//}
66 changes: 41 additions & 25 deletions blockingQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type BlockingQueue struct {
count uint64

// Main lock guarding all access
lock *sync.RWMutex
lock *sync.Mutex

// Condition for waiting reads
notEmpty *sync.Cond
Expand Down Expand Up @@ -47,18 +47,20 @@ func (q BlockingQueue) inc(idx uint64) uint64 {

// Size returns this current elements size, is concurrent safe
func (q BlockingQueue) Size() uint64 {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()
res := q.count
q.lock.Unlock()

return q.count
return res
}

// Capacity returns this current elements remaining capacity, is concurrent safe
func (q BlockingQueue) Capacity() uint64 {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()
res := uint64(q.store.Size() - q.count)
q.lock.Unlock()

return uint64(q.store.Size() - q.count)
return res
}

// Push element at current write position, advances, and signals.
Expand Down Expand Up @@ -100,44 +102,56 @@ func (q *BlockingQueue) Offer(item interface{}) bool {
panic("Null item")
}

q.lock.RLock()
defer q.lock.RUnlock()
var res bool

q.lock.Lock()
if q.count == q.store.Size() {
return false
res = false
} else {
q.push(item)
return true
res = true
}
q.lock.Unlock()

return res
}

// Pops an element from the head of the queue.
// Does not block the current goroutine
func (q *BlockingQueue) Pop() (interface{}, error) {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()

var res interface{}
var err error

if q.count == 0 {
// Case empty
return false, ErrorEmpty
res, err = nil, ErrorEmpty
} else {
var item = q.pop()
return item, nil
res, err = item, nil
}
q.lock.Unlock()

return res, err
}

// Just attempts to return the tail element of the queue
func (q BlockingQueue) Peek() interface{} {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()

var res interface{}

if q.count == 0 {
// Case empty
return nil
res = nil
} else {
var item = q.store.Get(q.readIndex)
return item
res = item
}
q.lock.Unlock()

return res
}

func (q BlockingQueue) IsEmpty() bool {
Expand All @@ -146,8 +160,7 @@ func (q BlockingQueue) IsEmpty() bool {

// Clears all the queues elements, cleans up, signals waiters for queue is empty
func (q *BlockingQueue) Clear() {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()

// Start from head up to the tail
next := q.readIndex
Expand All @@ -161,13 +174,13 @@ func (q *BlockingQueue) Clear() {
q.readIndex = uint64(0)
q.writeIndex = uint64(0)
q.notFull.Broadcast()
q.lock.Unlock()
}

// Takes an element from the head of the queue.
// It blocks the current goroutine if the queue is Empty until notified
func (q *BlockingQueue) Get() (interface{}, error) {
q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()

for q.count == 0 {
// We wait here until the queue has an item
Expand All @@ -176,6 +189,8 @@ func (q *BlockingQueue) Get() (interface{}, error) {

// Critical section after wait released and predicate is false
var item, err = q.Pop()
q.lock.Unlock()

return item, err
}

Expand All @@ -186,8 +201,7 @@ func (q *BlockingQueue) Put(item interface{}) (bool, error) {
panic("Null item")
}

q.lock.RLock()
defer q.lock.RUnlock()
q.lock.Lock()

for q.count == q.store.Size() {
// We wait here until the queue has an empty slot
Expand All @@ -196,5 +210,7 @@ func (q *BlockingQueue) Put(item interface{}) (bool, error) {

// Critical section after wait released and predicate is false
var res, err = q.Push(item)
q.lock.Unlock()

return res, err
}
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ import "errors"

var ErrorCapacity = errors.New("ERROR_CAPACITY: attempt to Create Queue with invalid Capacity")
var ErrorFull = errors.New("ERROR_FULL: attempt to Put while Queue is Full")
var ErrorEmpty = errors.New("ERROR_EMPTY: attempt to Get while Queue is Empty")
var ErrorEmpty = errors.New("ERROR_EMPTY: attempt to Get while Queue is Empty")
6 changes: 3 additions & 3 deletions linkedBlockingQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func NewLinkedBlockingQueue(capacity uint64) (*BlockingQueue, error) {
return nil, ErrorCapacity
}

lock := new(sync.RWMutex)
lock := new(sync.Mutex)

return &BlockingQueue{
lock: lock,
notEmpty: sync.NewCond(lock.RLocker()),
notFull: sync.NewCond(lock.RLocker()),
notEmpty: sync.NewCond(lock),
notFull: sync.NewCond(lock),
count: uint64(0),
store: NewLinkedListStore(),
}, nil
Expand Down

0 comments on commit f42bc06

Please sign in to comment.