Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
trisongz committed Jun 13, 2024
1 parent 1eef742 commit d2433b1
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
39 changes: 39 additions & 0 deletions lazyops/libs/abcs/state/global_ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class GlobalContext(abc.ABC):
server_processes: List['multiprocessing.Process'] = []
# worker_processes: Dict[str, Dict[str, List['multiprocessing.Process']]] = {}

asyncio_tasks: List[asyncio.Task] = []

on_close_funcs: List[Callable] = []

settings_func: Optional[Union[Callable, str]] = None # type: ignore
Expand Down Expand Up @@ -333,6 +335,42 @@ def stop_server_processes(self, verbose: bool = True, timeout: float = 5.0):
proc.close()
curr_proc += 1
# if verbose: self.logger.info(f"Stopped all {curr_proc} server processes")

def start_asyncio_task(
self,
func: Callable,
*args,
**kwargs,
):
"""
Starts an asyncio task
"""
self.asyncio_tasks.append(
asyncio.create_task(func(*args, **kwargs))
)

def add_asyncio_task(
self,
task: asyncio.Task,
) -> None:
"""
Adds an asyncio task
"""
self.asyncio_tasks.append(task)

def stop_all_asyncio_tasks(
self,
verbose: Optional[bool] = None,
):
"""
Stops all asyncio tasks
"""
if not self.asyncio_tasks: return
for task in self.asyncio_tasks:
if task.done(): continue
if verbose: self.logger.info(f"Stopping Asyncio Task: {task}", colored = True)
task.cancel()
self.asyncio_tasks = []

def end_all_processes(self, verbose: bool = True, timeout: float = 5.0):
"""
Expand All @@ -343,6 +381,7 @@ def end_all_processes(self, verbose: bool = True, timeout: float = 5.0):
# self.stop_worker_processes(name = name, verbose = verbose, timeout = timeout, kind = kind)
self.stop_task_workers(timeout = timeout, verbose = verbose)
self.stop_server_processes(verbose = verbose, timeout = timeout)
self.stop_all_asyncio_tasks(verbose = verbose)
# self.logger.info("Terminated all processes")

def register_on_close(self, func: Callable, *args, **kwargs):
Expand Down
28 changes: 27 additions & 1 deletion lazyops/libs/logging/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def log(
prefix: Optional[str] = None,
max_length: Optional[int] = None,
colored: Optional[bool] = False,
hook: Optional[Callable] = None,
**kwargs
): # noqa: N805
"""
Expand All @@ -479,6 +480,7 @@ def log(
# level_id, static_level_no, from_decorator, options, message, args, kwargs
static_log_no = REVERSE_LOGLEVEL_MAPPING.get(level, 20)
self._log(level, static_log_no, False, self._get_opts(colored = colored), message, args, kwargs)
if hook: hook(message)

def info(
self,
Expand Down Expand Up @@ -508,6 +510,7 @@ def success(
colored: Optional[bool] = False,
prefix: Optional[str] = None,
max_length: Optional[int] = None,
hook: Optional[Callable] = None,
**kwargs
): # noqa: N805
r"""Log ``message.format(*args, **kwargs)`` with severity ``'SUCCESS'``."""
Expand All @@ -517,7 +520,7 @@ def success(
except TypeError:
# Compatibility with < 0.6.0
self._log("SUCCESS", 20, False, self._get_opts(colored = colored), message, args, kwargs)

if hook: hook(message)

def warning(
self,
Expand All @@ -538,6 +541,25 @@ def warning(
self._log("WARNING", 30, False, self._get_opts(colored = colored), message, args, kwargs)
if hook: hook(message)

def error(
self,
message: Any,
*args,
prefix: Optional[str] = None,
max_length: Optional[int] = None,
colored: Optional[bool] = False,
hook: Optional[Callable] = None,
**kwargs
) -> None:
"""
Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``.
"""
message = self._format_message(message, prefix = prefix, max_length = max_length, colored = colored, level = 'ERROR')
try:
self._log("ERROR", False, self._get_opts(colored = colored), message, args, kwargs)
except TypeError:
self._log("ERROR", 40, False, self._get_opts(colored = colored), message, args, kwargs)
if hook: hook(message)

def trace(
self,
Expand All @@ -549,6 +571,7 @@ def trace(
colored: Optional[bool] = False,
prefix: Optional[str] = None,
max_length: Optional[int] = None,
hook: Optional[Callable] = None,
**kwargs,
) -> None:
"""
Expand All @@ -567,6 +590,7 @@ def trace(
except TypeError:
static_log_no = REVERSE_LOGLEVEL_MAPPING.get(level, 40)
self._log(level, static_log_no, False, self._get_opts(colored = colored), _msg, (), {})
if hook: hook(_msg)

def exception(
self,
Expand All @@ -575,13 +599,15 @@ def exception(
colored: Optional[bool] = False,
prefix: Optional[str] = None,
max_length: Optional[int] = None,
hook: Optional[Callable] = None,
**kwargs
):
"""
Log ``message.format(*args, **kwargs)`` with severity ``'ERROR'``.
"""
message = self._format_message(message, prefix = prefix, max_length = max_length, colored = colored, level = 'ERROR')
super().exception(message, *args, **kwargs)
if hook: hook(message)


def __call__(self, message: Any, *args, level: str = 'info', **kwargs):
Expand Down
10 changes: 7 additions & 3 deletions lazyops/libs/pooler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import functools
import subprocess
import contextvars
import contextlib
import anyio.from_thread
from concurrent import futures
from anyio._core._eventloop import threadlocals
Expand Down Expand Up @@ -244,9 +245,12 @@ def on_exit(self):
Cleans up the ThreadPoolExecutor and Tasks
"""
for task in self.tasks:
task.cancel()
if self._pool is not None: self._pool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion)
if self._ppool is not None: self._ppool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion)
with contextlib.suppress(Exception):
task.cancel()
with contextlib.suppress(Exception):
if self._pool is not None: self._pool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion)
with contextlib.suppress(Exception):
if self._ppool is not None: self._ppool.shutdown(wait = self.allow_task_completion, cancel_futures = not self.allow_task_completion)

"""
Core
Expand Down
2 changes: 1 addition & 1 deletion lazyops/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '0.2.82'
VERSION = '0.2.83'

0 comments on commit d2433b1

Please sign in to comment.