Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expands error handling for messaging #54

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ type S3 struct {
}

type InsightsMessage struct {
Payload []PayloadInput `json:"payload"`
BinaryPayload map[string]string `json:"binaryPayload"`
Annotations map[string]string `json:"annotations"`
Labels map[string]string `json:"labels"`
Type string `json:"type,omitempty"`
Payload []PayloadInput `json:"payload"`
BinaryPayload map[string]string `json:"binaryPayload"`
Annotations map[string]string `json:"annotations"`
Labels map[string]string `json:"labels"`
Type string `json:"type,omitempty"`
RequeueAttempts int `json:"requeueAttempts,omitempty"`
}

type PayloadInput struct {
Expand Down
30 changes: 25 additions & 5 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type Messaging struct {
EnableDebug bool
ProblemsFromSBOM bool
TrivyServerEndpoint string
MessageQWriter func(data []byte) error
}

// NewMessaging returns a messaging with config
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string) *Messaging {
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string, MessageQWriter func(data []byte) error) *Messaging {
return &Messaging{
Config: config,
LagoonAPI: lagoonAPI,
Expand All @@ -32,6 +33,7 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts
EnableDebug: enableDebug,
ProblemsFromSBOM: problemsFromSBOM,
TrivyServerEndpoint: trivyServerEndpoint,
MessageQWriter: MessageQWriter,
}
}

Expand All @@ -58,6 +60,26 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
}(message)

// Requeues the message a set number of times prior to rejecting it
rejectRequeue := func(message mq.Message) func(func(bool), *InsightsMessage, int, string, error) {
return func(rejectMessage func(bool), incoming *InsightsMessage, retryAttemptLimit int, target string, err error) {
incoming.RequeueAttempts++
updatedMessage, jsonErr := json.Marshal(incoming)
if jsonErr != nil {
slog.Error(jsonErr.Error())
}
if incoming.RequeueAttempts <= retryAttemptLimit {
rejectMessage(false)
if qErr := h.MessageQWriter(updatedMessage); qErr != nil {
slog.Error("Error re-queueing message", "Error", qErr.Error())
}
} else {
slog.Error(fmt.Sprintf("Retries failed, unable to send to %s", target), "Error", err.Error())
rejectMessage(false)
}
}
}(message)

// here we unmarshal the initial incoming message body
// notice how there is a "type" associated with the detail,
// this is the primary driver used to determine which subsystem this message will be processed by.
Expand Down Expand Up @@ -122,7 +144,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
slog.Error("Unable to send to S3", "Error", err.Error())
rejectRequeue(rejectMessage, incoming, 3, "S3", err)
}
}
}
Expand All @@ -138,9 +160,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
lagoonSourceFactMapCollection, err := h.gatherFactsFromInsightData(incoming, resource, insights)

if err != nil {
slog.Error("Unable to gather facts from incoming data", "Error", err.Error())
rejectMessage(false)
return
rejectRequeue(rejectMessage, incoming, 3, "Lagoon API", err)
}

// Here we actually go ahead and write all the facts with their source
Expand Down
38 changes: 36 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,30 @@ var (
enableDebug bool
problemsFromSBOM bool
trivyServerEndpoint string
config mq.Config
)

func mqWriteObject(data []byte) error {
messageQ, err := mq.New(config)
if err != nil {
return err
}
defer messageQ.Close()

producer, err := messageQ.SyncProducer("lagoon-handler")
if err != nil {
return err
}

err = producer.Produce(data)

if err != nil {
return err
}

return nil
}

func main() {
flag.StringVar(&lagoonAppID, "lagoon-app-id", "insights-handler", "The appID to use that will be sent with messages.")
flag.StringVar(&mqUser, "rabbitmq-username", "guest", "The username of the rabbitmq user.")
Expand Down Expand Up @@ -161,7 +183,7 @@ func main() {
os.Exit(1)
}

config := mq.Config{
config = mq.Config{
ReconnectDelay: time.Duration(rabbitReconnectRetryInterval) * time.Second,
Exchanges: mq.Exchanges{
{
Expand Down Expand Up @@ -200,9 +222,20 @@ func main() {
},
},
},
Producers: mq.Producers{
{
Name: "lagoon-handler",
Exchange: "lagoon-insights",
Sync: true,
Options: mq.Options{
"delivery_mode": "2",
"headers": "",
"content_type": "",
},
},
},
DSN: fmt.Sprintf("amqp://%s:%s@%s/", broker.Username, broker.Password, broker.Hostname),
}

messaging := handler.NewMessaging(config,
graphQLConfig,
s3Config,
Expand All @@ -211,6 +244,7 @@ func main() {
enableDebug,
problemsFromSBOM,
trivyServerEndpoint,
mqWriteObject,
)

// start the consumer
Expand Down
Loading