Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP queues die when the rabbit default timeout is reached #83

Open
m17kea opened this issue Jun 7, 2024 · 11 comments · May be fixed by #95
Open

AMQP queues die when the rabbit default timeout is reached #83

m17kea opened this issue Jun 7, 2024 · 11 comments · May be fixed by #95
Assignees
Labels
enhancement New feature or request minor Minor changes
Milestone

Comments

@m17kea
Copy link
Contributor

m17kea commented Jun 7, 2024

We've seen situations where the queue get's clogged up and occasionally hits the default 30 minute timeout of the Rabbit message. If this happens all queue handlers die and stop processing messages. This never seems to recover:

07:19:27 warn: Orleans.Streams.Rabbit[103312] Exception while retrying the 3th time reading from queue rabbit-2 RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more', classId=0, methodId=0    at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)    at RabbitMQ.Client.Framing.Impl.Model._Private_BasicGet(String queue, Boolean autoAck)    at RabbitMQ.Client.Impl.ModelBase.BasicGet(String queue, Boolean autoAck)    at Escendit.Orleans.Streaming.RabbitMQ.AmqpProtocol.Provider.AmqpProtocolAdapterReceiver.GetQueueMessagesAsync(Int32 maxCount)    at Orleans.Streams.PersistentStreamPullingAgent.ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver rcvr, Int32 maxCacheAddCount) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 523    at Orleans.Internal.AsyncExecutorWithRetries.ExecuteWithRetriesHelper[T](Func`2 function, Int32 maxNumSuccessTries, Int32 maxNumErrorTries, TimeSpan maxExecutionTime, DateTime startExecutionTime, Func`3 retryValueFilter, Func`3 retryExceptionFilter, IBackoffProvider onSuccessBackOff, IBackoffProvider onErrorBackOff) in /_/src/Orleans.Core/Async/AsyncExecutorWithRetries.cs:line 280

I'm happy to look into recovering from such a scenario and submit a PR, if you point me in the right direction.

I'm sure there is something more fundamental in our application causing this but it would be nice for it to recover and have the ability to alter the timeout.

@snovak7
Copy link
Contributor

snovak7 commented Jun 7, 2024

I see this seems more like networking issue at first glance, since it's older version of client, first step would be to update dependency, you can do that by just bumping explicit version if there was no change in the client API.

I think by default the client uses auto reconnect mechanism so it should be transparent, but it's true, handling of connections could be improved in the library as well.

Which version are you using?

@m17kea
Copy link
Contributor Author

m17kea commented Jun 7, 2024

I believe I'm on the latest version of everything:
image

I do not think this is networking as this happens from the moment of the first timeout message in the logs. That would be too much of a coincidence.

@snovak7
Copy link
Contributor

snovak7 commented Jun 7, 2024

I've seen some issues for RabbitMQ.Client package at rabbitmq/rabbitmq-dotnet-client#1061

Don't know if they solved everything?

So the idea if I saw it correctly might be that consumer is not recreated, might need manual intervention

@m17kea
Copy link
Contributor Author

m17kea commented Jun 7, 2024

I've had a look but it's not entirely clear what the best course of action is. I would suggest that we use Orleans first class citizens and replicate something like this:

https://github.com/dotnet/orleans/blob/main/src/Azure/Orleans.Streaming.AzureStorage/Providers/Streams/PersistentStreams/AzureTableStorageStreamFailureHandler.cs#L15

replacing the NoOpStreamDeliveryFailureHandler in here:

https://github.com/escendit/rabbitmq-orleans-extensions/blob/main/src/Orleans/RabbitMQ/Configuration/OptionsBase.cs

What do you think?

@snovak7
Copy link
Contributor

snovak7 commented Jun 7, 2024

So if I understand correctly I would just enable setter here, and you can provide your own handler, since handlers as seen in the example above just stores the failed entry to designated storage, where you can analyze the issue of the event

@m17kea
Copy link
Contributor Author

m17kea commented Jun 7, 2024

Now I've looked closer I see what you mean. I was thinking of rather than persisting the error we use this hook to recreate the connection from scratch

@snovak7
Copy link
Contributor

snovak7 commented Jun 7, 2024

I'll take a look how to do this re-connection

@m17kea
Copy link
Contributor Author

m17kea commented Jun 7, 2024

Awesome thanks

@snovak7 snovak7 added this to the v0.3.0 milestone Jun 20, 2024
@snovak7 snovak7 added enhancement New feature or request minor Minor changes labels Jun 20, 2024
@snovak7 snovak7 self-assigned this Jun 29, 2024
@snovak7 snovak7 linked a pull request Jun 29, 2024 that will close this issue
@snovak7 snovak7 linked a pull request Jun 29, 2024 that will close this issue
@snovak7
Copy link
Contributor

snovak7 commented Jun 29, 2024

Still figuring this out, looking more closely, the exception happened on 3rd retry of doing basic get command, and auto-ack is set to false...

What I see is that maybe you had some long running task or error in app that it didn't ack the message, alternatively I could also add to queue options that the events are auto ack-ed, but then messages could be lost if error occurred.

Since default value is 30minutes, it can trigger this error and halt the whole stream system in Orleans. This is also because BasicGet, BasicAck have to be in sequence, missing one can break this

I'm adding the StreamFailureHandler, but with only warnings to the logger.

@snovak7 snovak7 moved this from Todo to Done in RabbitMQ Orleans Stream Provider Jun 29, 2024
@snovak7 snovak7 closed this as completed by moving to Done in RabbitMQ Orleans Stream Provider Jun 29, 2024
@m17kea
Copy link
Contributor Author

m17kea commented Oct 18, 2024

I've noticed through experimentation that this issue only happens when I run more than 1 server in the Orleans cluster. Have you successfully run this over a long period with more than 1?

@snovak7
Copy link
Contributor

snovak7 commented Jan 17, 2025

Happens also with one silo, after a long time of running the silo

@snovak7 snovak7 reopened this Jan 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request minor Minor changes
Projects
Development

Successfully merging a pull request may close this issue.

2 participants