Skip to content

Commit

Permalink
Engine: Fix state change broadcast before process node is updated (#6580
Browse files Browse the repository at this point in the history
)

Processes start to broadcast their event before they update their
process status in the database. This can cause issues if the next
process directly tries to access the last process state retrieving it
from the database while it has not been updated in the database.

Cherry-pick: 867353c
  • Loading branch information
agoscinski committed Nov 4, 2024
1 parent 2fe62e4 commit f9d820a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,18 +418,14 @@ def on_entered(self, from_state: Optional[plumpy.process_states.State]) -> None:

from aiida.engine.utils import set_process_state_change_timestamp

super().on_entered(from_state)

if self._state.LABEL is ProcessState.EXCEPTED:
# The process is already excepted so simply update the process state on the node and let the process
# complete the state transition to the terminal state. If another exception is raised during this exception
# handling, the process transitioning is cut short and never makes it to the terminal state.
self.node.set_process_state(self._state.LABEL)
return

# For reasons unknown, it is important to update the outputs first, before doing anything else, otherwise there
# is the risk that certain outputs do not get attached before the process reaches a terminal state. Nevertheless
# we need to guarantee that the process state gets updated even if the ``update_outputs`` call excepts, for
# We need to guarantee that the process state gets updated even if the ``update_outputs`` call excepts, for
# example if the process implementation attaches an invalid output through ``Process.out``, and so we call the
# ``ProcessNode.set_process_state`` in the finally-clause. This way the state gets properly set on the node even
# if the process is transitioning to the terminal excepted state.
Expand All @@ -443,6 +439,12 @@ def on_entered(self, from_state: Optional[plumpy.process_states.State]) -> None:
self._save_checkpoint()
set_process_state_change_timestamp(self.node)

# The updating of outputs and state has to be performed before the super is called because the super will
# broadcast state changes and parent processes may start running again before the state change is completed. It
# is possible that they will read the old process state and outputs that they check may not yet have been
# attached.
super().on_entered(from_state)

@override
def on_terminated(self) -> None:
"""Called when a Process enters a terminal state."""
Expand Down
30 changes: 30 additions & 0 deletions tests/engine/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,36 @@ def test_work_calc_finish(self):
run(process)
assert process.node.is_finished_ok

def test_on_finish_node_updated_before_broadcast(self, monkeypatch):
"""Tests if the process state and output has been updated in the database before a broadcast is invoked.
In plumpy.Process.on_entered the state update is broadcasted. When a process is finished this results in the
next process being run. If the next process will access the process that just finished, it can result in not
being able to retrieve the outputs or correct process state because this information has yet not been updated
them in the database.
"""
import copy

# By monkeypatching the parent class we can check the state when the
# parents class method is invoked and check if the state has be
# correctly updated.
original_on_entered = copy.deepcopy(plumpy.Process.on_entered)

def on_entered(self, from_state):
if self._state.LABEL.value == 'finished':
assert (
self.node.is_finished_ok
), 'Node state should have been updated before plumpy.Process.on_entered is invoked.'
assert (
self.node.outputs.result.value == 2
), 'Outputs should have been attached before plumpy.Process.on_entered is invoked.'
original_on_entered(self, from_state)

monkeypatch.setattr(plumpy.Process, 'on_entered', on_entered)
# Ensure that process has run correctly otherwise the asserts in the
# monkeypatched member function have been skipped
assert run_get_node(test_processes.AddProcess, a=1, b=1).node.is_finished_ok, 'Process should not fail.'

@staticmethod
def test_save_instance_state():
"""Test save instance's state."""
Expand Down

0 comments on commit f9d820a

Please sign in to comment.