Source code for aopy.data.peslab

import os
import warnings
import pickle as pkl
import re
import json

import numpy as np
from pandas import DataFrame
import xarray as xr

from ..preproc.quality import high_freq_data_detection, saturated_data_detection

[docs]def parse_file_info(file_path): """parse_file_info Parses file strings for goose_wireless ECoG and LFP signal data into data parameters. Args: file_path (str): path to the file's location Returns: exp_file_name (str): JSON experiment data file path mask_file_name (str): binary data mask file path microdrive_name (str): string name of the microdrive type used to collect data in file_path rec_type (str): recording modality reflected in this file ('ECOG', 'LFP', etc.) """ file_name = os.path.basename(file_path) data_file_noext = os.path.splitext(file_name)[0] data_file_parts = data_file_noext.split('.') if len(data_file_parts) == 3: rec_id, microdrive_name, rec_type = data_file_parts else: rec_id, microdrive_name, _, rec_type = data_file_parts data_dir = os.path.dirname(file_path) exp_file_name = os.path.join(data_dir,rec_id + ".experiment.json") mask_file_name = os.path.join(data_dir,data_file_noext + ".mask.pkl") return exp_file_name, mask_file_name, microdrive_name, rec_type
[docs]def load_experiment_data(exp_file_name): """load_experiment_data Reads experiment metadata from an experiment JSON file. Returns the complete data structure as a dictionary and returns electrode data as a pandas DataFrame. Args: exp_file_name (str): JSON experiment data file path Returns: experiment (dict): dict data object containing experiment metadata. See lab documentation for more information. electrode_df (DataFrame): pandas DataFrame containing microdrive electrode information. Individual channels are indexed along columns, column names are electrode IDs. """ assert os.path.exists(exp_file_name), f'inferred experiment file not found at {exp_file_name}' with open(exp_file_name,'r') as f: experiment = json.load(f) electrode_df = DataFrame(experiment['hardware']['microdrive'][0]['electrodes']) electrode_df = DataFrame.join(electrode_df,DataFrame(list(electrode_df.position))) del electrode_df['position'] return experiment, electrode_df
[docs]def load_mask_data(mask_file_name): """load_mask_data Loads binary mask data from recording mask files. Binary True values indicate "bad" or noisy data not used in analyses. Args: mask_file_name (str): file path to binary mask file Returns: mask (numpy.array): numpy array of binary values. Length is equal to the number of time points in the respective data array. """ assert os.path.exists(mask_file_name), f'inferred mask file not found at {mask_file_name}' with open(mask_file_name,'rb') as f: return pkl.load(f)
# def read_lfp(file_path,t_range=(0,-1)): # """read_lfp # reads data from a structured binary *lfp file in the goose wireless dataset. # Args: # file_path (str): file path to data file # t_range (listlike, optional): Start and stop times to read data. (0, -1) reads the entire file. Defaults to (0,-1). # Returns: # da (numpy.array): numpy array of multichannel recorded neural activity saved in file_path # mask (numpy.array): numpy array of binary mask values # """ # # get local experiment, mask files # exp_file_name, mask_file_name, microdrive_name, rec_type = parse_file_info(file_path) # # load experiment data # experiment, electrode_df = load_experiment_data(exp_file_name) # # load mask data # mask = load_mask_data(mask_file_name) # # get parameters: srate # dsmatch = re.search(r'clfp_ds(\d+)',rec_type) # if rec_type == 'raw': # srate = experiment['hardware']['acquisition']['samplingrate'] # data_type = np.ushort # reshape_order = 'F' # elif rec_type == 'lfp': # srate = 1000 # data_type = np.float32 # reshape_order = 'F' # elif rec_type == 'clfp': # srate = 1000 # data_type = np.float32 # reshape_order = 'F' # elif dsmatch: # # downsampled data - get srate from name # srate = int(dsmatch.group(1)) # data_type = np.float32 # reshape_order = 'C' # files created with np.tofile which forces C ordering. # # get microdrive parameters # microdrive_name_list = [md['name'] for md in experiment['hardware']['microdrive']] # microdrive_idx = [md_idx for md_idx, md in enumerate(microdrive_name_list) if microdrive_name == md][0] # microdrive_dict = experiment['hardware']['microdrive'][microdrive_idx] # num_ch = len(microdrive_dict['electrodes']) # # get file size information # data_type_size = data_type().nbytes # file_size = os.path.getsize(file_path) # n_offset_samples = np.round(t_range[0]*srate) # n_offset_bytes = n_offset_samples*data_type_size # n_all = int(np.floor(file_size/num_ch/data_type_size)) # n_stop = n_all if t_range[1] == -1 else np.min((np.round(t_range[1]*srate),n_all)) # n_read = n_stop-n_offset_samples # # read signal data # data = read_from_file( # file_path, # data_type, # num_ch, # n_read, # n_offset_bytes, # reshape_order=reshape_order # ) # # create xarray from data and channel information # da = xr.DataArray( # data.T, # dime = ('sample','ch'), # coords = { # 'ch': electrode_df.label, # 'x_pos': ('ch', electrode_df.x), # 'y_pos': ('ch', electrode_df.y), # 'row': ('ch', electrode_df.row), # 'col': ('ch', electrode_df.col), # }, # attrs = {'srate': srate} # ) # return da, mask # wrapper to read and handle clfp ECOG data
[docs]def load_ecog_clfp_data(data_file_name,t_range=(0,-1),exp_file_name=None,mask_file_name=None,compute_mask=True): """load_ecog_clfp_data Load ECoG data file from a goose wireless dataset file. Args: data_file_name (str): file path to data file t_range (listlike, optional): Start and stop times to read data. (0, -1) reads the entire file. Defaults to (0,-1). exp_file_name (str, optional): File path to experiment data JSON file. mask_file_name (str, optional): File path to data quality mask file. Defaults to None. compute_mask (bool, optional): Compute a data quality mask array if no mask file is given or found. Defaults to True. Raises: NameError: If experiment file cannot be found, NameError is raised. NameError: If mask file cannot be found, NameError is raised. Returns: data (nt x nch): numpy array of multichannel ECoG data mask (numpy.array): binary mask indicating bad data samples exp (dict): dictionary of experiment data """ # get file path, set ancillary data file names exp_file_name, mask_file_name, microdrive_name, rec_type = parse_file_info(data_file_name) # check for experiment file, load if valid, exit if not. if os.path.exists(exp_file_name): with open(exp_file_name,'r') as f: experiment = json.load(f) else: raise NameError(f'Experiment file {exp_file_name} either invalid or not found. Aborting Process.') # get srate dsmatch = re.search(r'clfp_ds(\d+)',rec_type) if rec_type == 'raw': srate = experiment['hardware']['acquisition']['samplingrate'] data_type = np.ushort reshape_order = 'F' elif rec_type == 'lfp': srate = 1000 data_type = np.float32 reshape_order = 'F' elif rec_type == 'clfp': srate = 1000 data_type = np.float32 reshape_order = 'F' elif dsmatch: # downsampled data - get srate from name srate = int(dsmatch.group(1)) data_type = np.float32 compute_mask = False reshape_order = 'C' # files created with np.tofile which forces C ordering. Sorry! else: raise NameError(f'File type {rec_type}.dat not recognized. Aborting read process.') # get microdrive parameters microdrive_name_list = [md['name'] for md in experiment['hardware']['microdrive']] microdrive_idx = [md_idx for md_idx, md in enumerate(microdrive_name_list) if microdrive_name == md][0] microdrive_dict = experiment['hardware']['microdrive'][microdrive_idx] num_ch = len(microdrive_dict['electrodes']) exp = {"srate":srate,"num_ch":num_ch} data_type_size = data_type().nbytes file_size = os.path.getsize(data_file_name) n_offset_samples = np.round(t_range[0]*srate) n_offset = n_offset_samples*data_type_size n_all = int(np.floor(file_size/num_ch/data_type_size)) if t_range[1] == -1: n_stop = n_all else: n_stop = np.min((np.round(t_range[1]*srate),n_all)) n_read = n_stop-n_offset_samples # load data print("Loading data file:") # n_offset value is the number of bytes to skip # n_read value is the number of items to read (by data type) data = read_from_file(data_file_name,data_type,num_ch,n_read,n_offset, reshape_order=reshape_order) if rec_type == 'raw': # correct uint16 encoding errors data = np.array(data,dtype=np.float32) for ch_idx in range(num_ch): is_neg = data[ch_idx,:] > 2**15 data[ch_idx,is_neg] = data[ch_idx,is_neg] - (2**16 - 1) # check for mask file, load if valid, compute if not if os.path.exists(mask_file_name): with open(mask_file_name,"rb") as mask_f: mask = pkl.load(mask_f) elif compute_mask: print(f"No mask data file found for {data_file_name}") print("Computing data masks:") hf_mask,_ = high_freq_data_detection(data,srate) _,sat_mask_all = saturated_data_detection(data,srate) sat_mask = np.any(sat_mask_all,axis=0) mask = {"hf":hf_mask,"sat":sat_mask} # save mask data to current directory print(f"Saving mask data for {data_file_name} to {mask_file_name}") with open(mask_file_name,"wb") as mask_f: pkl.dump(mask,mask_f) else: mask = [] return data, exp, mask
# read T seconds of data from the start of the recording:
[docs]def read_from_start(data_file_path,data_type,n_ch,n_read): """read_from_start Read data from goose wireless data file. Reads a fixed number of samples from the start of the recording. Args: data_file_path (str): file path to data file data_type (numeric type): numpy numeric type reflecting the variable encoding in data_file_path n_ch (int): number of channels saved in data_file_path n_read (int): number of time points to read from data_file_path Returns: data (np.array): numpy array of neural recording data saved in data_file_path """ data_file = open(data_file_path,"rb") data = np.fromfile(data_file,dtype=data_type,count=n_read*n_ch) data = np.reshape(data,(n_ch,n_read),order='F') data = data.T data_file.close() return data
# read some time from a given offset
[docs]def read_from_file(data_file_path,data_type,n_ch,n_read,n_offset,reshape_order='F'): """read_from_file Reads recorded neural activity from a goose_wireless file. Args: data_file_path (str): file path to data file data_type (numeric type): numpy numeric type reflecting the variable encoding in data_file_path n_ch (int): Number of channels in data_file_path n_read (int): Number of data samples read from data_file_path n_offset (int): Offset point defining where data reading starts reshape_order (str, optional): Data reshaping order. Defaults to 'F' Returns: data (np.array): numpy array of neural activity stored in data_file_path """ data_file = open(data_file_path,"rb") if np.version.version >= "1.17": # "offset" field not added until later installations data = np.fromfile(data_file,dtype=data_type,count=n_read*n_ch, offset=n_offset*n_ch) else: warnings.warn("'offset' feature not available in numpy <= 1.13 - reading from the top",FutureWarning) data = np.fromfile(data_file,dtype=data_type,count=n_read*n_ch) data = np.reshape(data,(n_ch,n_read),order=reshape_order) data_file.close() data = data.T return data
# read variables from the "experiment.mat" files
[docs]def get_exp_var(exp_data,*args): """get_exp_var Generate a list of variable names from a .MAT formatted experiment data Args: exp_data (dict): MAT file data dict Returns: var_names (list): list of variable names in exp_data """ out = exp_data.copy() for k, var_name in enumerate(args): if k > 1: out = out[None][0][None][0][var_name] else: out = out[var_name] return out
# data filtration code