From f42bc06806d54f908899069fd5609a3d6e7b2643 Mon Sep 17 00:00:00 2001 From: Theo Date: Sun, 24 Dec 2017 18:58:25 +0000 Subject: [PATCH] ArrayBlockingQueue: Performance Enhancements --- Makefile | 2 +- README.md | 11 ++-- arrayBlockingQueue.go | 6 +- ...ueue_test.go => arrayBlockingQueue_test.go | 52 +++++++++++---- blockingQueue.go | 66 ++++++++++++------- errors.go | 2 +- linkedBlockingQueue.go | 6 +- 7 files changed, 95 insertions(+), 50 deletions(-) rename blockingQueue_test.go => arrayBlockingQueue_test.go (78%) diff --git a/Makefile b/Makefile index 86394a7..8d729a9 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test: .PHONY: bench bench: - GOPATH=$(GOPATH) go test -bench=. -check.b -benchmem + GOPATH=$(GOPATH) go test -bench=. -check.b .PHONY: docs docs: diff --git a/README.md b/README.md index 8e107c4..ef24a1c 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,12 @@ Using: Simple operations - no goroutines ```text -ArrayBlockingQueueSuite.BenchmarkPeek 50000000 56.5 ns/op -ArrayBlockingQueueSuite.BenchmarkPop 50000000 49.1 ns/op -ArrayBlockingQueueSuite.BenchmarkPopOverflow 50000000 51.3 ns/op -ArrayBlockingQueueSuite.BenchmarkPush 20000000 84.7 ns/op -ArrayBlockingQueueSuite.BenchmarkPushOverflow 20000000 75.3 ns/op +ArrayBlockingQueueSuite.BenchmarkPeek 100000000 21.0 ns/op +ArrayBlockingQueueSuite.BenchmarkPop 100000000 20.7 ns/op +ArrayBlockingQueueSuite.BenchmarkPopOverflow 100000000 20.8 ns/op +ArrayBlockingQueueSuite.BenchmarkPush 50000000 38.9 ns/op +ArrayBlockingQueueSuite.BenchmarkPushOverflow 50000000 39.0 ns/op + ``` ## LICENCE diff --git a/arrayBlockingQueue.go b/arrayBlockingQueue.go index 36d5a52..04361fb 100644 --- a/arrayBlockingQueue.go +++ b/arrayBlockingQueue.go @@ -37,12 +37,12 @@ func NewArrayBlockingQueue(capacity uint64) (*BlockingQueue, error) { return nil, ErrorCapacity } - lock := new(sync.RWMutex) + lock := new(sync.Mutex) return &BlockingQueue{ lock: lock, - notEmpty: sync.NewCond(lock.RLocker()), - notFull: sync.NewCond(lock.RLocker()), + notEmpty: sync.NewCond(lock), + notFull: sync.NewCond(lock), count: uint64(0), store: NewArrayStore(capacity), }, nil diff --git a/blockingQueue_test.go b/arrayBlockingQueue_test.go similarity index 78% rename from blockingQueue_test.go rename to arrayBlockingQueue_test.go index f533758..1627605 100644 --- a/blockingQueue_test.go +++ b/arrayBlockingQueue_test.go @@ -51,7 +51,7 @@ func (s *ArrayBlockingQueueSuite) TestIncrement(c *C) { } func (s *ArrayBlockingQueueSuite) TestPush(c *C) { - for i:= 0;i < 16; i+=1 { + for i := 0; i < 16; i += 1 { s.queue.Push(i) } @@ -62,7 +62,6 @@ func (s *ArrayBlockingQueueSuite) TestPush(c *C) { c.Assert(err, ErrorMatches, "ERROR_FULL: attempt to Put while Queue is Full") } - func (s *ArrayBlockingQueueSuite) BenchmarkPushOverflow(c *C) { for i := 0; i < c.N; i++ { s.queue.Push(i) @@ -80,22 +79,21 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPush(c *C) { } func (s *ArrayBlockingQueueSuite) TestPop(c *C) { - for i:= 0;i < 10; i+=1 { + for i := 0; i < 10; i += 1 { s.queue.Push(i) } - for i:= 0;i < 10; i+=1 { + for i := 0; i < 10; i += 1 { s.queue.Pop() } c.Assert(s.queue.Size(), Equals, uint64(0)) res, err := s.queue.Pop() - c.Assert(res, Equals, false) + c.Assert(res, IsNil) c.Assert(err, ErrorMatches, "ERROR_EMPTY: attempt to Get while Queue is Empty") } - func (s *ArrayBlockingQueueSuite) BenchmarkPopOverflow(c *C) { for i := 0; i < c.N; i++ { s.queue.Pop() @@ -116,9 +114,8 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPop(c *C) { } } - func (s *ArrayBlockingQueueSuite) TestClear(c *C) { - for i:= 0;i < 10; i+=1 { + for i := 0; i < 10; i += 1 { s.queue.Push(i) } @@ -127,9 +124,8 @@ func (s *ArrayBlockingQueueSuite) TestClear(c *C) { c.Assert(s.queue.Size(), Equals, uint64(0)) } - func (s *ArrayBlockingQueueSuite) TestPeek(c *C) { - for i:= 0;i < 10; i+=1 { + for i := 0; i < 10; i += 1 { s.queue.Push(i) } @@ -140,7 +136,6 @@ func (s *ArrayBlockingQueueSuite) TestPeek(c *C) { c.Assert(s.queue.Peek(), Equals, 1) } - func (s *ArrayBlockingQueueSuite) BenchmarkPeek(c *C) { for i := 0; i < c.N; i++ { s.queue.Peek() @@ -148,7 +143,9 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPeek(c *C) { q, _ := NewArrayBlockingQueue(math.MaxUint16) - q.Push(1);q.Push(1);q.Push(1) + q.Push(1) + q.Push(1) + q.Push(1) c.ResetTimer() @@ -157,3 +154,34 @@ func (s *ArrayBlockingQueueSuite) BenchmarkPeek(c *C) { } } +//func (s *ArrayBlockingQueueSuite) TestPut(c *C) { +// defer func() { +// if r := recover(); r == nil { +// c.Errorf("TestPutNotFull should have panicked!") +// } +// }() +// +// s.queue.Put(nil) +// +// for i := 0; i < 16; i += 1 { +// s.queue.Push(i) +// } +// +// c.Assert(s.queue.Size(), Equals, uint64(16)) +// +// n := 2 +// running := make(chan bool, n) +// awake := make(chan bool, n) +// for i := 0; i < n; i++ { +// go func() { +// m.Lock() +// running <- true +// c.Wait() +// awake <- true +// m.Unlock() +// }() +// } +// for i := 0; i < n; i++ { +// <-running // Wait for everyone to run. +// } +//} diff --git a/blockingQueue.go b/blockingQueue.go index cf64013..7c73737 100644 --- a/blockingQueue.go +++ b/blockingQueue.go @@ -14,7 +14,7 @@ type BlockingQueue struct { count uint64 // Main lock guarding all access - lock *sync.RWMutex + lock *sync.Mutex // Condition for waiting reads notEmpty *sync.Cond @@ -47,18 +47,20 @@ func (q BlockingQueue) inc(idx uint64) uint64 { // Size returns this current elements size, is concurrent safe func (q BlockingQueue) Size() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + res := q.count + q.lock.Unlock() - return q.count + return res } // Capacity returns this current elements remaining capacity, is concurrent safe func (q BlockingQueue) Capacity() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + res := uint64(q.store.Size() - q.count) + q.lock.Unlock() - return uint64(q.store.Size() - q.count) + return res } // Push element at current write position, advances, and signals. @@ -100,44 +102,56 @@ func (q *BlockingQueue) Offer(item interface{}) bool { panic("Null item") } - q.lock.RLock() - defer q.lock.RUnlock() + var res bool + q.lock.Lock() if q.count == q.store.Size() { - return false + res = false } else { q.push(item) - return true + res = true } + q.lock.Unlock() + + return res } // Pops an element from the head of the queue. // Does not block the current goroutine func (q *BlockingQueue) Pop() (interface{}, error) { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + + var res interface{} + var err error if q.count == 0 { // Case empty - return false, ErrorEmpty + res, err = nil, ErrorEmpty } else { var item = q.pop() - return item, nil + res, err = item, nil } + q.lock.Unlock() + + return res, err } // Just attempts to return the tail element of the queue func (q BlockingQueue) Peek() interface{} { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + + var res interface{} if q.count == 0 { // Case empty - return nil + res = nil } else { var item = q.store.Get(q.readIndex) - return item + res = item } + q.lock.Unlock() + + return res } func (q BlockingQueue) IsEmpty() bool { @@ -146,8 +160,7 @@ func (q BlockingQueue) IsEmpty() bool { // Clears all the queues elements, cleans up, signals waiters for queue is empty func (q *BlockingQueue) Clear() { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() // Start from head up to the tail next := q.readIndex @@ -161,13 +174,13 @@ func (q *BlockingQueue) Clear() { q.readIndex = uint64(0) q.writeIndex = uint64(0) q.notFull.Broadcast() + q.lock.Unlock() } // Takes an element from the head of the queue. // It blocks the current goroutine if the queue is Empty until notified func (q *BlockingQueue) Get() (interface{}, error) { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() for q.count == 0 { // We wait here until the queue has an item @@ -176,6 +189,8 @@ func (q *BlockingQueue) Get() (interface{}, error) { // Critical section after wait released and predicate is false var item, err = q.Pop() + q.lock.Unlock() + return item, err } @@ -186,8 +201,7 @@ func (q *BlockingQueue) Put(item interface{}) (bool, error) { panic("Null item") } - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() for q.count == q.store.Size() { // We wait here until the queue has an empty slot @@ -196,5 +210,7 @@ func (q *BlockingQueue) Put(item interface{}) (bool, error) { // Critical section after wait released and predicate is false var res, err = q.Push(item) + q.lock.Unlock() + return res, err } diff --git a/errors.go b/errors.go index d1dbcfe..7786d40 100644 --- a/errors.go +++ b/errors.go @@ -8,4 +8,4 @@ import "errors" var ErrorCapacity = errors.New("ERROR_CAPACITY: attempt to Create Queue with invalid Capacity") var ErrorFull = errors.New("ERROR_FULL: attempt to Put while Queue is Full") -var ErrorEmpty = errors.New("ERROR_EMPTY: attempt to Get while Queue is Empty") \ No newline at end of file +var ErrorEmpty = errors.New("ERROR_EMPTY: attempt to Get while Queue is Empty") diff --git a/linkedBlockingQueue.go b/linkedBlockingQueue.go index cd8b6ab..bc93dfc 100644 --- a/linkedBlockingQueue.go +++ b/linkedBlockingQueue.go @@ -39,12 +39,12 @@ func NewLinkedBlockingQueue(capacity uint64) (*BlockingQueue, error) { return nil, ErrorCapacity } - lock := new(sync.RWMutex) + lock := new(sync.Mutex) return &BlockingQueue{ lock: lock, - notEmpty: sync.NewCond(lock.RLocker()), - notFull: sync.NewCond(lock.RLocker()), + notEmpty: sync.NewCond(lock), + notFull: sync.NewCond(lock), count: uint64(0), store: NewLinkedListStore(), }, nil