From eb92e1a55e0cfa6e40140bf6c77ad33f8d4002e9 Mon Sep 17 00:00:00 2001 From: narodnik Date: Sun, 2 Jun 2024 18:57:08 +0200 Subject: [PATCH] record.py: send a keep alive message every minute. (#212) * record.py: send a keep alive message every minute. * record.py: if disconnected, attempt to reconnect forever --------- Co-authored-by: zero --- muselsl/backends.py | 33 ++++++++++++++++++++++++++++----- muselsl/muse.py | 8 ++++++-- muselsl/record.py | 34 +++++++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/muselsl/backends.py b/muselsl/backends.py index 7045cf5..9d12d76 100644 --- a/muselsl/backends.py +++ b/muselsl/backends.py @@ -1,11 +1,14 @@ import asyncio import atexit +import sys import time try: import bleak except ModuleNotFoundError as error: bleak = error +RETRY_SLEEP = 1 + def _wait(coroutine): loop = asyncio.get_event_loop() return loop.run_until_complete(coroutine) @@ -32,18 +35,38 @@ def scan(self, timeout=10): raise bleak devices = _wait(bleak.BleakScanner.discover(timeout)) return [{'name':device.name, 'address':device.address} for device in devices] - def connect(self, address): + def connect(self, address, retries): result = BleakDevice(self, address) - result.connect() + if not result.connect(retries): + return None return result class BleakDevice: def __init__(self, adapter, address): self._adapter = adapter - self._client = bleak.BleakClient(address) - def connect(self): - _wait(self._client.connect()) + self._address = address + self._client = None + # Use retries=-1 to continue attempting to reconnect forever + def connect(self, retries): + attempts = 1 + while True: + self._client = bleak.BleakClient(self._address) + if attempts > 1: + print(f'Connection attempt {attempts}') + try: + _wait(self._client.connect()) + except ( + bleak.exc.BleakDeviceNotFoundError, bleak.exc.BleakError + ) as err: + print(f'Failed to connect: {err}', file=sys.stderr) + if attempts == 1 + retries: + return False + sleep(RETRY_SLEEP) + attempts += 1 + else: + break self._adapter.connected.add(self) + return True def disconnect(self): _wait(self._client.disconnect()) self._adapter.connected.remove(self) diff --git a/muselsl/muse.py b/muselsl/muse.py index b23c3eb..bdc8b9f 100644 --- a/muselsl/muse.py +++ b/muselsl/muse.py @@ -66,7 +66,7 @@ def __init__(self, self.preset = preset self.disable_light = disable_light - def connect(self, interface=None): + def connect(self, interface=None, retries=0): """Connect to the device""" try: if self.backend == 'bluemuse': @@ -87,7 +87,11 @@ def connect(self, interface=None): serial_port=self.interface) self.adapter.start() - self.device = self.adapter.connect(self.address) + if ((device := self.adapter.connect(self.address, retries)) + is None): + return False + self.device = device + if(self.preset != None): self.select_preset(self.preset) diff --git a/muselsl/record.py b/muselsl/record.py index 30a41cb..f312aab 100644 --- a/muselsl/record.py +++ b/muselsl/record.py @@ -1,6 +1,8 @@ +import bleak import numpy as np import pandas as pd import os +import sys from typing import Union, List, Optional from pathlib import Path from pylsl import StreamInlet, resolve_byprop @@ -13,7 +15,6 @@ # Records a fixed duration of EEG data from an LSL stream into a CSV file - def record( duration: int, filename=None, @@ -212,16 +213,43 @@ def save_eeg(new_samples, new_timestamps): timestamps.append(new_timestamps) muse = Muse(address, save_eeg, backend=backend) - muse.connect() + if not muse.connect(): + print(f'Failed to connect to Muse: {address}', file=sys.stderr) + return muse.start() t_init = time() print('Start recording at time t=%.3f' % t_init) + last_update = t_init while (time() - t_init) < duration: try: - backends.sleep(1) + try: + backends.sleep(1) + + # Send a keep alive every 10 secs + if time() - last_update > 10: + last_update = time() + muse.keep_alive() + except bleak.exc.BleakError: + print('Disconnected. Attempting to reconnect...') + # Do not giveup since we make a best effort to continue + # data collection. Assume device is out of range or another + # temp error. + while True: + muse.connect(retries=-1) + try: + muse.resume() + except bleak.exc.BleakDBusError: + # Something is wrong with this connection + print('DBus error occurred. Reconnecting.') + muse.disconnect() + continue + else: + break + print('Connected. Continuing with data collection...') except KeyboardInterrupt: + print('Interrupt received. Exiting data collection.') break muse.stop()