Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: group.Set() not updating hot cache for all nodes #69

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 139 additions & 57 deletions example_pb_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 77 additions & 18 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,78 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
}

_, err := g.setGroup.Do(key, func() (interface{}, error) {
wg := sync.WaitGroup{}
errs := make(chan error)

// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
if err := g.setFromPeer(ctx, owner, key, value, expire, false); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
if !hotCache {
return nil, nil
}
return nil, nil

g.localSet(key, value, expire, &g.hotCache)

for _, peer := range g.peers.GetAll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see this code deduplicated with its counterpart in the owner code path (line 324). Move to function?

if peer == owner {
// Avoid setting to owner a second time
continue
}
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.setFromPeer(ctx, peer, key, value, expire, true)
wg.Done()
}(peer)
}

go func() {
wg.Wait()
close(errs)
}()

var err error
for e := range errs {
if e != nil {
err = errors.Join(err, e)
}
}

return nil, err
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)

if hotCache {
// Also set to the hot cache of all peers

for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.setFromPeer(ctx, peer, key, value, expire, true)
wg.Done()
}(peer)
}

go func() {
wg.Wait()
close(errs)
}()

var err error
for e := range errs {
if e != nil {
err = errors.Join(err, e)
}
}

return nil, err
}

return nil, nil
})
return err
Expand Down Expand Up @@ -329,11 +386,11 @@ func (g *Group) Remove(ctx context.Context, key string) error {
close(errs)
}()

// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
if e != nil {
err = errors.Join(err, e)
}
Comment on lines +391 to +393
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there were some errors but the last one yielded nil, the previous code would not return any error

}

return nil, err
Expand Down Expand Up @@ -449,8 +506,8 @@ func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView

func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
Group: g.name,
Key: key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
Expand All @@ -459,8 +516,8 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
}

var expire time.Time
if res.Expire != nil && *res.Expire != 0 {
expire = time.Unix(*res.Expire/int64(time.Second), *res.Expire%int64(time.Second))
if res.Expire != 0 {
expire = time.Unix(res.Expire/int64(time.Second), res.Expire%int64(time.Second))
if time.Now().After(expire) {
return ByteView{}, errors.New("peer returned expired value")
}
Expand All @@ -473,24 +530,26 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
return value, nil
}

func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, hotCache bool) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
}
req := &pb.SetRequest{
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Expire: expire,
Group: g.name,
Key: k,
Value: v,
HotCache: hotCache,
}

return peer.Set(ctx, req)
}

func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
Group: g.name,
Key: key,
}
return peer.Remove(ctx, req)
}
Expand Down
2 changes: 1 addition & 1 deletion groupcachepb/example.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
syntax = "proto3";

option go_package = "groupcache_test";
option go_package = "./;groupcache_test";

package groupcachepb;

Expand Down
Loading