From bf7743877b27fad6b9883b745ebd8c230d857c81 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Mon, 20 Nov 2023 18:35:46 +0100 Subject: [PATCH] fix call vs request --- _examples/actor-logging/main.go | 5 ++-- .../cluster-restartgracefully/client/main.go | 4 +-- cluster/cluster_config_context.go | 2 -- cluster/cluster_test.go | 2 +- cluster/config.go | 4 +-- cluster/context.go | 4 +-- cluster/default_context.go | 27 +++++++------------ cluster/grain.go | 11 +++++--- 8 files changed, 26 insertions(+), 33 deletions(-) diff --git a/_examples/actor-logging/main.go b/_examples/actor-logging/main.go index 2028de876..52fdef922 100644 --- a/_examples/actor-logging/main.go +++ b/_examples/actor-logging/main.go @@ -38,7 +38,8 @@ func consoleLogging(system *actor.ActorSystem) *slog.Logger { func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger { return slog.New(tint.NewHandler(os.Stdout, &tint.Options{ Level: slog.LevelDebug, - TimeFormat: time.Kitchen, + TimeFormat: time.RFC3339, + AddSource: true, })).With("lib", "Proto.Actor"). With("system", system.ID) } @@ -54,7 +55,7 @@ func zapAdapterLogging(system *actor.ActorSystem) *slog.Logger { func main() { - system := actor.NewActorSystem(actor.WithLoggerFactory(zapAdapterLogging)) + system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging)) props := actor.PropsFromProducer(func() actor.Actor { return &helloActor{} }) diff --git a/_examples/cluster-restartgracefully/client/main.go b/_examples/cluster-restartgracefully/client/main.go index 42b5a31ae..ad90523e0 100644 --- a/_examples/cluster-restartgracefully/client/main.go +++ b/_examples/cluster-restartgracefully/client/main.go @@ -103,7 +103,7 @@ func runClientsAll(clients int, loops int, interval time.Duration) { func runClient(grainId string, loops int, interval time.Duration) { now := time.Now() calcGrain := shared.GetCalculatorGrainClient(_cluster, grainId) - resp, err := calcGrain.GetCurrent(&shared.Void{}, cluster.WithRetry(3), cluster.WithTimeout(6*time.Second)) + resp, err := calcGrain.GetCurrent(&shared.Void{}, cluster.WithRetryCount(3), cluster.WithTimeout(6*time.Second)) if err != nil { _cluster.Shutdown(true) panic(err) @@ -125,7 +125,7 @@ func runClient(grainId string, loops int, interval time.Duration) { func calcAdd(grainId string, addNumber int64) int64 { calcGrain := shared.GetCalculatorGrainClient(_cluster, grainId) - resp, err := calcGrain.Add(&shared.NumberRequest{Number: addNumber}, cluster.WithRetry(3), cluster.WithTimeout(6*time.Second)) + resp, err := calcGrain.Add(&shared.NumberRequest{Number: addNumber}, cluster.WithRetryCount(3), cluster.WithTimeout(6*time.Second)) if err != nil { plog.Error("call grain failed", log.Error(err)) } diff --git a/cluster/cluster_config_context.go b/cluster/cluster_config_context.go index 7675bc75a..33e23b709 100644 --- a/cluster/cluster_config_context.go +++ b/cluster/cluster_config_context.go @@ -16,9 +16,7 @@ const ( // ClusterContextConfig is used to configure cluster context parameters type ClusterContextConfig struct { - ActorRequestTimeout time.Duration RequestsLogThrottlePeriod time.Duration MaxNumberOfEventsInRequestLogThrottledPeriod int - RetryAction func(int) int requestLogThrottle actor.ShouldThrottle } diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index fe895f03f..a0ccdaa7c 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -139,7 +139,7 @@ func TestCluster_Call(t *testing.T) { // FIXME: testcase // t.Run("timeout", func(t *testing.T) { // msg := struct{}{} - // callopts := NewGrainCallOptions(c).WithRetry(2).WithRequestTimeout(1 * time.Second) + // callopts := NewGrainCallOptions(c).WithRetryCount(2).WithRequestTimeout(1 * time.Second) // resp, err := c.Call("name", "kind", &msg, callopts) // assert.Equalf(Remote.ErrUnknownError, err, "%v", err) // assert.Nil(resp) diff --git a/cluster/config.go b/cluster/config.go index be05921be..f20edbfbd 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -62,10 +62,10 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook // into a valid ClusterContextConfig value and returns a pointer to its memory func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig { clusterContextConfig := ClusterContextConfig{ - ActorRequestTimeout: c.RequestTimeoutTime, + RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod, MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod, - RetryAction: defaultRetryAction, + requestLogThrottle: actor.NewThrottleWithLogger(logger, int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod), defaultRequestsLogThrottlePeriod, diff --git a/cluster/context.go b/cluster/context.go index 449f3594d..a81bea937 100644 --- a/cluster/context.go +++ b/cluster/context.go @@ -1,8 +1,6 @@ package cluster -import "time" - // Context is an interface any cluster context needs to implement type Context interface { - Request(identity string, kind string, message interface{}, timeout ...time.Duration) (interface{}, error) + Request(identity string, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error) } diff --git a/cluster/default_context.go b/cluster/default_context.go index 663790117..94ced03ce 100644 --- a/cluster/default_context.go +++ b/cluster/default_context.go @@ -32,12 +32,18 @@ func newDefaultClusterContext(cluster *Cluster) Context { return &clusterContext } -func (dcc *DefaultContext) Request(identity, kind string, message interface{}, timeout ...time.Duration) (interface{}, error) { +func (dcc *DefaultContext) Request(identity, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error) { var err error var resp interface{} var counter int + callConfig := DefaultGrainCallConfig(dcc.cluster) + for _, o := range opts { + o(callConfig) + } + + _context := callConfig.Context // get the configuration from the composed Cluster value cfg := dcc.cluster.Config.ToClusterContextConfig(dcc.cluster.Logger()) @@ -47,15 +53,11 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, t dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message)) // crate a new Timeout Context - ttl := cfg.ActorRequestTimeout - if len(timeout) > 0 { - ttl = timeout[0] - } + ttl := callConfig.Timeout ctx, cancel := context.WithTimeout(context.Background(), ttl) defer cancel() - _context := dcc.cluster.ActorSystem.Root selectloop: for { select { @@ -68,8 +70,7 @@ selectloop: pid := dcc.getCachedPid(identity, kind) if pid == nil { dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind)) - counter = cfg.RetryAction(counter) - + counter = callConfig.RetryAction(counter) continue } @@ -78,7 +79,7 @@ selectloop: dcc.cluster.Logger().Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid)) switch err { case actor.ErrTimeout, remote.ErrTimeout, actor.ErrDeadLetter, remote.ErrDeadLetter: - counter = cfg.RetryAction(counter) + counter = callConfig.RetryAction(counter) dcc.cluster.PidCache.Remove(identity, kind) err = nil // reset our error variable as we can succeed still @@ -110,11 +111,3 @@ func (dcc *DefaultContext) getCachedPid(identity, kind string) *actor.PID { return pid } - -// default retry action, it just sleeps incrementally. -func defaultRetryAction(i int) int { - i++ - time.Sleep(time.Duration(i * i * 50)) - - return i -} diff --git a/cluster/grain.go b/cluster/grain.go index a1aabecf2..912ca8e0c 100644 --- a/cluster/grain.go +++ b/cluster/grain.go @@ -9,7 +9,7 @@ import ( type GrainCallConfig struct { RetryCount int Timeout time.Duration - RetryAction func(n int) + RetryAction func(n int) int Context actor.SenderContext } @@ -26,11 +26,14 @@ func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig { func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig { return &GrainCallConfig{ + //TODO: set default in config RetryCount: 10, + Context: cluster.ActorSystem.Root, Timeout: cluster.Config.RequestTimeoutTime, - RetryAction: func(i int) { + RetryAction: func(i int) int { i++ time.Sleep(time.Duration(i * i * 50)) + return i }, } } @@ -41,13 +44,13 @@ func WithTimeout(timeout time.Duration) GrainCallOption { } } -func WithRetry(count int) GrainCallOption { +func WithRetryCount(count int) GrainCallOption { return func(config *GrainCallConfig) { config.RetryCount = count } } -func WithRetryAction(act func(i int)) GrainCallOption { +func WithRetryAction(act func(i int) int) GrainCallOption { return func(config *GrainCallConfig) { config.RetryAction = act }