Source code for aopy.preproc.wrappers

# wrappers.py
#
# Wrappers for preprocessing functions. These functions are used to preprocess data in a single step,
# calling the appropriate functions in the correct order.

import os
from importlib.metadata import version
import datetime
import h5py
from .base import *
from .bmi3d import parse_bmi3d
from .oculomatic import parse_oculomatic
from .optitrack import parse_optitrack
from .neuropixel import proc_neuropixel_spikes, proc_neuropixel_ts
from .. import postproc
from .. import data as aodata
from aopy.utils.base import detect_edges
import glob

'''
proc_* wrappers
'''
[docs]def proc_single(data_dir, files, preproc_dir, subject, te_id, date, preproc_jobs, kilosort_dir=None, overwrite=False, **kwargs): ''' Preprocess a single recording, given a list of raw data files, into a series of hdf records with the same prefix. Args: data_dir (str): File directory of collected session data files (dict): dict of file names to process in data_dir preproc_dir (str): Target directory for processed data subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording preproc_jobs (list): list of proc_types to generate overwrite (bool, optional): Overwrite files in result_dir. Defaults to False. ''' if os.path.basename(os.path.normpath(preproc_dir)) != subject: preproc_dir = os.path.join(preproc_dir, subject) if not os.path.exists(preproc_dir): os.mkdir(preproc_dir) preproc_dir_base = os.path.dirname(preproc_dir) # Remove existing files if overwrite is True for source in preproc_jobs: filename = aodata.get_preprocessed_filename(subject, te_id, date, source) filepath = os.path.join(preproc_dir, subject, filename) if overwrite and os.path.exists(filepath): os.remove(filepath) print(f'Removed existing file {filepath}') # Process each job individually if 'exp' in preproc_jobs: print('processing experiment data...') exp_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'exp') proc_exp( data_dir, files, preproc_dir, exp_filename, overwrite=overwrite, ) if 'eye' in preproc_jobs: print('processing eyetracking data...') exp_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'exp') eye_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'eye') proc_eyetracking( data_dir, files, preproc_dir, exp_filename, eye_filename, overwrite=overwrite, ) eye_data, eye_metadata = aodata.load_preproc_eye_data(preproc_dir_base, subject, te_id, date) assert 'raw_data' in eye_data.keys(), "No eye data found" assert eye_data['raw_data'].shape == (eye_metadata['n_samples'], eye_metadata['n_channels']) if 'broadband' in preproc_jobs: print('processing broadband data...') broadband_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'broadband') proc_broadband( data_dir, files, preproc_dir, broadband_filename, overwrite=overwrite ) broadband_data, broadband_metadata = aodata.load_preproc_broadband_data(preproc_dir_base, subject, te_id, date) assert broadband_data.shape == (broadband_metadata['n_samples'], broadband_metadata['n_channels']) files['broadband'] = broadband_filename # for proc_lfp() if 'ap' in preproc_jobs: print('processing ap band data...') ap_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'ap') proc_ap( data_dir, files, preproc_dir, ap_filename, kilosort_dir=kilosort_dir, overwrite=overwrite, filter_kwargs=kwargs # pass any remaining kwargs to the filtering function ) ap_data, ap_metadata = aodata.load_preproc_ap_data(preproc_dir_base, subject, te_id, date, drive_number=1) assert ap_data.shape == (ap_metadata['n_samples'], ap_metadata['n_channels']) if 'lfp' in preproc_jobs: print('processing local field potential data...') lfp_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'lfp') proc_lfp( data_dir, files, preproc_dir, lfp_filename, kilosort_dir=kilosort_dir, overwrite=overwrite, filter_kwargs=kwargs # pass any remaining kwargs to the filtering function ) lfp_data, lfp_metadata = aodata.load_preproc_lfp_data(preproc_dir_base, subject, te_id, date, drive_number=1) assert lfp_data.shape == (lfp_metadata['n_samples'], lfp_metadata['n_channels']) if 'spikes' in preproc_jobs: print('processing spike data...') ap_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'spike') proc_spikes( data_dir, files, preproc_dir, ap_filename, kilosort_dir=kilosort_dir, overwrite=overwrite, filter_kwargs=kwargs # pass any remaining kwargs to the filtering function ) if 'emg' in preproc_jobs: print('processing emg data...') emg_filename = aodata.get_preprocessed_filename(subject, te_id, date, 'emg') proc_emg( data_dir, files, preproc_dir, emg_filename, overwrite=overwrite )
[docs]def proc_exp(data_dir, files, result_dir, result_filename, overwrite=False, save_res=True): ''' Process experiment data files. Loads 'hdf' and 'ecube' (if present) data, parses, and prepares experiment data and metadata. Note: Currently supports BMI3D only. The above data is prepared into structured arrays: exp_data: task ([('cursor', '<f8', (3,)), ('trial', 'u8', (1,)), ('time', 'u8', (1,)), ...]) state ([('msg', 'S', (1,)), ('time', 'u8', (1,))]) clock ([('timestamp', 'f8', (1,)), ('time', 'u8', (1,))]) events ([('timestamp', 'f8', (1,)), ('time', 'u8', (1,)), ('event', 'S32', (1,)), ('data', 'u2', (1,)), ('code', 'u2', (1,))]) trials ([('trial', 'u8'), ('index', 'u8'), (condition_name, 'f8', (3,)))]) exp_metadata: source_dir (str) source_files (dict) bmi3d_start_time (float) n_cycles (int) n_trials (int) <other metadata from bmi3d> Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result overwrite (bool): whether to remove existing processed files if they exist Returns: None ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "exp_data" in contents or "exp_metadata" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return # Prepare the BMI3D data bmi3d_data, bmi3d_metadata = parse_bmi3d(data_dir, files) if save_res: aodata.save_hdf(result_dir, result_filename, bmi3d_data, "/exp_data", append=True) aodata.save_hdf(result_dir, result_filename, bmi3d_metadata, "/exp_metadata", append=True) print('done!') return bmi3d_data, bmi3d_metadata
[docs]def proc_mocap(data_dir, files, result_dir, result_filename, overwrite=False): ''' Process motion capture files: Loads metadata, position data, and rotation data from 'optitrack' files If present, reads 'hdf' metadata to find appropriate strobe channel If present, loads 'ecube' analog data representing optitrack camera strobe The data is prepared along with timestamps into HDF datasets: mocap_data: optitrack [('position', 'f8', (3,)), ('rotation', 'f8', (4,)), ('timestamp', 'f8', (1,)] mocap_metadata: source_dir (str) source_files (dict) samplerate (float) <other metadata from motive> Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result overwrite (bool): whether to remove existing processed files if they exist Returns: None ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "mocap_data" in contents or "mocap_metadata" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return # Parse Optitrack data if 'optitrack' in files: optitrack_data, optitrack_metadata = parse_optitrack(data_dir, files) aodata.save_hdf(result_dir, result_filename, optitrack_data, "/mocap_data", append=True) aodata.save_hdf(result_dir, result_filename, optitrack_metadata, "/mocap_metadata", append=True)
[docs]def proc_eyetracking(data_dir, files, result_dir, exp_filename, result_filename, debug=True, overwrite=False, save_res=True, **kwargs): ''' Loads eyedata from ecube analog signal and calculates calibration profile using least square fitting. Requires that experimental data has already been preprocessed in the same result hdf file. The data is prepared into HDF datasets: eye_data: eye_closed_mask (nt, nch): boolean mask of when the eyes are closed raw_data (nt, nch): raw eye data calibrated_data (nt, nch): calibrated eye data coefficients (nch, 2): linear regression coefficients correlation_coeff (nch): best fit correlation coefficients from linear regression cursor_calibration_data (ntr, 2): cursor coordinates used for calibration eye_calibration_data (ntr, nch): eye coordinates used for calibration eye_metadata: samplerate (float): sampling rate of the calibrated eye data see :func:`aopy.preproc.parse_oculomatic` for oculomatic metadata Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): what to call the preprocessed filename debug (bool, optional): if true, prints additional debug messages overwrite (bool, optional): whether to recalculate and overwrite existing preprocessed eyetracking data save_res (bool, optional): whether to save the calculated eyetracking data **kwargs (dict, optional): keyword arguments to pass to :func:`aopy.preproc.calc_eye_calibration()` Returns: eye_dict (dict): all the data pertaining to eye tracking, calibration eye_metadata (dict): metadata for eye tracking Example: Uncalibrated raw data: .. image:: _images/eye_trajectories_raw.png After calibration: .. image:: _images/eye_trajectories_calibrated.png ''' # Check if data already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "eye_data" in contents and "eye_metadata" in contents: print("Eye data already preprocessed in {}, returning existing data.".format(result_filename)) eye_data = aodata.load_hdf_group(result_dir, result_filename, 'eye_data') eye_metadata = aodata.load_hdf_group(result_dir, result_filename, 'eye_metadata') return eye_data, eye_metadata # Load the preprocessed experimental data try: exp_data = aodata.load_hdf_group(result_dir, exp_filename, 'exp_data') exp_metadata = aodata.load_hdf_group(result_dir, exp_filename, 'exp_metadata') except (FileNotFoundError, ValueError): print(f"File {exp_filename} does not include preprocessed experimental data. Please call proc_exp() first.") return None, {} # Parse the raw eye data; this could be extended in the future to support other eyetracking hardware try: eye_data, eye_metadata = parse_oculomatic(data_dir, files, debug=debug) eye_mask = eye_data['mask'] eye_data = eye_data['data'] except: print("Could not parse eyetracking data. Returning empty data.") eye_mask = None eye_data = None eye_metadata = {} try: # Calibrate the eye data cursor_samplerate = exp_metadata['cursor_interp_samplerate'] cursor_data = exp_data['cursor_interp'][:,:2] events = exp_data['events'] event_codes = events['code'] event_times = events['timestamp'] # time points in the ecube time frame coeff, correlation_coeff, cursor_calibration_data, eye_calibration_data = calc_eye_calibration( cursor_data, cursor_samplerate, eye_data, eye_metadata['samplerate'], event_times, event_codes, return_datapoints=True, **kwargs) calibrated_eye_data = postproc.get_calibrated_eye_data(eye_data, coeff) eye_dict = { 'eye_closed_mask': eye_mask, 'raw_data': eye_data, 'calibrated_data': calibrated_eye_data, 'coefficients': coeff, 'correlation_coeff': correlation_coeff, 'cursor_calibration_data': cursor_calibration_data, 'eye_calibration_data': eye_calibration_data } try: eye_metadata['calibration_version'] = version('aolab-aopy') except: eye_metadata['calibration_version'] = 'unknown' eye_metadata['calibration_date'] = datetime.datetime.now().isoformat() except (KeyError, ValueError): # If there is no cursor data or there aren't enough trials, this will fail. # We should still save the eye data, just don't include the calibrated data eye_dict = { 'eye_closed_mask': eye_mask, 'raw_data': eye_data } # Save everything into the HDF file if save_res: aodata.save_hdf(result_dir, result_filename, eye_dict, "/eye_data", append=True) aodata.save_hdf(result_dir, result_filename, eye_metadata, "/eye_metadata", append=True) return eye_dict, eye_metadata
[docs]def proc_broadband(data_dir, files, result_dir, result_filename, overwrite=False, max_memory_gb=1.): ''' Process broadband data: Loads 'ecube' headstage data and metadata Saves broadband data into the HDF datasets: broadband_data (nt, nch) broadband_metadata (dict) Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result overwrite (bool, optional): whether to remove existing processed files if they exist max_memory_gb (float, optional): max memory used to load binary data at one time Returns: None ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "broadband_data" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return elif os.path.exists(filepath): os.remove(filepath) # maybe bad, since it deletes everything, not just broadband data # Check if record_headstage is True or False record_headstage_key = 'record_headstage' metadata_group = 'exp_metadata' tmp = result_filename.split('_')[:-1] exp_filename = "_".join(tmp) + '_exp.hdf' try: record_headstage = aodata.load_hdf_data(result_dir, exp_filename, record_headstage_key, metadata_group, cached=True) except: record_headstage = True # For previous recording where there is no 'record_headstage' key # Copy the broadband data into an HDF dataset if 'ecube' in files and record_headstage: # Process the binary data data_filepath = os.path.join(data_dir, files['ecube']) result_filepath = os.path.join(result_dir, result_filename) _, metadata = aodata.proc_ecube_data(data_filepath, 'Headstages', result_filepath, result_name='broadband_data', max_memory_gb=max_memory_gb) # Append the broadband metadata to the file aodata.save_hdf(result_dir, result_filename, metadata, "/broadband_metadata", append=True)
[docs]def proc_spikes(data_dir, files, result_dir, result_filename, kilosort_dir=None, overwrite=False): ''' Process spike data: neuropixels: synchronize spike times with ecube. Extract waveforms from drift corrected data. Saves spike times and waveforms into the HDF datasets: spikes (HDF5 group (dict)) : spike times for each drive waveforms (HDF5 group (dict)) : spike times for each drive metadata (dict) Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result kilosort_dir (str): data folder where preprocessed data of kilosort is saved overwrite (bool, optional): whether to remove existing processed files if they exist filter_kwargs (dict, optional): keyword arguments to pass to :func:`aopy.precondition.filter_lfp` ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "spikes" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return elif os.path.exists(filepath): os.remove(filepath) # Check if record_headstage is True or False record_headstage_key = 'record_headstage' metadata_group = 'exp_metadata' tmp = result_filename.split('_')[:-1] exp_filename = "_".join(tmp) + '_exp.hdf' record_headstage = aodata.load_hdf_data(result_dir, exp_filename, record_headstage_key, metadata_group, cached=True) idrive = 0 # Preprocess neural data into spikes if 'neuropixels' in files: print(1) np_recorddir = files['neuropixels'] ecube_files = files['ecube'] nport = len(glob.glob(os.path.join(data_dir, f'{np_recorddir}*/**/continuous/*AP'),recursive=True)) for iport in range(nport): idrive += 1 iport += 1 # Note that idrive is not always the same as iport when you do neuropixels with other recordings _,_, metadata = proc_neuropixel_spikes(data_dir,np_recorddir,ecube_files,kilosort_dir,iport,filepath,version='kilosort4') aodata.save_hdf(result_dir, result_filename, metadata, f'drive{idrive}/metadata', append=True)
[docs]def proc_ap(data_dir, files, result_dir, result_filename, kilosort_dir=None, overwrite=False, max_memory_gb=1., filter_kwargs={}): ''' Process ap data neuropixels: filter data between 300-5k, downsample to 10k, synchronize timestamps with ecube. Drift correction is applied Saves ap data into the HDF datasets: ap_data (nt, nch) ap_metadata (dict) Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result kilosort_dir (str): data folder where preprocessed data of kilosort is saved overwrite (bool, optional): whether to remove existing processed files if they exist max_memory_gb (float, optional): max memory used to load binary data at one time filter_kwargs (dict, optional): keyword arguments to pass to :func:`aopy.precondition.filter_lfp` ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "ap_data" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return elif os.path.exists(filepath): os.remove(filepath) # Check if record_headstage is True or False record_headstage_key = 'record_headstage' metadata_group = 'exp_metadata' tmp = result_filename.split('_')[:-1] exp_filename = "_".join(tmp) + '_exp.hdf' record_headstage = aodata.load_hdf_data(result_dir, exp_filename, record_headstage_key, metadata_group, cached=True) idrive = 0 # Preprocess neural data into lfp if 'neuropixels' in files: np_recorddir = files['neuropixels'] ecube_files = files['ecube'] datatype = 'ap' nport = len(glob.glob(os.path.join(data_dir, f'{np_recorddir}*/**/continuous/*AP'),recursive=True)) for iport in range(nport): idrive += 1 iport += 1 # Note that idrive is not always the same as iport when you do neuropixels with other recordings _, ap_metadata = proc_neuropixel_ts(data_dir, np_recorddir, ecube_files, kilosort_dir, datatype, iport, filepath, max_memory_gb = max_memory_gb) aodata.save_hdf(result_dir, result_filename, ap_metadata, f'drive{idrive}/ap_metadata', append=True)
[docs]def proc_lfp(data_dir, files, result_dir, result_filename, kilosort_dir=None, overwrite=False, max_memory_gb=1., filter_kwargs={}): ''' Process lfp data: Loads 'broadband' hdf data or 'ecube' headstage data and metadata neuropixels: low-pass filter data with 500Hz, downsample to 1k, synchronize timestamps with ecube. Destriping is applied Saves lfp data into the HDF datasets: lfp_data (nt, nch) lfp_metadata (dict) Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result kilosort_dir (str): data folder where preprocessed data of kilosort is saved overwrite (bool, optional): whether to remove existing processed files if they exist max_memory_gb (float, optional): max memory used to load binary data at one time filter_kwargs (dict, optional): keyword arguments to pass to :func:`aopy.precondition.filter_lfp` ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "lfp_data" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return elif os.path.exists(filepath): os.remove(filepath) # Check if record_headstage is True or False record_headstage_key = 'record_headstage' metadata_group = 'exp_metadata' tmp = result_filename.split('_')[:-1] exp_filename = "_".join(tmp) + '_exp.hdf' try: record_headstage = aodata.load_hdf_data(result_dir, exp_filename, record_headstage_key, metadata_group, cached=True) except: record_headstage = True # For previous recording where there is no 'record_headstage' key idrive = 0 # Preprocess neural data into lfp if 'broadband' in files: idrive += 1 broadband_filepath = os.path.join(result_dir, files['broadband']) result_filepath = os.path.join(result_dir, result_filename) _, lfp_metadata = aodata.filter_lfp_from_broadband(broadband_filepath, result_filepath, drive_number=idrive, dtype='int16', max_memory_gb=max_memory_gb, **filter_kwargs) aodata.save_hdf(result_dir, result_filename, lfp_metadata, f'drive{idrive}/lfp_metadata', append=True) elif 'ecube' in files and record_headstage: idrive += 1 ecube_filepath = os.path.join(data_dir, files['ecube']) result_filepath = os.path.join(result_dir, result_filename) _, lfp_metadata = aodata.filter_lfp_from_ecube(ecube_filepath, result_filepath, drive_number=idrive, dtype='int16', max_memory_gb=max_memory_gb, **filter_kwargs) aodata.save_hdf(result_dir, result_filename, lfp_metadata, f'drive{idrive}/lfp_metadata', append=True) if 'neuropixels' in files: np_recorddir = files['neuropixels'] ecube_files = files['ecube'] datatype = 'lfp' nport = len(glob.glob(os.path.join(data_dir, f'{np_recorddir}*/**/continuous/*AP'),recursive=True)) for iport in range(nport): idrive += 1 iport += 1 # Note that idrive is not always the same as iport when you do neuropixels with other recordings _, lfp_metadata = proc_neuropixel_ts(data_dir, np_recorddir, ecube_files, kilosort_dir, datatype, iport, filepath, max_memory_gb = max_memory_gb) aodata.save_hdf(result_dir, result_filename, lfp_metadata, f'drive{idrive}/lfp_metadata', append=True)
[docs]def proc_emg(data_dir, files, result_dir, result_filename, overwrite=False): ''' Process emg data: Loads emg hdf data from bmi3d Saves emg data into the HDF datasets: emg_raw (nt, nch) emg_metadata (dict) Args: data_dir (str): where the data files are located files (dict): dictionary of filenames indexed by system result_dir (str): where to store the processed result result_filename (str): where to store the processed result overwrite (bool, optional): whether to remove existing processed files if they exist ''' # Check if a processed file already exists filepath = os.path.join(result_dir, result_filename) if not overwrite and os.path.exists(filepath): contents = aodata.get_hdf_dictionary(result_dir, result_filename) if "emg_data" in contents: print("File {} already preprocessed, doing nothing.".format(result_filename)) return elif os.path.exists(filepath): os.remove(filepath) if 'emg' in files: result_filepath = os.path.join(result_dir, result_filename) emg_data, emg_metadata = aodata.load_emg_data(data_dir, files['emg']) emg_data -= np.mean(emg_data, axis=0, dtype=emg_metadata['dtype']) # remove DC offset result_filepath = os.path.join(result_dir, result_filename) emg_hdf = h5py.File(result_filepath, 'w') dset = emg_hdf.create_dataset('emg_data', data=emg_data) emg_hdf.close() aodata.save_hdf(result_dir, result_filename, emg_metadata, "/emg_metadata", append=True)