Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
IvoVellekoop committed Nov 30, 2024
1 parent cf681f8 commit 17ebfb9
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TestDevice(Device):
def __init__(self, _name, _engine):
self._attribute = 123
self.set_attribute_thread = "Not set"
self.regular_method_thread = "Not set"
self.decorated_method_thread = "Not set"

@property
def attribute(self):
Expand All @@ -48,6 +50,9 @@ class CustomThreadTestDevice(Device):

def __init__(self, _name, _engine):
self._attribute = 123
self.get_attribute_thread = "Not set"
self.set_attribute_thread = "Not set"
self.regular_method_thread = "Not set"

@property
def attribute(self):
Expand Down Expand Up @@ -123,15 +128,15 @@ def test_custom_thread_device_attribute_access(engine):
"""
Test that device attribute access runs on the custom thread when specified.
"""
custom_device = CustomThreadTestDevice(engine, "CustomDevice")
custom_device = CustomThreadTestDevice( "CustomDevice", engine)
custom_device.attribute = 'something'
assert custom_device.set_attribute_thread == "CustomDeviceThread"

def test_custom_thread_device_property_access(engine):
"""
Test that device property access runs on the custom thread when specified.
"""
custom_device = CustomThreadTestDevice(engine,"CustomDevice")
custom_device = CustomThreadTestDevice("CustomDevice", engine)
custom_device.attribute = 'something'
assert custom_device.set_attribute_thread == "CustomDeviceThread"

Expand All @@ -141,8 +146,7 @@ def test_custom_thread_device_property_access(engine):

@on_thread("OuterThread")
class OuterThreadDevice(Device):
def __init__(self, engine, name, inner_device):
super().__init__(engine, name)
def __init__(self, _name, _engine, inner_device):
self.inner_device = inner_device
self.outer_thread = None

Expand All @@ -153,8 +157,7 @@ def outer_method(self):

@on_thread("InnerThread")
class InnerThreadDevice(Device):
def __init__(self, engine, name):
super().__init__(engine, name)
def __init__(self, _name, _engine):
self.inner_thread = None

