Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with new asyncio daemon (stress-test) #4595

Closed
giovannipizzi opened this issue Nov 28, 2020 · 51 comments · Fixed by #5732
Closed

Issues with new asyncio daemon (stress-test) #4595

giovannipizzi opened this issue Nov 28, 2020 · 51 comments · Fixed by #5732

Comments

@giovannipizzi
Copy link
Member

I have tried a stress-test of the new AiiDA daemon after the replacement of tornado with asyncio.

I have updated to the most recent develop (716a1d8), updated also aiida-qe to develop (commit 1a9713aefbcd235c20ecfb65d0df226b5544bf7d of that repo), pip installed both, run reentry scan, and stopped+started the daemon.

I try to roughly describe also what I've been doing.
Then, roughly, I have prepared a script to submit something of the order of ~2000 relax workflows.
While the submission was happening, I quickly reached the number of slots (warning message at the end of verdi process list indicating a % > 100%), so I did verdi daemon incr 7 to work with 8 workers.
After having submitted more than half of the workflows, I stopped because anyway 8 workers weren't enough and I didn't want to overload the supercomputer with too many connections from too many workers.
I left it run overnight, the next morning I was in a stalled situation all slots were taken, so I increased a bit more the workers, and after a while submitted the rest of the workflows, and let them finish.
Since I realised that most were excepting (see below), I also stopped the daemon (that took a bit, made sure it was stopped, and started again with just one worker to finish the work.

I have seen a number of issues unfortunately ( :-( ) where most calculations had some kind of problem. Pinging @sphuber @unkcpz @muhrin as they have been working on this so they should be able to help debugging/fixing the bugs.

I am going report below as different comments some of the issues that I'm seeing, but I'm not sure how to debug more, so if you need specific logs please let me know what to run (or @sphuber I can give you temporarily access to the machine if it's easier).
While I write I have the last few (~30) jobs finishing, but I can already start reporting the issues I see.

@giovannipizzi
Copy link
Member Author

  • a few of the relax work chains are left in the Created state, as well as some of the internals PwBaseWorkChains, and even some PwCalculations:
27034  14h ago    PwRelaxWorkChain  ⏹ Created
33506  7h ago     PwCalculation     ⏹ Created
33986  7h ago     PwCalculation     ⏹ Created
33983  7h ago     PwBaseWorkChain   ⏹ Created
33989  7h ago     PwCalculation     ⏹ Created
33992  6h ago     PwBaseWorkChain   ⏹ Created
34025  6h ago     PwBaseWorkChain   ⏹ Created
34028  6h ago     PwBaseWorkChain   ⏹ Created
34090  6h ago     PwCalculation     ⏹ Created
34147  6h ago     PwBaseWorkChain   ⏹ Created
34153  6h ago     PwBaseWorkChain   ⏹ Created
34156  6h ago     PwBaseWorkChain   ⏹ Created
34162  6h ago     PwCalculation     ⏹ Created
34174  6h ago     PwBaseWorkChain   ⏹ Created
34189  6h ago     PwBaseWorkChain   ⏹ Created
34207  6h ago     PwBaseWorkChain   ⏹ Created
34391  6h ago     PwCalculation     ⏹ Created
34394  6h ago     PwBaseWorkChain   ⏹ Created
35252  6h ago     PwCalculation     ⏹ Created
35467  5h ago     PwCalculation     ⏹ Created
35482  5h ago     PwCalculation     ⏹ Created
35570  5h ago     PwCalculation     ⏹ Created
35869  5h ago     PwCalculation     ⏹ Created
39375  3h ago     PwCalculation     ⏹ Created

(Since I hit CTRL+C only once yesterday night, and restarted the daemon once less than 1h ago, these shouldn't be connected to an action from my side).

@giovannipizzi
Copy link
Member Author

I have many (most?) of the calculations and workflows excepted.
Running

from collections import Counter

qb = QueryBuilder()
qb.append(Group, filters={'label': 'MY_GROUP_NAME'}, tag='g')
qb.append(Node, with_group='g', project=['id', 'attributes.exit_status'])
res = dict(qb.all())
print(Counter(res.values()))

I get

Counter({None: 1103, 401: 984, 0: 27, 402: 18})

as you see most of them failed, many without even an exit status (None). I report here some of the reports:

Example of a 401

$ verdi process report 16518

2020-11-28 01:14:49 [3203 | REPORT]: [16518|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29477>
2020-11-28 06:39:43 [5230 | REPORT]:   [29477|PwBaseWorkChain|run_process]: launching PwCalculation<32972> iteration #1
2020-11-28 07:07:45 [7357 | REPORT]:   [29477|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 07:18:17 [9388 | REPORT]: [16518|PwRelaxWorkChain|inspect_relax]: relax PwBaseWorkChain failed with exit status 301
2020-11-28 07:18:17 [9389 | REPORT]: [16518|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

and the corresponding failed (excepted) calculation has:

!verdi process report 32972
*** 32972: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 06:58:59.147179+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
 |     result = await super()._continue(communicator, pid, nowait, tag)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
 |     proc = saved_state.unbundle(self._load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
 |     return Savable.load(self, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
 |     return load_cls.recreate_from(saved_state, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
 |     base.call_with_super_check(process.init)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
 |     wrapped(*args, **kwargs)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
 |     super().init()
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
 |     wrapped(self, *args, **kwargs)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
 |     identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
 |     return self._communicator.add_rpc_subscriber(converted, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
 |     self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
 |     return self.__get_result()
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
 |     raise self._exception
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
 |     result = done_future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
 |     result = coro.throw(exc)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
 |     return await awaitable
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
 |     identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
 |     rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
 |     await queue.declare(timeout=timeout)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
 |     timeout=timeout,
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
 |     return await fut
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
 |     return await self.create_task(func(self, *args, **kwargs))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
 |     return await self.task
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
 |     yield self  # This tells Task to wait for completion.
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
 |     result = coro.send(None)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
 |     raise ChannelInvalidStateError("writer is None")
 | aiormq.exceptions.ChannelInvalidStateError: writer is None

and

$ verdi process status 16518
PwRelaxWorkChain<16518> Finished [401] [1:while_(should_run_relax)(1:inspect_relax)]
    └── PwBaseWorkChain<29477> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<32972> Excepted

So it failed because an internal step excepted.

Example of a None

$verdi process report 16663

2020-11-28 01:18:21 [3319 | REPORT]: [16663|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29652>
2020-11-28 06:40:24 [5336 | REPORT]:   [29652|PwBaseWorkChain|run_process]: launching PwCalculation<33293> iteration #1
2020-11-28 07:08:13 [7465 |  ERROR]: Traceback (most recent call last):
  File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
    result = await super()._continue(communicator, pid, nowait, tag)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
    proc = saved_state.unbundle(self._load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
    return Savable.load(self, load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
    return load_cls.recreate_from(saved_state, load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
    base.call_with_super_check(process.init)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
    super().init()
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
    wrapped(self, *args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
    identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
    return self._communicator.add_rpc_subscriber(converted, identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
    self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
    result = done_future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
    result = coro.throw(exc)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
    return await awaitable
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
    identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
    rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
    timeout=timeout,
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
    await queue.declare(timeout=timeout)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
    timeout=timeout,
  File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
    return await fut
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
    timeout=timeout,
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
    result = coro.send(None)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

2020-11-28 07:08:24 [7597 | REPORT]:   [29652|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned

and

$verdi process status 16663

PwRelaxWorkChain<16663> Excepted [1:while_(should_run_relax)]
    └── PwBaseWorkChain<29652> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<33293> Excepted

So also in this case it's an internal step that excepted - I'm not sure of the difference between the two

The 402 (I checked a couple) only happened at the very beginning of the submission, or a couple after a while, here are the PKs:

[1680, 1762, 1886, 1951, 2044, 2061, 2247, 2295, 2357, 2436, 2580, 2594, 2611, 2656, 2876, 2890, 26166, 26572]

Also in this case probably the problem is similar:

$ verdi process report 26166

2020-11-28 07:03:10 [6908  | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<34816>
2020-11-28 07:09:09 [8037  | REPORT]:   [34816|PwBaseWorkChain|run_process]: launching PwCalculation<35225> iteration #1
2020-11-28 11:40:36 [11802 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> run with smearing and highest band is occupied
2020-11-28 11:40:36 [11803 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: BandsData<40529> has invalid occupations: Occupation of 2.0 at last band lkn<0,0,200>
2020-11-28 11:40:36 [11804 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> had insufficient bands
2020-11-28 11:40:36 [11805 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: Action taken: increased number of bands to 210 and restarting from scratch
2020-11-28 11:40:36 [11806 | REPORT]:   [34816|PwBaseWorkChain|inspect_process]: PwCalculation<35225> finished successfully but a handler was triggered, restarting
2020-11-28 11:40:36 [11807 | REPORT]:   [34816|PwBaseWorkChain|run_process]: launching PwCalculation<40535> iteration #2
2020-11-28 13:02:41 [11886 | REPORT]:   [34816|PwBaseWorkChain|results]: work chain completed after 2 iterations
2020-11-28 13:02:42 [11887 | REPORT]:   [34816|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:02:42 [11888 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 1 cell volume of relaxed structure is 519.7184571850364
2020-11-28 13:02:42 [11889 | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<40724>
2020-11-28 13:02:43 [11890 | REPORT]:   [40724|PwBaseWorkChain|run_process]: launching PwCalculation<40727> iteration #1
2020-11-28 13:10:43 [11891 | REPORT]:   [40724|PwBaseWorkChain|results]: work chain completed after 1 iterations
2020-11-28 13:10:43 [11892 | REPORT]:   [40724|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:10:43 [11893 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 2 cell volume of relaxed structure is 519.7184503782762
2020-11-28 13:10:43 [11894 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: relative cell volume difference 1.3097014580438056e-08 smaller than convergence threshold 0.01
2020-11-28 13:10:43 [11895 | REPORT]: [26166|PwRelaxWorkChain|run_final_scf]: launching PwBaseWorkChain<40736> for final scf
2020-11-28 13:10:45 [11896 | REPORT]:   [40736|PwBaseWorkChain|run_process]: launching PwCalculation<40739> iteration #1
2020-11-28 13:11:45 [11898 | REPORT]:   [40736|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:11:45 [11899 | REPORT]: [26166|PwRelaxWorkChain|inspect_final_scf]: final scf PwBaseWorkChain failed with exit status 301
2020-11-28 13:11:45 [11900 | REPORT]: [26166|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

and

$verdi process status 26166

PwRelaxWorkChain<26166> Finished [402] [2:if_(should_run_final_scf)(1:inspect_final_scf)]
    ├── PwBaseWorkChain<34816> Finished [0] [7:results]
    │   ├── PwCalculation<35225> Finished [0]
    │   └── PwCalculation<40535> Finished [0]
    ├── PwBaseWorkChain<40724> Finished [0] [7:results]
    │   └── PwCalculation<40727> Finished [0]
    └── PwBaseWorkChain<40736> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<40739> Excepted

and finally:

$ verdi process report 40739

*** 40739: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 13:10:45.225990+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
 |     result = await super()._continue(communicator, pid, nowait, tag)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
 |     proc = saved_state.unbundle(self._load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
 |     return Savable.load(self, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
 |     return load_cls.recreate_from(saved_state, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
 |     base.call_with_super_check(process.init)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
 |     wrapped(*args, **kwargs)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
 |     super().init()
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
 |     wrapped(self, *args, **kwargs)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
 |     identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
 |     return self._communicator.add_rpc_subscriber(converted, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
 |     self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
 |     return self.__get_result()
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
 |     raise self._exception
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
 |     result = done_future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
 |     result = coro.throw(exc)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
 |     return await awaitable
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
 |     identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
 |     rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
 |     await queue.declare(timeout=timeout)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
 |     timeout=timeout,
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
 |     return await fut
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
 |     return await self.create_task(func(self, *args, **kwargs))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
 |     return await self.task
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
 |     yield self  # This tells Task to wait for completion.
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
 |     result = coro.send(None)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
 |     raise ChannelInvalidStateError("writer is None")
 | aiormq.exceptions.ChannelInvalidStateError: writer is None

@giovannipizzi
Copy link
Member Author

While this seems to be a quite important bug that happens often and has quite some important consequences for users, the "good news" is that from this simple analysis it seems to be mostly generated by the same aiormq.exceptions.ChannelInvalidStateError: writer is None error, so maybe fixing this would fix the majority of the problems (except maybe for the few calculations and workflows that were left in the Created state?

@giovannipizzi
Copy link
Member Author

A final comment there are a few more e.g. this:

$  verdi process report 45613
2020-11-28 13:16:06 [12490 | REPORT]: [45613|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<46239>
2020-11-28 13:16:17 [12531 | REPORT]:   [46239|PwBaseWorkChain|run_process]: launching PwCalculation<46565> iteration #1
2020-11-28 13:25:50 [13890 | REPORT]:   [46239|PwBaseWorkChain|on_except]: Traceback (most recent call last):
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
    next_state = await self._run_task(self._state.execute)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
    result = await coro(*args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
    result = await self._waiting_future
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

2020-11-28 13:25:50 [13893 | REPORT]:   [46239|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:25:59 [14414 | REPORT]: [45613|PwRelaxWorkChain|on_except]: Traceback (most recent call last):
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
    next_state = await self._run_task(self._state.execute)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
    result = await coro(*args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
    result = await self._waiting_future
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

2020-11-28 13:25:59 [14419 | REPORT]: [45613|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

The error is different, and while the workchain is failed, there some leftover orphan calculations still queued or running on the supercomputer:

$ verdi process status 45613
PwRelaxWorkChain<45613> Excepted [1:while_(should_run_relax)]
    └── PwBaseWorkChain<46239> Excepted [6:while_(should_run_process)(1:run_process)]
        └── PwCalculation<46565> Waiting

and

$ verdi process list | grep 46565
46565  53m ago    PwCalculation     ⏵ Waiting        Monitoring scheduler: job state RUNNING

@unkcpz
Copy link
Member

unkcpz commented Nov 29, 2020

@giovannipizzi thanks for such elaborate bug report!

As for the 401 and None cases, after reading the traceback, though I'm not sure what's going on here, it reminds me of a piece of code haunt hideously in my head. In the traceback, the program complaint when it adding the rpc subscriber to the communicator(a LoopCommunicator), this involves calling the plum_to_kiwi_future of plumpy. In this function, there are lines never get tested by the unittests but the same code once caused problems elsewhere. That's the if isinstance(result, futures.Future): to get the result from the future and unwrap it if it's another future. But when it is a SavableFuture it is a _asyncio.Future (PY>3.6) which not an instance of plumpy.Future (a asyncio.Future). So in aiida_core I use asyncio.isfuture(result) to identify it.

I'm not sure this relates to this issue, but it might be an inspiration for how to debug this. @muhrin is more expert in this, looking forward to him for inputs.

@giovannipizzi
Copy link
Member Author

Thanks @unkcpz !
Unfortunately I'm also not an expert of that code.
One important thing to mention (see #4598) is that I had been running with a lot of jobs, so (maybe?) there was just a system overload? I'm not sure, the errors are different than in #4598, but just to keep in mind.
In case I could try to run again in a few days, but without "exaggerating" (e.g. only ~200 was at at time) to see if the problem appears also with low computer load. Otherwise, any help in how to debug this more would be appreciated (it's relatively simple to reproduce, I'm just running a lot of PwRelaxWorkChains).

@sphuber
Copy link
Contributor

sphuber commented Nov 30, 2020

Thanks for the report @giovannipizzi. The exit codes here are not really useful. All those exit codes of the PwBaseWorkChain and PwRelaxWorkChain simply reflect that a sub process failed and so they don't provide any useful information. Note that the cases where exit_status == None that also apply to processes that are either not yet terminated, or are excepted/killed. Finding those in and of themselves then is not problematic, especially if you know that you have a lot of excepted processes. The most interesting query we should do here is simply find all the excepted processes and retrieve the corresponding exception. As you have noticed, there probably are a few types of exceptions that then cause all sorts of knock on effects.

The most critical one seems to be:

aiormq.exceptions.ChannelInvalidStateError: writer is None

Here there seems to be something wrong with the connection to RabbitMQ and so any operation that involves it will raise an exception that will cause the process to fall over. The other one:

concurrent.futures._base.CancelledError

is likely just a result of the former one. A process was waiting on a sub process, which excepted due to the first, causing the associated future to be cancelled and here I think we might not yet be catching the correct exceptions. Either because it has been changed, or because multiple ones can be thrown and we didn't expect this one.

@giovannipizzi
Copy link
Member Author

OK - I'll try to get more information about the actual errors/exceptions.

Anyway, would it be possible (at least for now, or maybe even in the mid term) to 'catch' these specific exceptions and just consider them as connection failures, so AiiDA's retry mechanisms is triggered and eventually the process is paused if the problem persists?

I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?

@sphuber
Copy link
Contributor

sphuber commented Nov 30, 2020

I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?

That would be ideal, but I think this is not going be straightforward to implement because we cannot put this into plumpy where we catch all exceptions and retry. Some exceptions are real and need to fail the process. It won't be possible to have plumpy automatically determine which failures can be transient and need to be "retried", where the last is in brackets because even that is not straightforward. The exponential backoff mechanism is something we implemented completely on top of the Process state machine, specifically and only for the CalcJob subclass. It does not exist as a basic piece of the machinery.

@muhrin
Copy link
Contributor

muhrin commented Nov 30, 2020

Hi Gio,

If there's any way you can still reproduce this then it might be worth looking at whether there could be a connection between the CancelledError and the ChannelInvalidStateError.

In aiormq there are only a few ways that the writer can be set to None and one of them is if a CancelledError [1] is caught in an RPC call.

So, if you can reproduce it, would you be able to enable logging for aiormq, and possibly asynio? We might see the signature "Closing channel .. because RPC call ... cancelled" warning log, but I would have thought that this would appear by default (as it is at WARNING level).

The only other ways I can see, at the moment, that the writer could be None is if the connector had a None writer when the Channel was created, or if the Connection itself was closed which I think is probably the most likely, likley due to missed heartbeats.

[1] This CancelledError is indeed a concurrent.futures._base.CancelledError if you follow the alias from asyncio

@giovannipizzi
Copy link
Member Author

Hi Martin,
I could manage to run again in a few days maybe. Just to be sure, how should I increase the logging leve exactly? Is it something I can do from verdi config? (Just to be sure that we have all information). Also, it seems AiiDA is rotating the logs very quickly, maybe I should increase some config to avoid to loose too much information?

@unkcpz
Copy link
Member

unkcpz commented Dec 3, 2020

, how should I increase the logging level exactly? Is it something I can do from verdi config?

Hi @giovannipizzi, the logger setter of asyncio (it is there to configure the tornado logger setting but I removed it since it is less used) was moved in this new commit. The easiest way(I guess so. @sphuber correct me if I'm wrong) to enable logger of aiormq and asyncio would be add them to the logger setter. Then you can config it through verdi config logging.asyncio_loglevel DEBUG.

@giovannipizzi
Copy link
Member Author

Just a short message to confirm that, with a "low throughput" (100 WorkChains/~300 processes running at the same time), everything works fine (I submitted a total of ~1000 work chains).
So I guess these errors start occurring only when the machine gets slow because of too many submissions - still to investigate, but maybe to do together with #4603 and #4598

@chrisjsewell chrisjsewell changed the title Issues with new asyncio daemon Issues with new asyncio daemon (stress-test) Jan 13, 2021
@chrisjsewell
Copy link
Member

chrisjsewell commented Feb 25, 2021

first to note, there is now an logging.aiopika_loglevel config value, and also kiwipy has debugging for what messages are sent (since the aiopika logging does not give the content of the messages)

then for the aiormq.exceptions.ChannelInvalidStateError, as already mentioned it is some issue with connecting to rabbitmq (the communicator has been closed?) that is possibly beyond our control.

In Process.init we already catch a connection error, in the form of futures.TimeoutError and ignore it: https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy

            try:
                identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
                self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

this means that process does not have except, but the trade-off is that it will be unreachable if trying to use verdi kill.
So we could also catch: aiormq.exceptions.ChannelInvalidStateError and accept this trade-off.

Alternatively, at a "higher level", perhaps it would be better not to ignore these exceptions but rather, in plumpy.ProcessLauncher._continue catch them and convert them to a TaskRejected exception (and also make sure this does not trigger aiida.ProcessLauncher.handle_continue_exception).

https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy/process_comms.py#L586

result = await super()._continue(communicator, pid, nowait, tag)

If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers.
Then you should not end up with unreachable processes.

How's this sound @muhrin @sphuber?

@chrisjsewell
Copy link
Member

from #4598 kiwipy.communications.DuplicateSubscriberIdentifier should also be add to the exceptions that are handled and result in a TaskRejected exception

in respect to:

           try:
                identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
                self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

            try:
                identifier = self._communicator.add_broadcast_subscriber(
                    self.broadcast_receive, identifier=str(self.pid)
                )
                self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)

I guess there should be some thought on what happens if add_rpc_subscriber is successful and add_broadcast_subscriber fails, i.e. we should try to remove_rpc_subscriber (catching exceptions)?

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers.
Then you should not end up with unreachable processes.

Yes, if in ProcessLauncher._continue we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ. However, this might not actually be a solution. If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever. Once all daemon workers have the same problem, they will all just fail to restart processes and send the task back, causing the tasks to start ping-ponging. So the proposed fix would be one if and only if we also fix the underlying communicator problem, or determine that it is transient and not permanent for a daemon worker.

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

from #4598 kiwipy.communications.DuplicateSubscriberIdentifier should also be add to the exceptions that are handled and result in a TaskRejected exception

Same story here, it would definitely be good to handle this more gracefully and reject the task instead of excepting the process. However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.

@chrisjsewell
Copy link
Member

thanks for the reply @sphuber

Yes, if in ProcessLauncher._continue we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ.

note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception:

except Exception as exception:
message = 'failed to recreate the process instance in order to continue it.'
self.handle_continue_exception(node, exception, message)
raise

i.e. here we should at least catch TaskRejected exceptions separately and not call handle_continue_exception before re-raising

If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever.

Indeed, if it is a "permanently broken" communicator then I guess we should just kill the daemon worker.
Within aiida.ProcessLauncher._continue you could certainly implement an (exponential backoff) retry mechanism, whereby if the Process initiation fails, you simply sleep for some time then retry. Then perhaps if these retries fail you kill the daemon worker entirely.

However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.

indeed but its just so hard to debug, because it is so hard to consistently replicate.
perhaps there is some way to mock rabbitmq heartbeats being missed 🤷

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception:

I know, I said if we were to start doing that, which to me seems like a good idea indeed, provided that we fix the underlying problem of a potentially "permanently" broken communicator. Here we should consult with @muhrin because recent versions of kiwipy should have a robust communicator that reconnects automatically, in which case either rejecting the task or having an automatic retry would maybe work. The reject would definitely be the simplest.

@chrisjsewell
Copy link
Member

chrisjsewell commented Feb 26, 2021

So just to consolidate known failure modes when adding the process rpc/broadcast reciever:

  • kiwipy.DuplicateSubscriberIdentifier: it seems this should always be rejected, because we don't want multiple instances of the same process running. (see also in A lot of failed jobs in AiiDA 1.5 (DuplicateSubcriber) when machine is overloaded #4598 and Crash of all workflows (DuplicateSubscriberIdentifier) #3973)
  • aiormq.exceptions.ChannelInvalidStateError: writer is None
  • pika.exceptions.ChannelWrongStateError: Channel is closed: noted in slack
  • RuntimeError: The channel is closed: from topika.common, noted in slack, but topika is no longer used I guess?
  • future.TimoutError: this one is perhaps a bit tricky, because perhaps a timeout does not always mean that the subscriber has not been added, just that we do not receive a confirmation quick enough. In fact thinking on it now, perhaps this is related to kiwipy.DuplicateSubscriberIdentifier, since if this exception is encountered then a broadcast remove task will not be added to the process cleanup. Perhaps @giovannipizzi could check if when he sees this exception, he also sees "failed to register as a broadcast subscriber" in the daemon log?
    (EDIT: its probably more likely related to a heartbeat missed. but good to check)

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 2, 2021

Could we add an option to the communicator in kiwipy, to add a callback to RobustConnection.add_reconnect_callback that does this daemon crash (i.e. raises an exception)?

@chrisjsewell
Copy link
Member

Could we add an option to the communicator in kiwipy, to add a callback to RobustConnection.add_reconnect_callback that does this daemon crash (i.e. raises an exception)?

See aiidateam/kiwipy#104

@chrisjsewell
Copy link
Member

the next part of my master plan lol: aiidateam/plumpy#213

@unkcpz
Copy link
Member

unkcpz commented Dec 1, 2021

I encounter the error aiormq.exceptions.ChannelInvalidStateError: writer is None again with only submitting two workchains which will generate > 200 processes. I don't think my case is very stressful but I can reproduce the error with 8 daemon workers. What is strange to me is that from the failed workchain report, the ERROR report is from a RemoteData node:

2021-11-25 02:24:18 [11057 |  ERROR]: Traceback (most recent call last):                                               
  File "/home/jyu/miniconda3/envs/aiida-sssp-dev/lib/python3.9/site-packages/aiida/manage/external/rmq.py", line 208, in _continue
    result = await super()._continue(communicator, pid, nowait, tag)                                                   
  File "/home/jyu/miniconda3/envs/aiida-sssp-dev/lib/python3.9/site-packages/plumpy/process_comms.py", line 607, in _continue
    proc = cast('Process', saved_state.unbundle(self._load_context))                                                   
  File "/home/jyu/miniconda3/envs/aiida-sssp-dev/lib/python3.9/site-packages/plumpy/persistence.py", line 60, in unbundle
    return Savable.load(self, load_context)                             
.....
  File "/home/jyu/miniconda3/envs/aiida-sssp-dev/lib/python3.9/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

The 11057 is a RemoteData node. This makes no sense to me why this node will report the error.

Since this issue thread diverge to general discussion about the related problem, do I need to create a specific issue for the writer is None issue?

@sphuber
Copy link
Contributor

sphuber commented Dec 1, 2021

The 11057 is the pk of the Log entry, not the node it is attached to.

@unkcpz
Copy link
Member

unkcpz commented Dec 1, 2021

Ah~ true, my bad.

@ltalirz
Copy link
Member

ltalirz commented Jan 18, 2022

Hey guys, so what is the current wisdom on ChannelInvalidStateError: writer is None?
Just received another report on this. This seems serious to me?

@ryotatomioka
Copy link

Thanks @ltalirz . This is my traceback:

In [53]: submit(builder)
---------------------------------------------------------------------------
ChannelInvalidStateError                  Traceback (most recent call last)
<ipython-input-53-4cf87a5f8414> in <module>
----> 1 submit(builder)

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiida/engine/launch.py in submit(process, **inputs)
     99     assert runner.controller is not None, 'runner does not have a persister'
    100
--> 101     process_inited = instantiate_process(runner, process, **inputs)
    102
    103     # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiida/engine/utils.py in instantiate_process(runner, process, *args, **inputs)
     63         raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder')
     64
---> 65     process = process_class(runner=runner, inputs=inputs)
     66
     67     return process

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in __call__(cls, *args, **kwargs)
    191         """
    192         inst = super().__call__(*args, **kwargs)
--> 193         inst.transition_to(inst.create_initial_state())
    194         call_with_super_check(inst.init)
    195         return inst

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, *args, **kwargs)
    333                 raise
    334             self._transition_failing = True
--> 335             self.transition_failed(initial_state_label, label, *sys.exc_info()[1:])
    336         finally:
    337             self._transition_failing = False

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in transition_failed(initial_state, final_state, exception, trace)
    349
    350         """
--> 351         raise exception.with_traceback(trace)
    352
    353     def get_debug(self) -> bool:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in transition_to(self, new_state, *args, **kwargs)
    318
    319             try:
--> 320                 self._enter_next_state(new_state)
    321             except StateEntryFailed as exception:
    322                 # Make sure we have a state instance

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in _enter_next_state(self, next_state)
    384         next_state.do_enter()
    385         self._state = next_state
--> 386         self._fire_state_event(StateEventHook.ENTERED_STATE, last_state)
    387
    388     def _create_state_instance(self, state: Union[Hashable, State, Type[State]], *args: Any, **kwargs: Any) -> State:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/base/state_machine.py in _fire_state_event(self, hook, state)
    297     def _fire_state_event(self, hook: Hashable, state: Optional[State]) -> None:
    298         for callback in self._event_callbacks.get(hook, []):
--> 299             callback(self, hook, state)
    300
    301     @super_check

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py in <lambda>(_s, _h, from_state)
    324             lambda _s, _h, state: self.on_entering(cast(process_states.State, state)),
    325             state_machine.StateEventHook.ENTERED_STATE:
--> 326             lambda _s, _h, from_state: self.on_entered(cast(Optional[process_states.State], from_state)),
    327             state_machine.StateEventHook.EXITING_STATE:
    328             lambda _s, _h, _state: self.on_exiting()

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiida/engine/processes/process.py in on_entered(self, from_state)
    388         self._save_checkpoint()
    389         set_process_state_change_timestamp(self)
--> 390         super().on_entered(from_state)
    391
    392     @override

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/processes.py in on_entered(self, from_state)
    698             self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
    699             try:
--> 700                 self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
    701             except ConnectionClosed:
    702                 message = 'Process<%s>: no connection available to broadcast state change from %s to %s'

~/miniconda3/envs/aiida/lib/python3.9/site-packages/plumpy/communications.py in broadcast_send(self, body, sender, subject, correlation_id)
    173         correlation_id: Optional['ID_TYPE'] = None
    174     ) -> futures.Future:
--> 175         return self._communicator.broadcast_send(body, sender, subject, correlation_id)
    176
    177     def is_closed(self) -> bool:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py in broadcast_send(self, body, sender, subject, correlation_id)
    225     def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
    226         self._ensure_open()
--> 227         result = self._loop_scheduler.await_(
    228             self._communicator.broadcast_send(body=body, sender=sender, subject=subject, correlation_id=correlation_id)
    229         )

~/miniconda3/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py in await_(self, awaitable, name)
    157         """
    158         try:
--> 159             return self.await_submit(awaitable).result(timeout=self.task_timeout)
    160         except concurrent.futures.TimeoutError as exc:
    161             # Try to get a reasonable name for the awaitable

~/miniconda3/envs/aiida/lib/python3.9/concurrent/futures/_base.py in result(self, timeout)
    443                     raise CancelledError()
    444                 elif self._state == FINISHED:
--> 445                     return self.__get_result()
    446                 else:
    447                     raise TimeoutError()

~/miniconda3/envs/aiida/lib/python3.9/concurrent/futures/_base.py in __get_result(self)
    388         if self._exception:
    389             try:
--> 390                 raise self._exception
    391             finally:
    392                 # Break a reference cycle with the exception in self._exception

~/miniconda3/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py in done(done_future)
     34         # Copy over the future
     35         try:
---> 36             result = done_future.result()
     37             if asyncio.isfuture(result):
     38                 # Change the aio future to a thread future

~/miniconda3/envs/aiida/lib/python3.9/asyncio/futures.py in result(self)
    199         self.__log_traceback = False
    200         if self._exception is not None:
--> 201             raise self._exception
    202         return self._result
    203

~/miniconda3/envs/aiida/lib/python3.9/asyncio/tasks.py in __step(***failed resolving arguments***)
    256                 result = coro.send(None)
    257             else:
--> 258                 result = coro.throw(exc)
    259         except StopIteration as exc:
    260             if self._must_cancel:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/pytray/aiothreads.py in proxy()
    176             async def proxy():
    177                 if not future.cancelled():
--> 178                     return await awaitable
    179
    180             coro_future = asyncio.ensure_future(proxy(), loop=self._loop)

~/miniconda3/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py in broadcast_send(self, body, sender, subject, correlation_id)
    485
    486     async def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
--> 487         publisher = await self.get_message_publisher()
    488         result = await publisher.broadcast_send(body, sender, subject, correlation_id)
    489         return result

~/miniconda3/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/communicator.py in get_message_publisher(self)
    412             )
    413
--> 414             await publisher.connect()
    415             self._message_publisher = publisher
    416

~/miniconda3/envs/aiida/lib/python3.9/site-packages/kiwipy/rmq/messages.py in connect(self)
    159             return
    160
--> 161         self._channel = await self._connection.channel(
    162             publisher_confirms=self._confirm_deliveries, on_return_raises=True
    163         )

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py in __await__(self)
    125
    126     def __await__(self):
--> 127         yield from self.initialize().__await__()
    128         return self
    129

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aio_pika/robust_channel.py in initialize(self, timeout)
     85
     86     async def initialize(self, timeout: TimeoutType = None) -> None:
---> 87         await super().initialize(timeout)
     88         self.add_close_callback(self._on_channel_close)
     89

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py in initialize(self, timeout)
    170             raise RuntimeError("Can't initialize channel")
    171
--> 172         self._channel = await asyncio.wait_for(
    173             self._create_channel(), timeout=timeout,
    174         )

~/miniconda3/envs/aiida/lib/python3.9/asyncio/tasks.py in wait_for(fut, timeout, loop)
    440
    441     if timeout is None:
--> 442         return await fut
    443
    444     if timeout <= 0:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aio_pika/channel.py in _create_channel(self)
    160         await self._connection.ready()
    161
--> 162         return await self._connection.connection.channel(
    163             publisher_confirms=self._publisher_confirms,
    164             on_return_raises=self._on_return_raises,

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiormq/connection.py in channel(self, channel_number, publisher_confirms, frame_buffer, **kwargs)
    525
    526         try:
--> 527             await channel.open()
    528         except Exception:
    529             self.channels[channel_number] = None

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py in open(self)
    172
    173     async def open(self):
--> 174         frame = await self.rpc(spec.Channel.Open())
    175
    176         if self.publisher_confirms:

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiormq/base.py in wrap(self, *args, **kwargs)
    166     async def wrap(self: "Base", *args, **kwargs):
    167         # noinspection PyCallingNonCallable
--> 168         return await self.create_task(func(self, *args, **kwargs))
    169
    170     return wrap

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiormq/base.py in __inner(self)
     23     async def __inner(self):
     24         try:
---> 25             return await self.task
     26         except asyncio.CancelledError as e:
     27             raise self.exception from e

~/miniconda3/envs/aiida/lib/python3.9/asyncio/futures.py in __await__(self)
    282         if not self.done():
    283             self._asyncio_future_blocking = True
--> 284             yield self  # This tells Task to wait for completion.
    285         if not self.done():
    286             raise RuntimeError("await wasn't used with future")

~/miniconda3/envs/aiida/lib/python3.9/asyncio/tasks.py in __wakeup(self, future)
    326     def __wakeup(self, future):
    327         try:
--> 328             future.result()
    329         except BaseException as exc:
    330             # This may also be a cancellation.

~/miniconda3/envs/aiida/lib/python3.9/asyncio/futures.py in result(self)
    199         self.__log_traceback = False
    200         if self._exception is not None:
--> 201             raise self._exception
    202         return self._result
    203

~/miniconda3/envs/aiida/lib/python3.9/asyncio/tasks.py in __step(***failed resolving arguments***)
    254                 # We use the `send` method directly, because coroutines
    255                 # don't have `__iter__` and `__next__` methods.
--> 256                 result = coro.send(None)
    257             else:
    258                 result = coro.throw(exc)

~/miniconda3/envs/aiida/lib/python3.9/site-packages/aiormq/channel.py in rpc(self, frame, timeout)
    119
    120         if self.writer is None:
--> 121             raise ChannelInvalidStateError("writer is None")
    122
    123         lock = self.lock

ChannelInvalidStateError: writer is None

I was trying to submit the calculation

Property     Value
-----------  ------------------------------------
type         PwCalculation
state        Created
pk           117
uuid         3483ff47-f390-4380-b627-aee7228a18bb
label
description
ctime        2022-01-18 09:51:23.390114+00:00
mtime        2022-01-18 09:51:23.564311+00:00
computer     [3] amlt3

Inputs      PK    Type
----------  ----  -------------
pseudos
    Ba      29    UpfData
    Ti      109   UpfData
    O       31    UpfData
code        112   Code
kpoints     114   KpointsData
parameters  115   Dict
structure   113   StructureData

which was taken from the tutorial.

There is no obvious error message in my case.

$ verdi process report 116
*** 116: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 0 LOG MESSAGES

verdi status shows everything is ok.

My environment:

  • aiida-core==1.6.5
  • aiida-quantumespresso==3.5.1
  • aiida-pseudo==0.6.3

@ltalirz
Copy link
Member

ltalirz commented Jan 18, 2022

From #5031 (comment) do I understand correctly that this issue stems from #5105 ?

I.e. that the solution is to downgrade to rabbitmq 3.7.x (or edit the config file as described in #5031 (comment)) ?

Edit: Just to mention that for me on Ubuntu 20.04 with rabbitmq 3.8.2 the following runs fine

from aiida import load_profile, orm, plugins, engine
load_profile()

#builder = orm.Code.get_from_string('pw-6.3@TheHive').get_builder()
builder = orm.Code.get_from_string('qe@localhost').get_builder()

# BaTiO3 cubic structure
alat = 4.  # angstrom
cell = [[alat, 0., 0.], [0., alat, 0.], [0., 0., alat]]
s = plugins.DataFactory('structure')(cell=cell)
s.append_atom(position=(0., 0., 0.), symbols='Ba')
s.append_atom(position=(alat / 2., alat / 2., alat / 2.), symbols='Ti')
s.append_atom(position=(alat / 2., alat / 2., 0.), symbols='O')
s.append_atom(position=(alat / 2., 0., alat / 2.), symbols='O')
s.append_atom(position=(0., alat / 2., alat / 2.), symbols='O')
builder.structure = s
builder.pseudos = orm.load_group('SSSP/1.1/PBE/efficiency').get_pseudos(structure=s)

builder.parameters = plugins.DataFactory('dict')(
    dict={
        'CONTROL': {
            'calculation': 'scf',
            'restart_mode': 'from_scratch',
            'wf_collect': True,
        },
        'SYSTEM': {
            'ecutwfc': 30.,
            'ecutrho': 240.,
        },
        'ELECTRONS': {
            'conv_thr': 1.e-6,
        }
    }
)

kpoints = plugins.DataFactory('array.kpoints')()
kpoints.set_kpoints_mesh([4, 4, 4])
builder.kpoints = kpoints

builder.metadata.label = 'BaTiO3 test run'
builder.metadata.options.resources = {'num_machines': 1}
builder.metadata.options.max_wallclock_seconds = 1800
builder.metadata.options.prepend_text = "export OMP_NUM_THREADS=1;"

calc = engine.submit(builder)
print(f'created calculation with PK={calc.pk}')

@ryotatomioka
Copy link

ryotatomioka commented Jan 18, 2022

I tried downgrading to rabbitmq-server=3.7.28 (installed via conda) but the error message was the same. The error occurs very early in the submission.

Edit: kiwipy test suite runs fine

git clone https://github.com/aiidateam/kiwipy.git
cd kiwipy
pip install tox pip
tox -e py39

@unkcpz
Copy link
Member

unkcpz commented Jan 18, 2022

There is an open issue of aio_pika argue about the robust connection which shows a minimal case of this, may relate. mosquito/aio-pika#288.

@unkcpz
Copy link
Member

unkcpz commented Jan 18, 2022

I tried downgrading to rabbitmq-server=3.7.28 (installed via conda) but the error message was the same. The error occurs very early in the submission.

It is great news that you can reproduce this issue with such a small process submission. I'd propose you keep the environment untouched for a while. Pining @chrisjsewell @sphuber to have a look at this?

@ryotatomioka
Copy link

Thanks for looking into this! I restarted my verdi shell and the problem seems to have gone away.

@ltalirz
Copy link
Member

ltalirz commented Jan 18, 2022

P.S. Ryota mentions this was with rabbitmq installed via conda

@ltalirz ltalirz closed this as completed Jan 18, 2022
@ltalirz ltalirz reopened this Jan 18, 2022
@unkcpz
Copy link
Member

unkcpz commented Jan 19, 2022

I now change to using aiida-hyperqueue to submit my calculations which mediate the stress issue that some of job stay in the slurm queue for too long (FYI, I use the rabbitmq v3.9.3 install by conda which has the problem #5031 and I change the timeout to 10h). I then never see the writer is None issue again.

So for this writer is None issue, I'd suggest if someone encounter it again, always check the rabbitmq version downgrade it if necessary.

@ltalirz
Copy link
Member

ltalirz commented Aug 2, 2022

Thanks for the hint @unkcpz - hyperqueue might be a good idea for some use cases but I still think the issue needs to be addressed at the aiida-core level as well.

@chrisjsewell @sphuber Could one of you please comment on the current status of this issue, including whether the current main (or aiida-core 2.0.2) already has any potential fixes over AiiDA 1.x?
We're definitely still encountering it on AiiDA 1.x.

If you are still looking for user input on this issue, could you provide instructions for users?
Something like

verdi config set logging.kiwipy_loglevel DEBUG
verdi daemon restart

plus which log messages you are looking for.

Thanks!

@sphuber
Copy link
Contributor

sphuber commented Aug 2, 2022

As far as I know, there were no changes in 2.0 that explicitly addressed this issue so I suspect it is very likely to still be present there.

@unkcpz
Copy link
Member

unkcpz commented Aug 3, 2022

The aio-pika has new versions released which address a writer is None which might be not the exact same case in this issue but worth to try. We can lose aio-pika version and have someone have a stress test again?

There is a fix ITISFoundation/osparc-simcore#2780 for the writer is None issue, which we also use 'exclusive' queue in kiwipy https://github.com/aiidateam/kiwipy/blob/adf373e794ed69d5ec21d4875514971f32d7734f/kiwipy/rmq/communicator.py#L123, the change may work for us.

@ltalirz
Copy link
Member

ltalirz commented Aug 3, 2022

Thanks @unkcpz for looking into this!

Indeed, after reading through mosquito/aio-pika#288 it seems possible that aio-pika 7.0 from February 2022 addressed the issue (although not yet confirmed).

As far as I can see, osparc-simcore is still using aio-pika v6.8.0, i.e. perhaps their fix/workaround could be avoided by upgrading.

kiwipy actually does not lock down the aio-pika version https://github.com/aiidateam/kiwipy/blob/adf373e794ed69d5ec21d4875514971f32d7734f/setup.py#L42 , but plumpy does
https://github.com/aiidateam/plumpy/blob/8c7640b6443410424bf3cfdd5b201ed24bf0dae1/pyproject.toml#L30

and aiida-core as well

"aio-pika~=6.6",

I'll start by opening a PR against plumpy to see whether the upgrade works.

Locking down the aio-pika version on the aiida-core level actually doesn't make a lot of sense to me - the only direct use of aio_pika in aiida-core is importing a ConnectionClosed exception.
I would suggest to import that exception e.g. in kiwipy and then to get it from there, and remove the version lock on aio-pika from the aiida dependencies (maybe even remove the dependency altogether).

@sphuber does that make sense?

Edit: There seem to be some API changes that affect plumpy aiidateam/plumpy#238 . Haven't looked into it yet.

@sphuber
Copy link
Contributor

sphuber commented Aug 4, 2022

Yeah that makes sense to me. Let's update in plumpy and provide the exception we need in aiida-core so we can get rid of the dependency there.

@unkcpz
Copy link
Member

unkcpz commented Aug 4, 2022

Super interesting results of the attempt I try to bump the version of aio-pika for kiwipy and plumpy. I think this issue is from the rabbitmq server rather than the aio-pika/aio-rmq. Although the writer is None raise from different exception class, I need to investigate more on it.
I got the writer is None error when I ran a unit test on kiwipy against rabbitmq 3.7.28 (which is recommended in the wiki) which failed the tests/rmq/test_jupyter_notebook with the exception below. When running against rabbitmq 3.8.14 the error is gone. (I use rabbitmq from docker container)

    def test_jupyter_notebook():                                                                                                                                                                                                         
        """Test that the `RmqThreadCommunicator` can be used in a Jupyter notebook."""                                                                                                                                                   
        from pytest_notebook.nb_regression import NBRegressionFixture                                                                                                                                                                    
                                                                                                                                                                                                                                         
        fixture = NBRegressionFixture(exec_timeout=50)                                                                                                                                                                                   
        fixture.diff_color_words = False                                                                                                                                                                                                 
        fixture.diff_ignore = ('/metadata/language_info/version',)                                                                                                                                                                       
                                                                                                                                                                                                                                         
        my_dir = pathlib.Path(__file__).parent                                                                                                                                                                                           
        with open(my_dir / pathlib.Path('notebooks/communicator.ipynb')) as handle:                                                                                                                                                      
>           fixture.check(handle)                                                                                                                                                                                                        
E           pytest_notebook.nb_regression.NBRegressionError:                                                                                                                                                                             
E           --- expected                                                                                                                                                                                                                 
E           +++ obtained                                                                                                                                                                                                                 
E           ## inserted before /cells/2/outputs/0:                                                                                                                                                                                       
E           +  output:                                                                                                                                                                                                                   
E           +    output_type: stream                                                                                                                                                                                                     
E           +    name: stderr                                                                                                                                                                                                            
E           +    text:                                                                                                                                                                                                                   
E           +      Robust channel <RobustChannel "None#Not initialized channel"> has been closed.                                                                                                                                        
E           +      NoneType: None                                                                                                                                                                                                        
E           +      Robust channel <RobustChannel "None#Not initialized channel"> has been closed.                                                                                                                                        
E           +      NoneType: None                                                                                                                                                                                                        
E                                                                                                                                                                                                                                        
E                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                         
test/rmq/test_rmq_thread_communicator.py:266: NBRegressionError                                                                                                                                                                          
----------------------------------------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------------------------------------
DEBUG    pytest_notebook.nb_regression:nb_regression.py:274 Checking file: /home/jyu/Projects/WP-aiida/kiwipy/test/rmq/notebooks/communicator.ipynb                                                                                      
DEBUG    pytest_notebook.nb_regression:nb_regression.py:283 Executing notebook.                                                                                                                                                          
ERROR    asyncio:base_events.py:1753 Task exception was never retrieved                                                                                                                                                                  
future: <Task finished name='Task-1881' coro=<_wrap_awaitable() done, defined at /home/jyu/miniconda3/envs/plumpy-aio-mig/lib/python3.9/asyncio/tasks.py:681> exception=RuntimeError('Writer is None')>                                  
Traceback (most recent call last):                                                                                                                                                                                                       
  File "/home/jyu/miniconda3/envs/plumpy-aio-mig/lib/python3.9/asyncio/tasks.py", line 688, in _wrap_awaitable                                                                                                                           
    return (yield from awaitable.__await__())                                                                                                                                                                                            
  File "/home/jyu/miniconda3/envs/plumpy-aio-mig/lib/python3.9/site-packages/aiormq/tools.py", line 86, in __await__                                                                                                                     
    return (yield from self().__await__())                                                                                                                                                                                               
  File "/home/jyu/miniconda3/envs/plumpy-aio-mig/lib/python3.9/site-packages/aiormq/connection.py", line 138, in drain                                                                                                                   
    raise RuntimeError("Writer is %r" % self.writer)                                                                                                                                                                                     
RuntimeError: Writer is None   

@sphuber
Copy link
Contributor

sphuber commented Dec 16, 2022

Update: support for the newer aio-pika versions have been released with plumpy==0.22.0 and kiwipy==0.8.0 but unfortunately, we cannot use them yet because there is a problem in aiida-core when running with them. See #5732

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants