Module src.pipeline

Expand source code
from decoding_analysis import EEGDecoder
from tqdm import tqdm
from multiprocessing import Pool
from dataclasses import dataclass
from genericpath import isfile
from mne_bids import (BIDSPath, read_raw_bids)
from typing import Optional, Union, Tuple, Dict, List
from encoding_analysis import Encoder
from erpanalysis import ERPAnalysis
from preprocessing import *

import os
import mne
import logging
import warnings
import pandas as pd


class P3:
    """Helper class for P3 tasks"""

    @abstractmethod
    def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
        blocks = np.array(
            [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
        rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                        ]).tolist()
        freq = np.setdiff1d(blocks.flatten(), rare).tolist()

        stimlus = ['A', 'B', 'C', 'D', 'E']

        evts_stim = [
            'stimulus/' + stimlus[i] + '/rare/' +
            str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' +
            str(alph) for i, x in enumerate(blocks) for alph in x
        ]
        evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
        evts_id[1] = 'response/correct/201'
        evts_id[2] = 'response/error/202'
        return evts_id, rare, freq


@dataclass
class Pipeline:
    """ Pipeline for processing Encoding and Decoding Analysis on EEG data"""
    bids_path: Union[str, list[str]]
    subject: Optional[int] = None
    events_mapping: P3 = P3.EVENTS_MAPINGS()
    verbose: logging = logging.INFO
    raw: mne.io.Raw = field(init=False, repr=False)
    events: np.ndarray = field(init=False, repr=False)
    event_ids: dict = field(init=False, repr=False)
    decoding_score: Tuple = field(init=False, repr=False, default=None)

    def __post_init__(self):
        logging.basicConfig(level=self.verbose)
        mne.set_log_level(verbose='ERROR')
        warnings.filterwarnings("ignore")

    def __str__(self) -> str:
        info = (
            'The raw data for subject {} has {} time samples and {} channels.\n'
            .format(self.subject, self.raw.n_times, len(self.raw.ch_names)) +
            'The last time sample is at {} seconds.'.format(self.raw.times[-1])
            + 'The first few channel names are {}....\n'.format(', '.join(
                self.raw.ch_names[:5])) +
            'sample rate: {} Hz.\n'.format(self.raw.info['sfreq']) +
            '%s channels x %s samples\n\n' %
            (len(self.raw), len(self.raw.times)))
        return info

    def set_montage(self) -> None:
        """Set the montage for EEG data"""
        montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels',
                                   'data', 'montages')
        logging.debug(sorted(os.listdir(montage_dir)))
        ten_twenty_montage = mne.channels.make_standard_montage('standard_1020')
        self.raw.set_channel_types({
            'HEOG_left': 'eog',
            'HEOG_right': 'eog',
            'VEOG_lower': 'eog'
        })
        self.raw.set_montage(ten_twenty_montage, match_case=False)
        logging.info("Standard 1020 montage and EOG channels are set")

    def load_data(self, event_id: Union[Dict, str] = "auto") -> any:
        """Load the mne raw data and find events and event ids from annotations"""
        logging.info("Loading Data")
        raw = read_raw_bids(bids_path=self.bids_path)
        self.subject = self.bids_path.subject
        self.events, self.event_ids = mne.events_from_annotations(
            raw, event_id=event_id)
        self.raw = raw.load_data()
        return self

    def set_custom_events_mapping(self,
                                  mapping: Dict[int, str] = None,
                                  task: str = None) -> None:
        """Custom mappings for the P3 task"""
        if task == 'P3':
            mapping, _, _ = P3.EVENTS_MAPINGS()
        assert mapping is not None, "Mapping is not defined! Please pass mapping as argument"

        annot_from_events = mne.annotations_from_events(
            events=self.events,
            event_desc=mapping,
            sfreq=self.raw.info['sfreq'])
        self.raw.set_annotations(annot_from_events)
        self.events, self.event_ids = mne.events_from_annotations(self.raw)

    def apply_resampling(self,
                         sampling_freq: int,
                         padding: str = 'auto') -> None:
        """Apply resampling on the mne raw data"""
        logging.info("Applying resampling")
        self.raw, self.events = self.raw.resample(sampling_freq,
                                                  npad=padding,
                                                  events=self.events)

    def apply_rereferencing(self, reference_channels: Union[List[str],
                                                            str]) -> None:
        """Apply re-referencing on the channels present in mne raw object"""
        logging.info("Applying re-referencing")
        mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels)

    def apply_cleaning(self, cleaner: CleaningData):
        """Apply cleaning of bad segments and bad channels"""
        logging.info("Applying cleaning")
        cleaner.apply_cleaning(self.raw)

    def apply_filter(self, filter: BaseFilter) -> None:
        """Apply filtering on raw data"""
        logging.info("Applying filtering")
        filter.apply_filter(self.raw)

    def apply_ica(self, ica: BaseICA) -> None:
        """Apply ICA on the raw data"""
        logging.info("Applying ICA")
        ica.compute_ica(self.raw)
        ica.apply_ica(self.raw)

    def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame:
        """Returns Pandas dataframe containing the events from the BIDS path"""
        bids_path = self.bids_path
        fname = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + events_ext)
        assert isfile(fname), "Events file not found!"
        return pd.read_csv(fname, delimiter='\t')

    def compute_epochs(self, erp: any) -> mne.Epochs:
        """Perform ERP epochs computation task"""
        return erp.compute_epochs(self.raw, self.events, self.event_ids)

    def compute_erp_peak(self,
                         erp: any,
                         condition: str,
                         thypo: float,
                         offset: float = 0.05,
                         channels: list[str] = []) -> pd.DataFrame:
        """Perform ERP Peak computation task"""
        self.compute_epochs(erp)
        return erp.compute_peak(condition, thypo, offset, channels)

    def apply_decoder(self, decoder: EEGDecoder) -> Tuple:
        """Perfrom Decoding Analysis"""
        svm_scores = decoder.run_svm_()
        # sliding_scores = decoder.run_sliding_() somehow doesnot work!
        decoding_score = svm_scores
        self.decoding_score = decoding_score
        return decoding_score

    def _parallel_process_raws(self, pipeline) -> mne.io.Raw:
        """Perform preprocessig steps on list of raws using multiprocessing"""
        pipeline.load_data()
        pipeline.set_montage()
        pipeline.make_pipeline([
            SimpleMNEFilter(0.1, 50, 'firwin'),
            CleaningData(pipeline.bids_path),
            PrecomputedICA(pipeline.bids_path)
        ])
        pipeline.set_custom_events_mapping(task='P3')
        return pipeline.raw

    def load_multiple_subjects(self,
                               n_subjects: int = 40,
                               preload: bool = False) -> None:
        """Load subjects into current mne raw object"""

        curr_sub = [int(self.bids_path.subject)]
        subjects = set(range(1, n_subjects + 1)) - set(curr_sub)
        bids_paths = [
            self.bids_path.copy().update(subject=str(x).zfill(3))
            for x in subjects
        ]
        pipelines = [
            Pipeline(bids_path=path, verbose=logging.ERROR)
            for path in bids_paths
        ]
        with Pool(6) as p:
            raws = list(
                tqdm(p.imap(self._parallel_process_raws, pipelines),
                     total=n_subjects - 1))

        raws.append(self.raw)
        self.raw = mne.concatenate_raws(raws)
        self.events, self.event_ids = mne.events_from_annotations(self.raw)
        if preload:
            self.raw.load_data()
        self.set_montage()

    def apply(self, step: list) -> None:
        """Find the step to apply"""
        if isinstance(step, tuple):
            step_name = step[0]
            step = step[1]
        else:
            step_name = step.step()
        if step_name == 'cleaning':
            self.apply_cleaning(step)
        elif step_name == 'filtering':
            self.apply_filter(step)
        elif step_name == 'ica':
            self.apply_ica(step)
        elif step_name == 'erp':
            self.compute_epochs(step)
        elif step_name == 'decoding':
            self.apply_decoder(step)
        elif step_name == 'classifier':
            self.apply_classifier(step)
        elif step_name == 'reference':
            self.apply_rereferencing(step)
        elif step_name == 'resample':
            self.apply_resampling(step)
        else:
            logging.error("Invalid pipeline operation!")

    def make_pipeline(self, steps: list) -> None:
        """Perform the list of steps on the current mne raw object"""
        logging.info(
            "*" * 5 +
            "Proceesing for subject: {}".format(self.bids_path.subject) +
            "*" * 5)
        for step in steps:
            self.apply(step)
        logging.info("Processed subject {}\n".format(self.bids_path.subject))


