Skip to content

Commit

Permalink
implemented ClientPool
Browse files Browse the repository at this point in the history
* a new ClientPool for pooling connections to fastcgi application.
  • Loading branch information
yookoala committed Feb 3, 2018
1 parent c989e4b commit fae9084
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 0 deletions.
41 changes: 41 additions & 0 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,44 @@ func (pc *PoolClient) Close() error {
}()
return nil
}

// NewClientPool creates a *ClientPool
// from the given ClientFactory and pool
// it to scale with expiration.
func NewClientPool(
clientFactory ClientFactory,
scale uint,
expires time.Duration,
) *ClientPool {
pool := make(chan *PoolClient, scale)
go func() {
for {
c, err := clientFactory()
pc := &PoolClient{
Client: c,
Err: err,
returnClient: pool,
expires: time.Now().Add(expires),
}
pool <- pc
}
}()
return &ClientPool{
createClient: pool,
}
}

// ClientPool pools client created from
// a given ClientFactory.
type ClientPool struct {
createClient <-chan *PoolClient
}

// CreateClient implements ClientFactory
func (p *ClientPool) CreateClient() (c Client, err error) {
pc := <-p.createClient
if c, err = pc, pc.Err; err != nil {
return nil, err
}
return
}
185 changes: 185 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,51 @@
package gofast

import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
)

// mockConn is a net.Conn implementation only
// indicates if its Close method been called or not.
// If it is true, means the Close method has been called.
type mockConn bool

func (mc *mockConn) Read(b []byte) (n int, err error) {
return 0, nil
}

func (mc *mockConn) Write(b []byte) (n int, err error) {
return 0, nil
}

func (mc *mockConn) Close() error {
*mc = true
return nil
}

func (mc *mockConn) LocalAddr() net.Addr {
return nil
}

func (mc *mockConn) RemoteAddr() net.Addr {
return nil
}

func (mc *mockConn) SetDeadline(t time.Time) error {
return nil
}

func (mc *mockConn) SetReadDeadline(t time.Time) error {
return nil
}

func (mc *mockConn) SetWriteDeadline(t time.Time) error {
return nil
}

func TestPoolClient_Expired(t *testing.T) {
// client that expired
pc := &PoolClient{
Expand Down Expand Up @@ -51,3 +92,147 @@ func TestPoolClient_Close(t *testing.T) {
t.Errorf("expected to get returned client, got nothing but blocked")
}
}

func TestClientPool_CreateClient_withErr(t *testing.T) {

// buffered client with error
cpHasError := NewClientPool(
SimpleClientFactory(func() (net.Conn, error) {
return nil, fmt.Errorf("dummy error")
}, 10),
10, 1*time.Millisecond,
)
c, err := cpHasError.CreateClient()
if c != nil {
t.Errorf("expected nil, got %#v", c)
}

if err == nil {
t.Errorf("expected error, got nil")
} else if want, have := "dummy error", err.Error(); want != have {
t.Errorf("expected %#v, got %#v", want, have)
}

// unbuffered client with error
cpHasError = NewClientPool(
SimpleClientFactory(func() (net.Conn, error) {
return nil, fmt.Errorf("dummy error")
}, 0),
0, 1*time.Millisecond,
)
c, err = cpHasError.CreateClient()
if c != nil {
t.Errorf("expected nil, got %#v", c)
}

if err == nil {
t.Errorf("expected error, got nil")
} else if want, have := "dummy error", err.Error(); want != have {
t.Errorf("expected %#v, got %#v", want, have)
}

}

func TestClientPool_CreateClient_Return_0(t *testing.T) {

var counter uint64

// buffered client with error
cp := NewClientPool(
SimpleClientFactory(func() (net.Conn, error) {
conn := mockConn(false)
atomic.AddUint64(&counter, 1)
return &conn, nil
}, 0),
0, 1000*time.Millisecond,
)

// create first client
c1, err := cp.CreateClient()
if c1 == nil {
t.Error("expected client, got nil")
}
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
c1.Close()

reused := make(chan Client)
go func() {
// loop until getting the supposedly returned client
for {
c, err := cp.CreateClient()
if c == nil {
t.Error("expected client, got nil")
}
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
if c1 == c {
reused <- c
break
}
}
}()

select {
case <-reused:
total := atomic.LoadUint64(&counter)
t.Logf("returned client got reused with %d concurrent connections",
total)
case <-time.After(time.Millisecond):
t.Errorf("client is not reused")
}
}

func TestClientPool_CreateClient_Return_40(t *testing.T) {

var counter uint64

// buffered client with error
cp := NewClientPool(
SimpleClientFactory(func() (net.Conn, error) {
conn := mockConn(false)
atomic.AddUint64(&counter, 1)
return &conn, nil
}, 0),
40, 1000*time.Millisecond,
)

// create first client
c1, err := cp.CreateClient()
if c1 == nil {
t.Error("expected client, got nil")
}
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
c1.Close()

reused := make(chan Client)
go func() {
// loop until getting the supposedly returned client
for {
c, err := cp.CreateClient()
if c == nil {
t.Error("expected client, got nil")
}
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
if c1 == c {
reused <- c
break
}
}
}()

select {
case <-reused:
total := atomic.LoadUint64(&counter)
t.Logf("returned client got reused with %d concurrent connections",
total)
case <-time.After(time.Millisecond):
t.Errorf("client is not reused")
}
}

0 comments on commit fae9084

Please sign in to comment.