def inner_method(self):
Expand All @@ -166,8 +169,8 @@ def test_nested_thread_switch(engine):
Test that nested calls to methods with different thread specifications
result in correct thread switches at each level.
"""
inner_device = InnerThreadDevice(engine, "InnerDevice")
outer_device = OuterThreadDevice(engine, "OuterDevice", inner_device)
inner_device = InnerThreadDevice("InnerDevice", engine)
outer_device = OuterThreadDevice("OuterDevice", engine, inner_device)

class OuterEvent(ExecutorEvent):
def execute(self):
Expand Down Expand Up @@ -205,7 +208,7 @@ def test_multiple_decorators(engine):
"""
Test that the thread decorator works correctly when combined with other decorators.
"""
device = MultiDecoratedDevice(engine, "MultiDevice")
device = MultiDecoratedDevice("MultiDevice", engine)

class MultiEvent(ExecutorEvent):
def execute(self):
Expand Down
168 changes: 14 additions & 154 deletions src/exengine/kernel/device.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,5 @@
"""
Base class for all device_implementations that integrates with the execution engine and enables tokenization of device access.
"""
from functools import wraps
from weakref import WeakSet
from .executor import ExecutionEngine

import threading
import sys

from .executor import MethodCallEvent, GetAttrEvent, SetAttrEvent, ExecutionEngine

# class DeviceMetaclass(ABCMeta):
# """
# Metaclass for device_implementations that wraps all methods and attributes in the device class to add the ability to
# control their execution and access. This has two purposes:
#
# 1) Add the ability to record all method calls and attribute accesses for tokenization
# 2) Add the ability to make all methods and attributes thread-safe by putting them on the Executor
# 3) Automatically register all instances of the device with the ExecutionEngine
# """
# @staticmethod
# def wrap_for_executor(attr_name, attr_value):
# if hasattr(attr_value, '_wrapped_for_executor'):
# return attr_value
#
# # Add this block to handle properties
# if isinstance(attr_value, property):
# return property(
# fget=DeviceMetaclass.wrap_for_executor(f"{attr_name}_getter", attr_value.fget) if attr_value.fget else None,
# fset=DeviceMetaclass.wrap_for_executor(f"{attr_name}_setter", attr_value.fset) if attr_value.fset else None,
# fdel=DeviceMetaclass.wrap_for_executor(f"{attr_name}_deleter", attr_value.fdel) if attr_value.fdel else None,
# doc=attr_value.__doc__
# )
#
# @wraps(attr_value)
# def wrapper(self: 'Device', *args: Any, **kwargs: Any) -> Any:
# if attr_name in _no_executor_attrs or self._no_executor:
# return attr_value(self, *args, **kwargs)
# if DeviceMetaclass._is_reroute_exempted_thread():
# return attr_value(self, *args, **kwargs)
# # check for method-level preferred thread name first, then class-level
# thread_name = getattr(attr_value, '_thread_name', None) or getattr(self, '_thread_name', None)
# #if ExecutionEngine.on_any_executor_thread():
# # check for device-level preferred thread
# # if thread_name is None or threading.current_thread().name == thread_name:
# # return attr_value(self, *args, **kwargs)
# event = MethodCallEvent(method_name=attr_name, args=args, kwargs=kwargs, instance=self)
# return self._engine.submit(event, thread_name=thread_name).await_execution()
#
# wrapper._wrapped_for_executor = True
# return wrapper
#
# @staticmethod
# def is_debugger_thread():
# if not _python_debugger_active:
Expand All @@ -61,121 +11,31 @@
# return any(name in current_thread.name or name in str(current_thread.__class__.__name__)
# for name in debugger_thread_names)
#
# @staticmethod
# def _is_reroute_exempted_thread() -> bool:
# return DeviceMetaclass.is_debugger_thread() or threading.current_thread() in _within_executor_threads
#
# @staticmethod
# def find_in_bases(bases, method_name):
# for base in bases:
# if hasattr(base, method_name):
# return getattr(base, method_name)
# return None
#
# def __new__(mcs, name: str, bases: tuple, attrs: dict) -> Any:
# new_attrs = {}
# for attr_name, attr_value in attrs.items():
# if not attr_name.startswith('_'):
# if isinstance(attr_value, property): # Property
# new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value)
# elif callable(attr_value): # Regular method
# new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value)
# else: # Attribute
# new_attrs[attr_name] = attr_value
# else:
# new_attrs[attr_name] = attr_value
#
#
# original_setattr = attrs.get('__setattr__') or mcs.find_in_bases(bases, '__setattr__') or object.__setattr__
# def getattribute_with_fallback(self, name):
# """ Wrap the getattribute method to fallback to getattr if an attribute is not found """
# try:
# return object.__getattribute__(self, name)
# except AttributeError:
# try:
# return self.__getattr__(name)
# except AttributeError as e:
# if _python_debugger_active and (name == 'shape' or name == '__len__'):
# pass # This prevents a bunch of irrelevant errors in the Pycharm debugger
# else:
# raise e
#
# def __getattribute__(self: 'Device', name: str) -> Any:
# if name.startswith('_') or name in _no_executor_attrs or self._no_executor:
# return object.__getattribute__(self, name)
# if DeviceMetaclass._is_reroute_exempted_thread():
# return getattribute_with_fallback(self, name)
# thread_name = getattr(self, '_thread_name', None)
# #if ExecutionEngine.on_any_executor_thread():
# # # check for device-level preferred thread
# # if thread_name is None or threading.current_thread().name == thread_name:
# # return getattribute_with_fallback(self, name)
# event = GetAttrEvent(attr_name=name, instance=self, method=getattribute_with_fallback)
# return self._engine.submit(event, thread_name=thread_name).await_execution()
#
# def __setattr__(self: 'Device', name: str, value: Any) -> None:
# if name in _no_executor_attrs or self._no_executor:
# return original_setattr(self, name, value)
# if DeviceMetaclass._is_reroute_exempted_thread():
# return original_setattr(self, name, value)
# thread_name = getattr(self, '_thread_name', None)
# # if ExecutionEngine.on_any_executor_thread():
# # # Check for device-level preferred thread
# # if thread_name is None or threading.current_thread().name == thread_name:
# # return original_setattr(self, name, value)
# event = SetAttrEvent(attr_name=name, value=value, instance=self, method=original_setattr)
# self._engine.submit(event, thread_name=thread_name).await_execution()
#
# new_attrs['__getattribute__'] = __getattribute__
# new_attrs['__setattr__'] = __setattr__
#
# new_attrs['_no_executor'] = True # For startup
# new_attrs['_no_executor_attrs'] = _no_executor_attrs
#
#
#
# # Create the class
# cls = super().__new__(mcs, name, bases, new_attrs)
#
# return cls


class Device:
"""
Required base class for all devices usable with the execution engine
Base class that causes the object to be automatically registered on creation.
Usage:
class MyDevice(Device):
def __init__(self, name: str, engine: ExecutionEngine, ...):
...
Device classes should inherit from this class and implement the abstract methods. The DeviceMetaclass will wrap all
methods and attributes in the class to make them thread-safe and to optionally record all method calls and
attribute accesses.
engine = ExecutionEngine()
device = MyDevice("device_name", engine, ...)
Attributes with a trailing _noexec will not be wrapped and will be executed directly on the calling thread. This is
useful for attributes that are not hardware related and can bypass the complexity of the executor.
Has the same effect as:
class MyDevice:
...
Device implementations can also implement functionality through properties (i.e. attributes that are actually
methods) by defining a getter and setter method for the property.
engine = ExecutionEngine()
device = engine.register("device_name", MyDevice(...))
"""
def __new__(cls, name: str, engine: "ExecutionEngine", *args, **kwargs):
obj = super().__new__(cls)
obj.__init__(name, engine, *args, **kwargs)
return engine.register(name, obj)