@dataclass
class MultiPipeline():
    """Performs pipeline operations on list of pipelines"""
    bids_root: str
    verbose: int = logging.ERROR
    subjects: list[str] = field(init=False, default=None)
    bids_paths: list[BIDSPath] = field(init=False)
    pipelines: list[Pipeline] = field(init=False)

    def __post_init__(self) -> None:
        logging.basicConfig(level=self.verbose)
        self.subjects = [
            sub for sub in os.listdir(self.bids_root)
            if os.path.isdir(os.path.join(self.bids_root, sub))
        ]
        bids_path = BIDSPath(subject='001',
                             session='P3',
                             task='P3',
                             datatype='eeg',
                             suffix='eeg',
                             root=self.bids_root)
        self.bids_paths = [
            bids_path.copy().update(subject=sub.split('-')[-1])
            for sub in self.subjects
        ]

        self.pipelines = [
            Pipeline(bids_path=path, verbose=logging.ERROR)
            for path in self.bids_paths
        ]

        self.pipelines.sort(key=lambda x: int(x.bids_path.subject))

    def _start_preprocessing(self, pipeline) -> list[Pipeline]:
        """
        Helper function to parallelize the preprocessing steps.
        
        Returns list of Pipelines having preprocessed data, In future, steps should be passed as parameter.
        
        """
        pipeline.load_data()
        pipeline.set_custom_events_mapping(task='P3')
        pipeline.set_montage()
        steps = [
            SimpleMNEFilter(0.5, 50, 'firwin'),
            CleaningData(pipeline.bids_path),
            PrecomputedICA(pipeline.bids_path), ('reference', ['P9', 'P10']),
            ('resample', 256)
        ]
        pipeline.make_pipeline(steps)
        return pipeline

    def start_erp_analysis(self, erpanalysis, jobs: int = 6):
        """
        Performs ERP Analysis on Pipelines
        """
        pipelines = self.pipelines
        with Pool(jobs) as p:
            pipelines = list(
                tqdm(p.imap(self._start_preprocessing, pipelines),
                     total=len(self.subjects)))

        for pipeline in pipelines:
            pipeline.compute_epochs(erpanalysis)

        return erpanalysis

    def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder):
        """
        Performs Encoding Analysis on Pipelines
        """
        assert erp.all_subjects, 'You have to run erp analysis first'
        for epoch in erp.epochs:
            encoder.fit(epoch)

    def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]:
        """
        Performs perprocessing on a list of pipelines.
        
        Pipelines are created by the list of BIDS paths in the BIDS root directory
        """

        pipelines = self.pipelines

        with Pool(jobs) as p:
            pipelines = list(
                tqdm(p.imap(self._start_preprocessing, pipelines),
                     total=len(self.subjects)))

        return pipelines

