Module src.decoding_analysis

Expand source code
from dataclasses import dataclass, field
import logging
from mne import epochs
from sklearn import svm

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.pipeline import Pipeline, make_pipeline

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Tuple, Union
import seaborn as sns
import numpy as np
import mne
from sklearn.utils import multiclass


@dataclass
class Classifier(ABC):
    """Base classifier"""

    @abstractmethod
    def fit():
        pass

    @abstractmethod
    def predict() -> np.ndarray:
        pass


@dataclass
class LDADecoder(Classifier):
    """Simple sklearn LDA decoder"""
    lda: LinearDiscriminantAnalysis = field(
        init=False, default_factory=LinearDiscriminantAnalysis)

    def fit(self, data: np.ndarray, labels: np.ndarray) -> None:
        data = data.reshape(data.shape[0], -1)
        self.lda.fit(data, labels)

    def predict(self, data: np.ndarray, labels: np.ndarray) -> float:
        score = self.lda.score(data, labels)
        logging.info("Score {}".format(score))
        return score


@dataclass
class CVPipleline(Classifier):
    """Cross-validation pipleline"""
    cv_pip: Pipeline
    train_data: np.ndarray
    test_data: np.ndarray
    train_labels: np.ndarray
    test_labels: np.ndarray

    def fit(self, **kwargs):
        self.cv_pip.fit(self.train_data, self.train_labels, **kwargs)

    def predict(self, **kwargs) -> np.ndarray:
        self.predictions = self.cv_pip.predict(self.test_data, **kwargs)

    def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']):
        report = classification_report(self.test_labels,
                                       self.predictions,
                                       target_names=target_names)
        print('Clasification Report:\n {}'.format(report))

        acc = accuracy_score(self.test_labels, self.predictions)
        print("Accuracy of model: {}".format(acc))

        precision, recall, fscore, support = precision_recall_fscore_support(
            self.test_labels, self.predictions, average='macro')
        print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(
            precision, recall, fscore))
        return acc, fscore


@dataclass
class CrossTimePipelineDecoder(Classifier):
    """Classifier across time
    Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises)
    """
    pipeline: Pipeline
    X: np.ndarray = field(init=False, repr=False)
    y: np.ndarray = field(init=False, repr=False)
    epochs: mne.Epochs = field(init=False, repr=False)
    timeVec: np.ndarray = field(init=False, repr=False)
    t_scores: List[float] = field(init=False, default_factory=list, repr=False)

    def fit(self,
            epochs: mne.Epochs,
            labels: np.ndarray,
            resampling_freq: float = 40) -> None:
        self.epochs = epochs.load_data().resample(resampling_freq)
        self.X = self.epochs.get_data()
        self.y = labels
        self.timeVec = epochs.times

    def predict(self, w_size: float) -> np.array:
        timeVec = self.timeVec[::10]
        for t, w_time in enumerate(timeVec):
            w_tmin = w_time - w_size / 2.
            w_tmax = w_time + w_size / 2.

            # stop the program if the timewindow is outside of our epoch
            if w_tmin < timeVec[0]:
                continue
            if w_tmax > timeVec[len(timeVec) - 1]:
                continue
            # Crop data into time-window of interest
            X = self.epochs.crop(w_tmin, w_tmax).get_data()

            # Save mean scores over folds for each frequency and time window
            self.t_scores.append(
                np.mean(cross_val_score(estimator=self.pipeline,
                                        X=X,
                                        y=self.y,
                                        scoring='roc_auc',
                                        cv=2,
                                        n_jobs=2),
                        axis=0))
        return np.array(self.t_scores)


@dataclass
class FeatureTransformer(ABC):
    """Base Transformer"""

    @abstractmethod
    def transform() -> np.ndarray:
        pass


@dataclass
class MNECSPTransformer(FeatureTransformer):
    """Simple CSP built on top of MNE"""
    n_components: int

    def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray:
        csp = mne.decoding.CSP(self.n_components)
        csp.fit_transform(data, labels)
        return csp.transform(data)


