Module src.decoding_analysis
Expand source code
from dataclasses import dataclass, field
import logging
from mne import epochs
from sklearn import svm
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.pipeline import Pipeline, make_pipeline
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Tuple, Union
import seaborn as sns
import numpy as np
import mne
from sklearn.utils import multiclass
@dataclass
class Classifier(ABC):
"""Base classifier"""
@abstractmethod
def fit():
pass
@abstractmethod
def predict() -> np.ndarray:
pass
@dataclass
class LDADecoder(Classifier):
"""Simple sklearn LDA decoder"""
lda: LinearDiscriminantAnalysis = field(
init=False, default_factory=LinearDiscriminantAnalysis)
def fit(self, data: np.ndarray, labels: np.ndarray) -> None:
data = data.reshape(data.shape[0], -1)
self.lda.fit(data, labels)
def predict(self, data: np.ndarray, labels: np.ndarray) -> float:
score = self.lda.score(data, labels)
logging.info("Score {}".format(score))
return score
@dataclass
class CVPipleline(Classifier):
"""Cross-validation pipleline"""
cv_pip: Pipeline
train_data: np.ndarray
test_data: np.ndarray
train_labels: np.ndarray
test_labels: np.ndarray
def fit(self, **kwargs):
self.cv_pip.fit(self.train_data, self.train_labels, **kwargs)
def predict(self, **kwargs) -> np.ndarray:
self.predictions = self.cv_pip.predict(self.test_data, **kwargs)
def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']):
report = classification_report(self.test_labels,
self.predictions,
target_names=target_names)
print('Clasification Report:\n {}'.format(report))
acc = accuracy_score(self.test_labels, self.predictions)
print("Accuracy of model: {}".format(acc))
precision, recall, fscore, support = precision_recall_fscore_support(
self.test_labels, self.predictions, average='macro')
print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(
precision, recall, fscore))
return acc, fscore
@dataclass
class CrossTimePipelineDecoder(Classifier):
"""Classifier across time
Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises)
"""
pipeline: Pipeline
X: np.ndarray = field(init=False, repr=False)
y: np.ndarray = field(init=False, repr=False)
epochs: mne.Epochs = field(init=False, repr=False)
timeVec: np.ndarray = field(init=False, repr=False)
t_scores: List[float] = field(init=False, default_factory=list, repr=False)
def fit(self,
epochs: mne.Epochs,
labels: np.ndarray,
resampling_freq: float = 40) -> None:
self.epochs = epochs.load_data().resample(resampling_freq)
self.X = self.epochs.get_data()
self.y = labels
self.timeVec = epochs.times
def predict(self, w_size: float) -> np.array:
timeVec = self.timeVec[::10]
for t, w_time in enumerate(timeVec):
w_tmin = w_time - w_size / 2.
w_tmax = w_time + w_size / 2.
# stop the program if the timewindow is outside of our epoch
if w_tmin < timeVec[0]:
continue
if w_tmax > timeVec[len(timeVec) - 1]:
continue
# Crop data into time-window of interest
X = self.epochs.crop(w_tmin, w_tmax).get_data()
# Save mean scores over folds for each frequency and time window
self.t_scores.append(
np.mean(cross_val_score(estimator=self.pipeline,
X=X,
y=self.y,
scoring='roc_auc',
cv=2,
n_jobs=2),
axis=0))
return np.array(self.t_scores)
@dataclass
class FeatureTransformer(ABC):
"""Base Transformer"""
@abstractmethod
def transform() -> np.ndarray:
pass
@dataclass
class MNECSPTransformer(FeatureTransformer):
"""Simple CSP built on top of MNE"""
n_components: int
def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray:
csp = mne.decoding.CSP(self.n_components)
csp.fit_transform(data, labels)
return csp.transform(data)
@dataclass
class EEGDecoder():
"""Decode EEG data using condition, epochs"""
condition: Union[str, List[str]]
epoch_times: Tuple[float, float]
decoding_times: Tuple[float, float]
raw: mne.io.Raw = field(repr=False)
equalize_events: bool = False
baseline: Union[Tuple, None] = None
reject_by_annotation: bool = False
reject: Union[dict, None] = None
epochs: mne.Epochs = field(init=False, repr=False)
score: np.ndarray = field(init=False, repr=False)
equalize_ids: list[str] = field(default_factory=list)
def __post_init__(self):
events, ids = mne.events_from_annotations(self.raw)
epochs = mne.Epochs(self.raw,
events,
ids,
self.epoch_times[0],
self.epoch_times[1],
self.baseline,
reject_by_annotation=self.reject_by_annotation,
reject=self.reject,
picks=['eeg'])
if self.equalize_events:
if len(self.equalize_ids) > 0:
epochs.equalize_event_counts(self.equalize_ids)
else:
epochs.equalize_event_counts(ids)
self.epochs = epochs[self.condition].load_data().crop(
self.decoding_times[0], self.decoding_times[1])
def _equalize_samples(self, data: np.ndarray,
labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Helper funciton to equalize the data and labels"""
labels_freq = np.random.choice(
np.where(labels == 2)[0], len(np.where(labels == 1)[0]))
labels_rare = np.where(labels == 1)[0]
labels_idx = np.hstack((labels_freq, labels_rare))
data = data[labels_idx, :]
labels = labels[labels_idx]
return data, labels
def get_train(
self,
channels: list[str] = None,
transform_feature: bool = False,
equalize_labels_count: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
"""Returns training data from the epochs averaged across times"""
if channels:
data = np.array(self.epochs.get_data(picks=channels))
else:
data = np.array(self.epochs.get_data())
if transform_feature:
data, labels = self.feature_transform(data), self.labels_transform()
else:
data, labels = data, self.labels_transform()
if equalize_labels_count:
data, labels = self._equalize_samples(data, labels)
return data, labels
def get_all_stim(self,
equalize_labels_count: bool = False) -> Dict[str, Any]:
"""Returns dictionary of epochs, data, labels, and balanced training data"""
epoch_A = self.epochs['stimulus/A'].copy()
epoch_B = self.epochs['stimulus/B'].copy()
epoch_C = self.epochs['stimulus/C'].copy()
epoch_D = self.epochs['stimulus/D'].copy()
epoch_E = self.epochs['stimulus/E'].copy()
data = {
'A': {
'epoch':
epoch_A,
'data':
epoch_A.get_data(),
'labels':
self.labels_transform(epoch_A),
'norm_samples':
self._equalize_samples(epoch_A.get_data().mean(axis=2),
self.labels_transform(epoch_A))
},
'B': {
'epoch':
epoch_B,
'data':
epoch_B.get_data(),
'labels':
self.labels_transform(epoch_B),
'norm_samples':
self._equalize_samples(epoch_B.get_data().mean(axis=2),
self.labels_transform(epoch_B))
},
'C': {
'epoch':
epoch_C,
'data':
epoch_C.get_data(),
'labels':
self.labels_transform(epoch_C),
'norm_samples':
self._equalize_samples(epoch_C.get_data().mean(axis=2),
self.labels_transform(epoch_C))
},
'D': {
'epoch':
epoch_D,
'data':
epoch_D.get_data(),
'labels':
self.labels_transform(epoch_D),
'norm_samples':
self._equalize_samples(epoch_D.get_data().mean(axis=2),
self.labels_transform(epoch_D))
},
'E': {
'epoch':
epoch_E,
'data':
epoch_E.get_data(),
'labels':
self.labels_transform(epoch_E),
'norm_samples':
self._equalize_samples(epoch_E.get_data().mean(axis=2),
self.labels_transform(epoch_E))
},
}
return data
def feature_transform(
self, transformer: FeatureTransformer = MNECSPTransformer(2)
) -> Tuple[np.ndarray, np.ndarray]:
"""Transform data using custom Transformers"""
labels = self.labels_transform()
data = transformer.transform(self.epochs.get_data(), labels)
return data, labels
def labels_transform(self,
epochs: mne.Epochs = None,
n_classes: int = 2) -> np.ndarray:
"""Returns class labels from the epochs events based on number of classes"""
_epochs = epochs if epochs else self.epochs
_labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1]
_, rare, _ = P3.EVENTS_MAPINGS()
wanted_keys = [
_epochs.event_id[key]
for key in _epochs.event_id
if int(key.split('/')[-1]) in rare
]
rare_stims = np.array([_epochs.events[key] for key in wanted_keys])
rare_stims = rare_stims[:, -1]
labels = np.where(np.isin(_labels, rare_stims), 1, 2)
return labels
def run_svm_(self) -> Tuple[object, object]:
"""Runs inside a seperate process using multiprocessing"""
from mne.decoding.transformer import Vectorizer
from sklearn.preprocessing import StandardScaler
from sklearn import svm
from sklearn.model_selection import GridSearchCV, StratifiedKFold
data, labels = self.get_train(channels=['Cz', 'CPz'])
clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(),
svm.SVC(random_state=42))
parameters = {
'svc__kernel': ['linear', 'rbf', 'sigmoid'],
'svc__C': [0.1, 1, 10]
}
gs_cv_svm = GridSearchCV(clf_svm_pip,
parameters,
scoring='accuracy',
cv=StratifiedKFold(n_splits=5),
return_train_score=True)
gs_cv_svm.fit(data, labels)
logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_))
logging.info('Best Score: {}'.format(gs_cv_svm.best_score_))
return gs_cv_svm.best_score_, gs_cv_svm.best_params_
def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]:
"""Runs inside a seperate process using multiprocessing"""
from mne.decoding.search_light import SlidingEstimator
from mne.decoding.transformer import Vectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold
from mne.decoding.base import cross_val_multiscore
clf_svm = make_pipeline(Vectorizer(), StandardScaler(),
svm.SVC(kernel='linear', C=1))
timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3)
epochs = self.epochs.load_data()
labels = self.labels_transform()
scores = cross_val_multiscore(timeDecode,
epochs.get_data(),
labels,
cv=StratifiedKFold(5, True, 114),
n_jobs=6)
return (epochs.times, scores)
class P3:
@abstractmethod
def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
"""Returns events mapping for the P3 task"""
blocks = np.array(
[list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
]).tolist()
freq = np.setdiff1d(blocks.flatten(), rare).tolist()
stimlus = ['A', 'B', 'C', 'D', 'E']
evts_stim = [
'stimulus/' + stimlus[i] + '/' + str(alph)
for i, x in enumerate(blocks)
for alph in x
]
evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
evts_id[1] = 'response/201'
evts_id[2] = 'response/202'
return evts_id, rare, freq
Classes
class CVPipleline (cv_pip: sklearn.pipeline.Pipeline, train_data: numpy.ndarray, test_data: numpy.ndarray, train_labels: numpy.ndarray, test_labels: numpy.ndarray)
-
Cross-validation pipleline
Expand source code
@dataclass class CVPipleline(Classifier): """Cross-validation pipleline""" cv_pip: Pipeline train_data: np.ndarray test_data: np.ndarray train_labels: np.ndarray test_labels: np.ndarray def fit(self, **kwargs): self.cv_pip.fit(self.train_data, self.train_labels, **kwargs) def predict(self, **kwargs) -> np.ndarray: self.predictions = self.cv_pip.predict(self.test_data, **kwargs) def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']): report = classification_report(self.test_labels, self.predictions, target_names=target_names) print('Clasification Report:\n {}'.format(report)) acc = accuracy_score(self.test_labels, self.predictions) print("Accuracy of model: {}".format(acc)) precision, recall, fscore, support = precision_recall_fscore_support( self.test_labels, self.predictions, average='macro') print('Precision: {0}, Recall: {1}, f1-score:{2}'.format( precision, recall, fscore)) return acc, fscore
Ancestors
- Classifier
- abc.ABC
Class variables
var cv_pip : sklearn.pipeline.Pipeline
var test_data : numpy.ndarray
var test_labels : numpy.ndarray
var train_data : numpy.ndarray
var train_labels : numpy.ndarray
Methods
def evaluate(self, target_names: list = ['Rare', 'Frequent'])
-
Expand source code
def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']): report = classification_report(self.test_labels, self.predictions, target_names=target_names) print('Clasification Report:\n {}'.format(report)) acc = accuracy_score(self.test_labels, self.predictions) print("Accuracy of model: {}".format(acc)) precision, recall, fscore, support = precision_recall_fscore_support( self.test_labels, self.predictions, average='macro') print('Precision: {0}, Recall: {1}, f1-score:{2}'.format( precision, recall, fscore)) return acc, fscore
def fit(self, **kwargs)
-
Expand source code
def fit(self, **kwargs): self.cv_pip.fit(self.train_data, self.train_labels, **kwargs)
def predict(self, **kwargs) ‑> numpy.ndarray
-
Expand source code
def predict(self, **kwargs) -> np.ndarray: self.predictions = self.cv_pip.predict(self.test_data, **kwargs)
class Classifier
-
Base classifier
Expand source code
@dataclass class Classifier(ABC): """Base classifier""" @abstractmethod def fit(): pass @abstractmethod def predict() -> np.ndarray: pass
Ancestors
- abc.ABC
Subclasses
Methods
def fit()
-
Expand source code
@abstractmethod def fit(): pass
def predict() ‑> numpy.ndarray
-
Expand source code
@abstractmethod def predict() -> np.ndarray: pass
class CrossTimePipelineDecoder (pipeline: sklearn.pipeline.Pipeline)
-
Classifier across time Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises)
Expand source code
@dataclass class CrossTimePipelineDecoder(Classifier): """Classifier across time Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises) """ pipeline: Pipeline X: np.ndarray = field(init=False, repr=False) y: np.ndarray = field(init=False, repr=False) epochs: mne.Epochs = field(init=False, repr=False) timeVec: np.ndarray = field(init=False, repr=False) t_scores: List[float] = field(init=False, default_factory=list, repr=False) def fit(self, epochs: mne.Epochs, labels: np.ndarray, resampling_freq: float = 40) -> None: self.epochs = epochs.load_data().resample(resampling_freq) self.X = self.epochs.get_data() self.y = labels self.timeVec = epochs.times def predict(self, w_size: float) -> np.array: timeVec = self.timeVec[::10] for t, w_time in enumerate(timeVec): w_tmin = w_time - w_size / 2. w_tmax = w_time + w_size / 2. # stop the program if the timewindow is outside of our epoch if w_tmin < timeVec[0]: continue if w_tmax > timeVec[len(timeVec) - 1]: continue # Crop data into time-window of interest X = self.epochs.crop(w_tmin, w_tmax).get_data() # Save mean scores over folds for each frequency and time window self.t_scores.append( np.mean(cross_val_score(estimator=self.pipeline, X=X, y=self.y, scoring='roc_auc', cv=2, n_jobs=2), axis=0)) return np.array(self.t_scores)
Ancestors
- Classifier
- abc.ABC
Class variables
var X : numpy.ndarray
var epochs : mne.epochs.Epochs
var pipeline : sklearn.pipeline.Pipeline
var t_scores : List[float]
var timeVec : numpy.ndarray
var y : numpy.ndarray
Methods
def fit(self, epochs: mne.epochs.Epochs, labels: numpy.ndarray, resampling_freq: float = 40) ‑> None
-
Expand source code
def fit(self, epochs: mne.Epochs, labels: np.ndarray, resampling_freq: float = 40) -> None: self.epochs = epochs.load_data().resample(resampling_freq) self.X = self.epochs.get_data() self.y = labels self.timeVec = epochs.times
def predict(self, w_size: float) ‑>
-
Expand source code
def predict(self, w_size: float) -> np.array: timeVec = self.timeVec[::10] for t, w_time in enumerate(timeVec): w_tmin = w_time - w_size / 2. w_tmax = w_time + w_size / 2. # stop the program if the timewindow is outside of our epoch if w_tmin < timeVec[0]: continue if w_tmax > timeVec[len(timeVec) - 1]: continue # Crop data into time-window of interest X = self.epochs.crop(w_tmin, w_tmax).get_data() # Save mean scores over folds for each frequency and time window self.t_scores.append( np.mean(cross_val_score(estimator=self.pipeline, X=X, y=self.y, scoring='roc_auc', cv=2, n_jobs=2), axis=0)) return np.array(self.t_scores)
class EEGDecoder (condition: Union[str, List[str]], epoch_times: Tuple[float, float], decoding_times: Tuple[float, float], raw: mne.io.fiff.raw.Raw, equalize_events: bool = False, baseline: Optional[Tuple] = None, reject_by_annotation: bool = False, reject: Optional[dict] = None, equalize_ids: list = <factory>)
-
Decode EEG data using condition, epochs
Expand source code
@dataclass class EEGDecoder(): """Decode EEG data using condition, epochs""" condition: Union[str, List[str]] epoch_times: Tuple[float, float] decoding_times: Tuple[float, float] raw: mne.io.Raw = field(repr=False) equalize_events: bool = False baseline: Union[Tuple, None] = None reject_by_annotation: bool = False reject: Union[dict, None] = None epochs: mne.Epochs = field(init=False, repr=False) score: np.ndarray = field(init=False, repr=False) equalize_ids: list[str] = field(default_factory=list) def __post_init__(self): events, ids = mne.events_from_annotations(self.raw) epochs = mne.Epochs(self.raw, events, ids, self.epoch_times[0], self.epoch_times[1], self.baseline, reject_by_annotation=self.reject_by_annotation, reject=self.reject, picks=['eeg']) if self.equalize_events: if len(self.equalize_ids) > 0: epochs.equalize_event_counts(self.equalize_ids) else: epochs.equalize_event_counts(ids) self.epochs = epochs[self.condition].load_data().crop( self.decoding_times[0], self.decoding_times[1]) def _equalize_samples(self, data: np.ndarray, labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Helper funciton to equalize the data and labels""" labels_freq = np.random.choice( np.where(labels == 2)[0], len(np.where(labels == 1)[0])) labels_rare = np.where(labels == 1)[0] labels_idx = np.hstack((labels_freq, labels_rare)) data = data[labels_idx, :] labels = labels[labels_idx] return data, labels def get_train( self, channels: list[str] = None, transform_feature: bool = False, equalize_labels_count: bool = False ) -> Tuple[np.ndarray, np.ndarray]: """Returns training data from the epochs averaged across times""" if channels: data = np.array(self.epochs.get_data(picks=channels)) else: data = np.array(self.epochs.get_data()) if transform_feature: data, labels = self.feature_transform(data), self.labels_transform() else: data, labels = data, self.labels_transform() if equalize_labels_count: data, labels = self._equalize_samples(data, labels) return data, labels def get_all_stim(self, equalize_labels_count: bool = False) -> Dict[str, Any]: """Returns dictionary of epochs, data, labels, and balanced training data""" epoch_A = self.epochs['stimulus/A'].copy() epoch_B = self.epochs['stimulus/B'].copy() epoch_C = self.epochs['stimulus/C'].copy() epoch_D = self.epochs['stimulus/D'].copy() epoch_E = self.epochs['stimulus/E'].copy() data = { 'A': { 'epoch': epoch_A, 'data': epoch_A.get_data(), 'labels': self.labels_transform(epoch_A), 'norm_samples': self._equalize_samples(epoch_A.get_data().mean(axis=2), self.labels_transform(epoch_A)) }, 'B': { 'epoch': epoch_B, 'data': epoch_B.get_data(), 'labels': self.labels_transform(epoch_B), 'norm_samples': self._equalize_samples(epoch_B.get_data().mean(axis=2), self.labels_transform(epoch_B)) }, 'C': { 'epoch': epoch_C, 'data': epoch_C.get_data(), 'labels': self.labels_transform(epoch_C), 'norm_samples': self._equalize_samples(epoch_C.get_data().mean(axis=2), self.labels_transform(epoch_C)) }, 'D': { 'epoch': epoch_D, 'data': epoch_D.get_data(), 'labels': self.labels_transform(epoch_D), 'norm_samples': self._equalize_samples(epoch_D.get_data().mean(axis=2), self.labels_transform(epoch_D)) }, 'E': { 'epoch': epoch_E, 'data': epoch_E.get_data(), 'labels': self.labels_transform(epoch_E), 'norm_samples': self._equalize_samples(epoch_E.get_data().mean(axis=2), self.labels_transform(epoch_E)) }, } return data def feature_transform( self, transformer: FeatureTransformer = MNECSPTransformer(2) ) -> Tuple[np.ndarray, np.ndarray]: """Transform data using custom Transformers""" labels = self.labels_transform() data = transformer.transform(self.epochs.get_data(), labels) return data, labels def labels_transform(self, epochs: mne.Epochs = None, n_classes: int = 2) -> np.ndarray: """Returns class labels from the epochs events based on number of classes""" _epochs = epochs if epochs else self.epochs _labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1] _, rare, _ = P3.EVENTS_MAPINGS() wanted_keys = [ _epochs.event_id[key] for key in _epochs.event_id if int(key.split('/')[-1]) in rare ] rare_stims = np.array([_epochs.events[key] for key in wanted_keys]) rare_stims = rare_stims[:, -1] labels = np.where(np.isin(_labels, rare_stims), 1, 2) return labels def run_svm_(self) -> Tuple[object, object]: """Runs inside a seperate process using multiprocessing""" from mne.decoding.transformer import Vectorizer from sklearn.preprocessing import StandardScaler from sklearn import svm from sklearn.model_selection import GridSearchCV, StratifiedKFold data, labels = self.get_train(channels=['Cz', 'CPz']) clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(), svm.SVC(random_state=42)) parameters = { 'svc__kernel': ['linear', 'rbf', 'sigmoid'], 'svc__C': [0.1, 1, 10] } gs_cv_svm = GridSearchCV(clf_svm_pip, parameters, scoring='accuracy', cv=StratifiedKFold(n_splits=5), return_train_score=True) gs_cv_svm.fit(data, labels) logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_)) logging.info('Best Score: {}'.format(gs_cv_svm.best_score_)) return gs_cv_svm.best_score_, gs_cv_svm.best_params_ def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]: """Runs inside a seperate process using multiprocessing""" from mne.decoding.search_light import SlidingEstimator from mne.decoding.transformer import Vectorizer from sklearn.preprocessing import StandardScaler from sklearn.model_selection import StratifiedKFold from mne.decoding.base import cross_val_multiscore clf_svm = make_pipeline(Vectorizer(), StandardScaler(), svm.SVC(kernel='linear', C=1)) timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3) epochs = self.epochs.load_data() labels = self.labels_transform() scores = cross_val_multiscore(timeDecode, epochs.get_data(), labels, cv=StratifiedKFold(5, True, 114), n_jobs=6) return (epochs.times, scores)
Class variables
var baseline : Optional[Tuple]
var condition : Union[str, List[str]]
var decoding_times : Tuple[float, float]
var epoch_times : Tuple[float, float]
var epochs : mne.epochs.Epochs
var equalize_events : bool
var equalize_ids : list
var raw : mne.io.fiff.raw.Raw
var reject : Optional[dict]
var reject_by_annotation : bool
var score : numpy.ndarray
Methods
def feature_transform(self, transformer: FeatureTransformer = MNECSPTransformer(n_components=2)) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Transform data using custom Transformers
Expand source code
def feature_transform( self, transformer: FeatureTransformer = MNECSPTransformer(2) ) -> Tuple[np.ndarray, np.ndarray]: """Transform data using custom Transformers""" labels = self.labels_transform() data = transformer.transform(self.epochs.get_data(), labels) return data, labels
def get_all_stim(self, equalize_labels_count: bool = False) ‑> Dict[str, Any]
-
Returns dictionary of epochs, data, labels, and balanced training data
Expand source code
def get_all_stim(self, equalize_labels_count: bool = False) -> Dict[str, Any]: """Returns dictionary of epochs, data, labels, and balanced training data""" epoch_A = self.epochs['stimulus/A'].copy() epoch_B = self.epochs['stimulus/B'].copy() epoch_C = self.epochs['stimulus/C'].copy() epoch_D = self.epochs['stimulus/D'].copy() epoch_E = self.epochs['stimulus/E'].copy() data = { 'A': { 'epoch': epoch_A, 'data': epoch_A.get_data(), 'labels': self.labels_transform(epoch_A), 'norm_samples': self._equalize_samples(epoch_A.get_data().mean(axis=2), self.labels_transform(epoch_A)) }, 'B': { 'epoch': epoch_B, 'data': epoch_B.get_data(), 'labels': self.labels_transform(epoch_B), 'norm_samples': self._equalize_samples(epoch_B.get_data().mean(axis=2), self.labels_transform(epoch_B)) }, 'C': { 'epoch': epoch_C, 'data': epoch_C.get_data(), 'labels': self.labels_transform(epoch_C), 'norm_samples': self._equalize_samples(epoch_C.get_data().mean(axis=2), self.labels_transform(epoch_C)) }, 'D': { 'epoch': epoch_D, 'data': epoch_D.get_data(), 'labels': self.labels_transform(epoch_D), 'norm_samples': self._equalize_samples(epoch_D.get_data().mean(axis=2), self.labels_transform(epoch_D)) }, 'E': { 'epoch': epoch_E, 'data': epoch_E.get_data(), 'labels': self.labels_transform(epoch_E), 'norm_samples': self._equalize_samples(epoch_E.get_data().mean(axis=2), self.labels_transform(epoch_E)) }, } return data
def get_train(self, channels: list = None, transform_feature: bool = False, equalize_labels_count: bool = False) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Returns training data from the epochs averaged across times
Expand source code
def get_train( self, channels: list[str] = None, transform_feature: bool = False, equalize_labels_count: bool = False ) -> Tuple[np.ndarray, np.ndarray]: """Returns training data from the epochs averaged across times""" if channels: data = np.array(self.epochs.get_data(picks=channels)) else: data = np.array(self.epochs.get_data()) if transform_feature: data, labels = self.feature_transform(data), self.labels_transform() else: data, labels = data, self.labels_transform() if equalize_labels_count: data, labels = self._equalize_samples(data, labels) return data, labels
def labels_transform(self, epochs: mne.epochs.Epochs = None, n_classes: int = 2) ‑> numpy.ndarray
-
Returns class labels from the epochs events based on number of classes
Expand source code
def labels_transform(self, epochs: mne.Epochs = None, n_classes: int = 2) -> np.ndarray: """Returns class labels from the epochs events based on number of classes""" _epochs = epochs if epochs else self.epochs _labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1] _, rare, _ = P3.EVENTS_MAPINGS() wanted_keys = [ _epochs.event_id[key] for key in _epochs.event_id if int(key.split('/')[-1]) in rare ] rare_stims = np.array([_epochs.events[key] for key in wanted_keys]) rare_stims = rare_stims[:, -1] labels = np.where(np.isin(_labels, rare_stims), 1, 2) return labels
def run_sliding_(self) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Runs inside a seperate process using multiprocessing
Expand source code
def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]: """Runs inside a seperate process using multiprocessing""" from mne.decoding.search_light import SlidingEstimator from mne.decoding.transformer import Vectorizer from sklearn.preprocessing import StandardScaler from sklearn.model_selection import StratifiedKFold from mne.decoding.base import cross_val_multiscore clf_svm = make_pipeline(Vectorizer(), StandardScaler(), svm.SVC(kernel='linear', C=1)) timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3) epochs = self.epochs.load_data() labels = self.labels_transform() scores = cross_val_multiscore(timeDecode, epochs.get_data(), labels, cv=StratifiedKFold(5, True, 114), n_jobs=6) return (epochs.times, scores)
def run_svm_(self) ‑> Tuple[object, object]
-
Runs inside a seperate process using multiprocessing
Expand source code
def run_svm_(self) -> Tuple[object, object]: """Runs inside a seperate process using multiprocessing""" from mne.decoding.transformer import Vectorizer from sklearn.preprocessing import StandardScaler from sklearn import svm from sklearn.model_selection import GridSearchCV, StratifiedKFold data, labels = self.get_train(channels=['Cz', 'CPz']) clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(), svm.SVC(random_state=42)) parameters = { 'svc__kernel': ['linear', 'rbf', 'sigmoid'], 'svc__C': [0.1, 1, 10] } gs_cv_svm = GridSearchCV(clf_svm_pip, parameters, scoring='accuracy', cv=StratifiedKFold(n_splits=5), return_train_score=True) gs_cv_svm.fit(data, labels) logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_)) logging.info('Best Score: {}'.format(gs_cv_svm.best_score_)) return gs_cv_svm.best_score_, gs_cv_svm.best_params_
class FeatureTransformer
-
Base Transformer
Expand source code
@dataclass class FeatureTransformer(ABC): """Base Transformer""" @abstractmethod def transform() -> np.ndarray: pass
Ancestors
- abc.ABC
Subclasses
Methods
def transform() ‑> numpy.ndarray
-
Expand source code
@abstractmethod def transform() -> np.ndarray: pass
class LDADecoder
-
Simple sklearn LDA decoder
Expand source code
@dataclass class LDADecoder(Classifier): """Simple sklearn LDA decoder""" lda: LinearDiscriminantAnalysis = field( init=False, default_factory=LinearDiscriminantAnalysis) def fit(self, data: np.ndarray, labels: np.ndarray) -> None: data = data.reshape(data.shape[0], -1) self.lda.fit(data, labels) def predict(self, data: np.ndarray, labels: np.ndarray) -> float: score = self.lda.score(data, labels) logging.info("Score {}".format(score)) return score
Ancestors
- Classifier
- abc.ABC
Class variables
var lda : sklearn.discriminant_analysis.LinearDiscriminantAnalysis
Methods
def fit(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> None
-
Expand source code
def fit(self, data: np.ndarray, labels: np.ndarray) -> None: data = data.reshape(data.shape[0], -1) self.lda.fit(data, labels)
def predict(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> float
-
Expand source code
def predict(self, data: np.ndarray, labels: np.ndarray) -> float: score = self.lda.score(data, labels) logging.info("Score {}".format(score)) return score
class MNECSPTransformer (n_components: int)
-
Simple CSP built on top of MNE
Expand source code
@dataclass class MNECSPTransformer(FeatureTransformer): """Simple CSP built on top of MNE""" n_components: int def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray: csp = mne.decoding.CSP(self.n_components) csp.fit_transform(data, labels) return csp.transform(data)
Ancestors
- FeatureTransformer
- abc.ABC
Class variables
var n_components : int
Methods
def transform(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> numpy.ndarray
-
Expand source code
def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray: csp = mne.decoding.CSP(self.n_components) csp.fit_transform(data, labels) return csp.transform(data)
class P3
-
Expand source code
class P3: @abstractmethod def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]: """Returns events mapping for the P3 task""" blocks = np.array( [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)]) rare = np.array([x + i for i, x in enumerate(range(11, 56, 10)) ]).tolist() freq = np.setdiff1d(blocks.flatten(), rare).tolist() stimlus = ['A', 'B', 'C', 'D', 'E'] evts_stim = [ 'stimulus/' + stimlus[i] + '/' + str(alph) for i, x in enumerate(blocks) for alph in x ] evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim))) evts_id[1] = 'response/201' evts_id[2] = 'response/202' return evts_id, rare, freq
Methods
def EVENTS_MAPINGS() ‑> Tuple[dict, list[int], list[int]]
-
Returns events mapping for the P3 task
Expand source code
@abstractmethod def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]: """Returns events mapping for the P3 task""" blocks = np.array( [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)]) rare = np.array([x + i for i, x in enumerate(range(11, 56, 10)) ]).tolist() freq = np.setdiff1d(blocks.flatten(), rare).tolist() stimlus = ['A', 'B', 'C', 'D', 'E'] evts_stim = [ 'stimulus/' + stimlus[i] + '/' + str(alph) for i, x in enumerate(blocks) for alph in x ] evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim))) evts_id[1] = 'response/201' evts_id[2] = 'response/202' return evts_id, rare, freq