Classes

class MultiPipeline (bids_root: str, verbose: int = 40)

Performs pipeline operations on list of pipelines

Expand source code
@dataclass
class MultiPipeline():
    """Performs pipeline operations on list of pipelines"""
    bids_root: str
    verbose: int = logging.ERROR
    subjects: list[str] = field(init=False, default=None)
    bids_paths: list[BIDSPath] = field(init=False)
    pipelines: list[Pipeline] = field(init=False)

    def __post_init__(self) -> None:
        logging.basicConfig(level=self.verbose)
        self.subjects = [
            sub for sub in os.listdir(self.bids_root)
            if os.path.isdir(os.path.join(self.bids_root, sub))
        ]
        bids_path = BIDSPath(subject='001',
                             session='P3',
                             task='P3',
                             datatype='eeg',
                             suffix='eeg',
                             root=self.bids_root)
        self.bids_paths = [
            bids_path.copy().update(subject=sub.split('-')[-1])
            for sub in self.subjects
        ]

        self.pipelines = [
            Pipeline(bids_path=path, verbose=logging.ERROR)
            for path in self.bids_paths
        ]

        self.pipelines.sort(key=lambda x: int(x.bids_path.subject))

    def _start_preprocessing(self, pipeline) -> list[Pipeline]:
        """
        Helper function to parallelize the preprocessing steps.
        
        Returns list of Pipelines having preprocessed data, In future, steps should be passed as parameter.
        
        """
        pipeline.load_data()
        pipeline.set_custom_events_mapping(task='P3')
        pipeline.set_montage()
        steps = [
            SimpleMNEFilter(0.5, 50, 'firwin'),
            CleaningData(pipeline.bids_path),
            PrecomputedICA(pipeline.bids_path), ('reference', ['P9', 'P10']),
            ('resample', 256)
        ]
        pipeline.make_pipeline(steps)
        return pipeline

    def start_erp_analysis(self, erpanalysis, jobs: int = 6):
        """
        Performs ERP Analysis on Pipelines
        """
        pipelines = self.pipelines
        with Pool(jobs) as p:
            pipelines = list(
                tqdm(p.imap(self._start_preprocessing, pipelines),
                     total=len(self.subjects)))

        for pipeline in pipelines:
            pipeline.compute_epochs(erpanalysis)

        return erpanalysis

    def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder):
        """
        Performs Encoding Analysis on Pipelines
        """
        assert erp.all_subjects, 'You have to run erp analysis first'
        for epoch in erp.epochs:
            encoder.fit(epoch)

    def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]:
        """
        Performs perprocessing on a list of pipelines.
        
        Pipelines are created by the list of BIDS paths in the BIDS root directory
        """

        pipelines = self.pipelines

        with Pool(jobs) as p:
            pipelines = list(
                tqdm(p.imap(self._start_preprocessing, pipelines),
                     total=len(self.subjects)))

        return pipelines

