Module src.pipeline
Expand source code
from decoding_analysis import EEGDecoder
from tqdm import tqdm
from multiprocessing import Pool
from dataclasses import dataclass
from genericpath import isfile
from mne_bids import (BIDSPath, read_raw_bids)
from typing import Optional, Union, Tuple, Dict, List
from encoding_analysis import Encoder
from erpanalysis import ERPAnalysis
from preprocessing import *
import os
import mne
import logging
import warnings
import pandas as pd
class P3:
"""Helper class for P3 tasks"""
@abstractmethod
def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]:
blocks = np.array(
[list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)])
rare = np.array([x + i for i, x in enumerate(range(11, 56, 10))
]).tolist()
freq = np.setdiff1d(blocks.flatten(), rare).tolist()
stimlus = ['A', 'B', 'C', 'D', 'E']
evts_stim = [
'stimulus/' + stimlus[i] + '/rare/' +
str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' +
str(alph) for i, x in enumerate(blocks) for alph in x
]
evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim)))
evts_id[1] = 'response/correct/201'
evts_id[2] = 'response/error/202'
return evts_id, rare, freq
@dataclass
class Pipeline:
""" Pipeline for processing Encoding and Decoding Analysis on EEG data"""
bids_path: Union[str, list[str]]
subject: Optional[int] = None
events_mapping: P3 = P3.EVENTS_MAPINGS()
verbose: logging = logging.INFO
raw: mne.io.Raw = field(init=False, repr=False)
events: np.ndarray = field(init=False, repr=False)
event_ids: dict = field(init=False, repr=False)
decoding_score: Tuple = field(init=False, repr=False, default=None)
def __post_init__(self):
logging.basicConfig(level=self.verbose)
mne.set_log_level(verbose='ERROR')
warnings.filterwarnings("ignore")
def __str__(self) -> str:
info = (
'The raw data for subject {} has {} time samples and {} channels.\n'
.format(self.subject, self.raw.n_times, len(self.raw.ch_names)) +
'The last time sample is at {} seconds.'.format(self.raw.times[-1])
+ 'The first few channel names are {}....\n'.format(', '.join(
self.raw.ch_names[:5])) +
'sample rate: {} Hz.\n'.format(self.raw.info['sfreq']) +
'%s channels x %s samples\n\n' %
(len(self.raw), len(self.raw.times)))
return info
def set_montage(self) -> None:
"""Set the montage for EEG data"""
montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels',
'data', 'montages')
logging.debug(sorted(os.listdir(montage_dir)))
ten_twenty_montage = mne.channels.make_standard_montage('standard_1020')
self.raw.set_channel_types({
'HEOG_left': 'eog',
'HEOG_right': 'eog',
'VEOG_lower': 'eog'
})
self.raw.set_montage(ten_twenty_montage, match_case=False)
logging.info("Standard 1020 montage and EOG channels are set")
def load_data(self, event_id: Union[Dict, str] = "auto") -> any:
"""Load the mne raw data and find events and event ids from annotations"""
logging.info("Loading Data")
raw = read_raw_bids(bids_path=self.bids_path)
self.subject = self.bids_path.subject
self.events, self.event_ids = mne.events_from_annotations(
raw, event_id=event_id)
self.raw = raw.load_data()
return self
def set_custom_events_mapping(self,
mapping: Dict[int, str] = None,
task: str = None) -> None:
"""Custom mappings for the P3 task"""
if task == 'P3':
mapping, _, _ = P3.EVENTS_MAPINGS()
assert mapping is not None, "Mapping is not defined! Please pass mapping as argument"
annot_from_events = mne.annotations_from_events(
events=self.events,
event_desc=mapping,
sfreq=self.raw.info['sfreq'])
self.raw.set_annotations(annot_from_events)
self.events, self.event_ids = mne.events_from_annotations(self.raw)
def apply_resampling(self,
sampling_freq: int,
padding: str = 'auto') -> None:
"""Apply resampling on the mne raw data"""
logging.info("Applying resampling")
self.raw, self.events = self.raw.resample(sampling_freq,
npad=padding,
events=self.events)
def apply_rereferencing(self, reference_channels: Union[List[str],
str]) -> None:
"""Apply re-referencing on the channels present in mne raw object"""
logging.info("Applying re-referencing")
mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels)
def apply_cleaning(self, cleaner: CleaningData):
"""Apply cleaning of bad segments and bad channels"""
logging.info("Applying cleaning")
cleaner.apply_cleaning(self.raw)
def apply_filter(self, filter: BaseFilter) -> None:
"""Apply filtering on raw data"""
logging.info("Applying filtering")
filter.apply_filter(self.raw)
def apply_ica(self, ica: BaseICA) -> None:
"""Apply ICA on the raw data"""
logging.info("Applying ICA")
ica.compute_ica(self.raw)
ica.apply_ica(self.raw)
def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame:
"""Returns Pandas dataframe containing the events from the BIDS path"""
bids_path = self.bids_path
fname = os.path.join(
bids_path.directory,
bids_path.basename.removesuffix(bids_path.suffix) + events_ext)
assert isfile(fname), "Events file not found!"
return pd.read_csv(fname, delimiter='\t')
def compute_epochs(self, erp: any) -> mne.Epochs:
"""Perform ERP epochs computation task"""
return erp.compute_epochs(self.raw, self.events, self.event_ids)
def compute_erp_peak(self,
erp: any,
condition: str,
thypo: float,
offset: float = 0.05,
channels: list[str] = []) -> pd.DataFrame:
"""Perform ERP Peak computation task"""
self.compute_epochs(erp)
return erp.compute_peak(condition, thypo, offset, channels)
def apply_decoder(self, decoder: EEGDecoder) -> Tuple:
"""Perfrom Decoding Analysis"""
svm_scores = decoder.run_svm_()
# sliding_scores = decoder.run_sliding_() somehow doesnot work!
decoding_score = svm_scores
self.decoding_score = decoding_score
return decoding_score
def _parallel_process_raws(self, pipeline) -> mne.io.Raw:
"""Perform preprocessig steps on list of raws using multiprocessing"""
pipeline.load_data()
pipeline.set_montage()
pipeline.make_pipeline([
SimpleMNEFilter(0.1, 50, 'firwin'),
CleaningData(pipeline.bids_path),
PrecomputedICA(pipeline.bids_path)
])
pipeline.set_custom_events_mapping(task='P3')
return pipeline.raw
def load_multiple_subjects(self,
n_subjects: int = 40,
preload: bool = False) -> None:
"""Load subjects into current mne raw object"""
curr_sub = [int(self.bids_path.subject)]
subjects = set(range(1, n_subjects + 1)) - set(curr_sub)
bids_paths = [
self.bids_path.copy().update(subject=str(x).zfill(3))
for x in subjects
]
pipelines = [
Pipeline(bids_path=path, verbose=logging.ERROR)
for path in bids_paths
]
with Pool(6) as p:
raws = list(
tqdm(p.imap(self._parallel_process_raws, pipelines),
total=n_subjects - 1))
raws.append(self.raw)
self.raw = mne.concatenate_raws(raws)
self.events, self.event_ids = mne.events_from_annotations(self.raw)
if preload:
self.raw.load_data()
self.set_montage()
def apply(self, step: list) -> None:
"""Find the step to apply"""
if isinstance(step, tuple):
step_name = step[0]
step = step[1]
else:
step_name = step.step()
if step_name == 'cleaning':
self.apply_cleaning(step)
elif step_name == 'filtering':
self.apply_filter(step)
elif step_name == 'ica':
self.apply_ica(step)
elif step_name == 'erp':
self.compute_epochs(step)
elif step_name == 'decoding':
self.apply_decoder(step)
elif step_name == 'classifier':
self.apply_classifier(step)
elif step_name == 'reference':
self.apply_rereferencing(step)
elif step_name == 'resample':
self.apply_resampling(step)
else:
logging.error("Invalid pipeline operation!")
def make_pipeline(self, steps: list) -> None:
"""Perform the list of steps on the current mne raw object"""
logging.info(
"*" * 5 +
"Proceesing for subject: {}".format(self.bids_path.subject) +
"*" * 5)
for step in steps:
self.apply(step)
logging.info("Processed subject {}\n".format(self.bids_path.subject))
@dataclass
class MultiPipeline():
"""Performs pipeline operations on list of pipelines"""
bids_root: str
verbose: int = logging.ERROR
subjects: list[str] = field(init=False, default=None)
bids_paths: list[BIDSPath] = field(init=False)
pipelines: list[Pipeline] = field(init=False)
def __post_init__(self) -> None:
logging.basicConfig(level=self.verbose)
self.subjects = [
sub for sub in os.listdir(self.bids_root)
if os.path.isdir(os.path.join(self.bids_root, sub))
]
bids_path = BIDSPath(subject='001',
session='P3',
task='P3',
datatype='eeg',
suffix='eeg',
root=self.bids_root)
self.bids_paths = [
bids_path.copy().update(subject=sub.split('-')[-1])
for sub in self.subjects
]
self.pipelines = [
Pipeline(bids_path=path, verbose=logging.ERROR)
for path in self.bids_paths
]
self.pipelines.sort(key=lambda x: int(x.bids_path.subject))
def _start_preprocessing(self, pipeline) -> list[Pipeline]:
"""
Helper function to parallelize the preprocessing steps.
Returns list of Pipelines having preprocessed data, In future, steps should be passed as parameter.
"""
pipeline.load_data()
pipeline.set_custom_events_mapping(task='P3')
pipeline.set_montage()
steps = [
SimpleMNEFilter(0.5, 50, 'firwin'),
CleaningData(pipeline.bids_path),
PrecomputedICA(pipeline.bids_path), ('reference', ['P9', 'P10']),
('resample', 256)
]
pipeline.make_pipeline(steps)
return pipeline
def start_erp_analysis(self, erpanalysis, jobs: int = 6):
"""
Performs ERP Analysis on Pipelines
"""
pipelines = self.pipelines
with Pool(jobs) as p:
pipelines = list(
tqdm(p.imap(self._start_preprocessing, pipelines),
total=len(self.subjects)))
for pipeline in pipelines:
pipeline.compute_epochs(erpanalysis)
return erpanalysis
def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder):
"""
Performs Encoding Analysis on Pipelines
"""
assert erp.all_subjects, 'You have to run erp analysis first'
for epoch in erp.epochs:
encoder.fit(epoch)
def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]:
"""
Performs perprocessing on a list of pipelines.
Pipelines are created by the list of BIDS paths in the BIDS root directory
"""
pipelines = self.pipelines
with Pool(jobs) as p:
pipelines = list(
tqdm(p.imap(self._start_preprocessing, pipelines),
total=len(self.subjects)))
return pipelines
Classes
class MultiPipeline (bids_root: str, verbose: int = 40)
-
Performs pipeline operations on list of pipelines
Expand source code
@dataclass class MultiPipeline(): """Performs pipeline operations on list of pipelines""" bids_root: str verbose: int = logging.ERROR subjects: list[str] = field(init=False, default=None) bids_paths: list[BIDSPath] = field(init=False) pipelines: list[Pipeline] = field(init=False) def __post_init__(self) -> None: logging.basicConfig(level=self.verbose) self.subjects = [ sub for sub in os.listdir(self.bids_root) if os.path.isdir(os.path.join(self.bids_root, sub)) ] bids_path = BIDSPath(subject='001', session='P3', task='P3', datatype='eeg', suffix='eeg', root=self.bids_root) self.bids_paths = [ bids_path.copy().update(subject=sub.split('-')[-1]) for sub in self.subjects ] self.pipelines = [ Pipeline(bids_path=path, verbose=logging.ERROR) for path in self.bids_paths ] self.pipelines.sort(key=lambda x: int(x.bids_path.subject)) def _start_preprocessing(self, pipeline) -> list[Pipeline]: """ Helper function to parallelize the preprocessing steps. Returns list of Pipelines having preprocessed data, In future, steps should be passed as parameter. """ pipeline.load_data() pipeline.set_custom_events_mapping(task='P3') pipeline.set_montage() steps = [ SimpleMNEFilter(0.5, 50, 'firwin'), CleaningData(pipeline.bids_path), PrecomputedICA(pipeline.bids_path), ('reference', ['P9', 'P10']), ('resample', 256) ] pipeline.make_pipeline(steps) return pipeline def start_erp_analysis(self, erpanalysis, jobs: int = 6): """ Performs ERP Analysis on Pipelines """ pipelines = self.pipelines with Pool(jobs) as p: pipelines = list( tqdm(p.imap(self._start_preprocessing, pipelines), total=len(self.subjects))) for pipeline in pipelines: pipeline.compute_epochs(erpanalysis) return erpanalysis def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder): """ Performs Encoding Analysis on Pipelines """ assert erp.all_subjects, 'You have to run erp analysis first' for epoch in erp.epochs: encoder.fit(epoch) def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]: """ Performs perprocessing on a list of pipelines. Pipelines are created by the list of BIDS paths in the BIDS root directory """ pipelines = self.pipelines with Pool(jobs) as p: pipelines = list( tqdm(p.imap(self._start_preprocessing, pipelines), total=len(self.subjects))) return pipelines
Class variables
var bids_paths : list
var bids_root : str
var pipelines : list
var subjects : list
var verbose : int
Methods
def start_encoding_analysis(self, erp: erpanalysis.ERPAnalysis, encoder: encoding_analysis.Encoder)
-
Performs Encoding Analysis on Pipelines
Expand source code
def start_encoding_analysis(self, erp: ERPAnalysis, encoder: Encoder): """ Performs Encoding Analysis on Pipelines """ assert erp.all_subjects, 'You have to run erp analysis first' for epoch in erp.epochs: encoder.fit(epoch)
def start_erp_analysis(self, erpanalysis, jobs: int = 6)
-
Performs ERP Analysis on Pipelines
Expand source code
def start_erp_analysis(self, erpanalysis, jobs: int = 6): """ Performs ERP Analysis on Pipelines """ pipelines = self.pipelines with Pool(jobs) as p: pipelines = list( tqdm(p.imap(self._start_preprocessing, pipelines), total=len(self.subjects))) for pipeline in pipelines: pipeline.compute_epochs(erpanalysis) return erpanalysis
def start_preprocessing(self, jobs: int = 6) ‑> list
-
Performs perprocessing on a list of pipelines.
Pipelines are created by the list of BIDS paths in the BIDS root directory
Expand source code
def start_preprocessing(self, jobs: int = 6) -> list[Pipeline]: """ Performs perprocessing on a list of pipelines. Pipelines are created by the list of BIDS paths in the BIDS root directory """ pipelines = self.pipelines with Pool(jobs) as p: pipelines = list( tqdm(p.imap(self._start_preprocessing, pipelines), total=len(self.subjects))) return pipelines
class P3
-
Helper class for P3 tasks
Expand source code
class P3: """Helper class for P3 tasks""" @abstractmethod def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]: blocks = np.array( [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)]) rare = np.array([x + i for i, x in enumerate(range(11, 56, 10)) ]).tolist() freq = np.setdiff1d(blocks.flatten(), rare).tolist() stimlus = ['A', 'B', 'C', 'D', 'E'] evts_stim = [ 'stimulus/' + stimlus[i] + '/rare/' + str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' + str(alph) for i, x in enumerate(blocks) for alph in x ] evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim))) evts_id[1] = 'response/correct/201' evts_id[2] = 'response/error/202' return evts_id, rare, freq
Methods
def EVENTS_MAPINGS() ‑> Tuple[dict, list[int], list[int]]
-
Expand source code
@abstractmethod def EVENTS_MAPINGS() -> Tuple[dict, list[int], list[int]]: blocks = np.array( [list(range(10 * x + 1, 10 * x + 6)) for x in range(1, 6)]) rare = np.array([x + i for i, x in enumerate(range(11, 56, 10)) ]).tolist() freq = np.setdiff1d(blocks.flatten(), rare).tolist() stimlus = ['A', 'B', 'C', 'D', 'E'] evts_stim = [ 'stimulus/' + stimlus[i] + '/rare/' + str(alph) if alph in rare else 'stimulus/' + stimlus[i] + '/freq/' + str(alph) for i, x in enumerate(blocks) for alph in x ] evts_id = dict((i + 3, evts_stim[i]) for i in range(0, len(evts_stim))) evts_id[1] = 'response/correct/201' evts_id[2] = 'response/error/202' return evts_id, rare, freq
class Pipeline (bids_path: Union[str, list[str]], subject: Optional[int] = None, events_mapping: P3 = ({3: 'stimulus/A/rare/11', 4: 'stimulus/A/freq/12', 5: 'stimulus/A/freq/13', 6: 'stimulus/A/freq/14', 7: 'stimulus/A/freq/15', 8: 'stimulus/B/freq/21', 9: 'stimulus/B/rare/22', 10: 'stimulus/B/freq/23', 11: 'stimulus/B/freq/24', 12: 'stimulus/B/freq/25', 13: 'stimulus/C/freq/31', 14: 'stimulus/C/freq/32', 15: 'stimulus/C/rare/33', 16: 'stimulus/C/freq/34', 17: 'stimulus/C/freq/35', 18: 'stimulus/D/freq/41', 19: 'stimulus/D/freq/42', 20: 'stimulus/D/freq/43', 21: 'stimulus/D/rare/44', 22: 'stimulus/D/freq/45', 23: 'stimulus/E/freq/51', 24: 'stimulus/E/freq/52', 25: 'stimulus/E/freq/53', 26: 'stimulus/E/freq/54', 27: 'stimulus/E/rare/55', 1: 'response/correct/201', 2: 'response/error/202'}, [11, 22, 33, 44, 55], [12, 13, 14, 15, 21, 23, 24, 25, 31, 32, 34, 35, 41, 42, 43, 45, 51, 52, 53, 54]), verbose:
= 20) -
Pipeline for processing Encoding and Decoding Analysis on EEG data
Expand source code
@dataclass class Pipeline: """ Pipeline for processing Encoding and Decoding Analysis on EEG data""" bids_path: Union[str, list[str]] subject: Optional[int] = None events_mapping: P3 = P3.EVENTS_MAPINGS() verbose: logging = logging.INFO raw: mne.io.Raw = field(init=False, repr=False) events: np.ndarray = field(init=False, repr=False) event_ids: dict = field(init=False, repr=False) decoding_score: Tuple = field(init=False, repr=False, default=None) def __post_init__(self): logging.basicConfig(level=self.verbose) mne.set_log_level(verbose='ERROR') warnings.filterwarnings("ignore") def __str__(self) -> str: info = ( 'The raw data for subject {} has {} time samples and {} channels.\n' .format(self.subject, self.raw.n_times, len(self.raw.ch_names)) + 'The last time sample is at {} seconds.'.format(self.raw.times[-1]) + 'The first few channel names are {}....\n'.format(', '.join( self.raw.ch_names[:5])) + 'sample rate: {} Hz.\n'.format(self.raw.info['sfreq']) + '%s channels x %s samples\n\n' % (len(self.raw), len(self.raw.times))) return info def set_montage(self) -> None: """Set the montage for EEG data""" montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels', 'data', 'montages') logging.debug(sorted(os.listdir(montage_dir))) ten_twenty_montage = mne.channels.make_standard_montage('standard_1020') self.raw.set_channel_types({ 'HEOG_left': 'eog', 'HEOG_right': 'eog', 'VEOG_lower': 'eog' }) self.raw.set_montage(ten_twenty_montage, match_case=False) logging.info("Standard 1020 montage and EOG channels are set") def load_data(self, event_id: Union[Dict, str] = "auto") -> any: """Load the mne raw data and find events and event ids from annotations""" logging.info("Loading Data") raw = read_raw_bids(bids_path=self.bids_path) self.subject = self.bids_path.subject self.events, self.event_ids = mne.events_from_annotations( raw, event_id=event_id) self.raw = raw.load_data() return self def set_custom_events_mapping(self, mapping: Dict[int, str] = None, task: str = None) -> None: """Custom mappings for the P3 task""" if task == 'P3': mapping, _, _ = P3.EVENTS_MAPINGS() assert mapping is not None, "Mapping is not defined! Please pass mapping as argument" annot_from_events = mne.annotations_from_events( events=self.events, event_desc=mapping, sfreq=self.raw.info['sfreq']) self.raw.set_annotations(annot_from_events) self.events, self.event_ids = mne.events_from_annotations(self.raw) def apply_resampling(self, sampling_freq: int, padding: str = 'auto') -> None: """Apply resampling on the mne raw data""" logging.info("Applying resampling") self.raw, self.events = self.raw.resample(sampling_freq, npad=padding, events=self.events) def apply_rereferencing(self, reference_channels: Union[List[str], str]) -> None: """Apply re-referencing on the channels present in mne raw object""" logging.info("Applying re-referencing") mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels) def apply_cleaning(self, cleaner: CleaningData): """Apply cleaning of bad segments and bad channels""" logging.info("Applying cleaning") cleaner.apply_cleaning(self.raw) def apply_filter(self, filter: BaseFilter) -> None: """Apply filtering on raw data""" logging.info("Applying filtering") filter.apply_filter(self.raw) def apply_ica(self, ica: BaseICA) -> None: """Apply ICA on the raw data""" logging.info("Applying ICA") ica.compute_ica(self.raw) ica.apply_ica(self.raw) def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame: """Returns Pandas dataframe containing the events from the BIDS path""" bids_path = self.bids_path fname = os.path.join( bids_path.directory, bids_path.basename.removesuffix(bids_path.suffix) + events_ext) assert isfile(fname), "Events file not found!" return pd.read_csv(fname, delimiter='\t') def compute_epochs(self, erp: any) -> mne.Epochs: """Perform ERP epochs computation task""" return erp.compute_epochs(self.raw, self.events, self.event_ids) def compute_erp_peak(self, erp: any, condition: str, thypo: float, offset: float = 0.05, channels: list[str] = []) -> pd.DataFrame: """Perform ERP Peak computation task""" self.compute_epochs(erp) return erp.compute_peak(condition, thypo, offset, channels) def apply_decoder(self, decoder: EEGDecoder) -> Tuple: """Perfrom Decoding Analysis""" svm_scores = decoder.run_svm_() # sliding_scores = decoder.run_sliding_() somehow doesnot work! decoding_score = svm_scores self.decoding_score = decoding_score return decoding_score def _parallel_process_raws(self, pipeline) -> mne.io.Raw: """Perform preprocessig steps on list of raws using multiprocessing""" pipeline.load_data() pipeline.set_montage() pipeline.make_pipeline([ SimpleMNEFilter(0.1, 50, 'firwin'), CleaningData(pipeline.bids_path), PrecomputedICA(pipeline.bids_path) ]) pipeline.set_custom_events_mapping(task='P3') return pipeline.raw def load_multiple_subjects(self, n_subjects: int = 40, preload: bool = False) -> None: """Load subjects into current mne raw object""" curr_sub = [int(self.bids_path.subject)] subjects = set(range(1, n_subjects + 1)) - set(curr_sub) bids_paths = [ self.bids_path.copy().update(subject=str(x).zfill(3)) for x in subjects ] pipelines = [ Pipeline(bids_path=path, verbose=logging.ERROR) for path in bids_paths ] with Pool(6) as p: raws = list( tqdm(p.imap(self._parallel_process_raws, pipelines), total=n_subjects - 1)) raws.append(self.raw) self.raw = mne.concatenate_raws(raws) self.events, self.event_ids = mne.events_from_annotations(self.raw) if preload: self.raw.load_data() self.set_montage() def apply(self, step: list) -> None: """Find the step to apply""" if isinstance(step, tuple): step_name = step[0] step = step[1] else: step_name = step.step() if step_name == 'cleaning': self.apply_cleaning(step) elif step_name == 'filtering': self.apply_filter(step) elif step_name == 'ica': self.apply_ica(step) elif step_name == 'erp': self.compute_epochs(step) elif step_name == 'decoding': self.apply_decoder(step) elif step_name == 'classifier': self.apply_classifier(step) elif step_name == 'reference': self.apply_rereferencing(step) elif step_name == 'resample': self.apply_resampling(step) else: logging.error("Invalid pipeline operation!") def make_pipeline(self, steps: list) -> None: """Perform the list of steps on the current mne raw object""" logging.info( "*" * 5 + "Proceesing for subject: {}".format(self.bids_path.subject) + "*" * 5) for step in steps: self.apply(step) logging.info("Processed subject {}\n".format(self.bids_path.subject))
Class variables
var bids_path : Union[str, list[str]]
var decoding_score : Tuple
var event_ids : dict
var events : numpy.ndarray
var events_mapping : P3
var raw : mne.io.fiff.raw.Raw
var subject : Optional[int]
var verbose :
Methods
def apply(self, step: list) ‑> None
-
Find the step to apply
Expand source code
def apply(self, step: list) -> None: """Find the step to apply""" if isinstance(step, tuple): step_name = step[0] step = step[1] else: step_name = step.step() if step_name == 'cleaning': self.apply_cleaning(step) elif step_name == 'filtering': self.apply_filter(step) elif step_name == 'ica': self.apply_ica(step) elif step_name == 'erp': self.compute_epochs(step) elif step_name == 'decoding': self.apply_decoder(step) elif step_name == 'classifier': self.apply_classifier(step) elif step_name == 'reference': self.apply_rereferencing(step) elif step_name == 'resample': self.apply_resampling(step) else: logging.error("Invalid pipeline operation!")
def apply_cleaning(self, cleaner: preprocessing.CleaningData)
-
Apply cleaning of bad segments and bad channels
Expand source code
def apply_cleaning(self, cleaner: CleaningData): """Apply cleaning of bad segments and bad channels""" logging.info("Applying cleaning") cleaner.apply_cleaning(self.raw)
def apply_decoder(self, decoder: decoding_analysis.EEGDecoder) ‑> Tuple
-
Perfrom Decoding Analysis
Expand source code
def apply_decoder(self, decoder: EEGDecoder) -> Tuple: """Perfrom Decoding Analysis""" svm_scores = decoder.run_svm_() # sliding_scores = decoder.run_sliding_() somehow doesnot work! decoding_score = svm_scores self.decoding_score = decoding_score return decoding_score
def apply_filter(self, filter: preprocessing.BaseFilter) ‑> None
-
Apply filtering on raw data
Expand source code
def apply_filter(self, filter: BaseFilter) -> None: """Apply filtering on raw data""" logging.info("Applying filtering") filter.apply_filter(self.raw)
def apply_ica(self, ica: preprocessing.BaseICA) ‑> None
-
Apply ICA on the raw data
Expand source code
def apply_ica(self, ica: BaseICA) -> None: """Apply ICA on the raw data""" logging.info("Applying ICA") ica.compute_ica(self.raw) ica.apply_ica(self.raw)
def apply_rereferencing(self, reference_channels: Union[str, List[str]]) ‑> None
-
Apply re-referencing on the channels present in mne raw object
Expand source code
def apply_rereferencing(self, reference_channels: Union[List[str], str]) -> None: """Apply re-referencing on the channels present in mne raw object""" logging.info("Applying re-referencing") mne.io.set_eeg_reference(self.raw, ref_channels=reference_channels)
def apply_resampling(self, sampling_freq: int, padding: str = 'auto') ‑> None
-
Apply resampling on the mne raw data
Expand source code
def apply_resampling(self, sampling_freq: int, padding: str = 'auto') -> None: """Apply resampling on the mne raw data""" logging.info("Applying resampling") self.raw, self.events = self.raw.resample(sampling_freq, npad=padding, events=self.events)
def compute_epochs(self, erp:
) ‑> mne.epochs.Epochs -
Perform ERP epochs computation task
Expand source code
def compute_epochs(self, erp: any) -> mne.Epochs: """Perform ERP epochs computation task""" return erp.compute_epochs(self.raw, self.events, self.event_ids)
def compute_erp_peak(self, erp:
, condition: str, thypo: float, offset: float = 0.05, channels: list = []) ‑> pandas.core.frame.DataFrame -
Perform ERP Peak computation task
Expand source code
def compute_erp_peak(self, erp: any, condition: str, thypo: float, offset: float = 0.05, channels: list[str] = []) -> pd.DataFrame: """Perform ERP Peak computation task""" self.compute_epochs(erp) return erp.compute_peak(condition, thypo, offset, channels)
def get_events_df(self, events_ext: str = 'events.tsv') ‑> pandas.core.frame.DataFrame
-
Returns Pandas dataframe containing the events from the BIDS path
Expand source code
def get_events_df(self, events_ext: str = 'events.tsv') -> pd.DataFrame: """Returns Pandas dataframe containing the events from the BIDS path""" bids_path = self.bids_path fname = os.path.join( bids_path.directory, bids_path.basename.removesuffix(bids_path.suffix) + events_ext) assert isfile(fname), "Events file not found!" return pd.read_csv(fname, delimiter='\t')
def load_data(self, event_id: Union[Dict, str] = 'auto') ‑>
-
Load the mne raw data and find events and event ids from annotations
Expand source code
def load_data(self, event_id: Union[Dict, str] = "auto") -> any: """Load the mne raw data and find events and event ids from annotations""" logging.info("Loading Data") raw = read_raw_bids(bids_path=self.bids_path) self.subject = self.bids_path.subject self.events, self.event_ids = mne.events_from_annotations( raw, event_id=event_id) self.raw = raw.load_data() return self
def load_multiple_subjects(self, n_subjects: int = 40, preload: bool = False) ‑> None
-
Load subjects into current mne raw object
Expand source code
def load_multiple_subjects(self, n_subjects: int = 40, preload: bool = False) -> None: """Load subjects into current mne raw object""" curr_sub = [int(self.bids_path.subject)] subjects = set(range(1, n_subjects + 1)) - set(curr_sub) bids_paths = [ self.bids_path.copy().update(subject=str(x).zfill(3)) for x in subjects ] pipelines = [ Pipeline(bids_path=path, verbose=logging.ERROR) for path in bids_paths ] with Pool(6) as p: raws = list( tqdm(p.imap(self._parallel_process_raws, pipelines), total=n_subjects - 1)) raws.append(self.raw) self.raw = mne.concatenate_raws(raws) self.events, self.event_ids = mne.events_from_annotations(self.raw) if preload: self.raw.load_data() self.set_montage()
def make_pipeline(self, steps: list) ‑> None
-
Perform the list of steps on the current mne raw object
Expand source code
def make_pipeline(self, steps: list) -> None: """Perform the list of steps on the current mne raw object""" logging.info( "*" * 5 + "Proceesing for subject: {}".format(self.bids_path.subject) + "*" * 5) for step in steps: self.apply(step) logging.info("Processed subject {}\n".format(self.bids_path.subject))
def set_custom_events_mapping(self, mapping: Dict[int, str] = None, task: str = None) ‑> None
-
Custom mappings for the P3 task
Expand source code
def set_custom_events_mapping(self, mapping: Dict[int, str] = None, task: str = None) -> None: """Custom mappings for the P3 task""" if task == 'P3': mapping, _, _ = P3.EVENTS_MAPINGS() assert mapping is not None, "Mapping is not defined! Please pass mapping as argument" annot_from_events = mne.annotations_from_events( events=self.events, event_desc=mapping, sfreq=self.raw.info['sfreq']) self.raw.set_annotations(annot_from_events) self.events, self.event_ids = mne.events_from_annotations(self.raw)
def set_montage(self) ‑> None
-
Set the montage for EEG data
Expand source code
def set_montage(self) -> None: """Set the montage for EEG data""" montage_dir = os.path.join(os.path.dirname(mne.__file__), 'channels', 'data', 'montages') logging.debug(sorted(os.listdir(montage_dir))) ten_twenty_montage = mne.channels.make_standard_montage('standard_1020') self.raw.set_channel_types({ 'HEOG_left': 'eog', 'HEOG_right': 'eog', 'VEOG_lower': 'eog' }) self.raw.set_montage(ten_twenty_montage, match_case=False) logging.info("Standard 1020 montage and EOG channels are set")