Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOC-964 New bloblang error functions #151

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 57 additions & 36 deletions modules/configuration/pages/error_handling.adoc
Original file line number Diff line number Diff line change
@@ -1,75 +1,96 @@
= Error Handling
// tag::single-source[]

It's always possible for things to go wrong, be a good captain and plan ahead.
Redpanda Connect supports a range of xref:components:processors/about.adoc[processors], such as `http` and `aws_lambda`, that may fail when retry attempts are exhausted. When a processor fails, the message data continues through the pipeline mostly unchanged, except for the addition of a metadata flag, which you can use for handling errors.

Redpanda Connect supports a range of xref:components:processors/about.adoc[processors] such as `http` and `aws_lambda` that have the potential to fail if their retry attempts are exhausted. When this happens the data is not dropped but instead continues through the pipeline mostly unchanged, but a metadata flag is added allowing you to handle the errors in a way that suits your needs.

This document outlines common patterns for dealing with errors, such as dropping them, recovering them with more processing, routing them to a dead-letter queue, or any combination thereof.
This topic explains some common error-handling patterns, including dropping messages, recovering them with more processing, and routing them to a dead-letter queue. It also shows how to combine these approaches, where appropriate.

== Abandon on failure

It's possible to define a list of processors which should be skipped for messages that failed a previous stage using the xref:components:processors/try.adoc[`try` processor]:
You can use the xref:components:processors/try.adoc[`try` processor] to define a list of processors that are executed in sequence. If a processor fails for a particular message, that message skips the remaining processors.

For example:

- If `processor_1` fails to process a message, that message skips `processor_2` and `processor_3`.
- If a message is processed by `processor_1`, but `processor_2` fails, that message skips `processor_3`, and so on.

[source,yaml]
----
pipeline:
processors:
- try:
- resource: foo
- resource: bar # Skipped if foo failed
- resource: baz # Skipped if foo or bar failed
- resource: processor_1
- resource: processor_2 # Skip if processor_1 fails
- resource: processor_3 # Skip if processor_1 or processor_2 fails
----

== Recover failed messages

Failed messages can be fed into their own processor steps with a xref:components:processors/catch.adoc[`catch` processor]:
You can also route failed messages through defined processing steps using a xref:components:processors/catch.adoc[`catch` processor].

For example, if `processor_1` fails to process a message, it is rerouted to `processor_2`.

[source,yaml]
----
pipeline:
processors:
- resource: foo # Processor that might fail
- resource: processor_1 # Processor that might fail
- catch:
- resource: bar # Recover here
- resource: processor_2 # Processes rerouted messages
----

Once messages finish the catch block they will have their failure flags removed and are treated like regular messages. If this behavior is not desired then it is possible to simulate a catch block with a xref:components:processors/switch.adoc[`switch` processor]:
After messages complete all processing steps defined in the `catch` block, failure flags are removed and they are treated like regular messages.

To keep failure flags in messages, you can simulate a `catch` block using a xref:components:processors/switch.adoc[`switch` processor]:

[source,yaml]
----
pipeline:
processors:
- resource: foo # Processor that might fail
- resource: processor_1 # Processor that might fail
- switch:
- check: errored()
processors:
- resource: bar # Recover here
- resource: processor_2 # Processes rerouted messages
----

== Logging errors

When an error occurs there will occasionally be useful information stored within the error flag that can be exposed with the interpolation function xref:configuration:interpolation.adoc#bloblang-queries[`error`]. This allows you to expose the information with processors.
When an error occurs, there may be useful information stored in the error flag. You can use the xref:guides:bloblang/functions.adoc#error[`error`] interpolation functions to write this information to logs. You can also add the following Bloblang functions to expose additional details about the processor.
asimms41 marked this conversation as resolved.
Show resolved Hide resolved

For example, when catching failed processors you can xref:components:processors/log.adoc[`log`] the messages:
- xref:guides:bloblang/functions.adoc#error_source_label[`error_source_label`]
- xref:guides:bloblang/functions.adoc#error_source_name[`error_source_name`]
- xref:guides:bloblang/functions.adoc#error_source_path[`error_source_path`]

For example, this configuration catches processor failures and writes the following information to logs:

- A heading, which includes the cause of the error (`${!error()}`)
- Error text, which includes the label of the processor (`${!error_source_label()}`) that failed along with the cause of the error

[source,yaml]
----
pipeline:
processors:
- resource: foo # Processor that might fail
- resource: processor_1 # Processor that might fail
asimms41 marked this conversation as resolved.
Show resolved Hide resolved
- try:
- resource: processor_1 # Processor that might fail
- resource: processor_2 # Processor that might fail
- resource: processor_3 # Processor that might fail
asimms41 marked this conversation as resolved.
Show resolved Hide resolved
- catch:
- log:
message: "Processing failed due to: ${!error()}"
message: "Processor ${!error_source_label()} failed due to: ${!error()}"
asimms41 marked this conversation as resolved.
Show resolved Hide resolved
----

Or perhaps augment the message payload with the error message:
You could also add an error message to the message payload:

[source,yaml]
----
pipeline:
processors:
- resource: foo # Processor that might fail
- resource: processor_1 # Processor that might fail
- resource: processor_2 # Processor that might fail
- resource: processor_3 # Processor that might fail
asimms41 marked this conversation as resolved.
Show resolved Hide resolved
- catch:
- mapping: |
root = this
Expand All @@ -78,7 +99,7 @@ pipeline:

