Skip to content

Commit

Permalink
Merge pull request #19 from henrypinkard/main
Browse files Browse the repository at this point in the history
finishing notifcation docs and restrucutre imports
  • Loading branch information
henrypinkard authored Aug 19, 2024
2 parents 7f22493 + 139be7f commit 5b5beb0
Show file tree
Hide file tree
Showing 23 changed files with 253 additions and 48 deletions.
4 changes: 2 additions & 2 deletions docs/_static/exengine_bigpicture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions docs/_static/tokenization.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output

html_theme = "sphinx_rtd_theme"
html_theme_options = {
'collapse_navigation': True,
'navigation_depth': 6,
}

# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Key Features:


.. toctree::
:maxdepth: 2
:maxdepth: 3
:caption: Contents:

design
Expand Down
178 changes: 174 additions & 4 deletions docs/usage/notifications.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,179 @@
.. _notifications:


##############
=============
Notifications
##############
=============


Overview
---------

Notifications in ExEngine provide a powerful mechanism for asynchronous communication between the Execution and user code. They allow devices, events, and other components to broadcast updates about their status or important occurrences. This enables reactive programming patterns, allowing your software to respond dynamically to changes in the system state or experimental conditions.

Notifications can serve several purposes:

- Inform about the completion of asynchronous operations (i.e. those occuring on a different thread)
- Alert about changes in device states
- Communicate errors or warnings
- Provide updates on the progress of long-running tasks



Anatomy of a Notification
^^^^^^^^^^^^^^^^^^^^^^^^^^

Notifications in ExEngine are instances of classes derived from the base ``Notification`` class. Each notification has several components:

1. **Category**: Defined by the ``NotificationCategory`` enum, this indicates the broad type of the notification (``Event``, ``Data``, ``Storage``, or ``Device``).

2. **Description**: A string providing a explanation of what the notification represents.

3. **Payload**: An optional piece of data associated with the notification, whose type depends on the particular notification.

4. **Timestamp**: Automatically set to the time the notification was created.


Built-in Notification Types
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

ExEngine provides built-in notification types, such as:

1. ``EventExecutedNotification``: Posted when an ExecutionEvent completes. Its payload is None, or an Exception if the event didn't complete successfully

2. ``DataStoredNotification``: Posted when data is stored by a Storage object. Its payload is the ``DataCoordinates`` of the stored data.



Subscribing to Notifications
----------------------------

To subscribe to notifications from ExEngine, you can use the ``subscribe_to_notifications`` method of the ``ExecutionEngine`` instance:

.. code-block:: python
from exengine import ExecutionEngine
def notification_handler(notification):
print(f'Got Notification: time {notification.timestamp} and payload {notification.payload}')
engine = ExecutionEngine.get_instance()
engine.subscribe_to_notifications(notification_handler)
# When finished, unsubscribe
engine.unsubscribe_from_notifications(notification_handler)
Your ``notification_handler`` function will be called each time a new notification is posted. Since there may be many notifications produced by the ``ExecutionEngine``, these handler functions should not contain code that takes a long time to run.


Filtering Subscriptions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can filter notifications by type (i.e. a specific notification subclass) or category when subscribing, so that the handler function only gets called for a subset of notifications

.. code-block:: python
# Subscribe to a specific notification type
# SpecificNotificationClass should be a subclass of exengine.base_classes.Notification
engine.subscribe_to_notifications(handler, SpecificNotificationClass)
# Subscribe to notifications of a specific category
from exengine.kernel.notification_base import NotificationCategory
engine.subscribe_to_notifications(handler, NotificationCategory.Data)
Multiple subscriptions with different filters can be set up:

.. code-block:: python
engine.subscribe_to_notifications(handler1, NotificationA)
engine.subscribe_to_notifications(handler2, NotificationCategory.Device)
engine.subscribe_to_notifications(handler3) # No filter, receives all notifications
Determining Available Notifications
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

