Skip to content

Commit

Permalink
Add type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper committed Dec 8, 2024
1 parent 5d5f95c commit c25e236
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ def set_table(self, key, table):


class _ParallelExecutionRunnableResult:
def __init__(self, runnable_name, data, runtime):
def __init__(self, runnable_name: str, data: Any, runtime: float):
self.runnable_name = runnable_name
self.data = data
self.runtime = runtime
Expand All @@ -1465,7 +1465,7 @@ class ParallelExecutionRunnable:
:param name: Runnable name
"""

execution_mechanism = None
execution_mechanism: Optional[str] = None
supported_mechanisms = ("multiprocessing", "threading", "asyncio", "naive")

# ignore unused keyword arguments such as context which may be passed in by mlrun
Expand All @@ -1477,11 +1477,11 @@ def __init__(self, name: str, **kwargs):
)
self.name = name

def init(self):
def init(self) -> None:
"""Override this method to add initialization logic."""
pass

def run(self, body, path: str):
def run(self, body: Any, path: str) -> Any:
"""
Override this method with the code this runnable should run. If execution_mechanism is "asyncio", override
run_async() instead.
Expand All @@ -1491,7 +1491,7 @@ def run(self, body, path: str):
"""
return body

async def run_async(self, body, path: str):
async def run_async(self, body: Any, path: str) -> Any:
"""
If execution_mechanism is "asyncio", override this method with the code this runnable should run. Otherwise,
override run() instead.
Expand All @@ -1501,13 +1501,13 @@ async def run_async(self, body, path: str):
"""
return body

def _run(self, body, path: str):
def _run(self, body: Any, path: str) -> Any:
start = time.monotonic()
body = self.run(body, path)
end = time.monotonic()
return _ParallelExecutionRunnableResult(self.name, body, end - start)

async def _async_run(self, body, path: str):
async def _async_run(self, body: Any, path: str) -> Any:
start = time.monotonic()
body = await self.run_async(body, path)
end = time.monotonic()
Expand Down Expand Up @@ -1541,7 +1541,7 @@ def __init__(
self.max_processes = max_processes
self.max_threads = max_threads

def select_runnables(self, event):
def select_runnables(self, event) -> Optional[Union[list[str], list[ParallelExecutionRunnable]]]:
"""
Given an event, returns a list of runnables (or a list of runnable names) to execute on it. It can also return
None, in which case all runnables are executed on the event, which is also the default.
Expand Down

0 comments on commit c25e236

Please sign in to comment.