== Attempt until success

It's possible to reattempt a processor for a particular message until it is successful with a xref:components:processors/retry.adoc[`retry`] processor:
To process a particular message until it is successful, try using a xref:components:processors/retry.adoc[`retry`] processor:

[source,yaml]
----
Expand All @@ -90,13 +111,13 @@ pipeline:
max_interval: 5s
max_elapsed_time: 30s
processors:
# Attempt this processor until success, or the maximum elapsed time is reached.
- resource: foo
# Retries this processor until the message is processed, or the maximum elapsed time is reached.
- resource: processor_1
----

== Drop failed messages

In order to filter out any failed messages from your pipeline you can use a xref:components:processors/mapping.adoc[`mapping` processor]:
To filter out any failed messages from your pipeline, you can use a xref:components:processors/mapping.adoc[`mapping` processor]:

[source,yaml]
----
Expand All @@ -105,53 +126,53 @@ pipeline:
- mapping: root = if errored() { deleted() }
----

This will remove any failed messages from a batch. Furthermore, dropping a message will propagate an acknowledgement (also known as "ack") upstream to the pipeline's input.
The mapping uses the error flag to identify any failed messages in a batch and drops the messages, which propagates acknowledgements (also known as "acks") upstream to the pipeline's input.

== Reject messages

Some inputs such as NATS, GCP Pub/Sub and AMQP support nacking (rejecting) messages. We can perform a nack (or rejection) on data that has failed to process rather than delivering it to our output with a xref:components:outputs/reject_errored.adoc[`reject_errored` output]:
Some inputs, such as `nats`, `gcp_pubsub`, and `amqp_1`, support nacking (rejecting) messages. Rather than delivering unprocessed messages to your output, you can use the xref:components:outputs/reject_errored.adoc[`reject_errored` output] to perform a nack (or rejection) on them:

[source,yaml]
----
output:
reject_errored:
resource: foo # Only non-errored messages go here
resource: processor_1 # Only non-errored messages go here
----

== Route to a dead-letter queue

And by placing the above within a xref:components:outputs/fallback.adoc[`fallback` output] we can instead route the failed messages to a different output:
You can also route failed messages to a different output by nesting the xref:components:outputs/reject_errored.adoc[`reject_errored` output] within a xref:components:outputs/fallback.adoc[`fallback` output]


[source,yaml]
----
output:
fallback:
- reject_errored:
resource: foo # Only non-errored messages go here

- resource: bar # Only errored messages, or those that failed to be delivered to foo, go here
resource: processor_1 # Only non-errored messages go here
- resource: processor_2 # Only errored messages, or delivery failures to processor_1, go here
----

And, finally, in cases where we wish to route data differently depending on the error message itself we can use a xref:components:outputs/switch.adoc[`switch` output]:
Finally, in cases where you want to route data differently depending on the type of error message, you can use a xref:components:outputs/switch.adoc[`switch` output]:

[source,yaml]
----
output:
switch:
cases:
# Capture specifically cat related errors
# Capture specifically cat-related errors
- check: errored() && error().contains("meow")
output:
resource: foo
resource: processor_1

# Capture all other errors
- check: errored()
output:
resource: bar
resource: processor_2

# Finally, route messages that haven't errored
# Finally, route all successfully processed messages here
- output:
resource: baz
resource: processor_3
----

// end::single-source[]
34 changes: 32 additions & 2 deletions modules/guides/pages/bloblang/functions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ root.doc = content().string()

=== `error`

If an error has occurred during the processing of a message this function returns the reported cause of the error as a string, otherwise `null`. For more information about error handling patterns read xref:configuration:error_handling.adoc[].
If an error occurs during the processing of a message, this function returns the reported cause as a string, otherwise `null`. For more information about error-handling patterns, see xref:configuration:error_handling.adoc[].

==== Examples

Expand All @@ -383,9 +383,39 @@ If an error has occurred during the processing of a message this function return
root.doc.error = error()
```

=== `error_source_label`

Returns the label of the component that raised the error during the processing of a message, or an empty string if not set. A `null` is returned when the error is null or no component is associated with the source of the error. For more information about error-handling patterns, see xref:configuration:error_handling.adoc[].

==== Examples

```coffeescript
root.doc.error_source_label = error_source_label()
```

=== `error_source_name`

Returns the name of the component that raised the error during the processing of a message. A `null` is returned when the error is null or no component is associated with the source of the error. For more information about error-handling patterns, see xref:configuration:error_handling.adoc[].

==== Examples

```coffeescript
root.doc.error_source_name = error_source_name()
```

=== `error_source_path`

Returns the path of the component that raised the error during the processing of a message. A `null` is returned when the error is null or no component is associated with source of the error. For more information about error-handling patterns, see xref:configuration:error_handling.adoc[].

==== Examples

```coffeescript
root.doc.error_source_path = error_source_path()
```

=== `errored`

Returns a boolean value indicating whether an error has occurred during the processing of a message. For more information about error handling patterns read xref:configuration:error_handling.adoc[].
Returns a boolean value indicating whether an error has occurred during the processing of a message. For more information about error-handling patterns, see xref:configuration:error_handling.adoc[].

==== Examples

Expand Down