Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider starting the renewal of messages in DTFx.Core as soon as they are fetched #1150

Closed
davidmrdavid opened this issue Jul 23, 2024 · 5 comments · May be fixed by #1168
Closed

Consider starting the renewal of messages in DTFx.Core as soon as they are fetched #1150

davidmrdavid opened this issue Jul 23, 2024 · 5 comments · May be fixed by #1168

Comments

@davidmrdavid
Copy link
Collaborator

davidmrdavid commented Jul 23, 2024

In DTFx.Core, the method RenewTaskOrchestrationWorkItemLockAsync is used to ensure a given worker maintains exclusivity over a given partition message. For example, in the Azure Storage backend, this messages renews the "message visibility timeout" so that the message does not get dequeued again, or at least until the visibility timeout expires.

This renewal flow is invoked when the message is being processed, which has a very specific meaning: we have not exceeded the "maxConcurrentOrchestrations" / "maxConcurrentActivities" limit, and therefore have enough capacity to process more messages.

This means that a message may be received by a given worker, but not become processable for a long time if the active orchestrators/activities match their "max concurrent" settings and are long-running. In that time, since we're not actively extending the message's visibilityTimeout, it is possible for the message to become visible again (possibly being dequeued by the same worker that already has that message!), therefore changing it's popReceipt, which in turn prevents us from successfully processing the copy of the message with the old popReceipt. This can lead to a cascade of errors.

I believe framework-level fix to this is to start renewing messages as soon as they're fetched/received, not just when they're being processed. This may require some refactoring in DTFx.Core's WorkItemDispatcher class, so it needs to be done with care.

@sig5
Copy link

sig5 commented Sep 22, 2024

I am picking this up. Will try to raise a PR in a couple of days.

@sig5
Copy link

sig5 commented Sep 23, 2024

@davidmrdavid How I was looking at it was:

  1. Pull the logic for the Renewal Job in WorkItemDispatcher by taking in action method for the dispatchers as an constructor argument.
  2. Start the renewal job just after fetching of work items.
    3 Do appropriate clean up and disposals after the processing of items.

Questions:

  1. I see, we haven't inherited TaskOrchaestrationDispatcher, TaskActivityDispatcher and TaskEntityDispatcher from the same base class, although they have a lot of overlapping behaviors. Is this by design to add readability or Is this a result of evolution over time? Are we okay moving towards having a DispatcherBase?

@davidmrdavid
Copy link
Collaborator Author

Thanks for your interest here, @sig5!

I see, we haven't inherited TaskOrchaestrationDispatcher, TaskActivityDispatcher and TaskEntityDispatcher from the same base class, although they have a lot of overlapping behaviors. Is this by design to add readability or Is this a result of evolution over time? Are we okay moving towards having a DispatcherBase?

Some of these design decisions pre-date me, to be honest, so I don't know why the design ended up like this. I do know the TaskEntityDispatcher is new addition ("from my time") so perhaps when we only had the orchestrator and acitvity dispatchers, we maintainers did not feel there was enough repetition to warrant a base class. However, I could see the argument that now that we have 3 dispatchers, a base class could be useful.

That being said, I strongly recommend that we don't introduce any such refactoring when working on this thread's feature proposal - that will only make the PR harder to merge. In general, we don't modify the dispatcher classes very often, so it's unclear to me that we'll gain much from a simplified design of them as these files are "rarely modified" as far as I can tell. That can be a separate discussion :-) .

Other than that, your sketch of the PR seems reasonable at a high-level, but we'll have to look at the details to be certain. Excited to see your PR!

@sig5
Copy link

sig5 commented Sep 24, 2024

@davidmrdavid , Thanks for clarification. Yeah, I get it. I just was curious, if there is a different school of thought being followed 😄 here for optimization/readability. I have published a PR with contained changes and an open question here #1168.

@cgillum
Copy link
Member

cgillum commented Jan 15, 2025

Closing this issue because I think fundamentally the problem is with DT.AzureStorage and not DT.Core.

This means that a message may be received by a given worker, but not become processable for a long time if the active orchestrators/activities match their "max concurrent" settings and are long-running.

This is true in the case of DurableTask.AzureStorage. However, it's not generally true with DurableTask.Core. DT.Core will only attempt to fetch a message when there is sufficient concurrency available. See WorkItemDispatcher.cs for reference. This means that there's no scenario where DT.Core fetches a message but can't process it because of concurrency limitations.

The problem with DT.AzureStorage is that it internally prefetches messages. DT.Core is actually not aware of this at all. It's therefore entirely the responsibility of DT.AzureStorage to renew its prefetched messages. We need to fix this, but the scope of the problem is different from what's described in this issue. We can use another issue to track the DT.AzureStorage problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants