Skip to content

Commit

Permalink
Add prometheus-based instrumentation
Browse files Browse the repository at this point in the history
Allows collection of various metrics as executors/futures are used.
  • Loading branch information
rohanpm committed Jul 11, 2021
1 parent 7e27887 commit 2301cd4
Show file tree
Hide file tree
Showing 28 changed files with 563 additions and 35 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ from version 1.20.0 onwards.

## [Unreleased]

- n/a
### Added
- Introduced support for prometheus instrumentation.
- All executors now accept a `name` argument. If given, the name is used
in prometheus metrics and in the name of any threads created by an executor.

## [2.6.0] - 2021-06-19

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extend the behavior of `Future` objects.
- Synchronous executor
- Bridge `concurrent.futures` with `asyncio`
- Convenience API for creating executors
- Instrumented with [Prometheus](https://prometheus.io/)

See the [API documentation](https://rohanpm.github.io/more-executors/) for detailed information on usage.

Expand Down
143 changes: 141 additions & 2 deletions docs/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Example:
.. code-block:: python
from more_executors import Executors
with Executors.thread_pool() as executor:
with Executors.thread_pool(name='web-client') as executor:
future = executor.submit(requests.get, 'https://github.com/rohanpm/more-executors')
Expand Down Expand Up @@ -64,7 +64,7 @@ Example:
.. code-block:: python
# Run in up to 4 threads, retry on failure, transform output values
executor = Executors.thread_pool(max_workers=4). \
executor = Executors.thread_pool(max_workers=4, name='web-client'). \
with_map(lambda response: response.json()). \
with_retry()
responses = [executor.submit(requests.get, url)
Expand All @@ -91,6 +91,31 @@ retries. The throttling in this example has no effect, since a
:class:`~more_executors.sync.SyncExecutor` is intrinsically throttled to
a single pending future.

.. _naming executors:

Naming executors
----------------

All executors accept an optional ``name`` argument, an arbitrary string.
Setting the ``name`` when creating an executor has the following effects:

- If the executor creates any threads, the thread name will include the
specified value.

- The name will be used as ``executor`` label on any :ref:`metrics`
associated with the executor.

When creating chained executors via the ``with_*`` methods
(see :ref:`composing executors`), names automatically propagate through
the chain:

.. code-block:: python
Executors.thread_pool(name='svc-client').with_retry().with_throttle(4)
In the above example, three executors are created and all of them are
given the name ``svc-client``.


Composing futures
-----------------
Expand Down Expand Up @@ -207,3 +232,117 @@ executor. When this happens, it will no longer be possible for the garbage
collector to clean up the executor's resources automatically and a thread
leak may occur. If in doubt, call
:meth:`~concurrent.futures.Executor.shutdown`.

.. _metrics:

Prometheus metrics
------------------

This library automatically collects `Prometheus <https://prometheus.io/>`_
metrics if the ``prometheus_client`` Python module is available.
The feature is disabled when this module is not installed.

If you want to ensure that ``more-executors`` is installed along with
all prometheus dependencies, you may request the 'prometheus' extras,
as in example:

.. code-block::
pip install more-executors[prometheus]
The library only collects metrics; it does not expose them.
You must use ``prometheus_client`` to expose metrics in the most
appropriate manner when integrating this library with your tool or service.
Here is a simple example to dump metrics to a file:

.. code-block:: python
import prometheus_client
prometheus_client.write_to_textfile('metrics.txt')
The following metrics are available:

``more_executors_exec_inprogress``
A *gauge* for the number of executors currently in use.

"In use" means an executor has been created and ``shutdown()`` not
yet called. Incorrect usage of ``shutdown()`` (e.g. calling more than
once) will lead to inaccurate data.

``more_executors_exec_total``
A *counter* for the total number of executors created.

``more_executors_future_inprogress``
A *gauge* for the number of futures currently in progress.

"In progress" means a future has been created and not yet reached
a terminal state.

``more_executors_future_total``
A *counter* for the total number of futures created.

``more_executors_future_cancel_total``
A *counter* for the total number of futures cancelled.

``more_executors_future_error_total``
A *counter* for the total number of futures resolved with an exception.

``more_executors_future_time_total``
A *counter* for the total execution time (in seconds) of futures.

The execution time of a future is the period between the
creation and resolution of a future.

``more_executors_poll_total``
A *counter* for the total number of times a :ref:`poll function` was
invoked.

``more_executors_poll_error_total``
A *counter* for the total number of times a :ref:`poll function`
raised an exception.

``more_executors_poll_time_total``
A *counter* for the total execution time (in seconds) of
:ref:`poll function` calls.

``more_executors_retry_total``
A *counter* for the total number of times a future was retried
by :class:`~more_executors.retry.RetryExecutor`.

``more_executors_retry_queue``
A *gauge* for the current queue size of a
:class:`~more_executors.retry.RetryExecutor` (i.e. the
number of futures currently waiting to retry).

``more_executors_retry_delay_total``
A *counter* for the total time (in seconds) spent waiting to
retry futures via :class:`~more_executors.retry.RetryExecutor`.

``more_executors_throttle_queue``
A *gauge* for the current queue size of a
:class:`~more_executors.throttle.ThrottleExecutor` (i.e. the number of futures
not yet able to start due to throttling).

``more_executors_timeout_total``
A *counter* for the total number of futures cancelled due to timeout
via :class:`~more_executors.timeout.TimeoutExecutor` or
:func:`~more_executors.futures.f_timeout`.

Only successfully cancelled futures are included.

``more_executors_shutdown_cancel_total``
A *counter* for the total number of futures cancelled due to executor
shutdown via :class:`~more_executors.cancel_on_shutdown.CancelOnShutdownExecutor`.

Only successfully cancelled futures are included.

Metrics include the following labels:

``type``
The type of executor or future in use; e.g. ``map``, ``retry``,
``poll``.

``executor``
Name of executor (see :ref:`Naming executors`).

14 changes: 13 additions & 1 deletion more_executors/_impl/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from concurrent.futures import Executor
import asyncio

from .metrics import metrics


class AsyncioExecutor(Executor):
"""An executor which delegates to another executor while converting
Expand All @@ -13,7 +15,7 @@ class AsyncioExecutor(Executor):
.. versionadded:: 1.7.0
"""

def __init__(self, delegate, loop=None, logger=None):
def __init__(self, delegate, loop=None, logger=None, name="default"):
"""
Parameters:
Expand All @@ -26,9 +28,18 @@ def __init__(self, delegate, loop=None, logger=None):
logger (~logging.Logger):
a logger used for messages from this executor
name (str):
a name for this executor
.. versionchanged:: 2.7.0
Introduced ``name``.
"""
self._delegate = delegate
self._loop = loop
self._name = name
metrics.EXEC_TOTAL.labels(type="asyncio", executor=self._name).inc()
metrics.EXEC_INPROGRESS.labels(type="asyncio", executor=self._name).inc()

def submit(self, *args, **kwargs): # pylint: disable=arguments-differ
return self.submit_with_loop(self._loop, *args, **kwargs)
Expand All @@ -53,4 +64,5 @@ def submit_with_loop(self, loop, fn, *args, **kwargs):
return asyncio.wrap_future(future, loop=loop)

def shutdown(self, wait=True, **_kwargs):
metrics.EXEC_INPROGRESS.labels(type="asyncio", executor=self._name).dec()
self._delegate.shutdown(wait, **_kwargs)
19 changes: 18 additions & 1 deletion more_executors/_impl/cancel_on_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .wrap import CanCustomizeBind
from .logwrap import LogWrapper
from .metrics import metrics


class CancelOnShutdownExecutor(CanCustomizeBind, Executor):
Expand Down Expand Up @@ -34,7 +35,7 @@ class CancelOnShutdownExecutor(CanCustomizeBind, Executor):
in the standard library).
"""

def __init__(self, delegate, logger=None):
def __init__(self, delegate, logger=None, name="default"):
"""
Parameters:
Expand All @@ -43,14 +44,25 @@ def __init__(self, delegate, logger=None):
logger (~logging.Logger):
a logger used for messages from this executor
name (str):
a name for this executor
.. versionchanged:: 2.7.0
Introduced ``name``.
"""
self._log = LogWrapper(
logger if logger else logging.getLogger("CancelOnShutdownExecutor")
)
self._name = name
self._delegate = delegate
self._futures = set()
self._lock = RLock()
self._shutdown = False
metrics.EXEC_TOTAL.labels(type="cancel_on_shutdown", executor=self._name).inc()
metrics.EXEC_INPROGRESS.labels(
type="cancel_on_shutdown", executor=self._name
).inc()

def shutdown(self, wait=True, **_kwargs):
"""Shut down the executor.
Expand All @@ -65,11 +77,16 @@ def shutdown(self, wait=True, **_kwargs):
if self._shutdown:
return
self._shutdown = True
metrics.EXEC_INPROGRESS.labels(
type="cancel_on_shutdown", executor=self._name
).dec()
futures = self._futures.copy()

for f in futures:
cancel = f.cancel()
self._log.debug("Cancel %s: %s", f, cancel)
if cancel:
metrics.SHUTDOWN_CANCEL.labels(executor=self._name).inc()

self._delegate.shutdown(wait, **_kwargs)

Expand Down
1 change: 1 addition & 0 deletions more_executors/_impl/flat_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ class FlatMapExecutor(MapExecutor):
"""

_FUTURE_CLASS = FlatMapFuture
_TYPE = "flat_map"
3 changes: 2 additions & 1 deletion more_executors/_impl/futures/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .base import wrap
from .check import ensure_futures
from ..metrics import track_future


# for wrapping arguments.
Expand Down Expand Up @@ -31,7 +32,7 @@ def f_apply(future_fn, *future_args, **future_kwargs):
.. versionadded:: 1.19.0
"""
wrapped_args = _wrap_args(*future_args, **future_kwargs)
return _wrapped_f_apply(future_fn, wrapped_args)
return track_future(_wrapped_f_apply(future_fn, wrapped_args), type="apply")


def _wrap_args(*future_args, **future_kwargs):
Expand Down
4 changes: 3 additions & 1 deletion more_executors/_impl/futures/bool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..common import copy_future_exception
from .check import ensure_futures
from ..logwrap import LogWrapper

from ..metrics import track_future

LOG = LogWrapper(logging.getLogger("more_executors.futures"))

Expand Down Expand Up @@ -106,6 +106,7 @@ def f_or(f, *fs):
return f

oper = OrOperation([f] + list(fs))
track_future(oper.out, type="or")
return oper.out


Expand Down Expand Up @@ -165,4 +166,5 @@ def f_and(f, *fs):
return f

oper = AndOperation([f] + list(fs))
track_future(oper.out, type="and")
return oper.out
3 changes: 2 additions & 1 deletion more_executors/_impl/futures/nocancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ..map import MapFuture
from .check import ensure_future
from ..metrics import track_future


class NoCancelFuture(MapFuture):
Expand All @@ -25,4 +26,4 @@ def f_nocancel(future):
.. versionadded:: 1.19.0
"""
return NoCancelFuture(future, lambda x: x)
return track_future(NoCancelFuture(future, lambda x: x), type="nocancel")
5 changes: 4 additions & 1 deletion more_executors/_impl/futures/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from more_executors._impl.map import MapFuture
from more_executors._impl.common import MAX_TIMEOUT
from .check import ensure_future
from ..metrics import track_future


class ProxyFuture(MapFuture):
Expand Down Expand Up @@ -213,4 +214,6 @@ def f_proxy(f, **kwargs):
.. versionadded:: 2.3.0
"""
return ProxyFuture(f, timeout=kwargs.pop("timeout", MAX_TIMEOUT))
return track_future(
ProxyFuture(f, timeout=kwargs.pop("timeout", MAX_TIMEOUT)), type="proxy"
)
5 changes: 3 additions & 2 deletions more_executors/_impl/futures/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .map import f_map

from ..common import copy_exception
from ..metrics import track_future


def f_sequence(futures):
Expand All @@ -30,7 +31,7 @@ def f_sequence(futures):
.. versionadded:: 1.19.0
"""
return f_traverse(lambda x: x, futures)
return track_future(f_traverse(lambda x: x, futures), type="sequence")


def f_traverse(fn, xs):
Expand Down Expand Up @@ -62,4 +63,4 @@ def f_traverse(fn, xs):
return future

zipped = f_zip(*futures)
return f_map(zipped, list)
return track_future(f_map(zipped, list), type="traverse")
Loading

0 comments on commit 2301cd4

Please sign in to comment.