Module src.preprocessing

Expand source code
from abc import ABC, abstractmethod
from operator import sub

import numpy as np
from dataclasses import dataclass, field
from genericpath import isfile
from typing import Optional

import mne
import os
import logging

from mne_bids.path import BIDSPath


@dataclass
class BaseFilter(ABC):

    @abstractmethod
    def apply_filter():
        pass

    @staticmethod
    def step():
        return "filtering"


@dataclass
class SimpleMNEFilter(BaseFilter):
    """ Simple filter based on MNE. """
    l_freq: float
    h_freq: float
    name: str

    def apply_filter(self, raw: mne.io.Raw):
        return raw.filter(l_freq=self.l_freq,
                          h_freq=self.h_freq,
                          fir_design=self.name)


@dataclass
class BaseICA(ABC):

    @abstractmethod
    def compute_ica(self, raw: mne.io.Raw):
        pass

    @abstractmethod
    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        pass

    @staticmethod
    def step():
        return "ica"


@dataclass
class SimpleMNEICA(BaseICA):
    """ Simple ICA based on MNE """
    method: str
    n_components: Optional[int] = None
    random_state: int = 23
    exclude: list[int] = field(default_factory=list)
    ica: mne.preprocessing.ica = field(init=False)

    def compute_ica(self, raw: mne.io.Raw):
        self.ica = mne.preprocessing.ICA(n_components=self.n_components,
                                         method=self.method,
                                         random_state=self.random_state)
        self.ica.fit(raw, verbose=True)

    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        self.ica.exclude = self.exclude
        if make_copy:
            return self.ica.apply(raw.copy())
        else:
            return self.ica.apply(raw)


"""
Classes for loading preprocessed and precomputed EEG data provided by the CCS-Department.
"""


@dataclass
class CleaningData():
    """Load precomputed bad channels and segments provided by the CCS-Department"""
    bids_path: BIDSPath
    bad_channels: list[int] = field(default_factory=list)
    bad_annotations: list = field(default_factory=list, repr=False)
    channels_ext: str = 'badChannels.tsv'
    segments_ext: str = 'badSegments.csv'

    @staticmethod
    def step():
        return "cleaning"

    def _get_fpath(self, dtype: str):
        assert dtype in [
            'channels', 'segments'
        ], "dataType can only have values from ['channels', 'segments']"
        bids_path = self.bids_path
        etx = self.channels_ext if dtype == 'channels' else self.segments_ext
        bad_fname = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + etx)
        assert isfile(bad_fname), "Bad {} file not found!".format(dtype)
        return bad_fname

    def _get_bad_channels(self) -> np.ndarray:
        ch_fname = self._get_fpath('channels')
        bad_channels = np.loadtxt(ch_fname, delimiter='\t', dtype='int')
        bad_channels -= 1  # handle 0 indexing
        return bad_channels.reshape(-1)

    def _get_bad_segments(self) -> mne.Annotations:
        import pandas as pd
        seg_fname = self._get_fpath('segments')
        df = pd.read_csv(seg_fname)
        return mne.Annotations(df.onset, df.duration, df.description)

    def load_bad_data(self):
        self.bad_annotations = self._get_bad_segments()
        self.bad_channels = self._get_bad_channels()

    def apply_cleaning(self, raw: mne.io.Raw, interpolate: bool = True):
        self.load_bad_data()
        raw.info['bads'] = [raw.ch_names[idx] for idx in self.bad_channels]
        if interpolate:
            raw.interpolate_bads()
        raw.annotations.append(self.bad_annotations.onset,
                               self.bad_annotations.duration,
                               self.bad_annotations.description)