``ExecutorEvents`` declare the types of notifications they might emit through the ``notification_types`` class attribute. This attribute is a list of Notification types that the event may produce during its execution.

To discover which notification types are supported by a particular event:

.. code-block:: python
print(MyEvent.notification_types)
All ExecutorEvents include the ``EventExecutedNotification`` by default. Subclasses can add their additional custom types of notifications.



Awaiting Notifications from a Future
------------------------------------

Notifications can be awaited on an :ref:`ExecutionFuture <futures>` in addition to subscribing to ExEngine notifications. This is useful for waiting on specific conditions related to a particular ``ExecutorEvent``:

.. code-block:: python
future = engine.submit(some_event)
notification = future.await_notification(SomeSpecificNotification)
The Future tracks all notifications for its event. If called after a notification occurs, it returns immediately.



Publishing Notifications
-------------------------

Events can emit notifications using the ``publish_notification`` method:

.. code-block:: python
class MyEvent(ExecutorEvent):
notification_types = [MyCustomNotification]
def execute(self):
# ... do something ...
self.publish_notification(MyCustomNotification(payload="Something happened"))
Creating Custom Notifications
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To create a custom notification:

1. Subclass ``exengine.base_classes.Notification``
2. Use Python's ``@dataclass`` decorator
3. Define ``category`` (from ``exengine.notifications.NotificationCategory`` enum) and ``description`` (string) as class variables
4. Optionally, specify a payload type using a type hint in the class inheritance. For example, ``class MyCustomNotification(Notification[str])`` indicates this notification's payload will be a string.

Keep payloads lightweight for efficient processing. Example:

.. code-block:: python
from dataclasses import dataclass
from exengine.base_classes import Notification
from exengine.notifications import NotificationCategory
@dataclass
class MyCustomNotification(Notification[str]):
category = NotificationCategory.Device
description = "A custom device status update"
# Usage
notification = MyCustomNotification(payload="Device XYZ is ready")
Notifications provide a mechanism for asynchronous communication within the system. They allow devices, events, and other components to broadcast updates about their status or important occurrences. This feature enables reactive programming patterns, allowing your software to respond dynamically to changes in the system state or experimental conditions.
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ classifiers = [
"Operating System :: OS Independent"
]
dependencies = [
"pydantic"
"pydantic",
"numpy"
]

dynamic = ["version"]
Expand All @@ -21,7 +22,7 @@ readme = "README.md"
[project.optional-dependencies]
test = ["pytest"]

