-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added a unit test to MuseInputStream's pop method
- Loading branch information
William Harvey
committed
Nov 19, 2018
1 parent
497f897
commit 5e22719
Showing
6 changed files
with
83 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ __pycache__/ | |
.idea/* | ||
.vscode/* | ||
|
||
settings.json | ||
|
||
# Distribution / packaging | ||
.Python | ||
env/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
DEFAULT_PORT = 5001 | ||
LOCALHOST = '127.0.0.1' | ||
DEFAULT_SIGNAL_QUEUE_LENGTH = 1024 | ||
SIGNAL_QUEUE_LENGTH = 1024 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,38 @@ | ||
from pythonosc import dispatcher, osc_server | ||
from datetime import datetime | ||
from signal import Signal, SignalData | ||
from muse_constants import ( | ||
DEFAULT_MUSE_EEG_ACQUISITION_FREQUENCY, | ||
DEFAULT_MUSE_BATT_ACQUISITION_FREQUENCY, | ||
MUSE_OSC_PATH, | ||
from threading import Thread | ||
from pythonosc import dispatcher, osc_server | ||
from constants import DEFAULT_PORT, LOCALHOST, SIGNAL_QUEUE_LENGTH | ||
from muse_constants import ( | ||
MUSE_ACQUISITION_FREQUENCIES, | ||
MUSE_BATT_ACQUISITION_FREQUENCY, | ||
MUSE_EEG_ACQUISITION_FREQUENCY, | ||
MUSE_OSC_PATH, | ||
) | ||
from constants import ( | ||
DEFAULT_PORT, | ||
LOCALHOST, | ||
DEFAULT_SIGNAL_QUEUE_LENGTH, | ||
) | ||
|
||
class MuseInputStreamWrapper(): | ||
signals: dict | ||
class MuseInputStream(): | ||
_signals: dict | ||
_server: osc_server.ThreadingOSCUDPServer | ||
|
||
def __init__(self, sought_data_list: list = ['eeg'], ip: str = LOCALHOST, port: int = DEFAULT_PORT): | ||
self._signals = dict() | ||
self._server = osc_server.ThreadingOSCUDPServer((ip, port), self._create_dispatchers(sought_data_list)) | ||
Thread(target=self._server.serve_forever).start() | ||
|
||
def _callback(self, osc_path, opt_params, signal_data): | ||
signal_name = opt_params[0] | ||
self._signals[signal_name].push(signal_data) | ||
|
||
def __init__(self, sought_data_list: list<str> = ['eeg'], port: int = DEFAULT_PORT, ip: str = LOCALHOST): | ||
self.signals = dict() | ||
def _create_dispatchers(self, sought_data_list) -> dispatcher.Dispatcher: | ||
disp = dispatcher.Dispatcher() | ||
for sought_data in sought_data_list: | ||
disp.map(MUSE_OSC_PATH[sought_data], self.callback, MUSE_ACQUISITION_FREQUENCIES[sought_data], sought_data) | ||
server = osc_server.ThreadingOSCUDPServer((ip, port), disp) | ||
server.serve_forever() | ||
if(sought_data not in self._signals): | ||
self._signals[sought_data] = Signal(SIGNAL_QUEUE_LENGTH, MUSE_ACQUISITION_FREQUENCIES[sought_data]) | ||
disp.map(MUSE_OSC_PATH[sought_data], self._callback, sought_data) | ||
return disp | ||
|
||
def callback(self, osc_path, *registeredParams, *messages): | ||
data: SignalData = SignalData([], 0) | ||
acquisition_frequency = registeredParams[0] | ||
signal_name = registeredParams[1] | ||
for message in messages[2:]: | ||
if(signal_name not in self.signals): | ||
self.signals[signal_name] = Signal(DEFAULT_SIGNAL_QUEUE_LENGTH, acquisition_frequency) | ||
data.values.append(message) | ||
data.time = datetime.now() - self.signals[signal_name].init_time | ||
self.signals[signal_name].push(data) | ||
def pop(self, signal_name: str) -> SignalData: | ||
return self._signals[signal_name].pop() | ||
|
||
def get(self, signal_name: str): | ||
return self.signals[signal_name].pop() | ||
def close(self): | ||
self._server.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,32 @@ | ||
from threading import Lock | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from PriorityQueue import PriorityQueue | ||
from queue import PriorityQueue | ||
from threading import Lock | ||
|
||
@dataclass | ||
class SignalData(): | ||
time: float | ||
values: list | ||
|
||
def __lt__(self, other): | ||
return self.time < other.time | ||
|
||
class Signal(): | ||
init_time: float | ||
lock: Lock | ||
signals_queue: PriorityQueue | ||
acquisition_period: float | ||
signal_queue: PriorityQueue | ||
signal_period: float | ||
data_counter: int | ||
|
||
def __init__(self, length: int, acquisition_frequency: float): | ||
self.init_time = datetime.now().timestamp() | ||
self.lock = Lock() | ||
self.signals_queue = PriorityQueue(length) | ||
self.acquisition_period = 1 / acquisition_frequency | ||
self.signal_queue = PriorityQueue(length) | ||
self.signal_period = (1 / acquisition_frequency) | ||
self.data_counter = 0 | ||
|
||
def push(self, data: SignalData): | ||
self.lock.acquire() | ||
self.signals_queue.put(data, True, self.acquisition_period) | ||
self.lock.release() | ||
def push(self, data_list: list): | ||
time = self.data_counter * self.signal_period | ||
signal_data: SignalData = SignalData(time, data_list) | ||
self.signal_queue.put(signal_data, True, self.signal_period) | ||
self.data_counter += 1 | ||
|
||
def pop(self) -> SignalData: | ||
self.lock.acquire() | ||
data = self.signals_queue.get(True) | ||
self.lock.release() | ||
return data | ||
return self.signal_queue.get(True) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import random | ||
import time | ||
import unittest | ||
import unittest.mock | ||
from pythonosc import udp_client | ||
from signal import SignalData | ||
from input_stream import MuseInputStream | ||
from muse_constants import MUSE_EEG_ACQUISITION_FREQUENCY | ||
|
||
MESSAGES_NUMBER = 16 | ||
MUSE_ACQUISITION_PERIOD = 1 / MUSE_EEG_ACQUISITION_FREQUENCY | ||
|
||
class InputStreamTest(unittest.TestCase): | ||
def create_random_list(self): | ||
return random.sample(range(255), MESSAGES_NUMBER) | ||
|
||
def create_client(self, sent_data: list): | ||
client = udp_client.SimpleUDPClient('127.0.0.1', 5001) | ||
for data in sent_data: | ||
client.send_message("/eeg", data) | ||
time.sleep(MUSE_ACQUISITION_PERIOD) | ||
|
||
def test_pop(self): | ||
random_bytes = self.create_random_list() | ||
sought_data_list = ['eeg'] | ||
muse_input_stream = MuseInputStream(sought_data_list, '127.0.0.1', 5001) | ||
self.create_client(random_bytes) | ||
for i in range(MESSAGES_NUMBER): | ||
self.assertEqual(muse_input_stream.pop('eeg'), SignalData(i * MUSE_ACQUISITION_PERIOD, random_bytes[i])) | ||
muse_input_stream.close() | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |