diff --git a/example_pb_test.go b/example_pb_test.go index 621b82f..714bd70 100644 --- a/example_pb_test.go +++ b/example_pb_test.go @@ -1,88 +1,170 @@ // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.26.1 // source: example.proto -/* -Package groupcache_test is a generated protocol buffer package. +package groupcache_test -It is generated from these files: - example.proto +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) -It has these top-level messages: - User -*/ -package groupcache_test +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +type User struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Age int64 `protobuf:"varint,3,opt,name=age,proto3" json:"age,omitempty"` + IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper,proto3" json:"is_super,omitempty"` +} -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +func (x *User) Reset() { + *x = User{} + if protoimpl.UnsafeEnabled { + mi := &file_example_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} -type User struct { - Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"` - Age int64 `protobuf:"varint,3,opt,name=age" json:"age,omitempty"` - IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper" json:"is_super,omitempty"` +func (x *User) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *User) Reset() { *m = User{} } -func (m *User) String() string { return proto.CompactTextString(m) } -func (*User) ProtoMessage() {} -func (*User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*User) ProtoMessage() {} -func (m *User) GetId() string { - if m != nil { - return m.Id +func (x *User) ProtoReflect() protoreflect.Message { + mi := &file_example_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use User.ProtoReflect.Descriptor instead. +func (*User) Descriptor() ([]byte, []int) { + return file_example_proto_rawDescGZIP(), []int{0} +} + +func (x *User) GetId() string { + if x != nil { + return x.Id } return "" } -func (m *User) GetName() string { - if m != nil { - return m.Name +func (x *User) GetName() string { + if x != nil { + return x.Name } return "" } -func (m *User) GetAge() int64 { - if m != nil { - return m.Age +func (x *User) GetAge() int64 { + if x != nil { + return x.Age } return 0 } -func (m *User) GetIsSuper() bool { - if m != nil { - return m.IsSuper +func (x *User) GetIsSuper() bool { + if x != nil { + return x.IsSuper } return false } -func init() { - proto.RegisterType((*User)(nil), "groupcachepb.User") +var File_example_proto protoreflect.FileDescriptor + +var file_example_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0c, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x22, 0x57, 0x0a, + 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x69, + 0x73, 0x5f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x69, + 0x73, 0x53, 0x75, 0x70, 0x65, 0x72, 0x42, 0x14, 0x5a, 0x12, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_example_proto_rawDescOnce sync.Once + file_example_proto_rawDescData = file_example_proto_rawDesc +) + +func file_example_proto_rawDescGZIP() []byte { + file_example_proto_rawDescOnce.Do(func() { + file_example_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_proto_rawDescData) + }) + return file_example_proto_rawDescData +} + +var file_example_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_example_proto_goTypes = []interface{}{ + (*User)(nil), // 0: groupcachepb.User +} +var file_example_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } -func init() { proto.RegisterFile("example.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 148 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xad, 0x48, 0xcc, - 0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0xca, 0x2f, 0x2d, - 0x48, 0x4e, 0x4c, 0xce, 0x48, 0x2d, 0x48, 0x52, 0x0a, 0xe7, 0x62, 0x09, 0x2d, 0x4e, 0x2d, 0x12, - 0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xca, 0x4c, 0x11, - 0x12, 0xe2, 0x62, 0xc9, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x02, 0x8b, 0x80, 0xd9, 0x42, 0x02, 0x5c, - 0xcc, 0x89, 0xe9, 0xa9, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x20, 0xa6, 0x90, 0x24, 0x17, - 0x47, 0x66, 0x71, 0x7c, 0x71, 0x69, 0x41, 0x6a, 0x91, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, - 0x7b, 0x66, 0x71, 0x30, 0x88, 0xeb, 0x24, 0x18, 0xc5, 0x8f, 0xb0, 0x28, 0xbe, 0x24, 0xb5, 0xb8, - 0x24, 0x89, 0x0d, 0xec, 0x00, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2e, 0x5f, 0x1a, - 0x91, 0x00, 0x00, 0x00, +func init() { file_example_proto_init() } +func file_example_proto_init() { + if File_example_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_example_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*User); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_example_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_example_proto_goTypes, + DependencyIndexes: file_example_proto_depIdxs, + MessageInfos: file_example_proto_msgTypes, + }.Build() + File_example_proto = out.File + file_example_proto_rawDesc = nil + file_example_proto_goTypes = nil + file_example_proto_depIdxs = nil } diff --git a/groupcache.go b/groupcache.go index 541e89a..480efdc 100644 --- a/groupcache.go +++ b/groupcache.go @@ -272,21 +272,78 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T } _, err := g.setGroup.Do(key, func() (interface{}, error) { + wg := sync.WaitGroup{} + errs := make(chan error) + // If remote peer owns this key owner, ok := g.peers.PickPeer(key) if ok { - if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil { + if err := g.setFromPeer(ctx, owner, key, value, expire, false); err != nil { return nil, err } // TODO(thrawn01): Not sure if this is useful outside of tests... // maybe we should ALWAYS update the local cache? - if hotCache { - g.localSet(key, value, expire, &g.hotCache) + if !hotCache { + return nil, nil } - return nil, nil + + g.localSet(key, value, expire, &g.hotCache) + + for _, peer := range g.peers.GetAll() { + if peer == owner { + // Avoid setting to owner a second time + continue + } + wg.Add(1) + go func(peer ProtoGetter) { + errs <- g.setFromPeer(ctx, peer, key, value, expire, true) + wg.Done() + }(peer) + } + + go func() { + wg.Wait() + close(errs) + }() + + var err error + for e := range errs { + if e != nil { + err = errors.Join(err, e) + } + } + + return nil, err } // We own this key g.localSet(key, value, expire, &g.mainCache) + + if hotCache { + // Also set to the hot cache of all peers + + for _, peer := range g.peers.GetAll() { + wg.Add(1) + go func(peer ProtoGetter) { + errs <- g.setFromPeer(ctx, peer, key, value, expire, true) + wg.Done() + }(peer) + } + + go func() { + wg.Wait() + close(errs) + }() + + var err error + for e := range errs { + if e != nil { + err = errors.Join(err, e) + } + } + + return nil, err + } + return nil, nil }) return err @@ -329,11 +386,11 @@ func (g *Group) Remove(ctx context.Context, key string) error { close(errs) }() - // TODO(thrawn01): Should we report all errors? Reporting context - // cancelled error for each peer doesn't make much sense. var err error for e := range errs { - err = e + if e != nil { + err = errors.Join(err, e) + } } return nil, err @@ -449,8 +506,8 @@ func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) { req := &pb.GetRequest{ - Group: &g.name, - Key: &key, + Group: g.name, + Key: key, } res := &pb.GetResponse{} err := peer.Get(ctx, req, res) @@ -459,8 +516,8 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) ( } var expire time.Time - if res.Expire != nil && *res.Expire != 0 { - expire = time.Unix(*res.Expire/int64(time.Second), *res.Expire%int64(time.Second)) + if res.Expire != 0 { + expire = time.Unix(res.Expire/int64(time.Second), res.Expire%int64(time.Second)) if time.Now().After(expire) { return ByteView{}, errors.New("peer returned expired value") } @@ -473,24 +530,26 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) ( return value, nil } -func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error { +func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, hotCache bool) error { var expire int64 if !e.IsZero() { expire = e.UnixNano() } req := &pb.SetRequest{ - Expire: &expire, - Group: &g.name, - Key: &k, - Value: v, + Expire: expire, + Group: g.name, + Key: k, + Value: v, + HotCache: hotCache, } + return peer.Set(ctx, req) } func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error { req := &pb.GetRequest{ - Group: &g.name, - Key: &key, + Group: g.name, + Key: key, } return peer.Remove(ctx, req) } diff --git a/groupcachepb/example.proto b/groupcachepb/example.proto index 92dd7d0..265d483 100644 --- a/groupcachepb/example.proto +++ b/groupcachepb/example.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -option go_package = "groupcache_test"; +option go_package = "./;groupcache_test"; package groupcachepb; diff --git a/groupcachepb/groupcache.pb.go b/groupcachepb/groupcache.pb.go index d6abd47..eeddc10 100644 --- a/groupcachepb/groupcache.pb.go +++ b/groupcachepb/groupcache.pb.go @@ -1,155 +1,354 @@ +// +//Copyright 2012 Google Inc. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.26.1 // source: groupcache.proto -/* -Package groupcachepb is a generated protocol buffer package. +package __ -It is generated from these files: - groupcache.proto +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) -It has these top-level messages: - GetRequest - GetResponse - SetRequest -*/ -package groupcachepb +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +type GetRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf + Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"` + Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` // not actually required/guaranteed to be UTF-8 +} -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +func (x *GetRequest) Reset() { + *x = GetRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_groupcache_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} -type GetRequest struct { - Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"` - Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"` - XXX_unrecognized []byte `json:"-"` +func (x *GetRequest) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *GetRequest) Reset() { *m = GetRequest{} } -func (m *GetRequest) String() string { return proto.CompactTextString(m) } -func (*GetRequest) ProtoMessage() {} -func (*GetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*GetRequest) ProtoMessage() {} -func (m *GetRequest) GetGroup() string { - if m != nil && m.Group != nil { - return *m.Group +func (x *GetRequest) ProtoReflect() protoreflect.Message { + mi := &file_groupcache_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. +func (*GetRequest) Descriptor() ([]byte, []int) { + return file_groupcache_proto_rawDescGZIP(), []int{0} +} + +func (x *GetRequest) GetGroup() string { + if x != nil { + return x.Group } return "" } -func (m *GetRequest) GetKey() string { - if m != nil && m.Key != nil { - return *m.Key +func (x *GetRequest) GetKey() string { + if x != nil { + return x.Key } return "" } type GetResponse struct { - Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"` - MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"` - Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"` - XXX_unrecognized []byte `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + MinuteQps float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps,proto3" json:"minute_qps,omitempty"` + Expire int64 `protobuf:"varint,3,opt,name=expire,proto3" json:"expire,omitempty"` } -func (m *GetResponse) Reset() { *m = GetResponse{} } -func (m *GetResponse) String() string { return proto.CompactTextString(m) } -func (*GetResponse) ProtoMessage() {} -func (*GetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (x *GetResponse) Reset() { + *x = GetResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_groupcache_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetResponse) ProtoMessage() {} + +func (x *GetResponse) ProtoReflect() protoreflect.Message { + mi := &file_groupcache_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. +func (*GetResponse) Descriptor() ([]byte, []int) { + return file_groupcache_proto_rawDescGZIP(), []int{1} +} -func (m *GetResponse) GetValue() []byte { - if m != nil { - return m.Value +func (x *GetResponse) GetValue() []byte { + if x != nil { + return x.Value } return nil } -func (m *GetResponse) GetMinuteQps() float64 { - if m != nil && m.MinuteQps != nil { - return *m.MinuteQps +func (x *GetResponse) GetMinuteQps() float64 { + if x != nil { + return x.MinuteQps } return 0 } -func (m *GetResponse) GetExpire() int64 { - if m != nil && m.Expire != nil { - return *m.Expire +func (x *GetResponse) GetExpire() int64 { + if x != nil { + return x.Expire } return 0 } type SetRequest struct { - Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"` - Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"` - XXX_unrecognized []byte `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"` + Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Expire int64 `protobuf:"varint,4,opt,name=expire,proto3" json:"expire,omitempty"` + HotCache bool `protobuf:"varint,5,opt,name=hotCache,proto3" json:"hotCache,omitempty"` } -func (m *SetRequest) Reset() { *m = SetRequest{} } -func (m *SetRequest) String() string { return proto.CompactTextString(m) } -func (*SetRequest) ProtoMessage() {} -func (*SetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (x *SetRequest) Reset() { + *x = SetRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_groupcache_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SetRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SetRequest) ProtoMessage() {} + +func (x *SetRequest) ProtoReflect() protoreflect.Message { + mi := &file_groupcache_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SetRequest.ProtoReflect.Descriptor instead. +func (*SetRequest) Descriptor() ([]byte, []int) { + return file_groupcache_proto_rawDescGZIP(), []int{2} +} -func (m *SetRequest) GetGroup() string { - if m != nil && m.Group != nil { - return *m.Group +func (x *SetRequest) GetGroup() string { + if x != nil { + return x.Group } return "" } -func (m *SetRequest) GetKey() string { - if m != nil && m.Key != nil { - return *m.Key +func (x *SetRequest) GetKey() string { + if x != nil { + return x.Key } return "" } -func (m *SetRequest) GetValue() []byte { - if m != nil { - return m.Value +func (x *SetRequest) GetValue() []byte { + if x != nil { + return x.Value } return nil } -func (m *SetRequest) GetExpire() int64 { - if m != nil && m.Expire != nil { - return *m.Expire +func (x *SetRequest) GetExpire() int64 { + if x != nil { + return x.Expire } return 0 } -func init() { - proto.RegisterType((*GetRequest)(nil), "groupcachepb.GetRequest") - proto.RegisterType((*GetResponse)(nil), "groupcachepb.GetResponse") - proto.RegisterType((*SetRequest)(nil), "groupcachepb.SetRequest") -} - -func init() { proto.RegisterFile("groupcache.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 215 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x31, 0x4b, 0xc5, 0x30, - 0x18, 0x34, 0x8d, 0x0a, 0xfd, 0xec, 0x50, 0x82, 0x48, 0x14, 0x84, 0x90, 0x29, 0x53, 0x07, 0x71, - 0x74, 0x73, 0x28, 0xb8, 0x19, 0x37, 0x17, 0x69, 0xcb, 0x87, 0x16, 0xb5, 0x49, 0x9b, 0x44, 0x7c, - 0xff, 0xfe, 0x91, 0xe6, 0x41, 0x3a, 0xbc, 0xe5, 0x6d, 0xb9, 0x3b, 0x2e, 0x77, 0xdf, 0x41, 0xfd, - 0xb9, 0x98, 0x60, 0x87, 0x6e, 0xf8, 0xc2, 0xc6, 0x2e, 0xc6, 0x1b, 0x56, 0x65, 0xc6, 0xf6, 0xf2, - 0x11, 0xa0, 0x45, 0xaf, 0x71, 0x0e, 0xe8, 0x3c, 0xbb, 0x86, 0x8b, 0x55, 0xe5, 0x44, 0x14, 0xaa, - 0xd4, 0x09, 0xb0, 0x1a, 0xe8, 0x37, 0xee, 0x78, 0xb1, 0x72, 0xf1, 0x29, 0xdf, 0xe1, 0x6a, 0x75, - 0x39, 0x6b, 0x26, 0x87, 0xd1, 0xf6, 0xd7, 0xfd, 0x04, 0xe4, 0x44, 0x10, 0x55, 0xe9, 0x04, 0xd8, - 0x3d, 0xc0, 0xef, 0x38, 0x05, 0x8f, 0x1f, 0xb3, 0x75, 0xbc, 0x10, 0x44, 0x11, 0x5d, 0x26, 0xe6, - 0xd5, 0x3a, 0x76, 0x03, 0x97, 0xf8, 0x6f, 0xc7, 0x05, 0x39, 0x15, 0x44, 0x51, 0x7d, 0x40, 0xb2, - 0x07, 0x78, 0x3b, 0xb9, 0x51, 0xae, 0x40, 0xb7, 0x15, 0x72, 0xc6, 0xf9, 0x36, 0xe3, 0xe1, 0x05, - 0xa0, 0x8d, 0x1f, 0x3d, 0xc7, 0x15, 0xd8, 0x13, 0xd0, 0x16, 0x3d, 0xe3, 0xcd, 0x76, 0x99, 0x26, - 0xcf, 0x72, 0x77, 0x7b, 0x44, 0x49, 0xa7, 0xcb, 0xb3, 0x7d, 0x00, 0x00, 0x00, 0xff, 0xff, 0x02, - 0x10, 0x64, 0xec, 0x62, 0x01, 0x00, 0x00, +func (x *SetRequest) GetHotCache() bool { + if x != nil { + return x.HotCache + } + return false +} + +var File_groupcache_proto protoreflect.FileDescriptor + +var file_groupcache_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x0c, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, + 0x22, 0x34, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, + 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, + 0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x5a, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, + 0x69, 0x6e, 0x75, 0x74, 0x65, 0x5f, 0x71, 0x70, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, + 0x09, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x51, 0x70, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x78, + 0x70, 0x69, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x65, 0x78, 0x70, 0x69, + 0x72, 0x65, 0x22, 0x7e, 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, + 0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, + 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x74, 0x43, 0x61, 0x63, + 0x68, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x68, 0x6f, 0x74, 0x43, 0x61, 0x63, + 0x68, 0x65, 0x32, 0x4a, 0x0a, 0x0a, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x61, 0x63, 0x68, 0x65, + 0x12, 0x3c, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x18, 0x2e, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, + 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x19, 0x2e, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, + 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x04, + 0x5a, 0x02, 0x2e, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_groupcache_proto_rawDescOnce sync.Once + file_groupcache_proto_rawDescData = file_groupcache_proto_rawDesc +) + +func file_groupcache_proto_rawDescGZIP() []byte { + file_groupcache_proto_rawDescOnce.Do(func() { + file_groupcache_proto_rawDescData = protoimpl.X.CompressGZIP(file_groupcache_proto_rawDescData) + }) + return file_groupcache_proto_rawDescData +} + +var file_groupcache_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_groupcache_proto_goTypes = []interface{}{ + (*GetRequest)(nil), // 0: groupcachepb.GetRequest + (*GetResponse)(nil), // 1: groupcachepb.GetResponse + (*SetRequest)(nil), // 2: groupcachepb.SetRequest +} +var file_groupcache_proto_depIdxs = []int32{ + 0, // 0: groupcachepb.GroupCache.Get:input_type -> groupcachepb.GetRequest + 1, // 1: groupcachepb.GroupCache.Get:output_type -> groupcachepb.GetResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_groupcache_proto_init() } +func file_groupcache_proto_init() { + if File_groupcache_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_groupcache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_groupcache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_groupcache_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SetRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_groupcache_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_groupcache_proto_goTypes, + DependencyIndexes: file_groupcache_proto_depIdxs, + MessageInfos: file_groupcache_proto_msgTypes, + }.Build() + File_groupcache_proto = out.File + file_groupcache_proto_rawDesc = nil + file_groupcache_proto_goTypes = nil + file_groupcache_proto_depIdxs = nil } diff --git a/groupcachepb/groupcache.proto b/groupcachepb/groupcache.proto index a24b410..066e16c 100644 --- a/groupcachepb/groupcache.proto +++ b/groupcachepb/groupcache.proto @@ -14,29 +14,31 @@ See the License for the specific language governing permissions and limitations under the License. */ -syntax = "proto2"; +syntax = "proto3"; package groupcachepb; +option go_package = "./"; message GetRequest { - required string group = 1; - required string key = 2; // not actually required/guaranteed to be UTF-8 + string group = 1; + string key = 2; // not actually required/guaranteed to be UTF-8 } message GetResponse { - optional bytes value = 1; - optional double minute_qps = 2; - optional int64 expire = 3; + bytes value = 1; + double minute_qps = 2; + int64 expire = 3; } message SetRequest { - required string group = 1; - required string key = 2; - optional bytes value = 3; - optional int64 expire = 4; + string group = 1; + string key = 2; + bytes value = 3; + int64 expire = 4; + bool hotCache = 5; } service GroupCache { rpc Get(GetRequest) returns (GetResponse) { - }; + } } diff --git a/http.go b/http.go index 32e1f90..c9a683b 100644 --- a/http.go +++ b/http.go @@ -213,11 +213,16 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { } var expire time.Time - if out.Expire != nil && *out.Expire != 0 { - expire = time.Unix(*out.Expire/int64(time.Second), *out.Expire%int64(time.Second)) + if out.Expire != 0 { + expire = time.Unix(out.Expire/int64(time.Second), out.Expire%int64(time.Second)) } - group.localSet(*out.Key, out.Value, expire, &group.mainCache) + c := &group.mainCache + if out.HotCache { + c = &group.hotCache + } + + group.localSet(out.Key, out.Value, expire, c) return } @@ -245,7 +250,7 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Write the value to the response body as a proto message. - body, err := proto.Marshal(&pb.GetResponse{Value: b, Expire: &expireNano}) + body, err := proto.Marshal(&pb.GetResponse{Value: b, Expire: expireNano}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/http_test.go b/http_test.go index 23f9a7b..4026ef0 100644 --- a/http_test.go +++ b/http_test.go @@ -95,7 +95,9 @@ func TestHTTPPool(t *testing.T) { wg.Wait() // Use a dummy self address so that we don't handle gets in-process. - p := NewHTTPPool("should-be-ignored") + p, mux := newTestHTTPPool("should-be-ignored") + defer mux.Close() + p.Set(addrToURL(childAddr)...) // Dummy getter function. Gets should go to children only. @@ -219,7 +221,8 @@ func testKeys(n int) (keys []string) { func beChildForTestHTTPPool(t *testing.T) { addrs := strings.Split(*peerAddrs, ",") - p := NewHTTPPool("http://" + addrs[*peerIndex]) + p, mux := newTestHTTPPool("http://" + addrs[*peerIndex]) + defer mux.Close() p.Set(addrToURL(addrs)...) getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { @@ -286,3 +289,45 @@ func awaitAddrReady(t *testing.T, addr string, wg *sync.WaitGroup) { time.Sleep(delay) } } + +type serveMux struct { + mux *http.ServeMux + handlers map[string]http.Handler +} + +func newTestHTTPPool(self string) (*HTTPPool, *serveMux) { + httpPoolMade, portPicker = false, nil // Testing only + + p := NewHTTPPoolOpts(self, nil) + sm := &serveMux{ + mux: http.NewServeMux(), + handlers: make(map[string]http.Handler), + } + + sm.handlers[p.opts.BasePath] = p + + return p, sm +} + +func (s *serveMux) Handle(pattern string, handler http.Handler) { + s.handlers[pattern] = handler + s.mux.Handle(pattern, handler) +} + +func (s *serveMux) Close() { + for pattern := range s.handlers { + delete(s.handlers, pattern) + } +} + +func (s *serveMux) RemoveHandle(pattern string) { + delete(s.handlers, pattern) +} + +func (s *serveMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if _, ok := s.handlers[r.URL.Path]; ok { + s.mux.ServeHTTP(w, r) + } else { + http.NotFound(w, r) + } +} diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..f7f55a4 --- /dev/null +++ b/integration_test.go @@ -0,0 +1,166 @@ +package groupcache + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestManualSet(t *testing.T) { + if *peerChild { + beChildForIntegrationTest(t) + os.Exit(0) + } + + const ( + nChild = 4 + nGets = 100 + ) + + var serverHits int + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "Hello") + serverHits++ + })) + defer ts.Close() + + var childAddr []string + for i := 0; i < nChild; i++ { + childAddr = append(childAddr, pickFreeAddr(t)) + } + + var cmds []*exec.Cmd + var wg sync.WaitGroup + for i := 0; i < nChild; i++ { + cmd := exec.Command(os.Args[0], + "--test.run=TestManualSet", + "--test_peer_child", + "--test_peer_addrs="+strings.Join(childAddr, ","), + "--test_peer_index="+strconv.Itoa(i), + "--test_server_addr="+ts.URL, + ) + cmds = append(cmds, cmd) + cmd.Stdout = os.Stdout + wg.Add(1) + if err := cmd.Start(); err != nil { + t.Fatal("failed to start child process: ", err) + } + go awaitAddrReady(t, childAddr[i], &wg) + } + defer func() { + for i := 0; i < nChild; i++ { + if cmds[i].Process != nil { + cmds[i].Process.Kill() + } + } + }() + wg.Wait() + + // Use a dummy self address so that we don't handle gets in-process. + p, mux := newTestHTTPPool("should-be-ignored") + defer mux.Close() + + p.Set(addrToURL(childAddr)...) + + // Dummy getter function. Gets should go to children only. + // The only time this process will handle a get is when the + // children can't be contacted for some reason. + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { + return errors.New("parent getter called; something's wrong") + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + assertLocalGets := func(key, expectedValue string) { + for _, addr := range childAddr { + resp, err := http.Get("http://" + addr + "/local/" + key) + assert.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + bytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, expectedValue, string(bytes)) + } + } + g := NewGroup("integrationTest", 1<<20, getter) + + var got string + err := g.Get(ctx, "key-0", StringSink(&got)) + assert.NoError(t, err) + assert.Equal(t, "got:key-0", got) + // Since nodes have hot caches, we assert that the localGets are returning the right data + assertLocalGets("key-0", "got:key-0") + + // Manually set the value in the cache + overwrite := "manual-set" + err = g.Set(ctx, "key-0", []byte(overwrite), time.Time{}, true) + assert.NoError(t, err) + + err = g.Get(ctx, "key-0", StringSink(&got)) + assert.NoError(t, err) + assert.Equal(t, overwrite, got) + assertLocalGets("key-0", overwrite) +} + +type overwriteHttpPool struct { + g *Group + p *HTTPPool +} + +// ServeHTTP implements http.Handler. +func (o overwriteHttpPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/local/") { + fmt.Printf("peer %d group.Get(%s)\n", *peerIndex, r.URL.Path) + key := strings.TrimPrefix(r.URL.Path, "/local/") + // Custom logic here + // For example, you can write the key to the response + var got string + err := o.g.Get(r.Context(), key, StringSink(&got)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + _, _ = w.Write([]byte(got)) + return + } + + // Call the original handler + o.p.ServeHTTP(w, r) +} + +var _ http.Handler = (*overwriteHttpPool)(nil) + +func beChildForIntegrationTest(t *testing.T) { + addrs := strings.Split(*peerAddrs, ",") + + p, mux := newTestHTTPPool("http://" + addrs[*peerIndex]) + defer mux.Close() + + hp := overwriteHttpPool{ + p: p, + } + hp.p.Set(addrToURL(addrs)...) + + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { + return dest.SetString("got:"+key, time.Time{}) + }) + hp.g = NewGroup("integrationTest", 1<<20, getter) + + log.Printf("Listening on %s\n", addrs[*peerIndex]) + log.Fatal(http.ListenAndServe(addrs[*peerIndex], hp)) +} diff --git a/proto.sh b/proto.sh index e710926..b8fb4d2 100755 --- a/proto.sh +++ b/proto.sh @@ -13,4 +13,4 @@ protoc -I=$PROTO_DIR \ protoc -I=$PROTO_DIR \ --go_out=. \ - $PROTO_DIR/example.proto + $PROTO_DIR/example.proto && mv ./example.pb.go ./example_pb_test.go