# all backends
# all backends -- this should be the union of all the specific backends below
all = [
"mmpycorex",
"ndstorage"
Expand Down
3 changes: 3 additions & 0 deletions src/exengine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
A flexible multi-backend execution engine for microscopy
"""
from ._version import __version__, version_info

from . import kernel
from .kernel.executor import ExecutionEngine
2 changes: 2 additions & 0 deletions src/exengine/base_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .kernel.notification_base import Notification
from .kernel.ex_event_base import ExecutorEvent
6 changes: 5 additions & 1 deletion src/exengine/examples/implicit_vs_explicit_excutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

# This occurs on the executor thread. The event is submitted to the executor and its result is awaited,
# meaning the call will block until the method is executed.
z_stage.set_position(100)
z_stage.set_position(100, thread='device_setting_thread')
# it is equivalent to:
executor.submit(SetPosition1DEvent(position=100, device=z_stage)).await_execution()



executor.submit(SetPosition1DEvent(position=100, device=z_stage), thread='device_setting_thread')
executor.submit(ReadoutImages(), thread='readout_thread')



executor.shutdown()
2 changes: 1 addition & 1 deletion src/exengine/examples/micromanager_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# download_and_install_mm() # If needed
# Start Micro-Manager core instance with Demo config
create_core_instance(mm_app_path=, mm_config_path=)
create_core_instance()

executor = ExecutionEngine()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Integration tests for events, notifications, futures, and the execution engine
"""
import pytest
from exengine.kernel.executor import ExecutionEngine
from exengine import ExecutionEngine
from exengine.kernel.ex_event_base import ExecutorEvent
from exengine.kernel.notification_base import Notification, NotificationCategory
from dataclasses import dataclass
Expand Down
22 changes: 22 additions & 0 deletions src/exengine/integration_tests/test_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# src/exengine/integration_tests/test_imports.py

import pytest


def test_import_engine():
try:
from exengine import ExecutionEngine
except ImportError as e:
pytest.fail(f"Import failed for ExecutionEngine: {e}")

def test_import_base_classes():
try:
from exengine.base_classes import Notification, ExecutorEvent
except ImportError as e:
pytest.fail(f"Import failed for base_classes: {e}")

def test_import_notifications():
try:
from exengine.notifications import NotificationCategory, DataStoredNotification, EventExecutedNotification
except ImportError as e:
pytest.fail(f"Import failed for notifications: {e}")
2 changes: 2 additions & 0 deletions src/exengine/kernel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .notification_base import NotificationCategory, Notification
from .ex_event_base import ExecutorEvent, EventExecutedNotification
10 changes: 5 additions & 5 deletions src/exengine/kernel/data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from pydantic.types import JsonValue
from dataclasses import dataclass

from exengine.kernel.notification_base import DataStoredNotification
from exengine.kernel.data_coords import DataCoordinates
from exengine.kernel.data_storage_api import DataStorageAPI
from .notification_base import DataStoredNotification
from .data_coords import DataCoordinates
from .data_storage_api import DataStorageAPI
from typing import TYPE_CHECKING


if TYPE_CHECKING:
from exengine.kernel.ex_future import ExecutionFuture
from .ex_future import ExecutionFuture


class _PeekableQueue(queue.Queue):
Expand Down Expand Up @@ -56,7 +56,7 @@ def __init__(self, storage: DataStorageAPI,
_executor=None):
# delayed import to avoid circular imports
if _executor is None:
from exengine.kernel.executor import ExecutionEngine
from .executor import ExecutionEngine
self._engine = ExecutionEngine.get_instance()
else:
self._engine = _executor
Expand Down
2 changes: 1 addition & 1 deletion src/exengine/kernel/data_storage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from typing import Protocol, runtime_checkable, Union, Dict
from exengine.kernel.data_coords import DataCoordinates
from .data_coords import DataCoordinates
import numpy as np
from pydantic.types import JsonValue

Expand Down
4 changes: 2 additions & 2 deletions src/exengine/kernel/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from weakref import WeakSet
from dataclasses import dataclass

from exengine.kernel.ex_event_base import ExecutorEvent
from exengine.kernel.executor import ExecutionEngine
from .ex_event_base import ExecutorEvent
from .executor import ExecutionEngine
import threading
import sys

Expand Down
3 changes: 1 addition & 2 deletions src/exengine/kernel/device_types_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from abc import abstractmethod, ABC
from typing import Tuple, List, Iterable, Union, Optional, Sequence
import numpy as np
from exengine.kernel.device import DeviceMetaclass

from .device import DeviceMetaclass


class Device(ABC, metaclass=DeviceMetaclass):
Expand Down
13 changes: 3 additions & 10 deletions src/exengine/kernel/ex_event_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import warnings
import numpy as np
from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable
from abc import ABC, abstractmethod, ABCMeta
import weakref
from dataclasses import dataclass, field
from exengine.kernel.notification_base import Notification
import itertools
from .notification_base import Notification

from typing import TYPE_CHECKING

from exengine.kernel.notification_base import EventExecutedNotification
from exengine.kernel.data_coords import DataCoordinates, DataCoordinatesIterator
from exengine.kernel.data_handler import DataHandler
from .notification_base import EventExecutedNotification

# if TYPE_CHECKING: # avoid circular imports
from exengine.kernel.ex_future import ExecutionFuture
from .ex_future import ExecutionFuture


class _ExecutorEventMeta(ABCMeta):
Expand Down
Loading

0 comments on commit 5b5beb0

Please sign in to comment.