Skip to content

Commit

Permalink
Fix kafka error handling (#13)
Browse files Browse the repository at this point in the history
* fix: added kafka error handling for republish events

* chore: deleted package-lock.json

* fix: removed republishing errorThreshold, added debugging logs

* fix: added special republishing error handling if kafka is not available

* fix: added debugging for kafka error handling

* fix: added debug log message for sarama errorType

* fix: changed kafaka error checking; deleted unused unit tests

* fix: refactored kafka error handling

* fix: added list of errors for special handling

* fix: removed debug log entries and set default deliveringStatesOffset to 70m

* fix: refactored log entries and levels

---------

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
Schnix84 and WRichter72 authored Aug 28, 2024
1 parent f55fd52 commit 01712d7
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 25 deletions.
2 changes: 0 additions & 2 deletions internal/circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func calculateExponentialBackoff(cbMessage message.CircuitBreakerMessage) time.D
// If the circuit breaker counter is 2 it is the first retry, because the counter had already been incremented immediately before
exponentialBackoff := exponentialBackoffBase * time.Duration(math.Pow(2, float64(cbMessage.LoopCounter-1)))

log.Debug().Msgf("Math.Pow: %v", math.Pow(2, float64(cbMessage.LoopCounter-1)))

// Limit the exponential backoff to the max backoff
if (exponentialBackoff > exponentialBackoffMax) || (exponentialBackoff < 0) {
exponentialBackoff = exponentialBackoffMax
Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func setDefaults() {
viper.SetDefault("healthCheck.coolDownTime", "30s")
viper.SetDefault("republishing.checkInterval", "30s")
viper.SetDefault("republishing.batchSize", 10)
viper.SetDefault("republishing.throttlingIntervalTime", "10s")
viper.SetDefault("republishing.deliveringStatesOffset", "15m")
viper.SetDefault("republishing.throttlingIntervalTime", "1s")
viper.SetDefault("republishing.deliveringStatesOffset", "70m")

// Caches
viper.SetDefault("hazelcast.caches.subscriptionCache", "subscriptions.subscriber.horizon.telekom.de.v1")
Expand Down
12 changes: 7 additions & 5 deletions internal/handler/delivering.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
)

func CheckDeliveringEvents() {
log.Info().Msgf("Republish messages in state DELIVERING")

var ctx = cache.DeliveringHandler.NewLockContext(context.Background())

if acquired, _ := cache.DeliveringHandler.TryLockWithTimeout(ctx, cache.DeliveringLockKey, 10*time.Millisecond); !acquired {
Expand Down Expand Up @@ -45,7 +47,7 @@ func CheckDeliveringEvents() {

dbMessages, lastCursor, err := mongo.CurrentConnection.FindDeliveringMessagesByDeliveryType(upperThresholdTimestamp, lastCursor)
if err != nil {
log.Error().Msgf("Error while fetching DELIVERING messages from MongoDb: %v", err)
log.Error().Err(err).Msgf("Error while fetching DELIVERING messages from MongoDb")
return
}

Expand All @@ -57,13 +59,13 @@ func CheckDeliveringEvents() {
for _, dbMessage := range dbMessages {

if dbMessage.Coordinates == nil {
log.Printf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage)
log.Warn().Msgf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage)
return
}

message, err := picker.Pick(&dbMessage)
if err != nil {
log.Printf("Error while fetching message from kafka for subscriptionId %s: %v", dbMessage.SubscriptionId, err)
log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", dbMessage.SubscriptionId)
return
}

Expand All @@ -79,10 +81,10 @@ func CheckDeliveringEvents() {

err = kafka.CurrentHandler.RepublishMessage(traceCtx, message, "", "", false)
if err != nil {
log.Printf("Error while republishing message for subscriptionId %s: %v", dbMessage.SubscriptionId, err)
log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", dbMessage.SubscriptionId)
return
}
log.Printf("Successfully republished message for subscriptionId %s", dbMessage.SubscriptionId)
log.Debug().Msgf("Successfully republished message in state DELIVERING for subscriptionId %s", dbMessage.SubscriptionId)
}

if len(dbMessages) < int(batchSize) {
Expand Down
16 changes: 9 additions & 7 deletions internal/handler/failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
)

func CheckFailedEvents() {
log.Info().Msgf("Republish messages in state FAILED")

var ctx = cache.FailedHandler.NewLockContext(context.Background())

if acquired, _ := cache.FailedHandler.TryLockWithTimeout(ctx, cache.FailedLockKey, 10*time.Millisecond); !acquired {
Expand All @@ -37,7 +39,7 @@ func CheckFailedEvents() {

picker, err := kafka.NewPicker()
if err != nil {
log.Error().Err(err).Msg("Could not initialize picker for handling events in state DELIVERING")
log.Error().Err(err).Msg("Could not initialize picker for handling events in state FAILED")
return
}
defer picker.Close()
Expand All @@ -46,7 +48,7 @@ func CheckFailedEvents() {
var lastCursor any
dbMessages, _, err = mongo.CurrentConnection.FindFailedMessagesWithCallbackUrlNotFoundException(time.Now(), lastCursor)
if err != nil {
log.Error().Err(err).Msgf("Error while fetching messages for subscription from db")
log.Error().Err(err).Msgf("Error while fetching FAILED messages from MongoDb")
return
}

Expand All @@ -59,7 +61,7 @@ func CheckFailedEvents() {

subscription, err := cache.SubscriptionCache.Get(config.Current.Hazelcast.Caches.SubscriptionCache, subscriptionId)
if err != nil {
log.Printf("Error while fetching republishing entry for subscriptionId %s: %v", subscriptionId, err)
log.Error().Err(err).Msgf("Error while fetching republishing entry for subscriptionId %s", subscriptionId)
return
}

Expand All @@ -68,13 +70,13 @@ func CheckFailedEvents() {
var newDeliveryType = "SERVER_SENT_EVENT"

if dbMessage.Coordinates == nil {
log.Printf("Coordinates in message for subscriptionId %s are nil: %v", subscriptionId, dbMessage)
log.Warn().Msgf("Coordinates in message for subscriptionId %s are nil: %v", dbMessage.SubscriptionId, dbMessage)
return
}

kafkaMessage, err := picker.Pick(&dbMessage)
if err != nil {
log.Printf("Error while fetching message from kafka for subscriptionId %s: %v", subscriptionId, err)
log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", subscriptionId)
return
}

Expand All @@ -90,10 +92,10 @@ func CheckFailedEvents() {

err = kafka.CurrentHandler.RepublishMessage(traceCtx, kafkaMessage, newDeliveryType, "", true)
if err != nil {
log.Printf("Error while republishing message for subscriptionId %s: %v", subscriptionId, err)
log.Error().Err(err).Msgf("Error while republishing message for subscriptionId %s", subscriptionId)
return
}
log.Printf("Successfully republished message for subscriptionId %s", subscriptionId)
log.Debug().Msgf("Successfully republished message in state FAILED for subscriptionId %s", subscriptionId)

}
}
Expand Down
48 changes: 41 additions & 7 deletions internal/republish/republish.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package republish
import (
"context"
"encoding/gob"
"errors"
"github.com/1pkg/gohalt"
"github.com/IBM/sarama"
"github.com/rs/zerolog/log"
"github.com/telekom/pubsub-horizon-go/message"
"github.com/telekom/pubsub-horizon-go/resource"
"github.com/telekom/pubsub-horizon-go/tracing"
"net"
"pubsub-horizon-golaris/internal/cache"
"pubsub-horizon-golaris/internal/config"
"pubsub-horizon-golaris/internal/kafka"
Expand Down Expand Up @@ -86,7 +89,11 @@ func HandleRepublishingEntry(subscription *resource.SubscriptionResource) {
}
}()

republishPendingEventsFunc(subscription, castedRepublishCacheEntry)
err = republishPendingEventsFunc(subscription, castedRepublishCacheEntry)
if err != nil {
log.Error().Err(err).Msgf("Error while republishing pending events for subscriptionId %s. Discarding rebublishing cache entry", subscriptionId)
return
}

err = cache.RepublishingCache.Delete(ctx, subscriptionId)
if err != nil {
Expand All @@ -100,16 +107,19 @@ func HandleRepublishingEntry(subscription *resource.SubscriptionResource) {
// RepublishPendingEvents handles the republishing of pending events for a given subscription.
// The function fetches waiting events from the database and republishes them to Kafka.
// The function takes a subscriptionId as a parameter.
func RepublishPendingEvents(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) {
func RepublishPendingEvents(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) error {
var subscriptionId = subscription.Spec.Subscription.SubscriptionId
log.Info().Msgf("Republishing pending events for subscription %s", subscriptionId)

picker, err := kafka.NewPicker()

// Returning an error results in NOT deleting the republishingEntry from the cache
// so that the republishing job will get retried shortly
if err != nil {
log.Error().Err(err).Fields(map[string]any{
"subscriptionId": subscriptionId,
}).Msg("Could not create picker for subscription")
return
return err
}
defer picker.Close()

Expand All @@ -122,7 +132,7 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi
for {
if cache.GetCancelStatus(subscriptionId) {
log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId)
return
return nil
}

var dbMessages []message.StatusMessage
Expand All @@ -149,9 +159,10 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi
}

for _, dbMessage := range dbMessages {

if cache.GetCancelStatus(subscriptionId) {
log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId)
return
return nil
}

for {
Expand All @@ -161,7 +172,7 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi
for slept := time.Duration(0); slept < totalSleepTime; slept += sleepInterval {
if cache.GetCancelStatus(subscriptionId) {
log.Info().Msgf("Republishing for subscription %s has been cancelled", subscriptionId)
return
return nil
}

time.Sleep(sleepInterval)
Expand All @@ -188,7 +199,29 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi

kafkaMessage, err := picker.Pick(&dbMessage)
if err != nil {
log.Printf("Error while fetching message from kafka for subscriptionId %s: %v", subscriptionId, err)
// Returning an error results in NOT deleting the republishingEntry from the cache
// so that the republishing job will get retried shortly
var nErr *net.OpError
if errors.As(err, &nErr) {
return err
}

var errorList = []error{
sarama.ErrEligibleLeadersNotAvailable,
sarama.ErrPreferredLeaderNotAvailable,
sarama.ErrUnknownLeaderEpoch,
sarama.ErrFencedLeaderEpoch,
sarama.ErrNotLeaderForPartition,
sarama.ErrLeaderNotAvailable,
}

for _, e := range errorList {
if errors.Is(err, e) {
return err
}
}

log.Error().Err(err).Msgf("Error while fetching message from kafka for subscriptionId %s", subscriptionId)
continue
}

Expand Down Expand Up @@ -216,6 +249,7 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi
break
}
}
return nil
}

// ForceDelete attempts to forcefully delete a RepublishingCacheEntry for a given subscriptionId.
Expand Down
8 changes: 6 additions & 2 deletions internal/republish/republish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func TestHandleRepublishingEntry_Acquired(t *testing.T) {
var assertions = assert.New(t)

// Mock republishPendingEventsFunc
republishPendingEventsFunc = func(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) {}
republishPendingEventsFunc = func(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) error {
return nil
}

// Prepare test data
testSubscriptionId := "testSubscriptionId"
Expand Down Expand Up @@ -69,7 +71,9 @@ func TestHandleRepublishingEntry_NotAcquired(t *testing.T) {
defer test.ClearCaches()
var assertions = assert.New(t)

republishPendingEventsFunc = func(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) {}
republishPendingEventsFunc = func(subscription *resource.SubscriptionResource, republishEntry RepublishingCacheEntry) error {
return nil
}

// Prepare test data
testSubscriptionId := "testSubscriptionId"
Expand Down

0 comments on commit 01712d7

Please sign in to comment.