Class variables

var bids_paths : list
var bids_root : str
var pipelines : list
var subjects : list
var verbose : int

Methods

def start_encoding_analysis(self, erp: erpanalysis.ERPAnalysis, encoder: encoding_analysis.Encoder)

Performs Encoding Analysis on Pipelines

Expand source code
def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder):
    """
    Performs Encoding Analysis on Pipelines
    """
    assert erp.all_subjects, 'You have to run erp analysis first'
    for epoch in erp.epochs:
        encoder.fit(epoch)
def start_erp_analysis(self, erpanalysis, jobs: int = 6)

Performs ERP Analysis on Pipelines

Expand source code
def start_erp_analysis(self, erpanalysis, jobs: int = 6):
    """
    Performs ERP Analysis on Pipelines
    """
    pipelines = self.pipelines
    with Pool(jobs) as p:
        pipelines = list(
            tqdm(p.imap(self._start_preprocessing, pipelines),
                 total=len(self.subjects)))

    for pipeline in pipelines:
        pipeline.compute_epochs(erpanalysis)

    return erpanalysis
def start_preprocessing(self, jobs: int = 6) ‑> list

Performs perprocessing on a list of pipelines.

Pipelines are created by the list of BIDS paths in the BIDS root directory

Expand source code
def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]:
    """
    Performs perprocessing on a list of pipelines.
    
    Pipelines are created by the list of BIDS paths in the BIDS root directory
    """

    pipelines = self.pipelines

    with Pool(jobs) as p:
        pipelines = list(
            tqdm(p.imap(self._start_preprocessing, pipelines),
                 total=len(self.subjects)))

    return pipelines
class P3

Helper class for P3 tasks

Expand source code
class P3:
    """Helper class for P3 tasks"""

    @abstractmethod
    def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
        blocks = np.array(
            [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
        rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                        ]).tolist()
        freq = np.setdiff1d(blocks.flatten(), rare).tolist()

        stimlus = ['A', 'B', 'C', 'D', 'E']

        evts_stim = [
            'stimulus/' + stimlus[i] + '/rare/' +
            str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' +
            str(alph) for i, x in enumerate(blocks) for alph in x
        ]
        evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
        evts_id[1] = 'response/correct/201'
        evts_id[2] = 'response/error/202'
        return evts_id, rare, freq

Methods

def EVENTS_MAPINGS() ‑> Tuple[dict, list[int], list[int]]
Expand source code
@abstractmethod
def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
    blocks = np.array(
        [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
    rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
                    ]).tolist()
    freq = np.setdiff1d(blocks.flatten(), rare).tolist()

    stimlus = ['A', 'B', 'C', 'D', 'E']

    evts_stim = [
        'stimulus/' + stimlus[i] + '/rare/' +
        str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' +
        str(alph) for i, x in enumerate(blocks) for alph in x
    ]
    evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
    evts_id[1] = 'response/correct/201'
    evts_id[2] = 'response/error/202'
    return evts_id, rare, freq
