-
Notifications
You must be signed in to change notification settings - Fork 185
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add 'neurofeedback' example code demonstrating epoching, FFT, band po…
…wers (#53) * Added relaxation example that prints out band powers in console * Added smoothing of bandpowers and option to view beta concentration score * Added theta protocol and cleaned up examples
- Loading branch information
Showing
4 changed files
with
365 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Estimate Relaxation from Band Powers | ||
This example shows how to buffer, epoch, and transform EEG data from a single | ||
electrode into values for each of the classic frequencies (e.g. alpha, beta, theta) | ||
Furthermore, it shows how ratios of the band powers can be used to estimate | ||
mental state for neurofeedback. | ||
The neurofeedback protocols described here are inspired by | ||
*Neurofeedback: A Comprehensive Review on System Design, Methodology and Clinical Applications* by Marzbani et. al | ||
Adapted from https://github.com/NeuroTechX/bci-workshop | ||
""" | ||
|
||
import numpy as np # Module that simplifies computations on matrices | ||
import matplotlib.pyplot as plt # Module used for plotting | ||
from pylsl import StreamInlet, resolve_byprop # Module to receive EEG data | ||
import utils # Our own utility functions | ||
|
||
# Handy little enum to make code more readable | ||
|
||
|
||
class Band: | ||
Delta = 0 | ||
Theta = 1 | ||
Alpha = 2 | ||
Beta = 3 | ||
|
||
|
||
""" EXPERIMENTAL PARAMETERS """ | ||
# Modify these to change aspects of the signal processing | ||
|
||
# Length of the EEG data buffer (in seconds) | ||
# This buffer will hold last n seconds of data and be used for calculations | ||
BUFFER_LENGTH = 5 | ||
|
||
# Length of the epochs used to compute the FFT (in seconds) | ||
EPOCH_LENGTH = 1 | ||
|
||
# Amount of overlap between two consecutive epochs (in seconds) | ||
OVERLAP_LENGTH = 0.8 | ||
|
||
# Amount to 'shift' the start of each next consecutive epoch | ||
SHIFT_LENGTH = EPOCH_LENGTH - OVERLAP_LENGTH | ||
|
||
# Index of the channel(s) (electrodes) to be used | ||
# 0 = left ear, 1 = left forehead, 2 = right forehead, 3 = right ear | ||
INDEX_CHANNEL = [0] | ||
|
||
if __name__ == "__main__": | ||
|
||
""" 1. CONNECT TO EEG STREAM """ | ||
|
||
# Search for active LSL streams | ||
print('Looking for an EEG stream...') | ||
streams = resolve_byprop('type', 'EEG', timeout=2) | ||
if len(streams) == 0: | ||
raise RuntimeError('Can\'t find EEG stream.') | ||
|
||
# Set active EEG stream to inlet and apply time correction | ||
print("Start acquiring data") | ||
inlet = StreamInlet(streams[0], max_chunklen=12) | ||
eeg_time_correction = inlet.time_correction() | ||
|
||
# Get the stream info and description | ||
info = inlet.info() | ||
description = info.desc() | ||
|
||
# Get the sampling frequency | ||
# This is an important value that represents how many EEG data points are | ||
# collected in a second. This influences our frequency band calculation. | ||
# for the Muse 2016, this should always be 256 | ||
fs = int(info.nominal_srate()) | ||
|
||
""" 2. INITIALIZE BUFFERS """ | ||
|
||
# Initialize raw EEG data buffer | ||
eeg_buffer = np.zeros((int(fs * BUFFER_LENGTH), 1)) | ||
filter_state = None # for use with the notch filter | ||
|
||
# Compute the number of epochs in "buffer_length" | ||
n_win_test = int(np.floor((BUFFER_LENGTH - EPOCH_LENGTH) / | ||
SHIFT_LENGTH + 1)) | ||
|
||
# Initialize the band power buffer (for plotting) | ||
# bands will be ordered: [delta, theta, alpha, beta] | ||
band_buffer = np.zeros((n_win_test, 4)) | ||
|
||
""" 3. GET DATA """ | ||
|
||
# The try/except structure allows to quit the while loop by aborting the | ||
# script with <Ctrl-C> | ||
print('Press Ctrl-C in the console to break the while loop.') | ||
|
||
try: | ||
# The following loop acquires data, computes band powers, and calculates neurofeedback metrics based on those band powers | ||
while True: | ||
|
||
""" 3.1 ACQUIRE DATA """ | ||
# Obtain EEG data from the LSL stream | ||
eeg_data, timestamp = inlet.pull_chunk( | ||
timeout=1, max_samples=int(SHIFT_LENGTH * fs)) | ||
|
||
# Only keep the channel we're interested in | ||
ch_data = np.array(eeg_data)[:, INDEX_CHANNEL] | ||
|
||
# Update EEG buffer with the new data | ||
eeg_buffer, filter_state = utils.update_buffer( | ||
eeg_buffer, ch_data, notch=True, | ||
filter_state=filter_state) | ||
|
||
""" 3.2 COMPUTE BAND POWERS """ | ||
# Get newest samples from the buffer | ||
data_epoch = utils.get_last_data(eeg_buffer, | ||
EPOCH_LENGTH * fs) | ||
|
||
# Compute band powers | ||
band_powers = utils.compute_band_powers(data_epoch, fs) | ||
band_buffer, _ = utils.update_buffer(band_buffer, | ||
np.asarray([band_powers])) | ||
# Compute the average band powers for all epochs in buffer | ||
# This helps to smooth out noise | ||
smooth_band_powers = np.mean(band_buffer, axis=0) | ||
|
||
# print('Delta: ', band_powers[Band.Delta], ' Theta: ', band_powers[Band.Theta], | ||
# ' Alpha: ', band_powers[Band.Alpha], ' Beta: ', band_powers[Band.Beta]) | ||
|
||
""" 3.3 COMPUTE NEUROFEEDBACK METRICS """ | ||
# These metrics could also be used to drive brain-computer interfaces | ||
|
||
# Alpha Protocol: | ||
# Simple redout of alpha power, divided by delta waves in order to rule out noise | ||
alpha_metric = smooth_band_powers[Band.Alpha] / \ | ||
smooth_band_powers[Band.Delta] | ||
print('Alpha Relaxation: ', alpha_metric) | ||
|
||
# Beta Protocol: | ||
# Beta waves have been used as a measure of mental activity and concentration | ||
# This beta over theta ratio is commonly used as neurofeedback for ADHD | ||
# beta_metric = smooth_band_powers[Band.Beta] / \ | ||
# smooth_band_powers[Band.Theta] | ||
# print('Beta Concentration: ', beta_metric) | ||
|
||
# Alpha/Theta Protocol: | ||
# This is another popular neurofeedback metric for stress reduction | ||
# Higher theta over alpha is supposedly associated with reduced anxiety | ||
# theta_metric = smooth_band_powers[Band.Theta] / \ | ||
# smooth_band_powers[Band.Alpha] | ||
# print('Theta Relaxation: ', theta_metric) | ||
|
||
except KeyboardInterrupt: | ||
print('Closing!') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,14 @@ | ||
""" | ||
Record from a Stream | ||
This example shows how to record data from an existing Muse LSL stream | ||
""" | ||
from muselsl import record | ||
|
||
# Note: an existing Muse LSL stream is required | ||
record(60) | ||
if __name__ == "__main__": | ||
|
||
# Note: an existing Muse LSL stream is required | ||
record(60) | ||
|
||
# Note: Recording is synchronous, so code here will not execute until the stream has been closed | ||
print('Recording has ended') | ||
# Note: Recording is synchronous, so code here will not execute until the stream has been closed | ||
print('Recording has ended') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,19 @@ | ||
""" | ||
Starting a Stream | ||
This example shows how to search for available Muses and | ||
create a new stream | ||
""" | ||
from muselsl import stream, list_muses | ||
|
||
muses = list_muses() | ||
if __name__ == "__main__": | ||
|
||
muses = list_muses() | ||
|
||
if not muses: | ||
print('No Muses found') | ||
else: | ||
stream(muses[0]['address']) | ||
if not muses: | ||
print('No Muses found') | ||
else: | ||
stream(muses[0]['address']) | ||
|
||
# Note: Streaming is synchronous, so code here will not execute until the stream has been closed | ||
print('Stream has ended') | ||
# Note: Streaming is synchronous, so code here will not execute until the stream has been closed | ||
print('Stream has ended') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Muse LSL Example Auxiliary Tools | ||
These functions perform the lower-level operations involved in buffering, | ||
epoching, and transforming EEG data into frequency bands | ||
@author: Cassani | ||
""" | ||
|
||
import os | ||
import sys | ||
from tempfile import gettempdir | ||
from subprocess import call | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
from sklearn import svm | ||
from scipy.signal import butter, lfilter, lfilter_zi | ||
|
||
|
||
NOTCH_B, NOTCH_A = butter(4, np.array([55, 65]) / (256 / 2), btype='bandstop') | ||
|
||
|
||
def epoch(data, samples_epoch, samples_overlap=0): | ||
"""Extract epochs from a time series. | ||
Given a 2D array of the shape [n_samples, n_channels] | ||
Creates a 3D array of the shape [wlength_samples, n_channels, n_epochs] | ||
Args: | ||
data (numpy.ndarray or list of lists): data [n_samples, n_channels] | ||
samples_epoch (int): window length in samples | ||
samples_overlap (int): Overlap between windows in samples | ||
Returns: | ||
(numpy.ndarray): epoched data of shape | ||
""" | ||
|
||
if isinstance(data, list): | ||
data = np.array(data) | ||
|
||
n_samples, n_channels = data.shape | ||
|
||
samples_shift = samples_epoch - samples_overlap | ||
|
||
n_epochs = int( | ||
np.floor((n_samples - samples_epoch) / float(samples_shift)) + 1) | ||
|
||
# Markers indicate where the epoch starts, and the epoch contains samples_epoch rows | ||
markers = np.asarray(range(0, n_epochs + 1)) * samples_shift | ||
markers = markers.astype(int) | ||
|
||
# Divide data in epochs | ||
epochs = np.zeros((samples_epoch, n_channels, n_epochs)) | ||
|
||
for i in range(0, n_epochs): | ||
epochs[:, :, i] = data[markers[i]:markers[i] + samples_epoch, :] | ||
|
||
return epochs | ||
|
||
|
||
def compute_band_powers(eegdata, fs): | ||
"""Extract the features (band powers) from the EEG. | ||
Args: | ||
eegdata (numpy.ndarray): array of dimension [number of samples, | ||
number of channels] | ||
fs (float): sampling frequency of eegdata | ||
Returns: | ||
(numpy.ndarray): feature matrix of shape [number of feature points, | ||
number of different features] | ||
""" | ||
# 1. Compute the PSD | ||
winSampleLength, nbCh = eegdata.shape | ||
|
||
# Apply Hamming window | ||
w = np.hamming(winSampleLength) | ||
dataWinCentered = eegdata - np.mean(eegdata, axis=0) # Remove offset | ||
dataWinCenteredHam = (dataWinCentered.T * w).T | ||
|
||
NFFT = nextpow2(winSampleLength) | ||
Y = np.fft.fft(dataWinCenteredHam, n=NFFT, axis=0) / winSampleLength | ||
PSD = 2 * np.abs(Y[0:int(NFFT / 2), :]) | ||
f = fs / 2 * np.linspace(0, 1, int(NFFT / 2)) | ||
|
||
# SPECTRAL FEATURES | ||
# Average of band powers | ||
# Delta <4 | ||
ind_delta, = np.where(f < 4) | ||
meanDelta = np.mean(PSD[ind_delta, :], axis=0) | ||
# Theta 4-8 | ||
ind_theta, = np.where((f >= 4) & (f <= 8)) | ||
meanTheta = np.mean(PSD[ind_theta, :], axis=0) | ||
# Alpha 8-12 | ||
ind_alpha, = np.where((f >= 8) & (f <= 12)) | ||
meanAlpha = np.mean(PSD[ind_alpha, :], axis=0) | ||
# Beta 12-30 | ||
ind_beta, = np.where((f >= 12) & (f < 30)) | ||
meanBeta = np.mean(PSD[ind_beta, :], axis=0) | ||
|
||
feature_vector = np.concatenate((meanDelta, meanTheta, meanAlpha, | ||
meanBeta), axis=0) | ||
|
||
feature_vector = np.log10(feature_vector) | ||
|
||
return feature_vector | ||
|
||
|
||
def nextpow2(i): | ||
""" | ||
Find the next power of 2 for number i | ||
""" | ||
n = 1 | ||
while n < i: | ||
n *= 2 | ||
return n | ||
|
||
|
||
def compute_feature_matrix(epochs, fs): | ||
""" | ||
Call compute_feature_vector for each EEG epoch | ||
""" | ||
n_epochs = epochs.shape[2] | ||
|
||
for i_epoch in range(n_epochs): | ||
if i_epoch == 0: | ||
feat = compute_band_powers(epochs[:, :, i_epoch], fs).T | ||
# Initialize feature_matrix | ||
feature_matrix = np.zeros((n_epochs, feat.shape[0])) | ||
|
||
feature_matrix[i_epoch, :] = compute_band_powers( | ||
epochs[:, :, i_epoch], fs).T | ||
|
||
return feature_matrix | ||
|
||
|
||
def get_feature_names(ch_names): | ||
"""Generate the name of the features. | ||
Args: | ||
ch_names (list): electrode names | ||
Returns: | ||
(list): feature names | ||
""" | ||
bands = ['delta', 'theta', 'alpha', 'beta'] | ||
|
||
feat_names = [] | ||
for band in bands: | ||
for ch in range(len(ch_names)): | ||
feat_names.append(band + '-' + ch_names[ch]) | ||
|
||
return feat_names | ||
|
||
|
||
def update_buffer(data_buffer, new_data, notch=False, filter_state=None): | ||
""" | ||
Concatenates "new_data" into "data_buffer", and returns an array with | ||
the same size as "data_buffer" | ||
""" | ||
if new_data.ndim == 1: | ||
new_data = new_data.reshape(-1, data_buffer.shape[1]) | ||
|
||
if notch: | ||
if filter_state is None: | ||
filter_state = np.tile(lfilter_zi(NOTCH_B, NOTCH_A), | ||
(data_buffer.shape[1], 1)).T | ||
new_data, filter_state = lfilter(NOTCH_B, NOTCH_A, new_data, axis=0, | ||
zi=filter_state) | ||
|
||
new_buffer = np.concatenate((data_buffer, new_data), axis=0) | ||
new_buffer = new_buffer[new_data.shape[0]:, :] | ||
|
||
return new_buffer, filter_state | ||
|
||
|
||
def get_last_data(data_buffer, newest_samples): | ||
""" | ||
Obtains from "buffer_array" the "newest samples" (N rows from the | ||
bottom of the buffer) | ||
""" | ||
new_buffer = data_buffer[(data_buffer.shape[0] - newest_samples):, :] | ||
|
||
return new_buffer |