diff --git a/pool.go b/pool.go index 62514f4..a04e030 100644 --- a/pool.go +++ b/pool.go @@ -32,3 +32,44 @@ func (pc *PoolClient) Close() error { }() return nil } + +// NewClientPool creates a *ClientPool +// from the given ClientFactory and pool +// it to scale with expiration. +func NewClientPool( + clientFactory ClientFactory, + scale uint, + expires time.Duration, +) *ClientPool { + pool := make(chan *PoolClient, scale) + go func() { + for { + c, err := clientFactory() + pc := &PoolClient{ + Client: c, + Err: err, + returnClient: pool, + expires: time.Now().Add(expires), + } + pool <- pc + } + }() + return &ClientPool{ + createClient: pool, + } +} + +// ClientPool pools client created from +// a given ClientFactory. +type ClientPool struct { + createClient <-chan *PoolClient +} + +// CreateClient implements ClientFactory +func (p *ClientPool) CreateClient() (c Client, err error) { + pc := <-p.createClient + if c, err = pc, pc.Err; err != nil { + return nil, err + } + return +} diff --git a/pool_test.go b/pool_test.go index b703d7b..91b3221 100644 --- a/pool_test.go +++ b/pool_test.go @@ -1,10 +1,51 @@ package gofast import ( + "fmt" + "net" + "sync/atomic" "testing" "time" ) +// mockConn is a net.Conn implementation only +// indicates if its Close method been called or not. +// If it is true, means the Close method has been called. +type mockConn bool + +func (mc *mockConn) Read(b []byte) (n int, err error) { + return 0, nil +} + +func (mc *mockConn) Write(b []byte) (n int, err error) { + return 0, nil +} + +func (mc *mockConn) Close() error { + *mc = true + return nil +} + +func (mc *mockConn) LocalAddr() net.Addr { + return nil +} + +func (mc *mockConn) RemoteAddr() net.Addr { + return nil +} + +func (mc *mockConn) SetDeadline(t time.Time) error { + return nil +} + +func (mc *mockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (mc *mockConn) SetWriteDeadline(t time.Time) error { + return nil +} + func TestPoolClient_Expired(t *testing.T) { // client that expired pc := &PoolClient{ @@ -51,3 +92,147 @@ func TestPoolClient_Close(t *testing.T) { t.Errorf("expected to get returned client, got nothing but blocked") } } + +func TestClientPool_CreateClient_withErr(t *testing.T) { + + // buffered client with error + cpHasError := NewClientPool( + SimpleClientFactory(func() (net.Conn, error) { + return nil, fmt.Errorf("dummy error") + }, 10), + 10, 1*time.Millisecond, + ) + c, err := cpHasError.CreateClient() + if c != nil { + t.Errorf("expected nil, got %#v", c) + } + + if err == nil { + t.Errorf("expected error, got nil") + } else if want, have := "dummy error", err.Error(); want != have { + t.Errorf("expected %#v, got %#v", want, have) + } + + // unbuffered client with error + cpHasError = NewClientPool( + SimpleClientFactory(func() (net.Conn, error) { + return nil, fmt.Errorf("dummy error") + }, 0), + 0, 1*time.Millisecond, + ) + c, err = cpHasError.CreateClient() + if c != nil { + t.Errorf("expected nil, got %#v", c) + } + + if err == nil { + t.Errorf("expected error, got nil") + } else if want, have := "dummy error", err.Error(); want != have { + t.Errorf("expected %#v, got %#v", want, have) + } + +} + +func TestClientPool_CreateClient_Return_0(t *testing.T) { + + var counter uint64 + + // buffered client with error + cp := NewClientPool( + SimpleClientFactory(func() (net.Conn, error) { + conn := mockConn(false) + atomic.AddUint64(&counter, 1) + return &conn, nil + }, 0), + 0, 1000*time.Millisecond, + ) + + // create first client + c1, err := cp.CreateClient() + if c1 == nil { + t.Error("expected client, got nil") + } + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + c1.Close() + + reused := make(chan Client) + go func() { + // loop until getting the supposedly returned client + for { + c, err := cp.CreateClient() + if c == nil { + t.Error("expected client, got nil") + } + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + if c1 == c { + reused <- c + break + } + } + }() + + select { + case <-reused: + total := atomic.LoadUint64(&counter) + t.Logf("returned client got reused with %d concurrent connections", + total) + case <-time.After(time.Millisecond): + t.Errorf("client is not reused") + } +} + +func TestClientPool_CreateClient_Return_40(t *testing.T) { + + var counter uint64 + + // buffered client with error + cp := NewClientPool( + SimpleClientFactory(func() (net.Conn, error) { + conn := mockConn(false) + atomic.AddUint64(&counter, 1) + return &conn, nil + }, 0), + 40, 1000*time.Millisecond, + ) + + // create first client + c1, err := cp.CreateClient() + if c1 == nil { + t.Error("expected client, got nil") + } + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + c1.Close() + + reused := make(chan Client) + go func() { + // loop until getting the supposedly returned client + for { + c, err := cp.CreateClient() + if c == nil { + t.Error("expected client, got nil") + } + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + if c1 == c { + reused <- c + break + } + } + }() + + select { + case <-reused: + total := atomic.LoadUint64(&counter) + t.Logf("returned client got reused with %d concurrent connections", + total) + case <-time.After(time.Millisecond): + t.Errorf("client is not reused") + } +}