@dataclass
class EEGDecoder():
    """Decode EEG data using condition, epochs"""
    condition: Union[str, List[str]]
    epoch_times: Tuple[float, float]
    decoding_times: Tuple[float, float]
    raw: mne.io.Raw = field(repr=False)
    equalize_events: bool = False
    baseline: Union[Tuple, None] = None
    reject_by_annotation: bool = False
    reject: Union[dict, None] = None
    epochs: mne.Epochs = field(init=False, repr=False)
    score: np.ndarray = field(init=False, repr=False)
    equalize_ids: list[str] = field(default_factory=list)

    def __post_init__(self):
        events, ids = mne.events_from_annotations(self.raw)
        epochs = mne.Epochs(self.raw,
                            events,
                            ids,
                            self.epoch_times[0],
                            self.epoch_times[1],
                            self.baseline,
                            reject_by_annotation=self.reject_by_annotation,
                            reject=self.reject,
                            picks=['eeg'])
        if self.equalize_events:
            if len(self.equalize_ids) > 0:
                epochs.equalize_event_counts(self.equalize_ids)
            else:
                epochs.equalize_event_counts(ids)
        self.epochs = epochs[self.condition].load_data().crop(
            self.decoding_times[0], self.decoding_times[1])

    def _equalize_samples(self, data: np.ndarray,
                          labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Helper funciton to equalize the data and labels"""
        labels_freq = np.random.choice(
            np.where(labels == 2)[0], len(np.where(labels == 1)[0]))
        labels_rare = np.where(labels == 1)[0]
        labels_idx = np.hstack((labels_freq, labels_rare))

        data = data[labels_idx, :]
        labels = labels[labels_idx]

        return data, labels

    def get_train(
            self,
            channels: list[str] = None,
            transform_feature: bool = False,
            equalize_labels_count: bool = False
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Returns training data from the epochs averaged across times"""
        if channels:
            data = np.array(self.epochs.get_data(picks=channels))
        else:
            data = np.array(self.epochs.get_data())

        if transform_feature:
            data, labels = self.feature_transform(data), self.labels_transform()
        else:
            data, labels = data, self.labels_transform()

        if equalize_labels_count:
            data, labels = self._equalize_samples(data, labels)

        return data, labels

    def get_all_stim(self,
                     equalize_labels_count: bool = False) -> Dict[str, Any]:
        """Returns dictionary of epochs, data, labels, and balanced training data"""

        epoch_A = self.epochs['stimulus/A'].copy()
        epoch_B = self.epochs['stimulus/B'].copy()
        epoch_C = self.epochs['stimulus/C'].copy()
        epoch_D = self.epochs['stimulus/D'].copy()
        epoch_E = self.epochs['stimulus/E'].copy()

        data = {
            'A': {
                'epoch':
                    epoch_A,
                'data':
                    epoch_A.get_data(),
                'labels':
                    self.labels_transform(epoch_A),
                'norm_samples':
                    self._equalize_samples(epoch_A.get_data().mean(axis=2),
                                           self.labels_transform(epoch_A))
            },
            'B': {
                'epoch':
                    epoch_B,
                'data':
                    epoch_B.get_data(),
                'labels':
                    self.labels_transform(epoch_B),
                'norm_samples':
                    self._equalize_samples(epoch_B.get_data().mean(axis=2),
                                           self.labels_transform(epoch_B))
            },
            'C': {
                'epoch':
                    epoch_C,
                'data':
                    epoch_C.get_data(),
                'labels':
                    self.labels_transform(epoch_C),
                'norm_samples':
                    self._equalize_samples(epoch_C.get_data().mean(axis=2),
                                           self.labels_transform(epoch_C))
            },
            'D': {
                'epoch':
                    epoch_D,
                'data':
                    epoch_D.get_data(),
                'labels':
                    self.labels_transform(epoch_D),
                'norm_samples':
                    self._equalize_samples(epoch_D.get_data().mean(axis=2),
                                           self.labels_transform(epoch_D))
            },
            'E': {
                'epoch':
                    epoch_E,
                'data':
                    epoch_E.get_data(),
                'labels':
                    self.labels_transform(epoch_E),
                'norm_samples':
                    self._equalize_samples(epoch_E.get_data().mean(axis=2),
                                           self.labels_transform(epoch_E))
            },
        }

        return data

    def feature_transform(
        self, transformer: FeatureTransformer = MNECSPTransformer(2)
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Transform data using custom Transformers"""
        labels = self.labels_transform()
        data = transformer.transform(self.epochs.get_data(), labels)
        return data, labels

    def labels_transform(self,
                         epochs: mne.Epochs = None,
                         n_classes: int = 2) -> np.ndarray:
        """Returns class labels from the epochs events based on number of classes"""
        _epochs = epochs if epochs else self.epochs
        _labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1]
        _, rare, _ = P3.EVENTS_MAPINGS()
        wanted_keys = [
            _epochs.event_id[key]
            for key in _epochs.event_id
            if int(key.split('/')[-1]) in rare
        ]
        rare_stims = np.array([_epochs.events[key] for key in wanted_keys])
        rare_stims = rare_stims[:, -1]
        labels = np.where(np.isin(_labels, rare_stims), 1, 2)
        return labels

    def run_svm_(self) -> Tuple[object, object]:
        """Runs inside a seperate process using multiprocessing"""
        from mne.decoding.transformer import Vectorizer
        from sklearn.preprocessing import StandardScaler
        from sklearn import svm
        from sklearn.model_selection import GridSearchCV, StratifiedKFold
        data, labels = self.get_train(channels=['Cz', 'CPz'])
        clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(),
                                    svm.SVC(random_state=42))
        parameters = {
            'svc__kernel': ['linear', 'rbf', 'sigmoid'],
            'svc__C': [0.1, 1, 10]
        }
        gs_cv_svm = GridSearchCV(clf_svm_pip,
                                 parameters,
                                 scoring='accuracy',
                                 cv=StratifiedKFold(n_splits=5),
                                 return_train_score=True)
        gs_cv_svm.fit(data, labels)
        logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_))
        logging.info('Best Score: {}'.format(gs_cv_svm.best_score_))
        return gs_cv_svm.best_score_, gs_cv_svm.best_params_

    def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]:
        """Runs inside a seperate process using multiprocessing"""
        from mne.decoding.search_light import SlidingEstimator
        from mne.decoding.transformer import Vectorizer
        from sklearn.preprocessing import StandardScaler
        from sklearn.model_selection import StratifiedKFold
        from mne.decoding.base import cross_val_multiscore

        clf_svm = make_pipeline(Vectorizer(), StandardScaler(),
                                svm.SVC(kernel='linear', C=1))
        timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3)
        epochs = self.epochs.load_data()
        labels = self.labels_transform()
        scores = cross_val_multiscore(timeDecode,
                                      epochs.get_data(),
                                      labels,
                                      cv=StratifiedKFold(5, True, 114),
                                      n_jobs=6)
        return (epochs.times, scores)


class P3:

    @abstractmethod
    def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
        """Returns events mapping for the P3 task"""
        blocks = np.array(
            [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
        rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                        ]).tolist()
        freq = np.setdiff1d(blocks.flatten(), rare).tolist()

        stimlus = ['A', 'B', 'C', 'D', 'E']

        evts_stim = [
            'stimulus/' + stimlus[i] + '/' + str(alph)
            for i, x in enumerate(blocks)
            for alph in x
        ]
        evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
        evts_id[1] = 'response/201'
        evts_id[2] = 'response/202'
        return evts_id, rare, freq

Classes

class CVPipleline (cv_pip: sklearn.pipeline.Pipeline, train_data: numpy.ndarray, test_data: numpy.ndarray, train_labels: numpy.ndarray, test_labels: numpy.ndarray)

Cross-validation pipleline

Expand source code
@dataclass
class CVPipleline(Classifier):
    """Cross-validation pipleline"""
    cv_pip: Pipeline
    train_data: np.ndarray
    test_data: np.ndarray
    train_labels: np.ndarray
    test_labels: np.ndarray

    def fit(self, **kwargs):
        self.cv_pip.fit(self.train_data, self.train_labels, **kwargs)

    def predict(self, **kwargs) -> np.ndarray:
        self.predictions = self.cv_pip.predict(self.test_data, **kwargs)

    def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']):
        report = classification_report(self.test_labels,
                                       self.predictions,
                                       target_names=target_names)
        print('Clasification Report:\n {}'.format(report))

        acc = accuracy_score(self.test_labels, self.predictions)
        print("Accuracy of model: {}".format(acc))

        precision, recall, fscore, support = precision_recall_fscore_support(
            self.test_labels, self.predictions, average='macro')
        print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(
            precision, recall, fscore))
        return acc, fscore

Ancestors

Class variables

var cv_pip : sklearn.pipeline.Pipeline
var test_data : numpy.ndarray
var test_labels : numpy.ndarray
var train_data : numpy.ndarray
var train_labels : numpy.ndarray

Methods

def evaluate(self, target_names: list = ['Rare', 'Frequent'])
Expand source code
def evaluate(self, target_names: list[str] = ['Rare', 'Frequent']):
    report = classification_report(self.test_labels,
                                   self.predictions,
                                   target_names=target_names)
    print('Clasification Report:\n {}'.format(report))

    acc = accuracy_score(self.test_labels, self.predictions)
    print("Accuracy of model: {}".format(acc))

    precision, recall, fscore, support = precision_recall_fscore_support(
        self.test_labels, self.predictions, average='macro')
    print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(
        precision, recall, fscore))
    return acc, fscore
def fit(self, **kwargs)
Expand source code
def fit(self, **kwargs):
    self.cv_pip.fit(self.train_data, self.train_labels, **kwargs)
def predict(self, **kwargs) ‑> numpy.ndarray
Expand source code
def predict(self, **kwargs) -> np.ndarray:
    self.predictions = self.cv_pip.predict(self.test_data, **kwargs)
class Classifier

Base classifier

Expand source code
@dataclass
class Classifier(ABC):
    """Base classifier"""

    @abstractmethod
    def fit():
        pass

    @abstractmethod
    def predict() -> np.ndarray:
        pass

Ancestors

  • abc.ABC

Subclasses

Methods

def fit()
Expand source code
@abstractmethod
def fit():
    pass
def predict() ‑> numpy.ndarray
Expand source code
@abstractmethod
def predict() -> np.ndarray:
    pass
class CrossTimePipelineDecoder (pipeline: sklearn.pipeline.Pipeline)

Classifier across time Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises)

Expand source code
@dataclass
class CrossTimePipelineDecoder(Classifier):
    """Classifier across time
    Taken from EEG: excercise (https://github.com/s-ccs/course_eeg_SS2021/tree/main/exercises)
    """
    pipeline: Pipeline
    X: np.ndarray = field(init=False, repr=False)
    y: np.ndarray = field(init=False, repr=False)
    epochs: mne.Epochs = field(init=False, repr=False)
    timeVec: np.ndarray = field(init=False, repr=False)
    t_scores: List[float] = field(init=False, default_factory=list, repr=False)

    def fit(self,
            epochs: mne.Epochs,
            labels: np.ndarray,
            resampling_freq: float = 40) -> None:
        self.epochs = epochs.load_data().resample(resampling_freq)
        self.X = self.epochs.get_data()
        self.y = labels
        self.timeVec = epochs.times

    def predict(self, w_size: float) -> np.array:
        timeVec = self.timeVec[::10]
        for t, w_time in enumerate(timeVec):
            w_tmin = w_time - w_size / 2.
            w_tmax = w_time + w_size / 2.

            # stop the program if the timewindow is outside of our epoch
            if w_tmin < timeVec[0]:
                continue
            if w_tmax > timeVec[len(timeVec) - 1]:
                continue
            # Crop data into time-window of interest
            X = self.epochs.crop(w_tmin, w_tmax).get_data()

            # Save mean scores over folds for each frequency and time window
            self.t_scores.append(
                np.mean(cross_val_score(estimator=self.pipeline,
                                        X=X,
                                        y=self.y,
                                        scoring='roc_auc',
                                        cv=2,
                                        n_jobs=2),
                        axis=0))
        return np.array(self.t_scores)

Ancestors

Class variables

var X : numpy.ndarray
var epochs : mne.epochs.Epochs
var pipeline : sklearn.pipeline.Pipeline
var t_scores : List[float]
var timeVec : numpy.ndarray
var y : numpy.ndarray

Methods

def fit(self, epochs: mne.epochs.Epochs, labels: numpy.ndarray, resampling_freq: float = 40) ‑> None
Expand source code
def fit(self,
        epochs: mne.Epochs,
        labels: np.ndarray,
        resampling_freq: float = 40) -> None:
    self.epochs = epochs.load_data().resample(resampling_freq)
    self.X = self.epochs.get_data()
    self.y = labels
    self.timeVec = epochs.times
def predict(self, w_size: float) ‑> 
Expand source code
def predict(self, w_size: float) -> np.array:
    timeVec = self.timeVec[::10]
    for t, w_time in enumerate(timeVec):
        w_tmin = w_time - w_size / 2.
        w_tmax = w_time + w_size / 2.

        # stop the program if the timewindow is outside of our epoch
        if w_tmin < timeVec[0]:
            continue
        if w_tmax > timeVec[len(timeVec) - 1]:
            continue
        # Crop data into time-window of interest
        X = self.epochs.crop(w_tmin, w_tmax).get_data()

        # Save mean scores over folds for each frequency and time window
        self.t_scores.append(
            np.mean(cross_val_score(estimator=self.pipeline,
                                    X=X,
                                    y=self.y,
                                    scoring='roc_auc',
                                    cv=2,
                                    n_jobs=2),
                    axis=0))
    return np.array(self.t_scores)
class EEGDecoder (condition: Union[str, List[str]], epoch_times: Tuple[float, float], decoding_times: Tuple[float, float], raw: mne.io.fiff.raw.Raw, equalize_events: bool = False, baseline: Optional[Tuple] = None, reject_by_annotation: bool = False, reject: Optional[dict] = None, equalize_ids: list = <factory>)

Decode EEG data using condition, epochs

Expand source code
@dataclass
class EEGDecoder():
    """Decode EEG data using condition, epochs"""
    condition: Union[str, List[str]]
    epoch_times: Tuple[float, float]
    decoding_times: Tuple[float, float]
    raw: mne.io.Raw = field(repr=False)
    equalize_events: bool = False
    baseline: Union[Tuple, None] = None
    reject_by_annotation: bool = False
    reject: Union[dict, None] = None
    epochs: mne.Epochs = field(init=False, repr=False)
    score: np.ndarray = field(init=False, repr=False)
    equalize_ids: list[str] = field(default_factory=list)

    def __post_init__(self):
        events, ids = mne.events_from_annotations(self.raw)
        epochs = mne.Epochs(self.raw,
                            events,
                            ids,
                            self.epoch_times[0],
                            self.epoch_times[1],
                            self.baseline,
                            reject_by_annotation=self.reject_by_annotation,
                            reject=self.reject,
                            picks=['eeg'])
        if self.equalize_events:
            if len(self.equalize_ids) > 0:
                epochs.equalize_event_counts(self.equalize_ids)
            else:
                epochs.equalize_event_counts(ids)
        self.epochs = epochs[self.condition].load_data().crop(
            self.decoding_times[0], self.decoding_times[1])

    def _equalize_samples(self, data: np.ndarray,
                          labels: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Helper funciton to equalize the data and labels"""
        labels_freq = np.random.choice(
            np.where(labels == 2)[0], len(np.where(labels == 1)[0]))
        labels_rare = np.where(labels == 1)[0]
        labels_idx = np.hstack((labels_freq, labels_rare))

        data = data[labels_idx, :]
        labels = labels[labels_idx]

        return data, labels

    def get_train(
            self,
            channels: list[str] = None,
            transform_feature: bool = False,
            equalize_labels_count: bool = False
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Returns training data from the epochs averaged across times"""
        if channels:
            data = np.array(self.epochs.get_data(picks=channels))
        else:
            data = np.array(self.epochs.get_data())

        if transform_feature:
            data, labels = self.feature_transform(data), self.labels_transform()
        else:
            data, labels = data, self.labels_transform()

        if equalize_labels_count:
            data, labels = self._equalize_samples(data, labels)

        return data, labels

    def get_all_stim(self,
                     equalize_labels_count: bool = False) -> Dict[str, Any]:
        """Returns dictionary of epochs, data, labels, and balanced training data"""

        epoch_A = self.epochs['stimulus/A'].copy()
        epoch_B = self.epochs['stimulus/B'].copy()
        epoch_C = self.epochs['stimulus/C'].copy()
        epoch_D = self.epochs['stimulus/D'].copy()
        epoch_E = self.epochs['stimulus/E'].copy()

        data = {
            'A': {
                'epoch':
                    epoch_A,
                'data':
                    epoch_A.get_data(),
                'labels':
                    self.labels_transform(epoch_A),
                'norm_samples':
                    self._equalize_samples(epoch_A.get_data().mean(axis=2),
                                           self.labels_transform(epoch_A))
            },
            'B': {
                'epoch':
                    epoch_B,
                'data':
                    epoch_B.get_data(),
                'labels':
                    self.labels_transform(epoch_B),
                'norm_samples':
                    self._equalize_samples(epoch_B.get_data().mean(axis=2),
                                           self.labels_transform(epoch_B))
            },
            'C': {
                'epoch':
                    epoch_C,
                'data':
                    epoch_C.get_data(),
                'labels':
                    self.labels_transform(epoch_C),
                'norm_samples':
                    self._equalize_samples(epoch_C.get_data().mean(axis=2),
                                           self.labels_transform(epoch_C))
            },
            'D': {
                'epoch':
                    epoch_D,
                'data':
                    epoch_D.get_data(),
                'labels':
                    self.labels_transform(epoch_D),
                'norm_samples':
                    self._equalize_samples(epoch_D.get_data().mean(axis=2),
                                           self.labels_transform(epoch_D))
            },
            'E': {
                'epoch':
                    epoch_E,
                'data':
                    epoch_E.get_data(),
                'labels':
                    self.labels_transform(epoch_E),
                'norm_samples':
                    self._equalize_samples(epoch_E.get_data().mean(axis=2),
                                           self.labels_transform(epoch_E))
            },
        }

        return data

    def feature_transform(
        self, transformer: FeatureTransformer = MNECSPTransformer(2)
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Transform data using custom Transformers"""
        labels = self.labels_transform()
        data = transformer.transform(self.epochs.get_data(), labels)
        return data, labels

    def labels_transform(self,
                         epochs: mne.Epochs = None,
                         n_classes: int = 2) -> np.ndarray:
        """Returns class labels from the epochs events based on number of classes"""
        _epochs = epochs if epochs else self.epochs
        _labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1]
        _, rare, _ = P3.EVENTS_MAPINGS()
        wanted_keys = [
            _epochs.event_id[key]
            for key in _epochs.event_id
            if int(key.split('/')[-1]) in rare
        ]
        rare_stims = np.array([_epochs.events[key] for key in wanted_keys])
        rare_stims = rare_stims[:, -1]
        labels = np.where(np.isin(_labels, rare_stims), 1, 2)
        return labels

    def run_svm_(self) -> Tuple[object, object]:
        """Runs inside a seperate process using multiprocessing"""
        from mne.decoding.transformer import Vectorizer
        from sklearn.preprocessing import StandardScaler
        from sklearn import svm
        from sklearn.model_selection import GridSearchCV, StratifiedKFold
        data, labels = self.get_train(channels=['Cz', 'CPz'])
        clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(),
                                    svm.SVC(random_state=42))
        parameters = {
            'svc__kernel': ['linear', 'rbf', 'sigmoid'],
            'svc__C': [0.1, 1, 10]
        }
        gs_cv_svm = GridSearchCV(clf_svm_pip,
                                 parameters,
                                 scoring='accuracy',
                                 cv=StratifiedKFold(n_splits=5),
                                 return_train_score=True)
        gs_cv_svm.fit(data, labels)
        logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_))
        logging.info('Best Score: {}'.format(gs_cv_svm.best_score_))
        return gs_cv_svm.best_score_, gs_cv_svm.best_params_

    def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]:
        """Runs inside a seperate process using multiprocessing"""
        from mne.decoding.search_light import SlidingEstimator
        from mne.decoding.transformer import Vectorizer
        from sklearn.preprocessing import StandardScaler
        from sklearn.model_selection import StratifiedKFold
        from mne.decoding.base import cross_val_multiscore

        clf_svm = make_pipeline(Vectorizer(), StandardScaler(),
                                svm.SVC(kernel='linear', C=1))
        timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3)
        epochs = self.epochs.load_data()
        labels = self.labels_transform()
        scores = cross_val_multiscore(timeDecode,
                                      epochs.get_data(),
                                      labels,
                                      cv=StratifiedKFold(5, True, 114),
                                      n_jobs=6)
        return (epochs.times, scores)

