Skip to content

Commit

Permalink
fix call vs request
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 20, 2023
1 parent 9122e54 commit bf77438
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 33 deletions.
5 changes: 3 additions & 2 deletions _examples/actor-logging/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func consoleLogging(system *actor.ActorSystem) *slog.Logger {
func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger {
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: slog.LevelDebug,
TimeFormat: time.Kitchen,
TimeFormat: time.RFC3339,
AddSource: true,
})).With("lib", "Proto.Actor").
With("system", system.ID)
}
Expand All @@ -54,7 +55,7 @@ func zapAdapterLogging(system *actor.ActorSystem) *slog.Logger {

func main() {

system := actor.NewActorSystem(actor.WithLoggerFactory(zapAdapterLogging))
system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging))

props := actor.PropsFromProducer(func() actor.Actor { return &helloActor{} })

Expand Down
4 changes: 2 additions & 2 deletions _examples/cluster-restartgracefully/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runClientsAll(clients int, loops int, interval time.Duration) {
func runClient(grainId string, loops int, interval time.Duration) {
now := time.Now()
calcGrain := shared.GetCalculatorGrainClient(_cluster, grainId)
resp, err := calcGrain.GetCurrent(&shared.Void{}, cluster.WithRetry(3), cluster.WithTimeout(6*time.Second))
resp, err := calcGrain.GetCurrent(&shared.Void{}, cluster.WithRetryCount(3), cluster.WithTimeout(6*time.Second))
if err != nil {
_cluster.Shutdown(true)
panic(err)
Expand All @@ -125,7 +125,7 @@ func runClient(grainId string, loops int, interval time.Duration) {

func calcAdd(grainId string, addNumber int64) int64 {
calcGrain := shared.GetCalculatorGrainClient(_cluster, grainId)
resp, err := calcGrain.Add(&shared.NumberRequest{Number: addNumber}, cluster.WithRetry(3), cluster.WithTimeout(6*time.Second))
resp, err := calcGrain.Add(&shared.NumberRequest{Number: addNumber}, cluster.WithRetryCount(3), cluster.WithTimeout(6*time.Second))
if err != nil {
plog.Error("call grain failed", log.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions cluster/cluster_config_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ const (

// ClusterContextConfig is used to configure cluster context parameters
type ClusterContextConfig struct {
ActorRequestTimeout time.Duration
RequestsLogThrottlePeriod time.Duration
MaxNumberOfEventsInRequestLogThrottledPeriod int
RetryAction func(int) int
requestLogThrottle actor.ShouldThrottle
}
2 changes: 1 addition & 1 deletion cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestCluster_Call(t *testing.T) {
// FIXME: testcase
// t.Run("timeout", func(t *testing.T) {
// msg := struct{}{}
// callopts := NewGrainCallOptions(c).WithRetry(2).WithRequestTimeout(1 * time.Second)
// callopts := NewGrainCallOptions(c).WithRetryCount(2).WithRequestTimeout(1 * time.Second)
// resp, err := c.Call("name", "kind", &msg, callopts)
// assert.Equalf(Remote.ErrUnknownError, err, "%v", err)
// assert.Nil(resp)
Expand Down
4 changes: 2 additions & 2 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook
// into a valid ClusterContextConfig value and returns a pointer to its memory
func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig {
clusterContextConfig := ClusterContextConfig{
ActorRequestTimeout: c.RequestTimeoutTime,

RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod,
RetryAction: defaultRetryAction,

requestLogThrottle: actor.NewThrottleWithLogger(logger,
int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod),
defaultRequestsLogThrottlePeriod,
Expand Down
4 changes: 1 addition & 3 deletions cluster/context.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cluster

import "time"

// Context is an interface any cluster context needs to implement
type Context interface {
Request(identity string, kind string, message interface{}, timeout ...time.Duration) (interface{}, error)
Request(identity string, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)
}
27 changes: 10 additions & 17 deletions cluster/default_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ func newDefaultClusterContext(cluster *Cluster) Context {
return &clusterContext
}

func (dcc *DefaultContext) Request(identity, kind string, message interface{}, timeout ...time.Duration) (interface{}, error) {
func (dcc *DefaultContext) Request(identity, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error) {
var err error

var resp interface{}

var counter int
callConfig := DefaultGrainCallConfig(dcc.cluster)
for _, o := range opts {
o(callConfig)
}

_context := callConfig.Context

// get the configuration from the composed Cluster value
cfg := dcc.cluster.Config.ToClusterContextConfig(dcc.cluster.Logger())
Expand All @@ -47,15 +53,11 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, t
dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))

// crate a new Timeout Context
ttl := cfg.ActorRequestTimeout
if len(timeout) > 0 {
ttl = timeout[0]
}
ttl := callConfig.Timeout

ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()

_context := dcc.cluster.ActorSystem.Root
selectloop:
for {
select {
Expand All @@ -68,8 +70,7 @@ selectloop:
pid := dcc.getCachedPid(identity, kind)
if pid == nil {
dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind))
counter = cfg.RetryAction(counter)

counter = callConfig.RetryAction(counter)
continue
}

Expand All @@ -78,7 +79,7 @@ selectloop:
dcc.cluster.Logger().Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid))
switch err {
case actor.ErrTimeout, remote.ErrTimeout, actor.ErrDeadLetter, remote.ErrDeadLetter:
counter = cfg.RetryAction(counter)
counter = callConfig.RetryAction(counter)
dcc.cluster.PidCache.Remove(identity, kind)
err = nil // reset our error variable as we can succeed still

Expand Down Expand Up @@ -110,11 +111,3 @@ func (dcc *DefaultContext) getCachedPid(identity, kind string) *actor.PID {

return pid
}

// default retry action, it just sleeps incrementally.
func defaultRetryAction(i int) int {
i++
time.Sleep(time.Duration(i * i * 50))

return i
}
11 changes: 7 additions & 4 deletions cluster/grain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type GrainCallConfig struct {
RetryCount int
Timeout time.Duration
RetryAction func(n int)
RetryAction func(n int) int
Context actor.SenderContext
}

Expand All @@ -26,11 +26,14 @@ func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig {

func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig {
return &GrainCallConfig{
//TODO: set default in config
RetryCount: 10,
Context: cluster.ActorSystem.Root,
Timeout: cluster.Config.RequestTimeoutTime,
RetryAction: func(i int) {
RetryAction: func(i int) int {
i++
time.Sleep(time.Duration(i * i * 50))
return i
},
}
}
Expand All @@ -41,13 +44,13 @@ func WithTimeout(timeout time.Duration) GrainCallOption {
}
}

func WithRetry(count int) GrainCallOption {
func WithRetryCount(count int) GrainCallOption {
return func(config *GrainCallConfig) {
config.RetryCount = count
}
}

func WithRetryAction(act func(i int)) GrainCallOption {
func WithRetryAction(act func(i int) int) GrainCallOption {
return func(config *GrainCallConfig) {
config.RetryAction = act
}
Expand Down

0 comments on commit bf77438

Please sign in to comment.