class Pipeline (bids_path: Union[str, list[str]], subject: Optional[int] = None, events_mapping: P3 = ({3: 'stimulus/A/rare/11', 4: 'stimulus/A/freq/12', 5: 'stimulus/A/freq/13', 6: 'stimulus/A/freq/14', 7: 'stimulus/A/freq/15', 8: 'stimulus/B/freq/21', 9: 'stimulus/B/rare/22', 10: 'stimulus/B/freq/23', 11: 'stimulus/B/freq/24', 12: 'stimulus/B/freq/25', 13: 'stimulus/C/freq/31', 14: 'stimulus/C/freq/32', 15: 'stimulus/C/rare/33', 16: 'stimulus/C/freq/34', 17: 'stimulus/C/freq/35', 18: 'stimulus/D/freq/41', 19: 'stimulus/D/freq/42', 20: 'stimulus/D/freq/43', 21: 'stimulus/D/rare/44', 22: 'stimulus/D/freq/45', 23: 'stimulus/E/freq/51', 24: 'stimulus/E/freq/52', 25: 'stimulus/E/freq/53', 26: 'stimulus/E/freq/54', 27: 'stimulus/E/rare/55', 1: 'response/correct/201', 2: 'response/error/202'}, [11, 22, 33, 44, 55], [12, 13, 14, 15, 21, 23, 24, 25, 31, 32, 34, 35, 41, 42, 43, 45, 51, 52, 53, 54]), verbose:  = 20)

Pipeline for processing Encoding and Decoding Analysis on EEG data