Class variables

var baseline : Optional[Tuple]
var condition : Union[str, List[str]]
var decoding_times : Tuple[float, float]
var epoch_times : Tuple[float, float]
var epochs : mne.epochs.Epochs
var equalize_events : bool
var equalize_ids : list
var raw : mne.io.fiff.raw.Raw
var reject : Optional[dict]
var reject_by_annotation : bool
var score : numpy.ndarray

Methods

def feature_transform(self, transformer: FeatureTransformer = MNECSPTransformer(n_components=2)) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Transform data using custom Transformers

Expand source code
def feature_transform(
    self, transformer: FeatureTransformer = MNECSPTransformer(2)
) -> Tuple[np.ndarray, np.ndarray]:
    """Transform data using custom Transformers"""
    labels = self.labels_transform()
    data = transformer.transform(self.epochs.get_data(), labels)
    return data, labels
def get_all_stim(self, equalize_labels_count: bool = False) ‑> Dict[str, Any]

Returns dictionary of epochs, data, labels, and balanced training data

Expand source code
def get_all_stim(self,
                 equalize_labels_count: bool = False) -> Dict[str, Any]:
    """Returns dictionary of epochs, data, labels, and balanced training data"""

    epoch_A = self.epochs['stimulus/A'].copy()
    epoch_B = self.epochs['stimulus/B'].copy()
    epoch_C = self.epochs['stimulus/C'].copy()
    epoch_D = self.epochs['stimulus/D'].copy()
    epoch_E = self.epochs['stimulus/E'].copy()

    data = {
        'A': {
            'epoch':
                epoch_A,
            'data':
                epoch_A.get_data(),
            'labels':
                self.labels_transform(epoch_A),
            'norm_samples':
                self._equalize_samples(epoch_A.get_data().mean(axis=2),
                                       self.labels_transform(epoch_A))
        },
        'B': {
            'epoch':
                epoch_B,
            'data':
                epoch_B.get_data(),
            'labels':
                self.labels_transform(epoch_B),
            'norm_samples':
                self._equalize_samples(epoch_B.get_data().mean(axis=2),
                                       self.labels_transform(epoch_B))
        },
        'C': {
            'epoch':
                epoch_C,
            'data':
                epoch_C.get_data(),
            'labels':
                self.labels_transform(epoch_C),
            'norm_samples':
                self._equalize_samples(epoch_C.get_data().mean(axis=2),
                                       self.labels_transform(epoch_C))
        },
        'D': {
            'epoch':
                epoch_D,
            'data':
                epoch_D.get_data(),
            'labels':
                self.labels_transform(epoch_D),
            'norm_samples':
                self._equalize_samples(epoch_D.get_data().mean(axis=2),
                                       self.labels_transform(epoch_D))
        },
        'E': {
            'epoch':
                epoch_E,
            'data':
                epoch_E.get_data(),
            'labels':
                self.labels_transform(epoch_E),
            'norm_samples':
                self._equalize_samples(epoch_E.get_data().mean(axis=2),
                                       self.labels_transform(epoch_E))
        },
    }

    return data
