From c25e23666bdc2be641e3da083e3e5f69b779ad45 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Sun, 8 Dec 2024 17:20:14 +0800 Subject: [PATCH] Add type annotations --- storey/flow.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/storey/flow.py b/storey/flow.py index 41dde7b1..b8f315ca 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -1439,7 +1439,7 @@ def set_table(self, key, table): class _ParallelExecutionRunnableResult: - def __init__(self, runnable_name, data, runtime): + def __init__(self, runnable_name: str, data: Any, runtime: float): self.runnable_name = runnable_name self.data = data self.runtime = runtime @@ -1465,7 +1465,7 @@ class ParallelExecutionRunnable: :param name: Runnable name """ - execution_mechanism = None + execution_mechanism: Optional[str] = None supported_mechanisms = ("multiprocessing", "threading", "asyncio", "naive") # ignore unused keyword arguments such as context which may be passed in by mlrun @@ -1477,11 +1477,11 @@ def __init__(self, name: str, **kwargs): ) self.name = name - def init(self): + def init(self) -> None: """Override this method to add initialization logic.""" pass - def run(self, body, path: str): + def run(self, body: Any, path: str) -> Any: """ Override this method with the code this runnable should run. If execution_mechanism is "asyncio", override run_async() instead. @@ -1491,7 +1491,7 @@ def run(self, body, path: str): """ return body - async def run_async(self, body, path: str): + async def run_async(self, body: Any, path: str) -> Any: """ If execution_mechanism is "asyncio", override this method with the code this runnable should run. Otherwise, override run() instead. @@ -1501,13 +1501,13 @@ async def run_async(self, body, path: str): """ return body - def _run(self, body, path: str): + def _run(self, body: Any, path: str) -> Any: start = time.monotonic() body = self.run(body, path) end = time.monotonic() return _ParallelExecutionRunnableResult(self.name, body, end - start) - async def _async_run(self, body, path: str): + async def _async_run(self, body: Any, path: str) -> Any: start = time.monotonic() body = await self.run_async(body, path) end = time.monotonic() @@ -1541,7 +1541,7 @@ def __init__( self.max_processes = max_processes self.max_threads = max_threads - def select_runnables(self, event): + def select_runnables(self, event) -> Optional[Union[list[str], list[ParallelExecutionRunnable]]]: """ Given an event, returns a list of runnables (or a list of runnable names) to execute on it. It can also return None, in which case all runnables are executed on the event, which is also the default.