diff --git a/lazyops/libs/abcs/state/global_ctx.py b/lazyops/libs/abcs/state/global_ctx.py index 4024a4f..b76cf90 100644 --- a/lazyops/libs/abcs/state/global_ctx.py +++ b/lazyops/libs/abcs/state/global_ctx.py @@ -47,6 +47,8 @@ class GlobalContext(abc.ABC): server_processes: List['multiprocessing.Process'] = [] # worker_processes: Dict[str, Dict[str, List['multiprocessing.Process']]] = {} + asyncio_tasks: List[asyncio.Task] = [] + on_close_funcs: List[Callable] = [] settings_func: Optional[Union[Callable, str]] = None # type: ignore @@ -333,6 +335,42 @@ def stop_server_processes(self, verbose: bool = True, timeout: float = 5.0): proc.close() curr_proc += 1 # if verbose: self.logger.info(f"Stopped all {curr_proc} server processes") + + def start_asyncio_task( + self, + func: Callable, + *args, + **kwargs, + ): + """ + Starts an asyncio task + """ + self.asyncio_tasks.append( + asyncio.create_task(func(*args, **kwargs)) + ) + + def add_asyncio_task( + self, + task: asyncio.Task, + ) -> None: + """ + Adds an asyncio task + """ + self.asyncio_tasks.append(task) + + def stop_all_asyncio_tasks( + self, + verbose: Optional[bool] = None, + ): + """ + Stops all asyncio tasks + """ + if not self.asyncio_tasks: return + for task in self.asyncio_tasks: + if task.done(): continue + if verbose: self.logger.info(f"Stopping Asyncio Task: {task}", colored = True) + task.cancel() + self.asyncio_tasks = [] def end_all_processes(self, verbose: bool = True, timeout: float = 5.0): """ @@ -343,6 +381,7 @@ def end_all_processes(self, verbose: bool = True, timeout: float = 5.0): # self.stop_worker_processes(name = name, verbose = verbose, timeout = timeout, kind = kind) self.stop_task_workers(timeout = timeout, verbose = verbose) self.stop_server_processes(verbose = verbose, timeout = timeout) + self.stop_all_asyncio_tasks(verbose = verbose) # self.logger.info("Terminated all processes") def register_on_close(self, func: Callable, *args, **kwargs): diff --git a/lazyops/libs/logging/main.py b/lazyops/libs/logging/main.py index c0f2422..cb86296 100644 --- a/lazyops/libs/logging/main.py +++ b/lazyops/libs/logging/main.py @@ -465,6 +465,7 @@ def log( prefix: Optional[str] = None, max_length: Optional[int] = None, colored: Optional[bool] = False, + hook: Optional[Callable] = None, **kwargs ): # noqa: N805 """ @@ -479,6 +480,7 @@ def log( # level_id, static_level_no, from_decorator, options, message, args, kwargs static_log_no = REVERSE_LOGLEVEL_MAPPING.get(level, 20) self._log(level, static_log_no, False, self._get_opts(colored = colored), message, args, kwargs) + if hook: hook(message) def info( self, @@ -508,6 +510,7 @@ def success( colored: Optional[bool] = False, prefix: Optional[str] = None, max_length: Optional[int] = None, + hook: Optional[Callable] = None, **kwargs ): # noqa: N805 r"""Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``.""" @@ -517,7 +520,7 @@ def success( except TypeError: # Compatibility with < 0.6.0 self._log("SUCCESS", 20, False, self._get_opts(colored = colored), message, args, kwargs) - + if hook: hook(message) def warning( self, @@ -538,6 +541,25 @@ def warning( self._log("WARNING", 30, False, self._get_opts(colored = colored), message, args, kwargs) if hook: hook(message) + def error( + self, + message: Any, + *args, + prefix: Optional[str] = None, + max_length: Optional[int] = None, + colored: Optional[bool] = False, + hook: Optional[Callable] = None, + **kwargs + ) -> None: + """ + Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``. + """ + message = self._format_message(message, prefix = prefix, max_length = max_length, colored = colored, level = 'ERROR') + try: + self._log("ERROR", False, self._get_opts(colored = colored), message, args, kwargs) + except TypeError: + self._log("ERROR", 40, False, self._get_opts(colored = colored), message, args, kwargs) + if hook: hook(message) def trace( self, @@ -549,6 +571,7 @@ def trace( colored: Optional[bool] = False, prefix: Optional[str] = None, max_length: Optional[int] = None, + hook: Optional[Callable] = None, **kwargs, ) -> None: """ @@ -567,6 +590,7 @@ def trace( except TypeError: static_log_no = REVERSE_LOGLEVEL_MAPPING.get(level, 40) self._log(level, static_log_no, False, self._get_opts(colored = colored), _msg, (), {}) + if hook: hook(_msg) def exception( self, @@ -575,6 +599,7 @@ def exception( colored: Optional[bool] = False, prefix: Optional[str] = None, max_length: Optional[int] = None, + hook: Optional[Callable] = None, **kwargs ): """ @@ -582,6 +607,7 @@ def exception( """ message = self._format_message(message, prefix = prefix, max_length = max_length, colored = colored, level = 'ERROR') super().exception(message, *args, **kwargs) + if hook: hook(message) def __call__(self, message: Any, *args, level: str = 'info', **kwargs): diff --git a/lazyops/libs/pooler/main.py b/lazyops/libs/pooler/main.py index c3d4aa9..556817f 100644 --- a/lazyops/libs/pooler/main.py +++ b/lazyops/libs/pooler/main.py @@ -15,6 +15,7 @@ import functools import subprocess import contextvars +import contextlib import anyio.from_thread from concurrent import futures from anyio._core._eventloop import threadlocals @@ -244,9 +245,12 @@ def on_exit(self): Cleans up the ThreadPoolExecutor and Tasks """ for task in self.tasks: - task.cancel() - if self._pool is not None: self._pool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion) - if self._ppool is not None: self._ppool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion) + with contextlib.suppress(Exception): + task.cancel() + with contextlib.suppress(Exception): + if self._pool is not None: self._pool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion) + with contextlib.suppress(Exception): + if self._ppool is not None: self._ppool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion) """ Core diff --git a/lazyops/version.py b/lazyops/version.py index 4630c2c..3f6a2d1 100644 --- a/lazyops/version.py +++ b/lazyops/version.py @@ -1 +1 @@ -VERSION = '0.2.82' +VERSION = '0.2.83'