def get_train(self, channels: list = None, transform_feature: bool = False, equalize_labels_count: bool = False) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Returns training data from the epochs averaged across times

Expand source code
def get_train(
        self,
        channels: list[str] = None,
        transform_feature: bool = False,
        equalize_labels_count: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
    """Returns training data from the epochs averaged across times"""
    if channels:
        data = np.array(self.epochs.get_data(picks=channels))
    else:
        data = np.array(self.epochs.get_data())

    if transform_feature:
        data, labels = self.feature_transform(data), self.labels_transform()
    else:
        data, labels = data, self.labels_transform()

    if equalize_labels_count:
        data, labels = self._equalize_samples(data, labels)

    return data, labels
def labels_transform(self, epochs: mne.epochs.Epochs = None, n_classes: int = 2) ‑> numpy.ndarray

Returns class labels from the epochs events based on number of classes

Expand source code
def labels_transform(self,
                     epochs: mne.Epochs = None,
                     n_classes: int = 2) -> np.ndarray:
    """Returns class labels from the epochs events based on number of classes"""
    _epochs = epochs if epochs else self.epochs
    _labels = epochs.events[:, -1] if epochs else self.epochs.events[:, -1]
    _, rare, _ = P3.EVENTS_MAPINGS()
    wanted_keys = [
        _epochs.event_id[key]
        for key in _epochs.event_id
        if int(key.split('/')[-1]) in rare
    ]
    rare_stims = np.array([_epochs.events[key] for key in wanted_keys])
    rare_stims = rare_stims[:, -1]
    labels = np.where(np.isin(_labels, rare_stims), 1, 2)
    return labels
def run_sliding_(self) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Runs inside a seperate process using multiprocessing

Expand source code
def run_sliding_(self) -> Tuple[np.ndarray, np.ndarray]:
    """Runs inside a seperate process using multiprocessing"""
    from mne.decoding.search_light import SlidingEstimator
    from mne.decoding.transformer import Vectorizer
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import StratifiedKFold
    from mne.decoding.base import cross_val_multiscore

    clf_svm = make_pipeline(Vectorizer(), StandardScaler(),
                            svm.SVC(kernel='linear', C=1))
    timeDecode = SlidingEstimator(clf_svm, scoring='roc_auc', n_jobs=3)
    epochs = self.epochs.load_data()
    labels = self.labels_transform()
    scores = cross_val_multiscore(timeDecode,
                                  epochs.get_data(),
                                  labels,
                                  cv=StratifiedKFold(5, True, 114),
                                  n_jobs=6)
    return (epochs.times, scores)
def run_svm_(self) ‑> Tuple[object, object]

Runs inside a seperate process using multiprocessing

Expand source code
def run_svm_(self) -> Tuple[object, object]:
    """Runs inside a seperate process using multiprocessing"""
    from mne.decoding.transformer import Vectorizer
    from sklearn.preprocessing import StandardScaler
    from sklearn import svm
    from sklearn.model_selection import GridSearchCV, StratifiedKFold
    data, labels = self.get_train(channels=['Cz', 'CPz'])
    clf_svm_pip = make_pipeline(Vectorizer(), StandardScaler(),
                                svm.SVC(random_state=42))
    parameters = {
        'svc__kernel': ['linear', 'rbf', 'sigmoid'],
        'svc__C': [0.1, 1, 10]
    }
    gs_cv_svm = GridSearchCV(clf_svm_pip,
                             parameters,
                             scoring='accuracy',
                             cv=StratifiedKFold(n_splits=5),
                             return_train_score=True)
    gs_cv_svm.fit(data, labels)
    logging.info('Best Parameters: {}'.format(gs_cv_svm.best_params_))
    logging.info('Best Score: {}'.format(gs_cv_svm.best_score_))
    return gs_cv_svm.best_score_, gs_cv_svm.best_params_
class FeatureTransformer

Base Transformer

Expand source code
@dataclass
class FeatureTransformer(ABC):
    """Base Transformer"""

    @abstractmethod
    def transform() -> np.ndarray:
        pass

Ancestors

  • abc.ABC

Subclasses

Methods

def transform() ‑> numpy.ndarray
Expand source code
@abstractmethod
def transform() -> np.ndarray:
    pass
class LDADecoder

Simple sklearn LDA decoder

Expand source code
@dataclass
class LDADecoder(Classifier):
    """Simple sklearn LDA decoder"""
    lda: LinearDiscriminantAnalysis = field(
        init=False, default_factory=LinearDiscriminantAnalysis)

    def fit(self, data: np.ndarray, labels: np.ndarray) -> None:
        data = data.reshape(data.shape[0], -1)
        self.lda.fit(data, labels)

    def predict(self, data: np.ndarray, labels: np.ndarray) -> float:
        score = self.lda.score(data, labels)
        logging.info("Score {}".format(score))
        return score

Ancestors

Class variables

var lda : sklearn.discriminant_analysis.LinearDiscriminantAnalysis

Methods

def fit(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> None
Expand source code
def fit(self, data: np.ndarray, labels: np.ndarray) -> None:
    data = data.reshape(data.shape[0], -1)
    self.lda.fit(data, labels)
def predict(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> float
Expand source code
def predict(self, data: np.ndarray, labels: np.ndarray) -> float:
    score = self.lda.score(data, labels)
    logging.info("Score {}".format(score))
    return score
class MNECSPTransformer (n_components: int)

Simple CSP built on top of MNE

Expand source code
@dataclass
class MNECSPTransformer(FeatureTransformer):
    """Simple CSP built on top of MNE"""
    n_components: int

    def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray:
        csp = mne.decoding.CSP(self.n_components)
        csp.fit_transform(data, labels)
        return csp.transform(data)

Ancestors

Class variables

var n_components : int

Methods

def transform(self, data: numpy.ndarray, labels: numpy.ndarray) ‑> numpy.ndarray
Expand source code
def transform(self, data: np.ndarray, labels: np.ndarray) -> np.ndarray:
    csp = mne.decoding.CSP(self.n_components)
    csp.fit_transform(data, labels)
    return csp.transform(data)
class P3
Expand source code
class P3:

    @abstractmethod
    def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
        """Returns events mapping for the P3 task"""
        blocks = np.array(
            [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
        rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                        ]).tolist()
        freq = np.setdiff1d(blocks.flatten(), rare).tolist()

        stimlus = ['A', 'B', 'C', 'D', 'E']

        evts_stim = [
            'stimulus/' + stimlus[i] + '/' + str(alph)
            for i, x in enumerate(blocks)
            for alph in x
        ]
        evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
        evts_id[1] = 'response/201'
        evts_id[2] = 'response/202'
        return evts_id, rare, freq

Methods

def EVENTS_MAPINGS() ‑> Tuple[dict, list[int], list[int]]

Returns events mapping for the P3 task

Expand source code
@abstractmethod
def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
    """Returns events mapping for the P3 task"""
    blocks = np.array(
        [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
    rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                    ]).tolist()
    freq = np.setdiff1d(blocks.flatten(), rare).tolist()

    stimlus = ['A', 'B', 'C', 'D', 'E']

    evts_stim = [
        'stimulus/' + stimlus[i] + '/' + str(alph)
        for i, x in enumerate(blocks)
        for alph in x
    ]
    evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
    evts_id[1] = 'response/201'
    evts_id[2] = 'response/202'
    return evts_id, rare, freq