Skip to content

Commit

Permalink
Added IGNORE_EVENT and fixed visual of node with 'none' property
Browse files Browse the repository at this point in the history
  • Loading branch information
cpelley committed Nov 8, 2024
1 parent 8b0308e commit db89c57
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
42 changes: 34 additions & 8 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,20 @@
TimeIt,
function_to_argparse_parse_args,
logger,
as_iterable,
Singleton
)
from dagrunner.utils.networkx import visualise_graph


class _SKIP_EVENT:
class _SKIP_EVENT(Singleton):
"""
This object is used to indicate to `plugin_executor` to skip execution of its node.
A plugin that returns a 'SKIP_EVENT' will cause `plugin_executor` to skip execution
of all descendant node execution.
"""

_instance = None

def __new__(cls):
if cls._instance is None:
cls._instance = super(_SKIP_EVENT, cls).__new__(cls)
return cls._instance

def __repr__(self):
return "SKIP_EVENT"

Expand All @@ -51,6 +49,27 @@ def __reduce__(self):
SKIP_EVENT = _SKIP_EVENT()


class _IGNORE_EVENT(Singleton):
"""
A plugin that returns an 'IGNORE_EVENT' will be filtered out as arguments by
`plugin_executor` in descendant node execution.
"""

_instance = None

def __repr__(self):
return "IGNORE_EVENT"

def __hash__(self):
return hash("IGNORE_EVENT")

def __reduce__(self):
return (self.__class__, ())


IGNORE_EVENT = _IGNORE_EVENT()


class SkipBranch(Exception):
"""
This exception is used to skip a branch of the execution graph.
Expand Down Expand Up @@ -141,6 +160,13 @@ def plugin_executor(
if verbose:
print(f"args: {args}")
print(f"call: {call}")

call = as_iterable(call)

# filter out IGNORE_EVENT from args.
args = filter(lambda x: x is not IGNORE_EVENT, args)

# ignore execution if SKIP_EVENT found in any arg.
if SKIP_EVENT in args:
if verbose:
print(f"Skipping node {call[0]}")
Expand Down Expand Up @@ -391,7 +417,7 @@ def _process_graph(self):


if CONFIG["dagrunner_visualisation"].pop("enabled", False) is True:
self.visualise_graph(**CONFIG["dagrunner_visualisation"])
self.visualise(**CONFIG["dagrunner_visualisation"])

exec_graph = {}
for node_id, properties in self._nxgraph.nodes(data=True):
Expand Down
2 changes: 1 addition & 1 deletion dagrunner/utils/networkx.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def add_node(node, mermaid, table, node_id, node_target_id_map, node_info_lookup
label = gen_label(node_id, node, label_by)
tooltip = pprint.pformat(node_info_lookup[node])

subgraphs = [getattr(node, key) for key in group_by if hasattr(node, key)]
subgraphs = [getattr(node, key) for key in group_by if getattr(node, key, None)]
for subgraph in subgraphs:
mermaid.add_raw(f"subgraph {subgraph}")
mermaid.add_node(
Expand Down

0 comments on commit db89c57

Please sign in to comment.