Skip to content

Commit

Permalink
fix: unsubscribe and close (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
bludot authored Apr 3, 2023
1 parent 0d609c4 commit 8d62260
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 429 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: docker-compose
run: docker compose up -d
- name: Test and generate code coverage
run: go test -v -race --tags=integration_test -coverprofile=coverage.txt -covermode=atomic ./...
run: go test -v --tags=integration_test -coverprofile=coverage.txt -covermode=atomic ./...
- run: docker compose down

- name: sonarcloud-scan
Expand Down
35 changes: 28 additions & 7 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,32 @@ import (
"github.com/go-redis/redis/v8"
)

type CacheOptions struct {
SubscriptionTimeout time.Duration
UnsubscribeAndClose bool
}

type Cache[Data any] interface {
RememberBlocking(ctx context.Context, missFn MissFunc[Data], hitFn HitFunc[Data], key string, ttl time.Duration) (*Data, error)
}

type cache[Data any] struct {
client *redis.Client
client *redis.Client
options *CacheOptions
}

type MissFunc[Data any] func(ctx context.Context) (*Data, error)
type HitFunc[Data any] func(ctx context.Context, data *Data)

func NewCache[Data any](client *redis.Client) Cache[Data] {
func NewCache[Data any](client *redis.Client, options *CacheOptions) Cache[Data] {
builtOptions := &CacheOptions{}
if options != nil {
builtOptions = options
}

return &cache[Data]{
client: client,
client: client,
options: builtOptions,
}
}

Expand Down Expand Up @@ -65,10 +77,7 @@ func (c *cache[Data]) RememberBlocking(ctx context.Context, missFn MissFunc[Data

return nil, err
}
bytedata, err := json.Marshal(*data)
if err != nil {
return nil, err
}
bytedata, _ := json.Marshal(*data)
_, err = c.client.Set(ctx, key, string(bytedata), ttl).Result()
if err != nil {
log.Println(err)
Expand All @@ -86,7 +95,13 @@ func (c *cache[Data]) RememberBlocking(ctx context.Context, missFn MissFunc[Data
func (c *cache[Data]) rememberWait(ctx context.Context, key string) (*Data, error) {
subscription := NewCacheSubscription(c.client, key)
subscription.Subscribe(ctx)

defer func() {
if c.options.UnsubscribeAndClose {
subscription.UnsubscribeAndClose(ctx)

return
}
err := subscription.Unsubscribe(ctx)
if err != nil {
log.Println(err)
Expand All @@ -97,6 +112,12 @@ func (c *cache[Data]) rememberWait(ctx context.Context, key string) (*Data, erro
if err != nil {
return nil, err
}
if c.options.SubscriptionTimeout != 0*time.Second {
go func() {
time.Sleep(c.options.SubscriptionTimeout)
subscription.UnsubscribeAndClose(ctx)
}()
}

for msg := range channel {
if msg.Payload != "" {
Expand Down
16 changes: 16 additions & 0 deletions cache_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type CacheSubscription interface {
Unsubscribe(ctx context.Context) error
UnsubscribeAndClose(ctx context.Context) error
Subscribe(ctx context.Context) CacheSubscription
GetChannel(ctx context.Context) (<-chan *redis.Message, error)
}
Expand Down Expand Up @@ -41,6 +42,21 @@ func (cs *cacheSubscription) Unsubscribe(ctx context.Context) error {
return nil
}

func (cs *cacheSubscription) UnsubscribeAndClose(ctx context.Context) error {
if cs.Subscription == nil {
return nil
}
err := cs.Subscription.Unsubscribe(ctx, cs.channel)
if err != nil {
return err
}

cs.Subscription.Close()
cs.Subscription = nil

return nil
}

func (cs *cacheSubscription) Subscribe(ctx context.Context) CacheSubscription {
cs.Subscription = cs.client.Subscribe(ctx, cs.channel)
if cs.Subscription == nil {
Expand Down
149 changes: 146 additions & 3 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func TestNewCache(t *testing.T) {
a := assert.New(t)

db, _ := redismock.NewClientMock()
cache := cache_lib.NewCache[Response](db)
cache := cache_lib.NewCache[Response](db, nil)

a.NotNil(cache)
}

func TestCache_RememberBlocking(t *testing.T) {
db, mock := redismock.NewClientMock()
cache := cache_lib.NewCache[Response](db)
cache := cache_lib.NewCache[Response](db, nil)
t.Run("single cache", func(t *testing.T) {
a := assert.New(t)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestNewCacheSubscription(t *testing.T) {
DB: 0,
})

cache := cache_lib.NewCache[Response](redisClient)
cache := cache_lib.NewCache[Response](redisClient, nil)

t.Run("Multiple calls", func(t *testing.T) {
a := assert.New(t)
Expand All @@ -156,3 +156,146 @@ func TestNewCacheSubscription(t *testing.T) {
a.Equal(response, *result)
})
}

func TestNewCacheSubscriptionWithOptions(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: ":6379", // We connect to host redis, thats what the hostname of the redis service is set to in the docker-compose
DB: 0,
})

cache := cache_lib.NewCache[Response](redisClient, &cache_lib.CacheOptions{
SubscriptionTimeout: 1 * time.Second,
UnsubscribeAndClose: true,
})

t.Run("Multiple calls", func(t *testing.T) {
a := assert.New(t)

response := Response{Result: true}

go func() {
_, _ = cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(2 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data", 1*time.Second)
}()

result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(2 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data", 1*time.Second)

a.NoError(err)
a.Equal(response, *result)
})
}

func TestCacheFailSet(t *testing.T) {
db, mock := redismock.NewClientMock()

cache := cache_lib.NewCache[Response](db, nil)

t.Run("fail set", func(t *testing.T) {
a := assert.New(t)

response := Response{Result: true}

mock.ExpectGet("data").SetVal("")
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true)
mock.ExpectSet("data", "", 1*time.Second).SetErr(errors.New("error"))

result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(2 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data", 1*time.Second)

a.Error(err)
a.Nil(result)
})

t.Run("fail publish", func(t *testing.T) {
a := assert.New(t)

response := Response{Result: true}

mock.ExpectGet("data").SetVal("")
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true)
mock.ExpectSet("data", "{\"result\":true}", 1*time.Second).SetVal("OK")
mock.ExpectPublish("data", response).SetErr(errors.New("error"))

result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(2 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data", 1*time.Second)

a.Error(err)
a.Nil(result)
})
}

func TestRememberWait(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: ":6379", // We connect to host redis, thats what the hostname of the redis service is set to in the docker-compose
DB: 0,
})

cache := cache_lib.NewCache[Response](redisClient, &cache_lib.CacheOptions{
SubscriptionTimeout: 1 * time.Second,
UnsubscribeAndClose: true,
})

t.Run("single cache", func(t *testing.T) {
a := assert.New(t)
response := Response{Result: true}
//responseString, _ := json.Marshal(response)
go func() {
_, _ = cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(8 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data2", 1*time.Second)
}()
_, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(5 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data2", 1*time.Second)

a.Error(err)
a.Equal("error reading from pub/sub", err.Error())
})
}

func TestRememberWait2(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: ":6379", // We connect to host redis, thats what the hostname of the redis service is set to in the docker-compose
DB: 0,
})

cache := cache_lib.NewCache[Response](redisClient, &cache_lib.CacheOptions{
SubscriptionTimeout: 1 * time.Second,
UnsubscribeAndClose: true,
})

t.Run("single cache", func(t *testing.T) {
a := assert.New(t)
response := Response{Result: true}
//responseString, _ := json.Marshal(response)
go func() {
_, _ = cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
time.Sleep(2 * time.Second)

return &response, nil
}, func(ctx context.Context, data *Response) {}, "data3", 1*time.Second)
}()
_, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) {
return &response, nil
}, func(ctx context.Context, data *Response) {}, "data3", 1*time.Second)

a.NoError(err)
})
}
4 changes: 2 additions & 2 deletions examples/pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.43.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
)
8 changes: 8 additions & 0 deletions examples/pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redismock/v8 v8.11.5 h1:RJFIiua58hrBrSpXhnGX3on79AU3S271H4ZhRI1wyVo=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/gofiber/fiber/v2 v2.41.0 h1:YhNoUS/OTjEz+/WLYuQ01xI7RXgKEFnGBKMagAu5f0M=
Expand All @@ -23,8 +25,10 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.43.0 h1:Gy4sb32C98fbzVWZlTM1oTMdLWGyvxR03VhM6cBIU4g=
Expand All @@ -37,13 +41,16 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -52,3 +59,4 @@ golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
7 changes: 4 additions & 3 deletions examples/pubsub/internal/services/img_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package img_client

import (
"context"
"github.com/go-resty/resty/v2"
"io"
"log"
"net/http"

"github.com/go-resty/resty/v2"
)

type imageClient struct {
Expand All @@ -26,7 +25,9 @@ func NewImageClient(client *http.Client) ImageClient {
func (restyClient *imageClient) GetImage(ctx context.Context) (*io.ReadCloser, error) {
log.Println("getting image!")
client := restyClient.RestyClient.GetClient()
resp, err := client.Get("https://api.catboys.com/img")
//resp, err := client.Get("https://api.catboys.com/img")
resp, err := client.Get("http://localhost:4003")

if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions examples/pubsub/internal/services/user_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (restyClient *userClient) GetUser(ctx context.Context, body UserBody) (*io.
"gender": body.Gender,
})
client := restyClient.RestyClient.GetClient()

resp, err := client.Get("https://randomuser.me/api/")
//resp, err := client.Get("https://randomuser.me/api/")
resp, err := client.Get("http://localhost:4002")
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 8d62260

Please sign in to comment.