@dataclass
class PrecomputedICA(BaseICA):
    """Load precomputed ICA provided by the CCS-Department"""
    bids_path: BIDSPath
    ica_ext: str = 'ica.set'
    badComponent_ext: str = 'ica.tsv'
    ica: mne.preprocessing.ica = field(init=False)
    exclude: list[int] = field(default_factory=list)

    def _load_bad_components(self,
                             badComponents_fname: str,
                             delimiter: str = '\t') -> np.ndarray:
        assert isfile(badComponents_fname), "ICA Bad Components file not found!"
        bad_components = np.loadtxt(badComponents_fname,
                                    delimiter=delimiter,
                                    dtype='float')
        # for zero indexing
        bad_components -= 1
        # for cases when we have only one component in the file
        return bad_components.reshape(-1)

    def _load_ica(self, ica_fname: str) -> mne.preprocessing.ica:
        assert isfile(ica_fname), "ICA file not found!"
        return self.sp_read_ica_eeglab(ica_fname)

    def compute_ica(self, raw: mne.io.Raw = None):
        bids_path = self.bids_path
        ica_file = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + self.ica_ext)
        self.ica = self._load_ica(ica_file)
        bad_comp = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) +
            self.badComponent_ext)
        self.exclude = self._load_bad_components(bad_comp)

    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        self.ica.exclude = self.exclude
        if make_copy:
            self.ica.apply(raw.copy())
        else:
            self.ica.apply(raw)

    # taken from exercise
    def sp_read_ica_eeglab(self, fname, *, verbose=None):
        """Load ICA information saved in an EEGLAB .set file.
            Parameters
            ----------
            fname : str
                Complete path to a .set EEGLAB file that contains an ICA object.
            %(verbose)s

            Returns
            -------
            ica : instance of ICA
                An ICA object based on the information contained in the input file.
            """
        from scipy import linalg
        eeg = mne.preprocessing.ica._check_load_mat(fname, None)
        info, eeg_montage, _ = mne.preprocessing.ica._get_info(eeg)
        mne.pick_info(info,
                      np.round(eeg['icachansind']).astype(int) - 1,
                      copy=False)
        info.set_montage(eeg_montage)

        rank = eeg.icasphere.shape[0]
        n_components = eeg.icaweights.shape[0]

        ica = mne.preprocessing.ica.ICA(method='imported_eeglab',
                                        n_components=n_components)

        ica.current_fit = "eeglab"
        ica.ch_names = info["ch_names"]
        ica.n_pca_components = None
        ica.n_components_ = n_components

        n_ch = len(ica.ch_names)
        assert len(eeg.icachansind) == n_ch

        ica.pre_whitener_ = np.ones((n_ch, 1))
        ica.pca_mean_ = np.zeros(n_ch)

        assert eeg.icasphere.shape[1] == n_ch
        assert eeg.icaweights.shape == (n_components, rank)

        # When PCA reduction is used in EEGLAB, runica returns
        # weights= weights*sphere*eigenvectors(:,1:ncomps)';
        # sphere = eye(urchans). When PCA reduction is not used, we have:
        #
        #     eeg.icawinv == pinv(eeg.icaweights @ eeg.icasphere)
        #
        # So in either case, we can use SVD to get our square whitened
        # weights matrix (u * s) and our PCA vectors (v) back:
        use = eeg.icaweights @ eeg.icasphere
        use_check = linalg.pinv(eeg.icawinv)
        if not np.allclose(use, use_check, rtol=1e-6):
            logging.warn(
                'Mismatch between icawinv and icaweights @ icasphere from EEGLAB '
                'possibly due to ICA component removal, assuming icawinv is '
                'correct')
            use = use_check
        u, s, v = mne.preprocessing.ica._safe_svd(use, full_matrices=False)
        ica.unmixing_matrix_ = u * s
        ica.pca_components_ = v
        ica.pca_explained_variance_ = s * s
        ica.info = info
        ica._update_mixing_matrix()
        ica._update_ica_names()
        return ica


