diff --git a/new-docs/source/tutorial/2-advanced-execution.ipynb b/new-docs/source/tutorial/2-advanced-execution.ipynb index 068546252..581ac71f4 100644 --- a/new-docs/source/tutorial/2-advanced-execution.ipynb +++ b/new-docs/source/tutorial/2-advanced-execution.ipynb @@ -32,22 +32,28 @@ "\n", "Pydra supports several workers with which to execute tasks\n", "\n", + "- `debug` (default)\n", "- `cf`\n", "- `slurm`\n", "- `sge`\n", + "- `psij`\n", "- `dask` (experimental)\n", - "- `debug`\n", "\n", - "By default, the *cf* (*ConcurrentFutures*) worker is used, which\n", - "executes tasks across multiple processes. If you are using a high-performance cluster (HPC)\n", - "then the [SLURM](https://slurm.schedmd.com/documentation.html) and\n", - "[SGE](https://www.metagenomics.wiki/tools/hpc-sge) workers can be used to submit each\n", - "workflow node as separate jobs to the HPC scheduler. There is also an\n", - "experimental [Dask](https://www.dask.org/) worker.\n", + "By default, the *debug* worker is used, which runs tasks serially in a single process\n", + "without use of the `asyncio` module. This makes it easier to debug errors in workflows\n", + "and python tasks, however, when using in Pydra in production you will typically want to\n", + "parallelise the execution for efficiency.\n", "\n", - "When using a debugger in the development of a workflow or Python tasks, the\n", - "*debug* worker is recommended as it executes nodes \"synchronously\" (as opposed to\n", - "asynchronously), and can therefore break on uncaught exceptions.\n", + "If running on a local workstation, then the `cf` (*ConcurrentFutures*) worker is a good\n", + "option because it is able to spread the tasks to be run over multiple processes and\n", + "maximise CPU usage.\n", + "\n", + "If you have access to a high-performance cluster (HPC) then\n", + "the [SLURM](https://slurm.schedmd.com/documentation.html) and\n", + "[SGE](https://www.metagenomics.wiki/tools/hpc-sge) and [PSI/J](https://exaworks.org/psij)\n", + "workers can be used to submit each workflow node as separate jobs to the HPC scheduler.\n", + "There is also an experimental [Dask](https://www.dask.org/) worker, which provides a\n", + "range of execution backends to choose from.\n", "\n", "To specify a worker, the abbreviation can be passed either as a string or using the\n", "class itself. Additional parameters can be passed to the worker initialisation as keyword\n", @@ -65,53 +71,73 @@ "name": "stderr", "output_type": "stream", "text": [ - "A newer version (0.25) of nipype/pydra is available. You are using 0.25.dev141+g03c7438b.d20250123\n" + "A newer version (0.25) of nipype/pydra is available. You are using 0.25.dev144+g6a590e9d.d20250124\n" ] }, { - "ename": "IndentationError", - "evalue": "unexpected indent (, line 1)", + "ename": "RuntimeError", + "evalue": "Graph of 'Workflow(name='Split', inputs=Split(_constructed=None, defn=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), constructor=.Split at 0x114510d60>), outputs=SplitOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), _workflow=..., _lzout=TenToThePowerOutputs(out=...), _state=, _cont_dim=None, _inner_cont_dim={}))), _nodes={'TenToThePower': Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), _workflow=..., _lzout=TenToThePowerOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=...)), _state=, _cont_dim=None, _inner_cont_dim={})})' workflow is not empty, but not able to get more tasks - something has gone wrong when retrieving the results predecessors:\n\n", "output_type": "error", "traceback": [ - "Traceback \u001b[0;36m(most recent call last)\u001b[0m:\n", - "\u001b[0m File \u001b[1;32m~/.pyenv/versions/3.12.5/envs/wf12/lib/python3.12/site-packages/IPython/core/interactiveshell.py:3577\u001b[0m in \u001b[1;35mrun_code\u001b[0m\n exec(code_obj, self.user_global_ns, self.user_ns)\u001b[0m\n", - "\u001b[0m Cell \u001b[1;32mIn[2], line 10\u001b[0m\n outputs = ten_to_the_power(worker=\"cf\", n_procs=3)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/specs.py:193\u001b[0m in \u001b[1;35m__call__\u001b[0m\n result = sub(self)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/submitter.py:157\u001b[0m in \u001b[1;35m__call__\u001b[0m\n self.loop.run_until_complete(task.run_async(rerun=self.rerun))\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/.pyenv/versions/3.12.5/envs/wf12/lib/python3.12/site-packages/nest_asyncio.py:98\u001b[0m in \u001b[1;35mrun_until_complete\u001b[0m\n return f.result()\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/.pyenv/versions/3.12.5/lib/python3.12/asyncio/futures.py:203\u001b[0m in \u001b[1;35mresult\u001b[0m\n raise self._exception.with_traceback(self._exception_tb)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/.pyenv/versions/3.12.5/lib/python3.12/asyncio/tasks.py:314\u001b[0m in \u001b[1;35m__step_run_and_handle_result\u001b[0m\n result = coro.send(None)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/core.py:398\u001b[0m in \u001b[1;35mrun_async\u001b[0m\n \"'%s' is attempting to acquire lock on %s\", self.name, self.lockfile\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/core.py:212\u001b[0m in \u001b[1;35mlockfile\u001b[0m\n return self.output_dir.with_suffix(\".lock\")\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/core.py:261\u001b[0m in \u001b[1;35moutput_dir\u001b[0m\n return self.cache_dir / self.checksum\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/core.py:206\u001b[0m in \u001b[1;35mchecksum\u001b[0m\n input_hash = self.definition._hash\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/specs.py:361\u001b[0m in \u001b[1;35m_hash\u001b[0m\n hsh, self._hashes = self._compute_hashes()\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/engine/specs.py:384\u001b[0m in \u001b[1;35m_compute_hashes\u001b[0m\n k: hash_function(v, cache=hash_cache) for k, v in inp_dict.items()\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/utils/hash.py:206\u001b[0m in \u001b[1;35mhash_function\u001b[0m\n return hash_object(obj, **kwargs).hex()\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/utils/hash.py:237\u001b[0m in \u001b[1;35mhash_object\u001b[0m\n raise e\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/utils/hash.py:225\u001b[0m in \u001b[1;35mhash_object\u001b[0m\n return hash_single(obj, cache)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/utils/hash.py:281\u001b[0m in \u001b[1;35mhash_single\u001b[0m\n first = next(bytes_it)\u001b[0m\n", - "\u001b[0m File \u001b[1;32m~/git/workflows/pydra/pydra/utils/hash.py:537\u001b[0m in \u001b[1;35mbytes_repr_function\u001b[0m\n src_ast = ast.parse(src)\u001b[0m\n", - "\u001b[0;36m File \u001b[0;32m~/.pyenv/versions/3.12.5/lib/python3.12/ast.py:52\u001b[0;36m in \u001b[0;35mparse\u001b[0;36m\n\u001b[0;31m return compile(source, filename, mode, flags,\u001b[0;36m\n", - "\u001b[0;36m File \u001b[0;32m:1\u001b[0;36m\u001b[0m\n\u001b[0;31m @workflow.define(outputs=output_types)\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mIndentationError\u001b[0m\u001b[0;31m:\u001b[0m unexpected indent\nand therefore cannot hash `.Split at 0x11443cd60>` of type `builtins.function`. Consider implementing a specific `bytes_repr()`(see pydra.utils.hash.register_serializer) or a `__bytes_repr__()` dunder methods for this type\n" + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mRuntimeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[2], line 12\u001b[0m\n\u001b[1;32m 9\u001b[0m ten_to_the_power \u001b[38;5;241m=\u001b[39m TenToThePower()\u001b[38;5;241m.\u001b[39msplit(p\u001b[38;5;241m=\u001b[39m[\u001b[38;5;241m1\u001b[39m, \u001b[38;5;241m2\u001b[39m, \u001b[38;5;241m3\u001b[39m, \u001b[38;5;241m4\u001b[39m, \u001b[38;5;241m5\u001b[39m])\n\u001b[1;32m 11\u001b[0m \u001b[38;5;66;03m# Run the 5 tasks in parallel split across 3 processes\u001b[39;00m\n\u001b[0;32m---> 12\u001b[0m outputs \u001b[38;5;241m=\u001b[39m \u001b[43mten_to_the_power\u001b[49m\u001b[43m(\u001b[49m\u001b[43mworker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mcf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mn_procs\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m3\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 14\u001b[0m p1, p2, p3, p4, p5 \u001b[38;5;241m=\u001b[39m outputs\u001b[38;5;241m.\u001b[39mout\n\u001b[1;32m 16\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m10^5 = \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mp5\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/specs.py:194\u001b[0m, in \u001b[0;36mTaskDef.__call__\u001b[0;34m(self, cache_dir, worker, environment, rerun, cache_locations, audit_flags, messengers, messenger_args, **kwargs)\u001b[0m\n\u001b[1;32m 182\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 183\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m Submitter(\n\u001b[1;32m 184\u001b[0m audit_flags\u001b[38;5;241m=\u001b[39maudit_flags,\n\u001b[1;32m 185\u001b[0m cache_dir\u001b[38;5;241m=\u001b[39mcache_dir,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 192\u001b[0m \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs,\n\u001b[1;32m 193\u001b[0m ) \u001b[38;5;28;01mas\u001b[39;00m sub:\n\u001b[0;32m--> 194\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43msub\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 195\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 196\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(e, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m__notes__\u001b[39m\u001b[38;5;124m\"\u001b[39m) \u001b[38;5;129;01mand\u001b[39;00m WORKER_KWARG_FAIL_NOTE \u001b[38;5;129;01min\u001b[39;00m e\u001b[38;5;241m.\u001b[39m__notes__:\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/submitter.py:156\u001b[0m, in \u001b[0;36mSubmitter.__call__\u001b[0;34m(self, task_def)\u001b[0m\n\u001b[1;32m 154\u001b[0m task \u001b[38;5;241m=\u001b[39m Task(task_def, submitter\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m, name\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mtask\u001b[39m\u001b[38;5;124m\"\u001b[39m, environment\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39menvironment)\n\u001b[1;32m 155\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m task\u001b[38;5;241m.\u001b[39mis_async: \u001b[38;5;66;03m# Only workflow tasks can be async\u001b[39;00m\n\u001b[0;32m--> 156\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_until_complete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mworker\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_async\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtask\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mrerun\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrerun\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 157\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 158\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mworker\u001b[38;5;241m.\u001b[39mrun(task, rerun\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mrerun)\n", + "File \u001b[0;32m~/.pyenv/versions/3.12.5/envs/wf12/lib/python3.12/site-packages/nest_asyncio.py:98\u001b[0m, in \u001b[0;36m_patch_loop..run_until_complete\u001b[0;34m(self, future)\u001b[0m\n\u001b[1;32m 95\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m f\u001b[38;5;241m.\u001b[39mdone():\n\u001b[1;32m 96\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\n\u001b[1;32m 97\u001b[0m \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mEvent loop stopped before Future completed.\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[0;32m---> 98\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.pyenv/versions/3.12.5/lib/python3.12/asyncio/futures.py:203\u001b[0m, in \u001b[0;36mFuture.result\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 201\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m__log_traceback \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 202\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m--> 203\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception\u001b[38;5;241m.\u001b[39mwith_traceback(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_exception_tb)\n\u001b[1;32m 204\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_result\n", + "File \u001b[0;32m~/.pyenv/versions/3.12.5/lib/python3.12/asyncio/tasks.py:314\u001b[0m, in \u001b[0;36mTask.__step_run_and_handle_result\u001b[0;34m(***failed resolving arguments***)\u001b[0m\n\u001b[1;32m 310\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 311\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m exc \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 312\u001b[0m \u001b[38;5;66;03m# We use the `send` method directly, because coroutines\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;66;03m# don't have `__iter__` and `__next__` methods.\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mcoro\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 316\u001b[0m result \u001b[38;5;241m=\u001b[39m coro\u001b[38;5;241m.\u001b[39mthrow(exc)\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/workers.py:51\u001b[0m, in \u001b[0;36mWorker.run_async\u001b[0;34m(self, task, rerun)\u001b[0m\n\u001b[1;32m 50\u001b[0m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mrun_async\u001b[39m(\u001b[38;5;28mself\u001b[39m, task: \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mTask[DefType]\u001b[39m\u001b[38;5;124m\"\u001b[39m, rerun: \u001b[38;5;28mbool\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mResult\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n\u001b[0;32m---> 51\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mawait\u001b[39;00m task\u001b[38;5;241m.\u001b[39mrun_async(rerun\u001b[38;5;241m=\u001b[39mrerun)\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/core.py:416\u001b[0m, in \u001b[0;36mTask.run_async\u001b[0;34m(self, rerun)\u001b[0m\n\u001b[1;32m 414\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 415\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39maudit\u001b[38;5;241m.\u001b[39mmonitor()\n\u001b[0;32m--> 416\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdefinition\u001b[38;5;241m.\u001b[39m_run_async(\u001b[38;5;28mself\u001b[39m)\n\u001b[1;32m 417\u001b[0m result\u001b[38;5;241m.\u001b[39moutputs \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mdefinition\u001b[38;5;241m.\u001b[39mOutputs\u001b[38;5;241m.\u001b[39m_from_task(\u001b[38;5;28mself\u001b[39m)\n\u001b[1;32m 418\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/specs.py:709\u001b[0m, in \u001b[0;36mWorkflowDef._run_async\u001b[0;34m(self, task)\u001b[0m\n\u001b[1;32m 707\u001b[0m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_run_async\u001b[39m(\u001b[38;5;28mself\u001b[39m, task: \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mTask[WorkflowDef]\u001b[39m\u001b[38;5;124m\"\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 708\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Run the workflow asynchronously.\"\"\"\u001b[39;00m\n\u001b[0;32m--> 709\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m task\u001b[38;5;241m.\u001b[39msubmitter\u001b[38;5;241m.\u001b[39mexpand_workflow_async(task)\n", + "File \u001b[0;32m~/git/workflows/pydra/pydra/engine/submitter.py:285\u001b[0m, in \u001b[0;36mSubmitter.expand_workflow_async\u001b[0;34m(self, workflow_task)\u001b[0m\n\u001b[1;32m 272\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m hashes_have_changed:\n\u001b[1;32m 273\u001b[0m msg \u001b[38;5;241m+\u001b[39m\u001b[38;5;241m=\u001b[39m (\n\u001b[1;32m 274\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSet loglevel to \u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mdebug\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m in order to track hash changes \u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 275\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mthroughout the execution of the workflow.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m \u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 283\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mor more types in your interface inputs.\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 284\u001b[0m )\n\u001b[0;32m--> 285\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(msg)\n\u001b[1;32m 286\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m task \u001b[38;5;129;01min\u001b[39;00m tasks:\n\u001b[1;32m 287\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m task\u001b[38;5;241m.\u001b[39mis_async:\n", + "\u001b[0;31mRuntimeError\u001b[0m: Graph of 'Workflow(name='Split', inputs=Split(_constructed=None, defn=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), constructor=.Split at 0x114510d60>), outputs=SplitOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), _workflow=..., _lzout=TenToThePowerOutputs(out=...), _state=, _cont_dim=None, _inner_cont_dim={}))), _nodes={'TenToThePower': Node(name='TenToThePower', _definition=TenToThePower(p=StateArray(1, 2, 3, 4, 5), function=), _workflow=..., _lzout=TenToThePowerOutputs(out=LazyOutField(field='out', type=list[int], cast_from=None, type_checked=True, node=...)), _state=, _cont_dim=None, _inner_cont_dim={})})' workflow is not empty, but not able to get more tasks - something has gone wrong when retrieving the results predecessors:\n\n" ] } ], "source": [ + "\n", "from pydra.design import python\n", "\n", - "@python.define\n", - "def TenToThePower(p: int) -> int:\n", - " return 10 ** p\n", + "if __name__ == \"__main__\":\n", + "\n", + " @python.define\n", + " def TenToThePower(p: int) -> int:\n", + " return 10 ** p\n", + "\n", + " ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])\n", "\n", - "ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])\n", + " # Run the 5 tasks in parallel split across 3 processes\n", + " outputs = ten_to_the_power(worker=\"cf\", n_procs=3)\n", + "\n", + " p1, p2, p3, p4, p5 = outputs.out\n", + "\n", + " print(f\"10^5 = {p5}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Alternatively, the worker object can be initialised in the calling code and passed directly to the execution call" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pydra.engine.workers import ConcurrentFuturesWorker\n", + "\n", + "ten_to_the_power = TenToThePower().split(p=[6, 7, 8, 9, 10])\n", "\n", "# Run the 5 tasks in parallel split across 3 processes\n", - "outputs = ten_to_the_power(worker=\"cf\", n_procs=3)\n", + "outputs = ten_to_the_power(worker=ConcurrentFuturesWorker(n_procs=3))\n", "\n", - "p1, p2, p3, p4, p5 = outputs.out\n", + "p6, p7, p8, p9, p10 = outputs.out\n", "\n", - "print(f\"10^5 = {p5}\")" + "print(f\"10^10 = {p10}\")" ] }, { @@ -141,10 +167,6 @@ "from fileformats.medimage import Nifti1\n", "from pydra.engine.submitter import Submitter\n", "from pydra.tasks.mrtrix3.v3_0 import MrGrid\n", - "import nest_asyncio\n", - "\n", - "# Allow running async code in Jupyter notebooks\n", - "nest_asyncio.apply()\n", "\n", "# Make directory filled with nifti files\n", "test_dir = Path(tempfile.mkdtemp())\n", @@ -153,19 +175,26 @@ "for i in range(10):\n", " Nifti1.sample(nifti_dir, seed=i)\n", "\n", - "VOXEL_SIZES = [0.5, 0.5, 0.5, 0.75, 0.75, 0.75, 1.0, 1.0, 1.0, 1.25]\n", - "\n", - "mrgrid_varying_vox_sizes = MrGrid().split(\n", - " (\"input\", \"voxel\"),\n", - " input=nifti_dir.iterdir(),\n", - " voxel=VOXEL_SIZES\n", + "# Instantiate the task definition, \"splitting\" over all NIfTI files in the test directory\n", + "# by splitting the \"input\" input field over all files in the directory\n", + "mrgrid = MrGrid(operation=\"regrid\", voxel=(0.5, 0.5, 0.5)).split(\n", + " in_file=nifti_dir.iterdir()\n", ")\n", "\n", - "submitter = Submitter(cache_dir=test_dir / \"cache\")\n", + "# Run the task to resample all NIfTI files\n", + "outputs = mrgrid()\n", + "\n", + "# Create a new custom directory\n", + "cache_dir = test_dir / \"cache\"\n", + "cache_dir.mkdir()\n", + "\n", + "submitter = Submitter(cache_dir=cache_dir)\n", "\n", "# Run the task to resample all NIfTI files with different voxel sizes\n", "with submitter:\n", - " result1 = submitter(mrgrid_varying_vox_sizes)" + " result1 = submitter(mrgrid)\n", + "\n", + "print(result1)\n" ] }, { diff --git a/pydra/design/shell.py b/pydra/design/shell.py index 6c8889b99..3a51b9f5d 100644 --- a/pydra/design/shell.py +++ b/pydra/design/shell.py @@ -662,6 +662,8 @@ def remaining_positions( # Check for multiple positions positions = defaultdict(list) for arg in args: + if arg.name == "arguments": + continue if arg.position is not None: if arg.position >= 0: positions[arg.position].append(arg) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index ab34be202..292235acd 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -496,7 +496,9 @@ def result(self, return_inputs=False): the result of the task """ if self.errored: - return Result(outputs=None, runtime=None, errored=True, task=self) + return Result( + outputs=None, runtime=None, errored=True, output_dir=self.output_dir + ) checksum = self.checksum result = load_result(checksum, self.cache_locations) diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index b72d1eed8..07e7c1a6f 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -134,7 +134,7 @@ class TaskDef(ty.Generic[OutputsType]): def __call__( self, cache_dir: os.PathLike | None = None, - worker: "str | ty.Type[Worker] | Worker" = "cf", + worker: "str | ty.Type[Worker] | Worker" = "debug", environment: "Environment | None" = None, rerun: bool = False, cache_locations: ty.Iterable[os.PathLike] | None = None, @@ -205,10 +205,14 @@ def __call__( ) raise if result.errored: - raise RuntimeError( - f"Task {self} failed @ {result.errors['time of crash']} with following errors:\n" - + "\n".join(result.errors["error message"]) - ) + if isinstance(self, WorkflowDef) or self._splitter: + raise RuntimeError(f"Workflow {self} failed with errors:") + else: + errors = result.errors + raise RuntimeError( + f"Task {self} failed @ {errors['time of crash']} with following errors:\n" + + "\n".join(errors["error message"]) + ) return result.outputs def split( @@ -550,8 +554,10 @@ def get_output_field(self, field_name): @property def errors(self): if self.errored: - with open(self.output_dir / "_error.pklz", "rb") as f: - return cp.load(f) + error_file = self.output_dir / "_error.pklz" + if error_file.exists(): + with open(error_file, "rb") as f: + return cp.load(f) return None @@ -878,7 +884,8 @@ class ShellDef(TaskDef[ShellOutputsType]): arguments: ty.List[str] = shell.arg( default=attrs.Factory(list), - help="Additional arguments to pass to the command.", + sep=" ", + help="Additional free-form arguments to append to the end of the command.", ) RESERVED_FIELD_NAMES = TaskDef.RESERVED_FIELD_NAMES + ("cmdline",) @@ -930,6 +937,8 @@ def _command_args( continue if name == "executable": pos_args.append(self._command_shelltask_executable(field, value)) + elif name == "arguments": + continue elif name == "args": pos_val = self._command_shelltask_args(field, value) if pos_val: diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 50fd2a81b..b9af10488 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -69,7 +69,7 @@ class Submitter: def __init__( self, cache_dir: os.PathLike | None = None, - worker: ty.Union[str, ty.Type[Worker]] = "cf", + worker: ty.Union[str, ty.Type[Worker]] = "debug", environment: "Environment | None" = None, rerun: bool = False, cache_locations: list[os.PathLike] | None = None, @@ -140,7 +140,7 @@ def __call__( output_types = {o.name: list[o.type] for o in list_fields(task_def.Outputs)} @workflow.define(outputs=output_types) - def Split(defn: TaskDef) -> tuple: + def Split(defn: TaskDef): node = workflow.add(defn) return tuple(getattr(node, o) for o in output_types) @@ -155,7 +155,7 @@ def Split(defn: TaskDef) -> tuple: if task.is_async: # Only workflow tasks can be async self.loop.run_until_complete(self.worker.run_async(task, rerun=self.rerun)) else: - self.worker.run(rerun=self.rerun) + self.worker.run(task, rerun=self.rerun) PersistentCache().clean_up() result = task.result() if result is None: