-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace tornado
dependency with built-in module asyncio
#4317
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @unkcpz . It is looking very good and only have a few requests for some minor changes, but mostly some questions first.
Finally, the docs are failing because of the following 5 warnings that we should address before we can merge:
/home/circleci/.local/lib/python3.7/site-packages/aiida/engine/utils.py:docstring of aiida.engine.InterruptableFuture:1: WARNING: py:class reference target not found: _asyncio.Future
/home/circleci/.local/lib/python3.7/site-packages/aiida/engine/runners.py:docstring of aiida.engine.runners.Runner.__init__:: WARNING: py:class reference target not found: the asyncio event loop
/home/circleci/.local/lib/python3.7/site-packages/aiida/engine/transports.py:docstring of aiida.engine.transports.TransportQueue.__init__:: WARNING: py:class reference target not found: The asyncio event loop
/home/circleci/.local/lib/python3.7/site-packages/aiida/engine/utils.py:docstring of aiida.engine.utils.InterruptableFuture:1: WARNING: py:class reference target not found: _asyncio.Future
/home/circleci/.local/lib/python3.7/site-packages/aiida/engine/processes/futures.py:docstring of aiida.engine.processes.futures.ProcessFuture:1: WARNING: py:class reference target not found: _asyncio.Future
Also, the build on jenkins is failing. It seems the daemon is failing to start. Do you think this is a one-time fluke or there really is a problem? Seems build for both backends failed so seems to suggest there really is a problem there.
tasks = [t for t in asyncio.Task.all_tasks() if t is not asyncio.Task.current_task()] | ||
|
||
for task in tasks: | ||
task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a particular bug that you ran into before you added this code? How does it manifest itself if you shutdown the daemon while it is running some processes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a particular bug that you ran into before you added this code?
After migrate to asyncio, with original code, run verdi devel run_daemon
and then Ctrl+c
to interrupt it won't stop the daemon process and then hang up in command line.
How does it manifest itself if you shutdown the daemon while it is running some processes?
I didn't quite understand your question. The code is referring to this https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
After that, there was no original issue, so I didn't think much about it. My understand is shutdown a daemon is just shutdown its corresponding runner's event loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't quite understand your question.
What I meant to ask was: if you remove the code that you added (that is to say, in the old version of the code) what would happen if you stop a daemon runner. But I think you already answered this just before this. Apparently, without your changes, the daemon would hang when asked to shutdown. In any case, I guess it makes sense to attempt to clean up before shutting down. Just was asking out of interest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a neat solution, I like the look of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muhrin Thanks! that is one thing I'm not sure about, now I don't have to worry.
:param args: input arguments to construct the FunctionProcess | ||
:param kwargs: input keyword arguments to construct the FunctionProcess | ||
:return: tuple of the outputs of the process and the process node pk | ||
:rtype: (dict, int) | ||
""" | ||
manager = get_manager() | ||
runner = manager.create_runner(with_persistence=False) | ||
runner = manager.get_runner(with_persistence=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because now we have just a single event-loop that is made reentrant with nested-asyncio
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I didn't test this in a really production environment. I am not sure that the original blocking issues is actually result from reentrant.
@muhrin Could you have a look at this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure which 'blocking issues' you're referring to @unkcpz but indeed we now only have the one event loop so never need to create more than one. Does get_runner
work if there is no event loop active at all or would this code be unreachable if that were the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following docstring is there before I make the changes. This is what I mean 'blocking issues', I guess this was why a new event loop created in the ProcessFunction.
aiida-core/aiida/engine/processes/functions.py
Lines 110 to 112 in 5977dac
The function will have to create a new runner for the FunctionProcess instead of using the global runner, | |
because otherwise if this process function were to call another one from within its scope, that would use | |
the same runner and it would be blocking the event loop from continuing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure which 'blocking issues' you're referring to @unkcpz but indeed we now only have the one event loop so never need to create more than one. Does
get_runner
work if there is no event loop active at all or would this code be unreachable if that were the case?
Manager.get_runner
will create a new runner if it doesn't have one yet. It won't pass in a loop to the Runner
constructor, so that one will get whatever loop the constructor logic defines.
@@ -347,6 +348,8 @@ def process_actions(futures_map, infinitive, present, past, wait=False, timeout= | |||
process = futures_map[future] | |||
|
|||
try: | |||
# unwrap is need here since LoopCommunicator will also wrap a future | |||
future = unwrap_kiwi_future(future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling that this is in the wrong place. The future that is returned comes from the RemoteProcessThreadController
returned by the Manager
. What it returns is the return value from the communicator
. You are here adding another action on it because apparently the underlying communicator is a LoopCommunicator
which adds a layer of wrapping. However, from the process controller's interface, this is not clear at all. Especially if you look at the Manager
code, the communicator we pass to the controller is instance of RmqThreadCommunicator
and not a LoopCommunicator
. Long story short, I think the unwrapping should be done elsewhere such that the user of the controller should never have to know what kind of communicator is being used. Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that this part is not very clear. And I think there is the same issue for the aiida_core
of the tornado version(should unwrap the future after you introduced LoopCommunicator to Runner.).
Let me try to describe my understanding of this part of the code:
The future returned from the controller.kill_process
is not depend on the communicator we used here, but on communicator which used to actually doing the kill
action. That is the LoopCommunicator
and we add a rpc_subscriber in that communicator of the daemon Runner
.
The reason that we do not need use the LoopCommunicator in verdi process
is every time we run a cmd command we run it in a independent thread and wait for the returned value.
And it is because we don't know what communicator we used exactly, we need to put a generic unrap_kiwi_future
here to get the final result of the procedure scheduled in the communicator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, yes, I'm also getting confused looking through this part of the code. I guess the principle that Sebastiaan is pointing out (and I agree with) is that if there is a Communicator
interface that we're using then things that come out of the interface should always be at the same level of 'unwrapping', ideally such that no manual unwrapping needs to be done outside, and certainly in a way that doesn't vary on the underlying concrete Communicator
instance type.
I'll have a look through the code and see if I can make sure I understand what the flow is.
I think it is result from |
dd8b1f9
to
3e4039a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! @sphuber Some of comments really the code I am also not very sure about the changes. Especially the unwrap of the future in
cmd_process`, I need to think it over and get back to you.
tasks = [t for t in asyncio.Task.all_tasks() if t is not asyncio.Task.current_task()] | ||
|
||
for task in tasks: | ||
task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a particular bug that you ran into before you added this code?
After migrate to asyncio, with original code, run verdi devel run_daemon
and then Ctrl+c
to interrupt it won't stop the daemon process and then hang up in command line.
How does it manifest itself if you shutdown the daemon while it is running some processes?
I didn't quite understand your question. The code is referring to this https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
After that, there was no original issue, so I didn't think much about it. My understand is shutdown a daemon is just shutdown its corresponding runner's event loop.
:param args: input arguments to construct the FunctionProcess | ||
:param kwargs: input keyword arguments to construct the FunctionProcess | ||
:return: tuple of the outputs of the process and the process node pk | ||
:rtype: (dict, int) | ||
""" | ||
manager = get_manager() | ||
runner = manager.create_runner(with_persistence=False) | ||
runner = manager.get_runner(with_persistence=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I didn't test this in a really production environment. I am not sure that the original blocking issues is actually result from reentrant.
@muhrin Could you have a look at this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very nice @unkcpz . I just want to check again this future unwrapping business.
Needless to say it would be great to check this new code against some real running workflows if possible. I think it's also OK for some of the more extensive testing to take place onece this is in develop given that it's not possible to test all possibilties thouroughly beforehand.
:param args: input arguments to construct the FunctionProcess | ||
:param kwargs: input keyword arguments to construct the FunctionProcess | ||
:return: tuple of the outputs of the process and the process node pk | ||
:rtype: (dict, int) | ||
""" | ||
manager = get_manager() | ||
runner = manager.create_runner(with_persistence=False) | ||
runner = manager.get_runner(with_persistence=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure which 'blocking issues' you're referring to @unkcpz but indeed we now only have the one event loop so never need to create more than one. Does get_runner
work if there is no event loop active at all or would this code be unreachable if that were the case?
@@ -347,6 +348,8 @@ def process_actions(futures_map, infinitive, present, past, wait=False, timeout= | |||
process = futures_map[future] | |||
|
|||
try: | |||
# unwrap is need here since LoopCommunicator will also wrap a future | |||
future = unwrap_kiwi_future(future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, yes, I'm also getting confused looking through this part of the code. I guess the principle that Sebastiaan is pointing out (and I agree with) is that if there is a Communicator
interface that we're using then things that come out of the interface should always be at the same level of 'unwrapping', ideally such that no manual unwrapping needs to be done outside, and certainly in a way that doesn't vary on the underlying concrete Communicator
instance type.
I'll have a look through the code and see if I can make sure I understand what the flow is.
tasks = [t for t in asyncio.Task.all_tasks() if t is not asyncio.Task.current_task()] | ||
|
||
for task in tasks: | ||
task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a neat solution, I like the look of it.
OK, I think we are almost there @unkcpz and @muhrin . I am just writing down an overview of open questions and things that remain to be checked. Once these are addressed, I will try and run a large production run with the new stack and see how things run.
@muhrin Have you had a chance to take a look at these three remaining questions? |
I just created a clean env with this branch and the asyncio compatible plumpy and kiwipy versions and ran a bunch of workchains to test the new code. One of the workchains (the first one that was launched, not sure if by accident) excepted:
This clearly comes from the process stack that checks for consistency. If you have ideas what could have caused this, let me know and I can try to dig up more debug information or try to create a MWE. Edit: I am not sure what caused this, but I reran the whole suite, after having deleted all previous workchains and now everything ran fine. One thing that may have had to do with it: I ran the Editer: now that we got the tests running (looks like a problem with the new pip resolver) the problem of the process stack actually was reproduced on GHA as well: https://github.com/aiidateam/aiida-core/pull/4317/checks?check_run_id=1074946912 |
0bcf704
to
c8a1d89
Compare
Tests are failing because it is failing to install the dependencies, especially, it claims there is a dependency conflict between
Any of the packaging experts out there know what the hell is going on. @chrisjsewell @greschd @csadorf ? Edit: to summarize, with this PR we have the following requirements:
Where
The only thing I can think of is the difference in |
What happens if you remove the use of the new resolver |
Well it installs now 😬 (I guess the pytest failure is a whole other issue). Hey @pradyunsg sorry for the spam, but I wonder if I can quickly pick your brain quickly on this install issue: #4317 (comment). |
That looks like a bug -- one that has been fixed in pip's master branch. Could you try running off of pip's master branch and check if this bug is reproducible? |
When I look at the logs, I see that conda reports a whole shit ton of dependency conflicts though and not just with |
Thanks for the comment. We currently install |
cec92f6
to
e6ffbd6
Compare
|
Thanks a lot @pradyunsg and not to worry, fully understand that such a big project takes time to iron out some bugs. Happy to provide bug reports if possible. I tried installing of master but that does not seem to have fixed the problem: https://github.com/aiidateam/aiida-core/pull/4317/checks?check_run_id=1075016847 The install on Python 3.8 fails with the same dependency conflict error. The builds on Python 3.5 are still running and I suspect they will actually time out. |
Yeh that does not look pretty 😬 |
The So, I don't think that we need to have the Python version specified in @unkcpz Which Python version is chosen when you remove the explicit dependency? Also, could you please share your environment with us after you successfully created it as described here? |
Thanks a lot @unkcpz for figuring this out! For reference, here is the commit where I introduced the python version in the
I.e. specifying the python version inside the python 3.7 was probably chosen because it was the latest python release that we tested on, i.e. the version number can be changed (it could even be changed to python 3.6 if that helps). The commit message also mentions that at that time there had been an issue with plumpy, which had not yet been marked as compatible with python 3.7 - but that issue had been fixed before this commit (perhaps coincidence). |
Here as @ltalirz mentioned, I run |
The problem may be caused by the package This dependency can be removed once we no longer support python3.6. |
@unkcpz Thx for identifying the issue! It would still be great if you could share your |
@csadorf Sure, but I don't think it helps, in my local machine, I only use conda to create the env and install the packages by environment.yml
|
@unkcpz That is 100% the problem. This conda package explicitly requires |
@csadorf Turns out it's plumpy https://github.com/conda-forge/plumpy-feedstock/blob/fe935aa06b56595ef3d5b2eaccafbe85e52da892/recipe/meta.yaml#L27 So, plumpy should depend on this package only for python < 3.7 (which will force us to remove the |
Thanks @csadorf. |
@ltalirz does this only need to happen for the feedstock, or will I have to release a |
I think we will need to release as well. While we are at it: gentle prod to review aiidateam/plumpy#180, then I'll be happy to handle these releases 😉 |
I would think just for the feedstock? |
I will hold off with that until this is merged and stable. The mypy PR contains a lot of changes some and may have an effect on this PR here which may not be tested. |
Thanks @chrisjsewell, I'll get time tomorrow to take care of aiidateam/plumpy#180 . But I agree with Sebastiaan we wait a minute after this PR. |
Fair 👍 |
a9e0102
to
ee92f59
Compare
The `plumpy` and `kiwipy` dependencies have already been migrated from using `tornado` to the Python built-in module `asyncio` in the versions `0.16.0` and `0.6.0`, respectively. This allows us to also rid AiiDA of the `tornado` dependency, which has been giving requirement clashes with other tools, specifically from the Jupyter and iPython world. The final limitation was the `circus` library that is used to daemonize the daemon workers, which as of `v0.17.1` also supports `tornado~=5`. A summary of the changes: * Replace `tornado.ioloop` with `asyncio` event loop. * Coroutines are marked with `async` instead of decorated with the `tornado.gen.coroutine` decorator. * Replace `yield` with `await` when calling a coroutine. * Replace `raise tornado.gen.Return` with `return` when returning from a coroutine. * Replace `add_callback` call on event loop with `call_soon` when scheduling a callback. * Replace `add_callback` call on event loop with `create_task` when scheduling `process.step_until_terminated()`. * Replace `run_sync` call on event loop with `run_until_complete`. * Replace `pika` uses with `aio-pika` which is now used by the `plumpy` and `kiwipy` libraries. * Replace `concurrent.Future` with `asyncio.Future`. * Replace `yield tornado.gen.sleep` with `await asyncio.sleep`. Additional changes: * Remove the `tornado` logger from the logging configuration. * Remove the `logging.tornado_loglevel` configuration option. * Turn the `TransportQueue.loop` attribute from method into property. * Call `Communicator.close()` instead of `Communicator.stop()` in the `Manager.close()` method. The `stop` method has been deprecated in `kiwipy==0.6.0`.
The result returned by `ProcessController.kill_process` that is called in `Process.kill` for each of its children, if it has any, can itself be a future, since the killing cannot always be performed directly, but instead will be scheduled in the event loop. To resolve the future of the main process, it will have to wait for the futures of all its children to be resolved as well. Therefore an intermediate future needs to be added that will be done once all child futures are resolved.
The commands of `verdi process` that perform an RPC on a live process will do so through the `ProcessController`, which returns a future. Currently, the process controller uses the `LoopCommunicator` as its communicator which adds an additional layer of wrapping. Ideally, the return type of the communicator should not change depending on the specific implementation that is used, however, for now that is the case and so the future needs to be unwrapped explicitly one additional time. Once the `LoopCommunicator` is fixed to return the same future type as the base `Communicator` class, this workaround can and should be removed.
With the migration to `asyncio`, there is now only a single event loop that is made reentrant through the `nest-asyncio` library, that monkey patches `asyncio`'s built-in mechanism to prevent this. This means that in the `Runner` constructor, we should simply get the global event loop instead of creating a new one, if no explicit loop is passed into the constructor. This also implies that the runner should never take charge in closing the loop, because it no longer owns the global loop. In addition, process functions now simply use the global runner instead of creating a new runner. This used to be necessary because running in the same runner, would mean running in the same loop and so the child process would block the parent. However, with the new design on `asyncio`, everything runs in a single reentrant loop and so child processes no longer need to spawn their own independent nested runner.
When a daemon runner is started, the `SIGINT` and `SIGTERM` signals are captured to shutdown the runner before exiting the interpreter. However, the async tasks associated with the interpreter should be properly canceled first.
The event loop implementation of `asyncio` does not allow to make the event loop to be reentrant, which essentially means that event loops cannot be nested. One event loop cannot be run within another event loop. However, this concept is crucial for `plumpy`'s design to work and was perfectly allowed by the previous event loop provider `tornado`. To work around this, `plumpy` uses the library `nest_asyncio` to patch the `asyncio` event loop and make it reentrant. The trick is that this should be applied at the correct time. Here we update the `Runner` to enable `plumpy`'s event loop policy, which will patch the default event loop policy. This location is chosen since any process in `aiida-core` *has* to be run by a `Runner` and only one runner instance will ever be created in a Python interpreter. When the runner shuts down, the event policy is reset to undo the patch.
RabbitMQ 3.6 changed the way integer values are interpreted for connection parameters. This would cause certain integer values that used to be perfectly acceptable, to all of suddent cause the declaration of resources, such as channels and queues, to fail. The library `pamqp`, that is used by `aiormq`, which in turn is used ultimately by `kiwipy` to communicate with the RabbitMQ server, adapted to these changes, but this would break code with RabbitMQ 3.5 that used to work just fine. For example, the message TTL when declaring a queue would now fail when `32767 < TTL < 655636` due to incorrect interpretation of the integer type. The library `pamqp` provides a way to enable compatibility with these older versions. One should merely call the method: pamqp.encode.support_deprecated_rabbitmq() This will enable the legacy integer conversion table and will restore functionality for RabbitMQ 3.5.
ee92f59
to
80af135
Compare
Fixes #3184
PR #4218 is rebased here for easy to review.