Expand source code
@dataclass
class Pipeline:
    """ Pipeline for processing Encoding and Decoding Analysis on EEG data"""
    bids_path: Union[str, list[str]]
    subject: Optional[int] = None
    events_mapping: P3 = P3.EVENTS_MAPINGS()
    verbose: logging = logging.INFO
    raw: mne.io.Raw = field(init=False, repr=False)
    events: np.ndarray = field(init=False, repr=False)
    event_ids: dict = field(init=False, repr=False)
    decoding_score: Tuple = field(init=False, repr=False, default=None)

    def __post_init__(self):
        logging.basicConfig(level=self.verbose)
        mne.set_log_level(verbose='ERROR')
        warnings.filterwarnings("ignore")

    def __str__(self) -> str:
        info = (
            'The raw data for subject {} has {} time samples and {} channels.\n'
            .format(self.subject, self.raw.n_times, len(self.raw.ch_names)) +
            'The last time sample is at {} seconds.'.format(self.raw.times[-1])
            + 'The first few channel names are {}....\n'.format(', '.join(
                self.raw.ch_names[:5])) +
            'sample rate: {} Hz.\n'.format(self.raw.info['sfreq']) +
            '%s channels x %s samples\n\n' %
            (len(self.raw), len(self.raw.times)))
        return info

    def set_montage(self) -> None:
        """Set the montage for EEG data"""
        montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels',
                                   'data', 'montages')
        logging.debug(sorted(os.listdir(montage_dir)))
        ten_twenty_montage = mne.channels.make_standard_montage('standard_1020')
        self.raw.set_channel_types({
            'HEOG_left': 'eog',
            'HEOG_right': 'eog',
            'VEOG_lower': 'eog'
        })
        self.raw.set_montage(ten_twenty_montage, match_case=False)
        logging.info("Standard 1020 montage and EOG channels are set")

    def load_data(self, event_id: Union[Dict, str] = "auto") -> any:
        """Load the mne raw data and find events and event ids from annotations"""
        logging.info("Loading Data")
        raw = read_raw_bids(bids_path=self.bids_path)
        self.subject = self.bids_path.subject
        self.events, self.event_ids = mne.events_from_annotations(
            raw, event_id=event_id)
        self.raw = raw.load_data()
        return self

    def set_custom_events_mapping(self,
                                  mapping: Dict[int, str] = None,
                                  task: str = None) -> None:
        """Custom mappings for the P3 task"""
        if task == 'P3':
            mapping, _, _ = P3.EVENTS_MAPINGS()
        assert mapping is not None, "Mapping is not defined! Please pass mapping as argument"

        annot_from_events = mne.annotations_from_events(
            events=self.events,
            event_desc=mapping,
            sfreq=self.raw.info['sfreq'])
        self.raw.set_annotations(annot_from_events)
        self.events, self.event_ids = mne.events_from_annotations(self.raw)

    def apply_resampling(self,
                         sampling_freq: int,
                         padding: str = 'auto') -> None:
        """Apply resampling on the mne raw data"""
        logging.info("Applying resampling")
        self.raw, self.events = self.raw.resample(sampling_freq,
                                                  npad=padding,
                                                  events=self.events)

    def apply_rereferencing(self, reference_channels: Union[List[str],
                                                            str]) -> None:
        """Apply re-referencing on the channels present in mne raw object"""
        logging.info("Applying re-referencing")
        mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels)

    def apply_cleaning(self, cleaner: CleaningData):
        """Apply cleaning of bad segments and bad channels"""
        logging.info("Applying cleaning")
        cleaner.apply_cleaning(self.raw)

    def apply_filter(self, filter: BaseFilter) -> None:
        """Apply filtering on raw data"""
        logging.info("Applying filtering")
        filter.apply_filter(self.raw)

    def apply_ica(self, ica: BaseICA) -> None:
        """Apply ICA on the raw data"""
        logging.info("Applying ICA")
        ica.compute_ica(self.raw)
        ica.apply_ica(self.raw)

    def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame:
        """Returns Pandas dataframe containing the events from the BIDS path"""
        bids_path = self.bids_path
        fname = os.path.join(
            bids_path.directory,
            bids_path.basename.removesuffix(bids_path.suffix) + events_ext)
        assert isfile(fname), "Events file not found!"
        return pd.read_csv(fname, delimiter='\t')

    def compute_epochs(self, erp: any) -> mne.Epochs:
        """Perform ERP epochs computation task"""
        return erp.compute_epochs(self.raw, self.events, self.event_ids)

    def compute_erp_peak(self,
                         erp: any,
                         condition: str,
                         thypo: float,
                         offset: float = 0.05,
                         channels: list[str] = []) -> pd.DataFrame:
        """Perform ERP Peak computation task"""
        self.compute_epochs(erp)
        return erp.compute_peak(condition, thypo, offset, channels)

    def apply_decoder(self, decoder: EEGDecoder) -> Tuple:
        """Perfrom Decoding Analysis"""
        svm_scores = decoder.run_svm_()
        # sliding_scores = decoder.run_sliding_() somehow doesnot work!
        decoding_score = svm_scores
        self.decoding_score = decoding_score
        return decoding_score

    def _parallel_process_raws(self, pipeline) -> mne.io.Raw:
        """Perform preprocessig steps on list of raws using multiprocessing"""
        pipeline.load_data()
        pipeline.set_montage()
        pipeline.make_pipeline([
            SimpleMNEFilter(0.1, 50, 'firwin'),
            CleaningData(pipeline.bids_path),
            PrecomputedICA(pipeline.bids_path)
        ])
        pipeline.set_custom_events_mapping(task='P3')
        return pipeline.raw

    def load_multiple_subjects(self,
                               n_subjects: int = 40,
                               preload: bool = False) -> None:
        """Load subjects into current mne raw object"""

        curr_sub = [int(self.bids_path.subject)]
        subjects = set(range(1, n_subjects + 1)) - set(curr_sub)
        bids_paths = [
            self.bids_path.copy().update(subject=str(x).zfill(3))
            for x in subjects
        ]
        pipelines = [
            Pipeline(bids_path=path, verbose=logging.ERROR)
            for path in bids_paths
        ]
        with Pool(6) as p:
            raws = list(
                tqdm(p.imap(self._parallel_process_raws, pipelines),
                     total=n_subjects - 1))

        raws.append(self.raw)
        self.raw = mne.concatenate_raws(raws)
        self.events, self.event_ids = mne.events_from_annotations(self.raw)
        if preload:
            self.raw.load_data()
        self.set_montage()

    def apply(self, step: list) -> None:
        """Find the step to apply"""
        if isinstance(step, tuple):
            step_name = step[0]
            step = step[1]
        else:
            step_name = step.step()
        if step_name == 'cleaning':
            self.apply_cleaning(step)
        elif step_name == 'filtering':
            self.apply_filter(step)
        elif step_name == 'ica':
            self.apply_ica(step)
        elif step_name == 'erp':
            self.compute_epochs(step)
        elif step_name == 'decoding':
            self.apply_decoder(step)
        elif step_name == 'classifier':
            self.apply_classifier(step)
        elif step_name == 'reference':
            self.apply_rereferencing(step)
        elif step_name == 'resample':
            self.apply_resampling(step)
        else:
            logging.error("Invalid pipeline operation!")

    def make_pipeline(self, steps: list) -> None:
        """Perform the list of steps on the current mne raw object"""
        logging.info(
            "*" * 5 +
            "Proceesing for subject: {}".format(self.bids_path.subject) +
            "*" * 5)
        for step in steps:
            self.apply(step)
        logging.info("Processed subject {}\n".format(self.bids_path.subject))