class IO:
    """Class for performing file IO operations"""

    @staticmethod
    def save_badSegments(sub_id: int, dir: str, annotations: dict):
        """Save the bad annotated segments in the raw data"""
        import csv
        if not os.path.isdir(dir):
            os.makedirs(dir)
        path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

        with open(path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            for ann in annotations:
                if ann['description'].startswith('BAD_'):
                    logging.info("'{}' goes from {} to {}".format(
                        ann['onset'], ann['duration'], ann['description']))
                    writer.writerow(
                        [ann['onset'], ann['duration'], ann['description']])

    @staticmethod
    def save_badChannels(sub_id: int, dir: str, channels: list[str]):
        """Save the bad channles in the raw data"""
        import csv

        if not os.path.isdir(dir):
            os.makedirs(dir)

        path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

        with open(path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            for channel in channels:
                writer.writerow([channel])

Classes

class BaseFilter

BaseFilter()

Expand source code
@dataclass
class BaseFilter(ABC):

    @abstractmethod
    def apply_filter():
        pass

    @staticmethod
    def step():
        return "filtering"

Ancestors

  • abc.ABC

Subclasses

Static methods

def step()
Expand source code
@staticmethod
def step():
    return "filtering"

Methods

def apply_filter()
Expand source code
@abstractmethod
def apply_filter():
    pass
class BaseICA

BaseICA()

Expand source code
@dataclass
class BaseICA(ABC):

    @abstractmethod
    def compute_ica(self, raw: mne.io.Raw):
        pass

    @abstractmethod
    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        pass

    @staticmethod
    def step():
        return "ica"

Ancestors

  • abc.ABC

Subclasses

Static methods

def step()
Expand source code
@staticmethod
def step():
    return "ica"

Methods

def apply_ica(self, raw: mne.io.fiff.raw.Raw, make_copy: bool = False)
Expand source code
@abstractmethod
def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
    pass
def compute_ica(self, raw: mne.io.fiff.raw.Raw)
Expand source code
@abstractmethod
def compute_ica(self, raw: mne.io.Raw):
    pass
class CleaningData (bids_path: mne_bids.path.BIDSPath, bad_channels: list = <factory>, bad_annotations: list = <factory>, channels_ext: str = 'badChannels.tsv', segments_ext: str = 'badSegments.csv')

Load precomputed bad channels and segments provided by the CCS-Department

Expand source code
@dataclass
class CleaningData():
    """Load precomputed bad channels and segments provided by the CCS-Department"""
    bids_path: BIDSPath
    bad_channels: list[int] = field(default_factory=list)
    bad_annotations: list = field(default_factory=list, repr=False)
    channels_ext: str = 'badChannels.tsv'
    segments_ext: str = 'badSegments.csv'

    @staticmethod
    def step():
        return "cleaning"

    def _get_fpath(self, dtype: str):
        assert dtype in [
            'channels', 'segments'
        ], "dataType can only have values from ['channels', 'segments']"
        bids_path = self.bids_path
        etx = self.channels_ext if dtype == 'channels' else self.segments_ext
        bad_fname = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + etx)
        assert isfile(bad_fname), "Bad {} file not found!".format(dtype)
        return bad_fname

    def _get_bad_channels(self) -> np.ndarray:
        ch_fname = self._get_fpath('channels')
        bad_channels = np.loadtxt(ch_fname, delimiter='\t', dtype='int')
        bad_channels -= 1  # handle 0 indexing
        return bad_channels.reshape(-1)

    def _get_bad_segments(self) -> mne.Annotations:
        import pandas as pd
        seg_fname = self._get_fpath('segments')
        df = pd.read_csv(seg_fname)
        return mne.Annotations(df.onset, df.duration, df.description)

    def load_bad_data(self):
        self.bad_annotations = self._get_bad_segments()
        self.bad_channels = self._get_bad_channels()

    def apply_cleaning(self, raw: mne.io.Raw, interpolate: bool = True):
        self.load_bad_data()
        raw.info['bads'] = [raw.ch_names[idx] for idx in self.bad_channels]
        if interpolate:
            raw.interpolate_bads()
        raw.annotations.append(self.bad_annotations.onset,
                               self.bad_annotations.duration,
                               self.bad_annotations.description)

Class variables

var bad_annotations : list
var bad_channels : list
var bids_path : mne_bids.path.BIDSPath
var channels_ext : str
var segments_ext : str

Static methods

def step()
Expand source code
@staticmethod
def step():
    return "cleaning"

Methods

def apply_cleaning(self, raw: mne.io.fiff.raw.Raw, interpolate: bool = True)
Expand source code
def apply_cleaning(self, raw: mne.io.Raw, interpolate: bool = True):
    self.load_bad_data()
    raw.info['bads'] = [raw.ch_names[idx] for idx in self.bad_channels]
    if interpolate:
        raw.interpolate_bads()
    raw.annotations.append(self.bad_annotations.onset,
                           self.bad_annotations.duration,
                           self.bad_annotations.description)
def load_bad_data(self)
Expand source code
def load_bad_data(self):
    self.bad_annotations = self._get_bad_segments()
    self.bad_channels = self._get_bad_channels()
class IO

Class for performing file IO operations

Expand source code
class IO:
    """Class for performing file IO operations"""

    @staticmethod
    def save_badSegments(sub_id: int, dir: str, annotations: dict):
        """Save the bad annotated segments in the raw data"""
        import csv
        if not os.path.isdir(dir):
            os.makedirs(dir)
        path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

        with open(path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            for ann in annotations:
                if ann['description'].startswith('BAD_'):
                    logging.info("'{}' goes from {} to {}".format(
                        ann['onset'], ann['duration'], ann['description']))
                    writer.writerow(
                        [ann['onset'], ann['duration'], ann['description']])

    @staticmethod
    def save_badChannels(sub_id: int, dir: str, channels: list[str]):
        """Save the bad channles in the raw data"""
        import csv

        if not os.path.isdir(dir):
            os.makedirs(dir)

        path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

        with open(path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile, delimiter=',')
            for channel in channels:
                writer.writerow([channel])

Static methods

def save_badChannels(sub_id: int, dir: str, channels: list)

Save the bad channles in the raw data

Expand source code
@staticmethod
def save_badChannels(sub_id: int, dir: str, channels: list[str]):
    """Save the bad channles in the raw data"""
    import csv

    if not os.path.isdir(dir):
        os.makedirs(dir)

    path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

    with open(path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        for channel in channels:
            writer.writerow([channel])
def save_badSegments(sub_id: int, dir: str, annotations: dict)

Save the bad annotated segments in the raw data

Expand source code
@staticmethod
def save_badSegments(sub_id: int, dir: str, annotations: dict):
    """Save the bad annotated segments in the raw data"""
    import csv
    if not os.path.isdir(dir):
        os.makedirs(dir)
    path = os.path.join(dir, 'sub_{}_badChannels.csv'.format(sub_id))

    with open(path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        for ann in annotations:
            if ann['description'].startswith('BAD_'):
                logging.info("'{}' goes from {} to {}".format(
                    ann['onset'], ann['duration'], ann['description']))
                writer.writerow(
                    [ann['onset'], ann['duration'], ann['description']])
class PrecomputedICA (bids_path: mne_bids.path.BIDSPath, ica_ext: str = 'ica.set', badComponent_ext: str = 'ica.tsv', exclude: list = <factory>)

Load precomputed ICA provided by the CCS-Department

Expand source code
@dataclass
class PrecomputedICA(BaseICA):
    """Load precomputed ICA provided by the CCS-Department"""
    bids_path: BIDSPath
    ica_ext: str = 'ica.set'
    badComponent_ext: str = 'ica.tsv'
    ica: mne.preprocessing.ica = field(init=False)
    exclude: list[int] = field(default_factory=list)

    def _load_bad_components(self,
                             badComponents_fname: str,
                             delimiter: str = '\t') -> np.ndarray:
        assert isfile(badComponents_fname), "ICA Bad Components file not found!"
        bad_components = np.loadtxt(badComponents_fname,
                                    delimiter=delimiter,
                                    dtype='float')
        # for zero indexing
        bad_components -= 1
        # for cases when we have only one component in the file
        return bad_components.reshape(-1)

    def _load_ica(self, ica_fname: str) -> mne.preprocessing.ica:
        assert isfile(ica_fname), "ICA file not found!"
        return self.sp_read_ica_eeglab(ica_fname)

    def compute_ica(self, raw: mne.io.Raw = None):
        bids_path = self.bids_path
        ica_file = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + self.ica_ext)
        self.ica = self._load_ica(ica_file)
        bad_comp = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) +
            self.badComponent_ext)
        self.exclude = self._load_bad_components(bad_comp)

    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        self.ica.exclude = self.exclude
        if make_copy:
            self.ica.apply(raw.copy())
        else:
            self.ica.apply(raw)

    # taken from exercise
    def sp_read_ica_eeglab(self, fname, *, verbose=None):
        """Load ICA information saved in an EEGLAB .set file.
            Parameters
            ----------
            fname : str
                Complete path to a .set EEGLAB file that contains an ICA object.
            %(verbose)s

            Returns
            -------
            ica : instance of ICA
                An ICA object based on the information contained in the input file.
            """
        from scipy import linalg
        eeg = mne.preprocessing.ica._check_load_mat(fname, None)
        info, eeg_montage, _ = mne.preprocessing.ica._get_info(eeg)
        mne.pick_info(info,
                      np.round(eeg['icachansind']).astype(int) - 1,
                      copy=False)
        info.set_montage(eeg_montage)

        rank = eeg.icasphere.shape[0]
        n_components = eeg.icaweights.shape[0]

        ica = mne.preprocessing.ica.ICA(method='imported_eeglab',
                                        n_components=n_components)

        ica.current_fit = "eeglab"
        ica.ch_names = info["ch_names"]
        ica.n_pca_components = None
        ica.n_components_ = n_components

        n_ch = len(ica.ch_names)
        assert len(eeg.icachansind) == n_ch

        ica.pre_whitener_ = np.ones((n_ch, 1))
        ica.pca_mean_ = np.zeros(n_ch)

        assert eeg.icasphere.shape[1] == n_ch
        assert eeg.icaweights.shape == (n_components, rank)

        # When PCA reduction is used in EEGLAB, runica returns
        # weights= weights*sphere*eigenvectors(:,1:ncomps)';
        # sphere = eye(urchans). When PCA reduction is not used, we have:
        #
        #     eeg.icawinv == pinv(eeg.icaweights @ eeg.icasphere)
        #
        # So in either case, we can use SVD to get our square whitened
        # weights matrix (u * s) and our PCA vectors (v) back:
        use = eeg.icaweights @ eeg.icasphere
        use_check = linalg.pinv(eeg.icawinv)
        if not np.allclose(use, use_check, rtol=1e-6):
            logging.warn(
                'Mismatch between icawinv and icaweights @ icasphere from EEGLAB '
                'possibly due to ICA component removal, assuming icawinv is '
                'correct')
            use = use_check
        u, s, v = mne.preprocessing.ica._safe_svd(use, full_matrices=False)
        ica.unmixing_matrix_ = u * s
        ica.pca_components_ = v
        ica.pca_explained_variance_ = s * s
        ica.info = info
        ica._update_mixing_matrix()
        ica._update_ica_names()
        return ica

Ancestors

Class variables

var badComponent_ext : str
var bids_path : mne_bids.path.BIDSPath
var exclude : list
var ica
var ica_ext : str

Methods

def apply_ica(self, raw: mne.io.fiff.raw.Raw, make_copy: bool = False)
Expand source code
def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
    self.ica.exclude = self.exclude
    if make_copy:
        self.ica.apply(raw.copy())
    else:
        self.ica.apply(raw)
def compute_ica(self, raw: mne.io.fiff.raw.Raw = None)
Expand source code
def compute_ica(self, raw: mne.io.Raw = None):
    bids_path = self.bids_path
    ica_file = os.path.join(
        bids_path.directory,
        bids_path.basename.removesuffix(bids_path.suffix) + self.ica_ext)
    self.ica = self._load_ica(ica_file)
    bad_comp = os.path.join(
        bids_path.directory,
        bids_path.basename.removesuffix(bids_path.suffix) +
        self.badComponent_ext)
    self.exclude = self._load_bad_components(bad_comp)
def sp_read_ica_eeglab(self, fname, *, verbose=None)

Load ICA information saved in an EEGLAB .set file. Parameters


fname : str
Complete path to a .set EEGLAB file that contains an ICA object.

%(verbose)s

Returns

ica : instance of ICA
An ICA object based on the information contained in the input file.
Expand source code
def sp_read_ica_eeglab(self, fname, *, verbose=None):
    """Load ICA information saved in an EEGLAB .set file.
        Parameters
        ----------
        fname : str
            Complete path to a .set EEGLAB file that contains an ICA object.
        %(verbose)s

        Returns
        -------
        ica : instance of ICA
            An ICA object based on the information contained in the input file.
        """
    from scipy import linalg
    eeg = mne.preprocessing.ica._check_load_mat(fname, None)
    info, eeg_montage, _ = mne.preprocessing.ica._get_info(eeg)
    mne.pick_info(info,
                  np.round(eeg['icachansind']).astype(int) - 1,
                  copy=False)
    info.set_montage(eeg_montage)

    rank = eeg.icasphere.shape[0]
    n_components = eeg.icaweights.shape[0]

    ica = mne.preprocessing.ica.ICA(method='imported_eeglab',
                                    n_components=n_components)

    ica.current_fit = "eeglab"
    ica.ch_names = info["ch_names"]
    ica.n_pca_components = None
    ica.n_components_ = n_components

    n_ch = len(ica.ch_names)
    assert len(eeg.icachansind) == n_ch

    ica.pre_whitener_ = np.ones((n_ch, 1))
    ica.pca_mean_ = np.zeros(n_ch)

    assert eeg.icasphere.shape[1] == n_ch
    assert eeg.icaweights.shape == (n_components, rank)

    # When PCA reduction is used in EEGLAB, runica returns
    # weights= weights*sphere*eigenvectors(:,1:ncomps)';
    # sphere = eye(urchans). When PCA reduction is not used, we have:
    #
    #     eeg.icawinv == pinv(eeg.icaweights @ eeg.icasphere)
    #
    # So in either case, we can use SVD to get our square whitened
    # weights matrix (u * s) and our PCA vectors (v) back:
    use = eeg.icaweights @ eeg.icasphere
    use_check = linalg.pinv(eeg.icawinv)
    if not np.allclose(use, use_check, rtol=1e-6):
        logging.warn(
            'Mismatch between icawinv and icaweights @ icasphere from EEGLAB '
            'possibly due to ICA component removal, assuming icawinv is '
            'correct')
        use = use_check
    u, s, v = mne.preprocessing.ica._safe_svd(use, full_matrices=False)
    ica.unmixing_matrix_ = u * s
    ica.pca_components_ = v
    ica.pca_explained_variance_ = s * s
    ica.info = info
    ica._update_mixing_matrix()
    ica._update_ica_names()
    return ica
class SimpleMNEFilter (l_freq: float, h_freq: float, name: str)

Simple filter based on MNE.

Expand source code
@dataclass
class SimpleMNEFilter(BaseFilter):
    """ Simple filter based on MNE. """
    l_freq: float
    h_freq: float
    name: str

    def apply_filter(self, raw: mne.io.Raw):
        return raw.filter(l_freq=self.l_freq,
                          h_freq=self.h_freq,
                          fir_design=self.name)

Ancestors

Class variables

var h_freq : float
var l_freq : float
var name : str

Methods

def apply_filter(self, raw: mne.io.fiff.raw.Raw)
Expand source code
def apply_filter(self, raw: mne.io.Raw):
    return raw.filter(l_freq=self.l_freq,
                      h_freq=self.h_freq,
                      fir_design=self.name)
class SimpleMNEICA (method: str, n_components: Optional[int] = None, random_state: int = 23, exclude: list = <factory>)

Simple ICA based on MNE

Expand source code
@dataclass
class SimpleMNEICA(BaseICA):
    """ Simple ICA based on MNE """
    method: str
    n_components: Optional[int] = None
    random_state: int = 23
    exclude: list[int] = field(default_factory=list)
    ica: mne.preprocessing.ica = field(init=False)

    def compute_ica(self, raw: mne.io.Raw):
        self.ica = mne.preprocessing.ICA(n_components=self.n_components,
                                         method=self.method,
                                         random_state=self.random_state)
        self.ica.fit(raw, verbose=True)

    def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
        self.ica.exclude = self.exclude
        if make_copy:
            return self.ica.apply(raw.copy())
        else:
            return self.ica.apply(raw)

Ancestors

Class variables

var exclude : list
var ica
var method : str
var n_components : Optional[int]
var random_state : int

Methods

def apply_ica(self, raw: mne.io.fiff.raw.Raw, make_copy: bool = False)
Expand source code
def apply_ica(self, raw: mne.io.Raw, make_copy: bool = False):
    self.ica.exclude = self.exclude
    if make_copy:
        return self.ica.apply(raw.copy())
    else:
        return self.ica.apply(raw)
def compute_ica(self, raw: mne.io.fiff.raw.Raw)
Expand source code
def compute_ica(self, raw: mne.io.Raw):
    self.ica = mne.preprocessing.ICA(n_components=self.n_components,
                                     method=self.method,
                                     random_state=self.random_state)
    self.ica.fit(raw, verbose=True)