From da2a3f63b076449a37d27b3181bd7c6da7b6d3f3 Mon Sep 17 00:00:00 2001 From: alxtkr77 <3098237+alxtkr77@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:06:27 +0300 Subject: [PATCH] [NUC-233] Ensure the call to setShardSequenceNumberInPersistency fails for non-existing or non-shard objects. (#151) --- pkg/common/helper.go | 18 ++++++++++++++++++ pkg/dataplane/streamconsumergroup/claim.go | 4 +++- .../streamconsumergroup/streamconsumergroup.go | 4 ++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/common/helper.go b/pkg/common/helper.go index 6177456..86bb110 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -24,6 +24,7 @@ import ( "context" "reflect" "runtime" + "strings" "time" "github.com/nuclio/errors" @@ -170,3 +171,20 @@ func StringSlicesEqual(slice1 []string, slice2 []string) bool { return true } + +func EngineErrorIsNonFatal(err error) bool { + var nonFatalEngineErrorsPartialMatch = []string{ + "dialing to the given TCP address timed out", + "timeout", + "refused", + } + if err != nil && len(err.Error()) > 0 { + for _, nonFatalError := range nonFatalEngineErrorsPartialMatch { + if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) { + return true + } + } + return false + } + return true +} diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 8e6e45c..6555a15 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -129,7 +129,9 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time &c.getShardLocationBackoff, func(attempt int) (bool, error) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { - + if common.EngineErrorIsNonFatal(err) { + return true, errors.Wrap(err, "Failed to get shard location due to a network error") + } // requested for an immediate stop if err == v3ioerrors.ErrStopped { return false, nil diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index 8a21087..fd5b5a6 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -118,6 +118,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) { return true, errors.Wrap(err, "Failed getting current state from persistency") } + if common.EngineErrorIsNonFatal(err) { + return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error") + } if state == nil { state, err = newState() @@ -367,6 +370,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int, Attributes: map[string]interface{}{ scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber, }, + Condition: "__obj_type == 3", }) return err }