Class variables

var bids_path : Union[str, list[str]]
var decoding_score : Tuple
var event_ids : dict
var events : numpy.ndarray
var events_mappingP3
var raw : mne.io.fiff.raw.Raw
var subject : Optional[int]
var verbose

Methods

def apply(self, step: list) ‑> None

Find the step to apply

Expand source code
def apply(self, step: list) -> None:
    """Find the step to apply"""
    if isinstance(step, tuple):
        step_name = step[0]
        step = step[1]
    else:
        step_name = step.step()
    if step_name == 'cleaning':
        self.apply_cleaning(step)
    elif step_name == 'filtering':
        self.apply_filter(step)
    elif step_name == 'ica':
        self.apply_ica(step)
    elif step_name == 'erp':
        self.compute_epochs(step)
    elif step_name == 'decoding':
        self.apply_decoder(step)
    elif step_name == 'classifier':
        self.apply_classifier(step)
    elif step_name == 'reference':
        self.apply_rereferencing(step)
    elif step_name == 'resample':
        self.apply_resampling(step)
    else:
        logging.error("Invalid pipeline operation!")
def apply_cleaning(self, cleaner: preprocessing.CleaningData)

Apply cleaning of bad segments and bad channels

Expand source code
def apply_cleaning(self, cleaner: CleaningData):
    """Apply cleaning of bad segments and bad channels"""
    logging.info("Applying cleaning")
    cleaner.apply_cleaning(self.raw)
def apply_decoder(self, decoder: decoding_analysis.EEGDecoder) ‑> Tuple

Perfrom Decoding Analysis

Expand source code
def apply_decoder(self, decoder: EEGDecoder) -> Tuple:
    """Perfrom Decoding Analysis"""
    svm_scores = decoder.run_svm_()
    # sliding_scores = decoder.run_sliding_() somehow doesnot work!
    decoding_score = svm_scores
    self.decoding_score = decoding_score
    return decoding_score
def apply_filter(self, filter: preprocessing.BaseFilter) ‑> None

Apply filtering on raw data

Expand source code
def apply_filter(self, filter: BaseFilter) -> None:
    """Apply filtering on raw data"""
    logging.info("Applying filtering")
    filter.apply_filter(self.raw)
def apply_ica(self, ica: preprocessing.BaseICA) ‑> None

Apply ICA on the raw data

Expand source code
def apply_ica(self, ica: BaseICA) -> None:
    """Apply ICA on the raw data"""
    logging.info("Applying ICA")
    ica.compute_ica(self.raw)
    ica.apply_ica(self.raw)
def apply_rereferencing(self, reference_channels: Union[str, List[str]]) ‑> None

Apply re-referencing on the channels present in mne raw object

Expand source code
def apply_rereferencing(self, reference_channels: Union[List[str],
                                                        str]) -> None:
    """Apply re-referencing on the channels present in mne raw object"""
    logging.info("Applying re-referencing")
    mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels)
def apply_resampling(self, sampling_freq: int, padding: str = 'auto') ‑> None

Apply resampling on the mne raw data

Expand source code
def apply_resampling(self,
                     sampling_freq: int,
                     padding: str = 'auto') -> None:
    """Apply resampling on the mne raw data"""
    logging.info("Applying resampling")
    self.raw, self.events = self.raw.resample(sampling_freq,
                                              npad=padding,
                                              events=self.events)
def compute_epochs(self, erp: ) ‑> mne.epochs.Epochs

Perform ERP epochs computation task

Expand source code
def compute_epochs(self, erp: any) -> mne.Epochs:
    """Perform ERP epochs computation task"""
    return erp.compute_epochs(self.raw, self.events, self.event_ids)
def compute_erp_peak(self, erp: , condition: str, thypo: float, offset: float = 0.05, channels: list = []) ‑> pandas.core.frame.DataFrame