# def __init__(self, engine: "ExecutionEngine", name: str, no_executor: bool = False, no_executor_attrs: Sequence[str] = ('_name', )):
# """
# Create a new device
#
# :param name: The name of the device
# :param no_executor: If True, all methods and attributes will be executed directly on the calling thread instead
# of being rerouted to the executor
# :param no_executor_attrs: If no_executor is False, this is a list of attribute names that will be executed
# directly on the calling thread
# """
# self._engine = engine
# self._no_executor_attrs.extend(no_executor_attrs)
# self._no_executor = no_executor
# self._name = name
# engine.register_device(name, self)
#
#
# def get_allowed_property_values(self, property_name: str) -> Optional[list[str]]:
# return None # By default, any value is allowed
#
Expand Down
16 changes: 1 addition & 15 deletions src/exengine/kernel/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
from .ex_future import ExecutionFuture
from .queue import PriorityQueue, Queue, Shutdown

# todo: Remove singleton pattern. Remove related locking, __new__ override and other complications
# todo: Add shutdown to __del__
# todo: use [] operator for getting devices by id
# todo: simplify worker threads:
# - remove enqueing on free thread -> replace by a thread pool mechanism
# - decouple enqueing and dequeing (related)
Expand Down Expand Up @@ -119,7 +117,7 @@ def setter(self, value, _name=name):
WrappedObject = type('_' + obj.__class__.__name__, (DeviceBase,), class_dict)
# todo: cache dynamically generated classes
wrapped = WrappedObject(self,obj)
self.register_device(id, wrapped)
self._devices[id] = wrapped
return wrapped

def subscribe_to_notifications(self, subscriber: Callable[[Notification], None],
Expand Down Expand Up @@ -200,18 +198,6 @@ def __getitem__(self, device_id: str):
"""
return self._devices[device_id]

def register_device(self, name: str, device):
"""
Called automatically when a Device is created so that the ExecutionEngine can keep track of all devices
and look them up by their string names
"""
# Make sure there's not already a device with this name
if name in self._devices:
raise ValueError(f"Device with name {name} already exists")
# todo: check if device is wrapped
self._devices[name] = device


@staticmethod
def on_any_executor_thread():
#todo: remove
Expand Down
2 changes: 2 additions & 0 deletions src/exengine/kernel/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def test_submit_multiple_events(engine):
assert event2.executed


@pytest.mark.skip("This test is broken. Even though event3 gets priority, it may execute after event2.")
def test_event_prioritization(engine):
"""
Test event prioritization in the ExecutionEngine.
Expand All @@ -278,6 +279,7 @@ def test_event_prioritization(engine):
start_event1.wait() # Wait for the first event to start executing

engine.submit(event2)
# race condition, at this point the engine may or may not have started executing event2
engine.submit(event3, prioritize=True)

finish_event1.set()
Expand Down
25 changes: 17 additions & 8 deletions src/exengine/kernel/test/test_generic_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

from exengine import ExecutionEngine
from exengine.kernel.device import Device
from exengine.kernel.executor import MethodCallEvent, GetAttrEvent, SetAttrEvent
from exengine.kernel.ex_future import ExecutionFuture

Expand Down Expand Up @@ -52,8 +53,10 @@ def _private_method(self, x):
return x + self.value1 + self.value2

@pytest.fixture
def obj():
return TestObject()
def engine():
e = ExecutionEngine()
yield e
e.shutdown()

def verify_behavior(obj):
"""Test the non-wrapped object"""
Expand All @@ -70,19 +73,25 @@ def verify_behavior(obj):
assert result == 28 + 29 + 4


def test_bare(obj):
verify_behavior(obj)
def test_bare():
verify_behavior(TestObject())

def test_wrapping(obj):
engine = ExecutionEngine()
wrapper = engine.register("object1", obj)
def test_wrapping(engine):
wrapper = engine.register("object1", TestObject())
with pytest.raises(AttributeError):
wrapper.non_existing_property = 0
verify_behavior(wrapper)
engine["object1"].value1 = 7
assert wrapper.value1 == 7
engine.shutdown()

def test_device_base_class(engine):
class T(TestObject, Device):
def __init__(self, _name, _engine):
super().__init__()

device = T("object1", engine)
assert engine["object1"] is device
verify_behavior(device)


def test_openwfs():
Expand Down

0 comments on commit 17ebfb9

Please sign in to comment.