Source code for aopy.data.base

# base.py
#
# Code for directly loading and saving data (and results)

import sys
import warnings
import os
import re

from functools import lru_cache
import h5py
import tables
import glob
import pickle as pkl
import numpy as np
import yaml
import pandas as pd

# importlib_resources is a backport of importlib.resources from Python 3.9
if sys.version_info >= (3,9):
    from importlib.resources import files, as_file
else:
    from importlib_resources import files, as_file

from .. import preproc

###############################################################################
# Loading preprocessed data
###############################################################################
[docs]def get_filenames_in_dir(base_dir, te): ''' Gets the filenames for available systems in a given task entry. Requires that files are organized by system in the base directory, and named with their task entry somewhere in their filename or directory name. Args: base_dir (str): directory where the files will be te (int): block number for the task entry Returns: dict: dictionary of files indexed by system ''' warnings.warn("This function is deprecated. Please use the database instead!", DeprecationWarning) contents = glob.glob(os.path.join(base_dir,'*/*')) relevant_contents = filter(lambda f: str(te) in f, contents) files = {} for file in relevant_contents: system = os.path.basename(os.path.dirname(file)) filename = os.path.relpath(file, base_dir) files[system] = filename return files
[docs]def get_preprocessed_filename(subject, te_id, date, data_source): ''' Generates preprocessed filenames as per our naming conventions. Format: preproc_<Date>_<MonkeyName>_<TaskEntry>_<DataSource>.hdf Args: subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording data_source (str): Processed data type (exp, eye, broadband, lfp, etc.) Returns: str: filename ''' return f"preproc_{date}_{subject}_{te_id}_{data_source}.hdf"
[docs]def get_kilosort_foldername(subject, te_id, date, data_source): """ Generates a folder name string to access the Kilosort output. Args: subject (str): The subject name. te_id (int or list of int): The experiment task entry(s) to use. date (str): The experiment date. data_source (str): The data source (e.g., 'Neuropixel') Returns: str: A formatted folder name string for the kilosort output in the format: "{date}_{data_source}_{subject}_te{te_id1}_te{te_id2}...". """ if not isinstance(te_id, list): te_id = [te_id] folder_name = f"{date}_{data_source}_{subject}" for entry in te_id: print(entry) folder_name += f"_te{entry}" return folder_name
def _get_drive_group(preproc_dir, filename, drive_number=None): ''' Helper function to get the drive group for a given drive number. If there is data for multiple drives (drive1, drive2) in the hdf file, drive_number must be specified. If there is only one drive it can be either specified or inferred automatically. Args: preproc_dir (str): base directory where the files live filename (str): name of the hdf file drive_number (int): drive number for multiple recordings. ''' group_names = list_root_groups(preproc_dir, filename) drive_names = [g for g in group_names if g.startswith('drive')] if drive_number is None and len(drive_names) > 1: raise ValueError('Multiple drives detected. Please set drive_number') elif drive_number is None and len(drive_names) == 1: drive_number = int(drive_names[0].replace('drive','')) elif drive_number is None: return '/' return f'/drive{drive_number}'
[docs]def find_preproc_ids_from_day(preproc_dir, subject, date, data_source): ''' Returns the task entry ids that have preprocessed files in the given directory matching the subject, date, and data source given. Args: preproc_dir (str): base directory where the files live subject (str): Subject name date (str): Date of recording data_source (str): Processed data type (exp, eye, broadband, lfp, etc.) Returns list of ids: task entry id for each matching file found in the given folder ''' contents = glob.glob(os.path.join(preproc_dir,subject,f"preproc_{date}_{subject}_*_{data_source}.hdf")) ids = [] for file in contents: try: filename = os.path.basename(file) te_id = int(re.match(f"preproc_{date}_{subject}_(\\d*)_{data_source}.hdf$", filename).group(1)) except AttributeError: return [] ids.append(te_id) return ids
[docs]def load_preproc_exp_data(preproc_dir, subject, te_id, date, verbose=True, cached=True): ''' Loads experiment data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording verbose (bool, optional): check for preprocessing errors and print them (default True) cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: Dictionary of exp data dict: Dictionary of exp metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'exp') preproc_dir = os.path.join(preproc_dir, subject) data = load_hdf_group(preproc_dir, filename, 'exp_data', cached=cached) metadata = load_hdf_group(preproc_dir, filename, 'exp_metadata', cached=cached) # Check for errors if verbose and 'preproc_errors' in metadata and len(metadata['preproc_errors']) > 0: warnings.warn(f"Preprocessing errors found in {filename}:\n{metadata['preproc_errors']}") return data, metadata
[docs]def load_preproc_emg_data(preproc_dir, subject, te_id, date, cached=True): ''' Loads emg data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: Dictionary of exp data dict: Dictionary of exp metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'emg') path = str(os.path.join(preproc_dir, subject)) data = load_hdf_data(path, filename, '/emg_data', cached=cached) metadata = load_hdf_group(path, filename, 'emg_metadata', cached=cached) return data, metadata
[docs]def load_preproc_eye_data(preproc_dir, subject, te_id, date, cached=True): ''' Loads eye data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: Dictionary of eye data dict: Dictionary of eye metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'eye') preproc_dir = os.path.join(preproc_dir, subject) data = load_hdf_group(preproc_dir, filename, 'eye_data', cached=cached) metadata = load_hdf_group(preproc_dir, filename, 'eye_metadata', cached=cached) return data, metadata
[docs]def load_preproc_analog_data(preproc_dir, subject, te_id, date, cached=True): ''' Loads analog data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: analog data dict: dictionary of analog metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'analog') preproc_dir = os.path.join(preproc_dir, subject) data = load_hdf_data(preproc_dir, filename, 'analog_data', cached=cached) metadata = load_hdf_group(preproc_dir, filename, 'analog_metadata', cached=cached) return data, metadata
[docs]def load_preproc_digital_data(preproc_dir, subject, te_id, date, cached=True): ''' Loads digital data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: digital data dict: dictionary of digital metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'digital') preproc_dir = os.path.join(preproc_dir, subject) data = load_hdf_data(preproc_dir, filename, 'digital_data', cached=cached) metadata = load_hdf_group(preproc_dir, filename, 'digital_metadata', cached=cached) return data, metadata
[docs]def load_preproc_broadband_data(preproc_dir, subject, te_id, date, cached=True): ''' Loads broadband data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: broadband data dict: Dictionary of broadband metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'broadband') preproc_dir = os.path.join(preproc_dir, subject) data = load_hdf_data(preproc_dir, filename, 'broadband_data', cached=cached) metadata = load_hdf_group(preproc_dir, filename, 'broadband_metadata', cached=cached) return data, metadata
[docs]def load_preproc_lfp_data(preproc_dir, subject, te_id, date, drive_number=None, cached=True): ''' Loads LFP data from a preprocessed file. When drive_number is None, load lfp_data and lfp_metadata directly. Please specify drive_number when there are drives in hdf files. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording drive_number (int): drive number for multiple recordings. 1-based indexing. cached (bool, optional): whether to allow loading cached version of data (default True) Raises: ValueError: if drives are detected when drive number is None. Returns: ndarray: numpy array of lfp data from hdf dict: Dictionary of lfp metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'lfp') preproc_dir = os.path.join(preproc_dir, subject) data_group = _get_drive_group(preproc_dir, filename, drive_number) data = load_hdf_data(preproc_dir, filename, 'lfp_data', data_group=data_group, cached=cached) metadata = load_hdf_group(preproc_dir, filename, os.path.join(data_group, 'lfp_metadata'), cached=cached) return data, metadata
[docs]def load_preproc_ap_data(preproc_dir, subject, te_id, date, drive_number=None, cached=True): ''' Loads spike band time series from a preprocessed file. When drive_number is None, load lfp_data and lfp_metadata directly. Please specify drive_number when there are drives in hdf files. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording drive_number (int): drive number for multiple recordings. 1-based indexing. cached (bool, optional): whether to allow loading cached version of data (default True) Raises: ValueError: if drives are detected when drive number is None. Returns: ndarray: numpy array of ap data from hdf dict: Dictionary of ap metadata ''' filename = get_preprocessed_filename(subject, te_id, date, 'ap') preproc_dir = os.path.join(preproc_dir, subject) data_group = _get_drive_group(preproc_dir, filename, drive_number) data = load_hdf_data(preproc_dir, filename, 'ap_data', data_group=data_group, cached=cached) metadata = load_hdf_group(preproc_dir, filename, os.path.join(data_group, 'ap_metadata'), cached=cached) return data, metadata
[docs]def load_preproc_spike_data(preproc_dir, subject, te_id, date, drive_number=1, cached=True): ''' Loads spike data from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording drive_number (int): drive number for multiple recordings. 1-based indexing. cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: spike data dict: Dictionary of spike metadata ''' filename = get_preprocessed_filename(subject, te_id, date,'spike') preproc_dir = os.path.join(preproc_dir, subject) spikes =load_hdf_group(preproc_dir, filename, f'drive{drive_number}/spikes', cached=cached) metadata = load_hdf_group(preproc_dir, filename, f'drive{drive_number}/metadata', cached=cached) return spikes, metadata
[docs]def load_spike_waveforms(preproc_dir, subject, te_id, date, drive_number=1, cached=True): ''' Loads spike waveforms from a preprocessed file. Args: preproc_dir (str): base directory where the files live subject (str): Subject name te_id (int): Block number of Task entry object date (str): Date of recording drive_number (int): drive number for multiple recordings. 1-based indexing. cached (bool, optional): whether to allow loading cached version of data (default True) Returns: dict: spike waveforms ''' filename = get_preprocessed_filename(subject, te_id, date, 'spike') preproc_dir = os.path.join(preproc_dir, subject) waveforms = load_hdf_group(preproc_dir, filename, f'drive{drive_number}/waveforms', cached=cached) return waveforms
############################################################################### # Loading / saving data ###############################################################################
[docs]def save_hdf(data_dir, hdf_filename, data_dict, data_group="/", compression=0, append=False, debug=False): ''' Writes data_dict and params into a hdf file in the data_dir folder Args: data_dir (str): destination file directory hdf_filename (str): name of the hdf file to be saved data_dict (dict): the data to be saved as a hdf file data_group (str, optional): where to store the data in the hdf compression(int, optional): gzip compression level. 0 indicate no compression. Compression not added to existing datasets. (default: 0) append (bool, optional): append an existing hdf file or create a new hdf file Returns: None ''' full_file_name = os.path.join(data_dir, hdf_filename) if append: hdf = h5py.File(full_file_name, 'a') elif not os.path.exists(full_file_name): hdf = h5py.File(full_file_name, 'w') else: raise FileExistsError("Will not overwrite existing file!") # Find or make the appropriate group if not data_group in hdf: group = hdf.create_group(data_group) if debug: print("Writing new group: {}".format(data_group)) else: group = hdf[data_group] if debug: print("Adding data to group: {}".format(data_group)) # Write each key, unless it exists and append is False for key in data_dict.keys(): if key in group: if debug: print("Warning: dataset " + key + " already exists in " + data_group + "!") del group[key] data = data_dict[key] if hasattr(data, 'dtype') and data.dtype.char == 'U': data = str(data) elif type(data) is dict: import json key = key + '_json' data = json.dumps(data) try: if compression > 0: group.create_dataset(key, data=data, compression='gzip', compression_opts=compression) else: group.create_dataset(key, data=data) if debug: print("Added " + key) except: if debug: print("Warning: could not add key {} with data {}".format(key, data)) hdf.close() if debug: print("Done!") return
[docs]def get_hdf_dictionary(data_dir, hdf_filename, show_tree=False): ''' Lists the hdf contents in a dictionary. Does not read any data! For example, calling get_hdf_dictionary() with show_tree will result in something like this:: >>> dict = get_hdf_dictionary('/exampledir', 'example.hdf', show_tree=True) example.hdf └──group1 | └──group_data: [shape: (1000,), type: int64] └──test_data: [shape: (1000,), type: int64] >>> print(dict) { 'group1': { 'group_data': ((1000,), dtype('int64')) }, 'test_data': ((1000,), dtype('int64')) } Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file Returns: dict: contents of the file keyed by name as tuples containing: | **shape (tuple):** size of the data | **dtype (np.dtype):** type of the data ''' full_file_name = os.path.join(data_dir, hdf_filename) hdf = h5py.File(full_file_name, 'r') def _is_dataset(hdf): return isinstance(hdf, h5py.Dataset) def _get_hdf_contents(hdf, str_prefix=""): # If we're at a dataset print it out if _is_dataset(hdf): name = os.path.split(hdf.name)[1] if show_tree: print(f'{str_prefix}{name}: [shape: {hdf.shape}, type: {hdf.dtype}]') return (hdf.shape, hdf.dtype) # Otherwise recurse if we're in a group else: contents = dict() for name, group in hdf.items(): if show_tree and not _is_dataset(group): print(str_prefix+"└──" + name) contents[name] = _get_hdf_contents(group, str_prefix.replace("└──", "| ")+"└──") return contents if show_tree: print(hdf_filename) return _get_hdf_contents(hdf)
[docs]def list_root_groups(data_dir, hdf_filename): ''' List the name of groups directly under the root in HDF5 files. Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file Returns: list: Name of groups ''' filepath = os.path.join(data_dir, hdf_filename) groups = [] with h5py.File(filepath, 'r') as f: for name in f.keys(): if isinstance(f[name], h5py.Group): groups.append(name) return groups
def _load_hdf_dataset(dataset, name): ''' Internal function for loading hdf datasets. Decodes json and unicode data automatically. Args: dataset (hdf object): dataset to load name (str): name of the dataset Returns: tuple: Tuple containing: | **name (str):** name of the dataset (might be modified) | **data (object):** loaded data ''' data = dataset[()] if '_json' in name: import json name = name.replace('_json', '') # Handle bytes vs string for JSON decoding if isinstance(data, bytes): data = data.decode('utf-8') data = json.loads(data) # Handle bytes objects for Python 3 compatibility elif isinstance(data, bytes): try: data = data.decode('utf-8') except UnicodeDecodeError: # If UTF-8 decoding fails, try with other common encodings or keep as bytes pass return name, data
[docs]def load_hdf_data(data_dir, hdf_filename, data_name, data_group="/", cached=False): ''' Simple wrapper to get the data from an hdf file as a numpy array Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file data_name (str): table to load data_group (str, optional): from which group to load data cached (bool, optional): whether to allow loading cached data or not Returns: ndarray: numpy array of data from hdf ''' if not cached: _load_hdf_data_cached.cache_clear() return _load_hdf_data_cached(data_dir, hdf_filename, data_name, data_group=data_group)
@lru_cache(maxsize=2) def _load_hdf_data_cached(data_dir, hdf_filename, data_name, data_group="/"): ''' Cached version of load_hdf_data Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file data_name (str): table to load data_group (str): from which group to load data Returns: ndarray: numpy array of data from hdf ''' full_file_name = os.path.join(data_dir, hdf_filename) hdf = h5py.File(full_file_name, 'r') full_data_name = os.path.join(data_group, data_name).replace("\\", "/") if full_data_name not in hdf: raise ValueError('{} not found in file {}'.format(full_data_name, hdf_filename)) _, data = _load_hdf_dataset(hdf[full_data_name], data_name) hdf.close() return np.array(data)
[docs]def load_hdf_group(data_dir, hdf_filename, group="/", cached=False): ''' Loads any datasets from the given hdf group into a dictionary. Also will recursively load other groups if any exist under the given group Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file group (str, optional): name of the group to load cached (bool, optional): whether to allow loading cached data or not Returns: dict: all the datasets contained in the given group ''' if not cached: _load_hdf_group_cached.cache_clear() return _load_hdf_group_cached(data_dir, hdf_filename, group=group)
@lru_cache(maxsize=2) def _load_hdf_group_cached(data_dir, hdf_filename, group="/"): ''' Loads any datasets from the given hdf group into a dictionary. Also will recursively load other groups if any exist under the given group Args: data_dir (str): folder where data is located hdf_filename (str): name of hdf file group (str): name of the group to load Returns: dict: all the datasets contained in the given group ''' full_file_name = os.path.join(data_dir, hdf_filename) hdf = h5py.File(full_file_name, 'r') if group not in hdf: raise ValueError('No such group in file {}'.format(hdf_filename)) # Recursively load groups until datasets are reached def _load_hdf_group(hdf): keys = hdf.keys() data = dict() for k in keys: if isinstance(hdf[k], h5py.Group): data[k] = _load_hdf_group(hdf[k]) else: k_, v = _load_hdf_dataset(hdf[k], k) data[k_] = v return data data = _load_hdf_group(hdf[group]) hdf.close() return data
[docs]def load_hdf_ts_trial(preproc_dir, filename, data_group, data_name, samplerate, trigger_time, time_before, time_after, channels=None): ''' Load a segment of HDF timeseries data given start and end times and a sampling rate. Args: preproc_dir (str): base directory where the files live filename (str): filename of the hdf file where the data resides data_group (str): hdf group of the desired dataset data_name (str): hdf name of the desired dataset samplerate (float): the sampling rate of the data in Hz trigger_time (float): time (in seconds) in the recording at which the desired segment starts time_before (float): time (in seconds) to include before the trigger times time_after (float): time (in seconds) to include after the trigger times channels (list, optional): list of channels to include in the segment (default all channels Raises: ValueError: if the dataset cannot be found in the file Returns: tuple: tuple containing: | **segment (nt, nch):** data segment from the given preprocessed file | **samplerate (float):** sampling rate of the returned data ''' filepath = os.path.join(preproc_dir, filename) hdf = h5py.File(filepath, 'r') # Check that the data group exists dataname = os.path.join(data_group, data_name).replace("\\", "/") if dataname not in hdf: raise ValueError('{} not found in file {}'.format(dataname, filepath)) # Get the ts segment ts_data = hdf[dataname] segment = preproc.get_trial_data(ts_data, trigger_time, time_before, time_after, samplerate, channels=channels) hdf.close() return segment
[docs]def load_hdf_ts_segment(preproc_dir, filename, data_group, data_name, samplerate, start_time, end_time, channels=None): ''' Load a segment of HDF timeseries data given a start and end time and a sampling rate. Args: preproc_dir (str): base directory where the files live filename (str): filename of the hdf file where the data resides data_group (str): hdf group of the desired dataset data_name (str): hdf name of the desired dataset samplerate (float): the sampling rate of the data in Hz start_time (float): time (in seconds) in the recording at which the desired segment starts end_time (float): time (in seconds) in the recording at which the desired segment ends channels (list, optional): list of channels to include in the segment (default all channels) Raises: ValueError: if the dataset cannot be found in the file Returns: tuple: tuple containing: | **segment (nt, nch):** data segment from the given preprocessed file | **samplerate (float):** sampling rate of the returned data ''' filepath = os.path.join(preproc_dir, filename) hdf = h5py.File(filepath, 'r') # Check that the data group exists dataname = os.path.join(data_group, data_name).replace("\\", "/") if dataname not in hdf: raise ValueError('{} not found in file {}'.format(dataname, filepath)) # Get the ts segment ts_data = hdf[dataname] segment = preproc.get_data_segment(ts_data, start_time, end_time, samplerate, channels=channels) hdf.close() return segment
# Set up a cache mapping filenames to pandas dataframes so we don't have to load the # dataframe every time someone calls the lookup functions _cached_dataframes = {}
[docs]def is_table_in_hdf(table_name:str, hdf_filename:str): """ Checks if a table exists in an hdf file' first level directory(i.e. non-recursively) Args: table_name(str): table name to be checked hdf_filename(str): full path to the hdf file Returns: Boolean """ with tables.open_file(hdf_filename, mode = 'r') as f: return table_name in f.root
[docs]def lookup_excel_value(data_dir, excel_file, from_column, to_column, lookup_value): ''' Finds a matching value for the given key in an excel file. Used for looking up electrode and acquisition channels for signal path files, but can also be useful as a lookup table for other numeric mappings. Args: data_dir (str): where the signal path file is located signal_path_file (str): signal path definition file from_column (str, optional): the name of the electrode column to_column (str, optional): the name of the acquisition column lookup_value (int): match this value in the from_column Returns: int: the corresponding value in the lookup table, or 0 if none is found ''' fullfile = os.path.join(data_dir, excel_file) if fullfile in _cached_dataframes: dataframe = _cached_dataframes[fullfile] else: dataframe = pd.read_excel(fullfile) _cached_dataframes[fullfile] = dataframe row = dataframe.loc[dataframe[from_column] == lookup_value] if len(row) > 0: return row[to_column].to_numpy()[0] else: return 0
[docs]def lookup_acq2elec(data_dir, signal_path_file, acq, zero_index=True): ''' Looks up the electrode number for a given acquisition channel using an excel map file (from Dr. Map) Args: data_dir (str): where the signal path file is located signal_path_file (str): signal path definition file acq (int): which channel to look up zero_index (bool, optional): use 0-indexing for acq and elec (default True) Returns: int: matching electrode number. If no matching electrode is found, returns -1 (or 0 with zero_index=False) ''' value = lookup_excel_value(data_dir, signal_path_file, 'acq', 'electrode', acq + 1*zero_index) return value - 1*zero_index
[docs]def lookup_elec2acq(data_dir, signal_path_file, elec, zero_index=True): ''' Looks up the acquisition channel for a given electrode number using an excel map file (from Dr. Map) Args: data_dir (str): where the signal path file is located signal_path_file (str): signal path definition file elec (int): which electrode to look up zero_index (bool, optional): use 0-indexing for acq and elec (default True) Returns: int: matching acquisition channel. If no matching channel is found, returns -1 (or 0 with zero_index=False) ''' value = lookup_excel_value(data_dir, signal_path_file, 'electrode', 'acq', elec + 1*zero_index) return value - 1*zero_index
[docs]def load_electrode_pos(data_dir, pos_file): ''' Reads an electrode position map file and returns the x and y positions. The file should have the columns 'topdown_x' and 'topdown_y'. Args: data_dir (str): where to find the file pos_file (str): the excel file Returns: tuple: Tuple containing: | **x_pos (nch):** x position of each electrode | **y_pos (nch):** y position of each electrode ''' fullfile = os.path.join(data_dir, pos_file) electrode_pos = pd.read_excel(fullfile) x_pos = electrode_pos['topdown_x'].to_numpy() y_pos = electrode_pos['topdown_y'].to_numpy() return x_pos, y_pos
[docs]def map_acq2elec(signalpath_table, acq_ch_subset=None): ''' Create index mapping from acquisition channel to electrode number. Excel files can be loaded as a pandas dataframe using pd.read_excel Args: signalpath_table (pd dataframe): Signal path information in a pandas dataframe. (Mapping between electrode and acquisition ch) acq_ch_subset (nacq): Subset of acquisition channels to call. If not called, all acquisition channels and connected electrodes will be return. If a requested acquisition channel isn't returned a warned will be displayed Returns: tuple: Tuple containing: | **acq_chs (nelec):** Acquisition channels that map to electrodes (e.g. 240/256 for viventi ECoG array) | **connected_elecs (nelec):** Electrodes used (e.g. 240/244 for viventi ECoG array) ''' # Parse acquisition channels used and the connected electrodes connected_elecs_mask = np.logical_not(np.isnan(signalpath_table['acq'])) connected_elecs = signalpath_table['electrode'][connected_elecs_mask].to_numpy() acq_chs = signalpath_table['acq'][connected_elecs_mask].to_numpy(dtype=int) if acq_ch_subset is not None: acq_chs_mask = np.where(np.in1d(acq_chs, acq_ch_subset))[0] acq_chs = acq_chs[acq_chs_mask] connected_elecs = connected_elecs[acq_chs_mask] if len(acq_chs) < len(acq_ch_subset): missing_acq_chs = acq_ch_subset[np.in1d(acq_ch_subset,acq_chs, invert=True)] warning_str = "Requested acquisition channels " + str(missing_acq_chs) + " are not connected" warnings.warn(warning_str) return acq_chs, connected_elecs
[docs]def map_elec2acq(signalpath_table, elecs): ''' This function finds the acquisition channels that correspond to the input electrode numbers given the signal path table input. This function works by calling aopy.data.map_acq2elec and subsampling the output. If a requested electrode isn't connected to an acquisition channel a warning will be displayed alerting the user and the corresponding index in the output array will be a np.nan value. Args: signalpath_table (pd dataframe): Signal path information in a pandas dataframe. (Mapping between electrode and acquisition ch) elecs (nelec): Electrodes to find the acquisition channels for Returns: acq_chs: Acquisition channels that map to electrodes (e.g. nelec/256 for viventi ECoG array) ''' acq_chs, connected_elecs = map_acq2elec(signalpath_table) elec_idx = np.in1d(connected_elecs, elecs) # Find elements in 'connected_elecs' that are also in 'elecs' # If the output acq_chs are not the same length as the input electodes, 1+ electrodes weren't connected if np.sum(elec_idx) < len(elecs): output_acq_chs = np.zeros(len(elecs)) output_acq_chs[:] = np.nan missing_elecs = [] for ielec, elecid in enumerate(elecs): matched_idx = np.where(connected_elecs == elecid)[0] if len(matched_idx) == 0: missing_elecs.append(elecid) else: output_acq_chs[ielec] = acq_chs[matched_idx] warning_str = 'Electrodes ' + str(missing_elecs) + ' are not connected.' print(warning_str) return output_acq_chs else: return acq_chs[elec_idx]
[docs]def map_acq2pos(signalpath_table, eleclayout_table, acq_ch_subset=None, theta=0, rotation_offset=(0,0), xpos_name='topdown_x', ypos_name='topdown_y'): ''' Create index mapping from acquisition channel to electrode position by calling aopy.data.map_acq2elec Excel files can be loaded as a pandas dataframe using pd.read_excel Args: signalpath_table (pd dataframe): Signal path information in a pandas dataframe. (Mapping between electrode and acquisition ch) eleclayout_table (pd dataframe): Electrode position information in a pandas dataframe. (Mapping between electrode and position on array) acq_ch_subset (nacq): Subset of acquisition channels to call. If not called, all acquisition channels and connected electrodes will be return. If a requested acquisition channel isn't returned a warned will be displayed theta (float): rotation (in degrees) to apply to positions. rotations are applied clockwise, e.g., theta = 90 rotates the map clockwise by 90 degrees, -90 rotates the map anti-clockwise by 90 degrees. Default 0. rotation_offset (tuple): X and Y coordinates of the rotation center. Defaults to (0,0) xpos_name (str): Column name for the electrode 'x' position. Defaults to 'topdown_x' used with the viventi ECoG array ypos_name (str): Column name for the electrode 'y' position. Defaults to 'topdown_y' used with the viventi ECoG array Returns: tuple: Tuple Containing: | **acq_ch_position (nelec, 2):** X and Y coordinates of the electrode each acquisition channel gets data from. X position is in the first column and Y position is in the second column | **acq_chs (nelec):** Acquisition channels that map to electrodes (e.g. 240/256 for viventi ECoG array) | **connected_elecs (nelec):** Electrodes used (e.g. 240/244 for viventi ECoG array) ''' # Get index mapping from acquisition channel to electrode number acq_chs, connected_elecs = map_acq2elec(signalpath_table, acq_ch_subset=acq_ch_subset) nelec = len(connected_elecs) # Map connected electrodes to their position acq_ch_position = np.empty((nelec, 2)) for ielec, elecid in enumerate(connected_elecs): row = eleclayout_table.loc[eleclayout_table['electrode'] == elecid].iloc[0] acq_ch_position[ielec, 0] = float(row[xpos_name]) acq_ch_position[ielec, 1] = float(row[ypos_name]) if theta != 0: theta = np.deg2rad(theta) rot_mat = np.array([[np.cos(theta),-np.sin(theta)],[np.sin(theta),np.cos(theta)]]) acq_ch_position = ((acq_ch_position - rotation_offset) @ rot_mat) + rotation_offset return acq_ch_position, acq_chs, connected_elecs
[docs]def map_data2elec(datain, signalpath_table, acq_ch_subset=None, zero_indexing=False): ''' Map data from its acquisition channel to the electrodes recorded from. Wrapper for aopy.data.map_acq2elec Excel files can be loaded as a pandas dataframe using pd.read_excel Args: datain (nt, nacqch): Data recoded from an array. signalpath_table (pd dataframe): Signal path information in a pandas dataframe. (Mapping between electrode and acquisition ch) acq_ch_subset (nacq): Subset of acquisition channels to call. If not called, all acquisition channels and connected electrodes will be return. If a requested acquisition channel isn't returned a warned will be displayed zero_indexing (bool): Set true if acquisition channel numbers start with 0. Defaults to False. Returns: tuple: Tuple containing: | **dataout (nt, nelec):** Data from the connected electrodes | **acq_chs (nelec):** Acquisition channels that map to electrodes (e.g. 240/256 for viventi ECoG array) | **connected_elecs (nelec):** Electrodes used (e.g. 240/244 for viventi ECoG array) ''' acq_chs, connected_elecs = map_acq2elec(signalpath_table, acq_ch_subset=acq_ch_subset) if zero_indexing: dataout = datain[:,acq_chs] else: dataout = datain[:,acq_chs-1] return dataout, acq_chs, connected_elecs
[docs]def map_data2elecandpos(datain, signalpath_table, eleclayout_table, acq_ch_subset=None, theta=0, rotation_offset=(0,0), xpos_name='topdown_x', ypos_name='topdown_y', zero_indexing=False): ''' Map data from its acquisition channel to the electrodes recorded from and their position. Wrapper for aopy.data.map_acq2pos Excel files can be loaded as a pandas dataframe using pd.read_excel Args: datain (nt, nacqch): Data recoded from an array. signalpath_table (pd dataframe): Signal path information in a pandas dataframe. (Mapping between electrode and acquisition ch) eleclayout_table (pd dataframe): Electrode position information in a pandas dataframe. (Mapping between electrode and position on array) acq_ch_subset (nacq): Subset of acquisition channels to call. If not called, all acquisition channels and connected electrodes will be return. If a requested acquisition channel isn't returned a warned will be displayed theta (float): rotation (in degrees) to apply to positions. rotations are applied clockwise, e.g., theta = 90 rotates the map clockwise by 90 degrees, -90 rotates the map anti-clockwise by 90 degrees. Default 0. rotation_offset (tuple): X and Y coordinates of the rotation center. Defaults to (0,0) xpos_name (str): Column name for the electrode 'x' position. Defaults to 'topdown_x' used with the viventi ECoG array ypos_name (str): Column name for the electrode 'y' position. Defaults to 'topdown_y' used with the viventi ECoG array zero_indexing (bool): Set true if acquisition channel numbers start with 0. Defaults to False. Returns: tuple: Tuple containing: | **dataout (nt, nelec):** Data from the connected electrodes | **acq_ch_position (nelec, 2):** X and Y coordinates of the electrode each acquisition channel gets data from. X position is in the first column and Y position is in the second column | **acq_chs (nelec):** Acquisition channels that map to electrodes (e.g. 240/256 for viventi ECoG array) | **connected_elecs (nelec):** Electrodes used (e.g. 240/244 for viventi ECoG array) ''' acq_ch_position, acq_chs, connected_elecs = map_acq2pos(signalpath_table, eleclayout_table, acq_ch_subset=acq_ch_subset, theta=theta, rotation_offset=rotation_offset, xpos_name=xpos_name, ypos_name=ypos_name) if zero_indexing: dataout = datain[:,acq_chs] else: dataout = datain[:,acq_chs-1] return dataout, acq_ch_position, acq_chs, connected_elecs
[docs]def load_chmap(drive_type='ECoG244', acq_ch_subset=None, theta=0, center=(0,0), **kwargs): ''' Load the centered mapping between acquisition channels and electrode position for supported drives. Currently supports 'ECoG244', 'Opto32', 'NP_Insert72', and 'NP_Insert137' drives. Args: drive_type (str, optional): Drive type of the method used to record neural activity. - 'ECoG244': Viventi 244 channel ECoG array - 'Opto32': Orsborn 32 channel fiber optic array - 'NP_Insert72': Orsborn 72 site Neuropixel grid - 'NP_Insert137': Orsborn 137 site Neuropixel grid acq_ch_subset (nacq, optional): Subset of acquisition channels to call. If not called, all acquisition channels and connected electrodes will be returned. theta (float): rotation (in degrees) to apply to positions. rotations are applied clockwise, e.g., theta = 90 rotates the map clockwise by 90 degrees, -90 rotates the map anti-clockwise by 90 degrees. Default 0. center (2-tuple): chamber coordinates of the center of the drive in mm. This function translates the coordinates of the drive to be centered on this value. Defaults to (0,0). kwargs (dict): Additional keyword arguments to pass to :func:`~aopy.data.map_acq2pos` Returns: tuple: Tuple Containing: | **acq_ch_position (nelec, 2):** X and Y coordinates (in mm) of the electrodes corresponding to each acquisition channel. X position is in the first column and Y position is in the second column | **acq_chs (nelec):** Acquisition channels that map to electrodes (e.g. 240/256 for viventi ECoG array) | **connected_elecs (nelec):** Electrodes used (e.g. 240/244 for viventi ECoG array) Examples: .. code-block:: python plot_ECoG244_data_map(np.zeros(256,), cmap='Greys') annotate_spatial_map_channels(drive_type='ECoG244', color='k') annotate_spatial_map_channels(drive_type='Opto32', color='b') annotate_spatial_map_channels(drive_type='ECoG244', color='r', theta=90) annotate_spatial_map_channels(drive_type='Opto32', color='g', theta=90) .. image:: _images/ecog244_opto32_theta90.png .. code-block:: python plt.figure() plot_spatial_drive_map(np.zeros(64,), drive_type='EMG_GR08MM1305', cmap='Greys', theta=0) annotate_spatial_map_channels(drive_type='EMG_GR08MM1305', color='k', theta=0) .. image:: _images/emg64_gr08mm1305.png ''' config_files = files('aopy').joinpath('config') rotation_offset = (0,0) if drive_type == 'ECoG244': signal_path_filepath = as_file(config_files.joinpath('210910_ecog_signal_path.xlsx')) elec_to_pos_filepath = as_file(config_files.joinpath('244ch_viventi_ecog_elec_to_pos.xlsx')) rotation_offset = (5.625, 5.625) elif drive_type == 'Opto32': signal_path_filepath = as_file(config_files.joinpath('221021_opto_signal_path.xlsx')) elec_to_pos_filepath = as_file(config_files.joinpath('32ch_fiber_optic_assy_elec_to_pos.xlsx')) rotation_offset = (5.625, 5.625) elif drive_type == 'NP_Insert72' or drive_type == 'NP_Insert137': signal_path_filepath = as_file(config_files.joinpath(f'neuropixel_insert_ch_mapping/{drive_type}_signal_path.xlsx')) elec_to_pos_filepath = as_file(config_files.joinpath(f'neuropixel_insert_ch_mapping/{drive_type}_ch_mapping.xlsx')) elif drive_type == 'EMG_GR08MM1305': signal_path_filepath = as_file(config_files.joinpath('251202_64ch_emg_signal_path.xlsx')) elec_to_pos_filepath = as_file(config_files.joinpath('64ch_emg_gr08mm1305_elec_to_pos.xlsx')) rotation_offset = (16, 48) else: raise ValueError('Drive type not supported') with signal_path_filepath as f: signal_path = pd.read_excel(f) with elec_to_pos_filepath as f: layout = pd.read_excel(f) if acq_ch_subset is not None: acq_ch_subset = np.array(acq_ch_subset, dtype='int') elec_pos, acq_ch, elecs = map_acq2pos(signal_path, layout, acq_ch_subset=acq_ch_subset, theta=theta, rotation_offset=rotation_offset, **kwargs) elec_pos = elec_pos - rotation_offset + center # re-center the coordinates return elec_pos, acq_ch, elecs
[docs]def align_neuropixel_recoring_drive(neuropixel_drive, drive2, subject, theta=0, center=(0,0)): ''' This function aligns one drive to another drive type. In the current iteration, this function only supports aligning neuropixels drives ('NP_Insert72'/'NP_Insert137'') to each other or to 'ECoG244'/'Opto32' drives. This function assumes a fixed mapping between subject and alignment is not currently compatible with selecting subsets of channels. The mapping between subject and alignment is defined in aopy/config/neuropixel_insert_ch_mapping/NP_insert_angle_alignment.xlsx. The following images depict the alignment between neuropixels insert grid hole locations and ECoG channel location for two subjects. .. image:: _images/NP_Insert137_ECoG244_alignment.png .. image:: _images/NP_Insert72_ECoG244_alignment.png Args: neuropixel_drive (str): Neuropixel drive to align. Currently supports 'NP_Insert72', and 'NP_Insert137' drive2 (str): Other drive to align. Currently supports 'ECoG244', 'Opto32', 'NP_Insert72', and 'NP_Insert137' subject (str): Subject recordings were performed on. Currently supports 'Affi' and 'Beignet' theta (float): rotation (in degrees) to apply to positions. Rotations are applied clockwise. Default 0. center (2-tuple): chamber coordinates of the center of the drive in mm. Defaults to (0,0). Returns: tuple: Tuple Containing: | **aligned_np_drive_coordinates (nelec, 2):** X and Y coordinates of each neuropixel insert recording site relative to drive2 | **aligned_drive2_coordinates (nelec, 2):** X and Y coordinates of each drive2 recording site | **recording_sites (nelec):** Neuropixel insert recording site numbers | **acq_ch (nelec):** Acquisition channels (0-indexed) for each drive2 recording site ''' NP_drive_list = ['NP_Insert72', 'NP_Insert137'] # Drives this function is compatible with drive2_list = ['NP_Insert72', 'NP_Insert137', 'ECoG244', 'Opto32'] subjects = ['affi', 'beignet'] if neuropixel_drive not in NP_drive_list and drive2 not in drive2_list: raise ValueError('Neuropixel drive type and drive2 type not supported') elif neuropixel_drive not in NP_drive_list: raise ValueError('Neuropixel drive type not supported') elif neuropixel_drive not in drive2_list: raise ValueError('drive2 type not supported') if subject not in subjects: raise ValueError('Subject not supported') # If drive2 is not a neuropixel drive, load alignment angle config_files = files('aopy').joinpath('config') if drive2 not in NP_drive_list: angle_lookup_filepath = as_file(config_files.joinpath(f'neuropixel_insert_ch_mapping/NP_Insert_angle_alignment.xlsx')) with angle_lookup_filepath as f: angle_df = pd.read_excel(f) relative_angle = np.rad2deg(angle_df.loc[angle_df['subject'] == subject, drive2].values[0]) else: relative_angle = 0 # Load ch map for neuropixel drive and drive 2 np_drive_ch_pos, recording_sites , _ = load_chmap(drive_type=neuropixel_drive, theta=theta - relative_angle, center=center) drive2_ch_pos, acq_ch , _ = load_chmap(drive_type=drive2, theta=theta, center=center) return np_drive_ch_pos, drive2_ch_pos, recording_sites, acq_ch
[docs]def parse_str_list(strings, str_include=None, str_avoid=None): ''' This function parses a list of strings to return the strings that include/avoid specific substrings It was designed to parse dictionary keys Args: strings (list of strings): List of strings str_include (list of strings): List of substrings that must be included in a string to keep it str_avoid (list of strings): List of substrings that can not be included in a string to keep it Returns: (list of strings): List of strings fitting the input conditions Example:: >>> str_list = ['sig001i_wf', 'sig001i_wf_ts', 'sig002a_wf', 'sig002a_wf_ts', 'sig002b_wf', 'sig002b_wf_ts', 'sig002i_wf', 'sig002i_wf_ts'] >>> parsed_strings = parse_str_list(str_list, str_include=['sig002', 'wf'], str_avoid=['b_wf', 'i_wf']) >>> print(parsed_strings) ['sig002a_wf', 'sig002a_wf_ts'] ''' parsed_str = [] for str_idx, str_val in enumerate(strings): counter = 0 nconditions = 0 if str_include is not None: for istr_incl, istr_incl_val in enumerate(str_include): nconditions += 1 if istr_incl_val in strings[str_idx]: counter += 1 if str_avoid is not None: for istr_avd, istr_avd_val in enumerate(str_avoid): nconditions += 1 if istr_avd_val not in strings[str_idx]: counter += 1 if counter == nconditions: parsed_str.append(strings[str_idx]) return parsed_str
[docs]def load_matlab_cell_strings(data_dir, hdf_filename, object_name): ''' This function extracts strings from an object within .mat file that was saved from matlab in version -7.3 (-v7.3). example:: >>> testfile = 'matlab_cell_str.mat' >>> strings = load_matlab_cell_strings(data_dir, testfile, 'bmiSessions') >>> print(strings) ['jeev070412j', 'jeev070512g', 'jeev070612d', 'jeev070712e', 'jeev070812d'] Args: data_dir (str): where the matlab file is located hdf_filename (str): .mat filename object_name (str): Name of object to load. This is typically the variable name saved from matlab Returns: (list of strings): List of strings in the hdf file object ''' full_file_name = os.path.join(data_dir, hdf_filename) strings = [] with h5py.File(full_file_name, 'r') as f: objects = f[object_name] if objects.shape[0] == 1: for iobject in objects[0]: string_unicode = f[iobject] temp_string = ''.join(chr(i) for i in string_unicode[:].flatten()) strings.append(temp_string) else: for iobject in objects: string_unicode = f[iobject[0]] temp_string = ''.join(chr(i) for i in string_unicode[:].flatten()) strings.append(temp_string) return strings
[docs]def pkl_write(file_to_write, values_to_dump, write_dir): ''' Write data into a pickle file. Note: H5D5 (HDF) files can not be pickled. Refer :func:`aopy.data.save_hdf` for saving HDF data Args: file_to_write (str): filename with '.pkl' extension values_to_dump (any): values to write in a pickle file write_dir (str): Path - where do you want to write this file Returns: None examples: pkl_write('meta.pkl', data, '/data_dir') ''' file = os.path.join(write_dir, file_to_write) with open(file, 'wb') as pickle_file: pkl.dump(values_to_dump, pickle_file)
[docs]def pkl_read(file_to_read, read_dir): ''' Reads data stored in a pickle file. Args: file_to_read (str): filename with '.pkl' extension read_dir (str): Path to folder where the file is stored Returns: data in a format as it is stored ''' file = os.path.join(read_dir, file_to_read) with open(file, "rb") as f: this_dat = pkl.load(f) return this_dat
# - - -- --- ----- -------- ------------- -------- ----- --- -- - - # # - - -- --- ----- -------- ------------- -------- ----- --- -- - - #
[docs]def yaml_write(filename, data): ''' YAML stands for Yet Another Markup Language. It can be used to save Params or configuration files. Args: filename(str): Filename including the full path data (dict) : Params data to be dumped into a yaml file Returns: None Example: >>>params = [{ 'CENTER_TARGET_ON': 16 , 'CURSOR_ENTER_CENTER_TARGET' : 80 , 'REWARD' : 48 , 'DELAY_PENALTY' : 66 }] >>>params_file = '/test_data/task_codes.yaml' >>>yaml_write(params_file, params) ''' with open(filename, 'w') as file: documents = yaml.dump(data, file)
[docs]def yaml_read(filename): ''' The FullLoader parameter handles the conversion from YAML scalar values to Python the dictionary format Args: filename(str): Filename including the full path Returns: data (dict) : Params data dumped into a yaml file Example: >>>params_file = '/test_data/task_codes.yaml' >>>task_codes = yaml_read(params_file, params) ''' with open(filename) as file: task_codes = yaml.load(file, Loader=yaml.FullLoader) return task_codes
[docs]def load_yaml_config(filename): ''' Load a yaml configuration file into a dictionary Args: config_file (str): path to the yaml configuration file Returns: dict: dictionary containing the configuration parameters ''' config_dir = files('aopy').joinpath('config') params_file = as_file(config_dir.joinpath(filename)) with params_file as f: config = yaml_read(f) return config