Perform ERP Peak computation task

Expand source code
def compute_erp_peak(self,
                     erp: any,
                     condition: str,
                     thypo: float,
                     offset: float = 0.05,
                     channels: list[str] = []) -> pd.DataFrame:
    """Perform ERP Peak computation task"""
    self.compute_epochs(erp)
    return erp.compute_peak(condition, thypo, offset, channels)
def get_events_df(self, events_ext: str = 'events.tsv') ‑> pandas.core.frame.DataFrame

Returns Pandas dataframe containing the events from the BIDS path

Expand source code
def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame:
    """Returns Pandas dataframe containing the events from the BIDS path"""
    bids_path = self.bids_path
    fname = os.path.join(
        bids_path.directory,
        bids_path.basename.removesuffix(bids_path.suffix) + events_ext)
    assert isfile(fname), "Events file not found!"
    return pd.read_csv(fname, delimiter='\t')
def load_data(self, event_id: Union[Dict, str] = 'auto') ‑> 

Load the mne raw data and find events and event ids from annotations

Expand source code
def load_data(self, event_id: Union[Dict, str] = "auto") -> any:
    """Load the mne raw data and find events and event ids from annotations"""
    logging.info("Loading Data")
    raw = read_raw_bids(bids_path=self.bids_path)
    self.subject = self.bids_path.subject
    self.events, self.event_ids = mne.events_from_annotations(
        raw, event_id=event_id)
    self.raw = raw.load_data()
    return self
def load_multiple_subjects(self, n_subjects: int = 40, preload: bool = False) ‑> None

Load subjects into current mne raw object

Expand source code
def load_multiple_subjects(self,
                           n_subjects: int = 40,
                           preload: bool = False) -> None:
    """Load subjects into current mne raw object"""

    curr_sub = [int(self.bids_path.subject)]
    subjects = set(range(1, n_subjects + 1)) - set(curr_sub)
    bids_paths = [
        self.bids_path.copy().update(subject=str(x).zfill(3))
        for x in subjects
    ]
    pipelines = [
        Pipeline(bids_path=path, verbose=logging.ERROR)
        for path in bids_paths
    ]
    with Pool(6) as p:
        raws = list(
            tqdm(p.imap(self._parallel_process_raws, pipelines),
                 total=n_subjects - 1))

    raws.append(self.raw)
    self.raw = mne.concatenate_raws(raws)
    self.events, self.event_ids = mne.events_from_annotations(self.raw)
    if preload:
        self.raw.load_data()
    self.set_montage()
def make_pipeline(self, steps: list) ‑> None

Perform the list of steps on the current mne raw object

Expand source code
def make_pipeline(self, steps: list) -> None:
    """Perform the list of steps on the current mne raw object"""
    logging.info(
        "*" * 5 +
        "Proceesing for subject: {}".format(self.bids_path.subject) +
        "*" * 5)
    for step in steps:
        self.apply(step)
    logging.info("Processed subject {}\n".format(self.bids_path.subject))
def set_custom_events_mapping(self, mapping: Dict[int, str] = None, task: str = None) ‑> None

Custom mappings for the P3 task

Expand source code
def set_custom_events_mapping(self,
                              mapping: Dict[int, str] = None,
                              task: str = None) -> None:
    """Custom mappings for the P3 task"""
    if task == 'P3':
        mapping, _, _ = P3.EVENTS_MAPINGS()
    assert mapping is not None, "Mapping is not defined! Please pass mapping as argument"

    annot_from_events = mne.annotations_from_events(
        events=self.events,
        event_desc=mapping,
        sfreq=self.raw.info['sfreq'])
    self.raw.set_annotations(annot_from_events)
    self.events, self.event_ids = mne.events_from_annotations(self.raw)
def set_montage(self) ‑> None

Set the montage for EEG data

Expand source code
def set_montage(self) -> None:
    """Set the montage for EEG data"""
    montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels',
                               'data', 'montages')
    logging.debug(sorted(os.listdir(montage_dir)))
    ten_twenty_montage = mne.channels.make_standard_montage('standard_1020')
    self.raw.set_channel_types({
        'HEOG_left': 'eog',
        'HEOG_right': 'eog',
        'VEOG_lower': 'eog'
    })
    self.raw.set_montage(ten_twenty_montage, match_case=False)
    logging.info("Standard 1020 montage and EOG channels are set")