diff --git a/internal/handler/main.go b/internal/handler/main.go index 1bf13a4..1bb6ad1 100644 --- a/internal/handler/main.go +++ b/internal/handler/main.go @@ -62,11 +62,12 @@ type S3 struct { } type InsightsMessage struct { - Payload []PayloadInput `json:"payload"` - BinaryPayload map[string]string `json:"binaryPayload"` - Annotations map[string]string `json:"annotations"` - Labels map[string]string `json:"labels"` - Type string `json:"type,omitempty"` + Payload []PayloadInput `json:"payload"` + BinaryPayload map[string]string `json:"binaryPayload"` + Annotations map[string]string `json:"annotations"` + Labels map[string]string `json:"labels"` + Type string `json:"type,omitempty"` + RequeueAttempts int `json:"requeueAttempts,omitempty"` } type PayloadInput struct { diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go index 836877a..5e1856f 100644 --- a/internal/handler/messaging.go +++ b/internal/handler/messaging.go @@ -19,10 +19,11 @@ type Messaging struct { EnableDebug bool ProblemsFromSBOM bool TrivyServerEndpoint string + MessageQWriter func(data []byte) error } // NewMessaging returns a messaging with config -func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string) *Messaging { +func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string, MessageQWriter func(data []byte) error) *Messaging { return &Messaging{ Config: config, LagoonAPI: lagoonAPI, @@ -32,6 +33,7 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts EnableDebug: enableDebug, ProblemsFromSBOM: problemsFromSBOM, TrivyServerEndpoint: trivyServerEndpoint, + MessageQWriter: MessageQWriter, } } @@ -58,6 +60,26 @@ func (h *Messaging) processMessageQueue(message mq.Message) { } }(message) + // Requeues the message a set number of times prior to rejecting it + rejectRequeue := func(message mq.Message) func(func(bool), *InsightsMessage, int, string, error) { + return func(rejectMessage func(bool), incoming *InsightsMessage, retryAttemptLimit int, target string, err error) { + incoming.RequeueAttempts++ + updatedMessage, jsonErr := json.Marshal(incoming) + if jsonErr != nil { + slog.Error(jsonErr.Error()) + } + if incoming.RequeueAttempts <= retryAttemptLimit { + rejectMessage(false) + if qErr := h.MessageQWriter(updatedMessage); qErr != nil { + slog.Error("Error re-queueing message", "Error", qErr.Error()) + } + } else { + slog.Error(fmt.Sprintf("Retries failed, unable to send to %s", target), "Error", err.Error()) + rejectMessage(false) + } + } + }(message) + // here we unmarshal the initial incoming message body // notice how there is a "type" associated with the detail, // this is the primary driver used to determine which subsystem this message will be processed by. @@ -122,7 +144,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if insights.InsightsType != Direct { err := h.sendToLagoonS3(incoming, insights, resource) if err != nil { - slog.Error("Unable to send to S3", "Error", err.Error()) + rejectRequeue(rejectMessage, incoming, 3, "S3", err) } } } @@ -138,9 +160,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { lagoonSourceFactMapCollection, err := h.gatherFactsFromInsightData(incoming, resource, insights) if err != nil { - slog.Error("Unable to gather facts from incoming data", "Error", err.Error()) - rejectMessage(false) - return + rejectRequeue(rejectMessage, incoming, 3, "Lagoon API", err) } // Here we actually go ahead and write all the facts with their source diff --git a/main.go b/main.go index 63f4771..4fda211 100644 --- a/main.go +++ b/main.go @@ -41,8 +41,30 @@ var ( enableDebug bool problemsFromSBOM bool trivyServerEndpoint string + config mq.Config ) +func mqWriteObject(data []byte) error { + messageQ, err := mq.New(config) + if err != nil { + return err + } + defer messageQ.Close() + + producer, err := messageQ.SyncProducer("lagoon-handler") + if err != nil { + return err + } + + err = producer.Produce(data) + + if err != nil { + return err + } + + return nil +} + func main() { flag.StringVar(&lagoonAppID, "lagoon-app-id", "insights-handler", "The appID to use that will be sent with messages.") flag.StringVar(&mqUser, "rabbitmq-username", "guest", "The username of the rabbitmq user.") @@ -161,7 +183,7 @@ func main() { os.Exit(1) } - config := mq.Config{ + config = mq.Config{ ReconnectDelay: time.Duration(rabbitReconnectRetryInterval) * time.Second, Exchanges: mq.Exchanges{ { @@ -200,9 +222,20 @@ func main() { }, }, }, + Producers: mq.Producers{ + { + Name: "lagoon-handler", + Exchange: "lagoon-insights", + Sync: true, + Options: mq.Options{ + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, + }, DSN: fmt.Sprintf("amqp://%s:%s@%s/", broker.Username, broker.Password, broker.Hostname), } - messaging := handler.NewMessaging(config, graphQLConfig, s3Config, @@ -211,6 +244,7 @@ func main() { enableDebug, problemsFromSBOM, trivyServerEndpoint, + mqWriteObject, ) // start the consumer