Source code for aopy.postproc.base

# base.py
#
# Code for post-processing neural data, including separating neural features such as 
# LFP bands or spikes detection / binning

import math
import warnings

import numpy as np
from scipy.signal import convolve

from .. import precondition

[docs]def translate_spatial_data(spatial_data, new_origin): ''' Shifts 2D or 3D spatial data to a new location. Args: spatial_data (nt, ndim): Spatial data in 2D or 3D new_origin (ndim): Location of point that will become the origin in cartesian coordinates Returns: new_spatial_data (nt, ndim): new reach trajectory translated to the new origin ''' new_spatial_data = np.subtract(spatial_data, new_origin) return new_spatial_data
[docs]def rotate_spatial_data(spatial_data, new_axis, current_axis): ''' Rotates data about the origin into a new coordinate system based on the relationship between 'new_axis' and 'current_axis'. If 'current_axis' and 'new_axis' point in the same direction, the code will return 'spatial_data' with a warning that the vectors point in the same direction. This function was written to rotate spatial data but can be applied to other data of similar form. Args: spatial_data (nt, ndim): Array of spatial data in 2D or 3D new_axis (ndim): vector pointing along the desired orientation of the data current_axis (ndim): vector pointing along the current orientation of the dat Returns: output_spatial_data (nt, ndim): new reach trajectory rotated to the new axis Updates: July 2023 : updated function to work when new_axis is not a unit vector ''' # Check if input data is a single point and enfore that it is a row vector if len(spatial_data.shape) == 1: spatial_data.shape = (1,len(spatial_data)) # Initialize output array output_spatial_data = np.empty((spatial_data.shape[0], 3)) # Check for a 2D or 3D trajectory and convert to 3D points if spatial_data.shape[1] == 2: spatial_data3d = np.concatenate((spatial_data, np.zeros((spatial_data.shape[0],1))), axis = 1) new_axis3d = np.concatenate((new_axis, np.array([0]))) current_axis3d = np.concatenate((current_axis, np.array([0]))) elif spatial_data.shape[1] == 3: spatial_data3d = spatial_data new_axis3d = new_axis current_axis3d = current_axis # Calcualte angle between 'new_axis3d' and target trajectory via dot product angle = np.arccos(np.dot(new_axis3d, current_axis3d)/(np.linalg.norm(new_axis3d)*np.linalg.norm(current_axis3d))) # If angle is 0, return the original data and warn if np.isclose(angle, 0, atol = 1e-8): warnings.warn("Starting and desired vector are the same. No rotation applied") output_spatial_data = spatial_data3d return output_spatial_data # If the angle is exactly 180 degrees, slightly nudge the starting vector by 1e-7 elif (spatial_data.shape[1] == 2) and np.isclose(angle, np.pi, atol = 1e-8): current_axis3d = current_axis3d.astype('float64') current_axis3d[:2] += 1e-7 elif np.isclose(angle, np.pi, atol = 1e-8): raise ValueError("180 degree rotation cannot be resolved in 3D") # Calculate unit vector axis to rotate about via cross product rotation_axis = np.cross(current_axis3d, new_axis3d) unit_rotation_axis = rotation_axis/np.linalg.norm(rotation_axis) # Calculate quaternion # https://en.wikipedia.org/wiki/Quaternions_and_spatial_rotation#Using_quaternions_as_rotations qr = np.cos(angle/2) qi = np.sin(angle/2)*unit_rotation_axis[0] qj = np.sin(angle/2)*unit_rotation_axis[1] qk = np.sin(angle/2)*unit_rotation_axis[2] # Convert quaternion to rotation matrix # https://en.wikipedia.org/wiki/Quaternions_and_spatial_rotation#Quaternion-derived_rotation_matrix rotation_matrix = np.array([[1-2*(qj**2 + qk**2), 2*(qi*qj - qk*qr), 2*(qi*qk + qj*qr)], [2*(qi*qj + qk*qr), 1-2*(qi**2 + qk**2), 2*(qj*qk - qi*qr)], [2*(qi*qk - qj*qr), 2*(qj*qk + qi*qr), 1-2*(qi**2 + qj**2)]]) # Apply rotation matrix to each point in the trajectory output_spatial_data = rotation_matrix @ spatial_data3d.T output_spatial_data = output_spatial_data.T # Check that we did the correct rotation output_axis = rotation_matrix @ current_axis3d output_axis = output_axis/np.linalg.norm(output_axis) assert np.allclose(output_axis, new_axis3d/np.linalg.norm(new_axis3d), atol=1e-4), "Something went wrong!" # Return trajectories the the same dimensions as the input if spatial_data.shape[1] == 2: return output_spatial_data[:,:2] elif spatial_data.shape[1] == 3: return output_spatial_data
[docs]def calc_reward_intervals(timestamps, values): ''' Given timestamps and values corresponding to reward times and reward state, calculate the intervals (start, end) during which the reward was active Args: timestamps (nt): when the reward transitioned state values (nt): what the state was at each corresponding timestamp Returns: (nt/2): during which the reward was active ''' reward_ts_on = timestamps[values == 1] reward_ts_off = timestamps[values == 0] if len(reward_ts_on) == len(reward_ts_off): return list(zip(reward_ts_on, reward_ts_off)) else: raise ValueError("Invalid reward timestamps or values")
[docs]def get_trial_targets(trials, targets): ''' Organizes targets from each trial into a trial array of targets. Essentially reshapes the array, but sometimes? there can be more or fewer targets in certain trials than in others Args: trials (ntargets): trial number for each target presented targets (ntargets, 3): target locations Returns: (ntrials list of (ntargets, 3)): list of targets in each trial ''' n_trials = np.max(trials) + 1 trial_targets = [[] for _ in range(n_trials)] for idx in range(len(trials)): trial = trials[idx] trial_targets[trial].append(targets[idx]) return trial_targets
[docs]def get_minimum_trials_per_target(target_idx, cond_mask=None): ''' Get the minimum number of trials per target after restricting trials Args: target_index (ntr): target index cond_mask (ntr): boolean array to remove trials Returns: (int): minimum number of trials per target ''' if cond_mask is None: min_trial = min([sum(target_idx == itarget) for itarget in np.unique(target_idx)]) else: min_trial = min([sum(target_idx[cond_mask] == itarget) for itarget in np.unique(target_idx[cond_mask])]) return min_trial
[docs]def get_conditioned_trials_per_target(target_idx, trials_per_cond, cond_mask=None, replacement=False, seed=None): ''' Get trial index to choose the same number of trials per target in removing trials by a certain condition. The trial index is evenly aligned like [1,2,3,1,2,3,1,2,3,...]. trials_per_cond can be taken from 'get_minimum_trials_per_target' function. Args: target_index (ntr): target index trials_per_cond (int): minimum trial across conditions to get the same number of trials per condition cond_mask (ntr): boolean array to remove trials replacement (bool): whether to allow replacement in choosing trials. This can be used for bootstrapping. seed (int): random seed Returns: (ntr): trial index to extract the same number of conditioned trials for each target ''' if seed is not None: np.random.seed(seed) # Get trial index to get the same number of trials per target trial_mask = [] for idx, itarget in enumerate(np.unique(target_idx)): if cond_mask is None: trial_mask_targ = np.where(target_idx == itarget)[0] else: trial_mask_targ = np.where(cond_mask * (target_idx == itarget))[0] if trial_mask_targ.size: chosen_trials = np.random.choice(trial_mask_targ, trials_per_cond, replace=replacement) trial_mask.append(chosen_trials[np.newaxis,:]) # add new axis for np.vstack later trial_mask = np.vstack(trial_mask) # reshape using 'F' so that trial index would be aligned in the pseudorandom order trial_mask = trial_mask.reshape(-1, order='F') return trial_mask
[docs]def get_relative_point_location(ref_point_pos, new_point_pos): ''' This function calculates the relative location (angle and position) of a point compared to a reference point. Assumes angle 0 starts from the direciton of vector [1, 0]. This function is specific to the 2D case at the moment but can handle a single point entry or multiple. Args: ref_point_pos (nxpts, nypts): Point position of reference point in spatial coordinates new_point_pos (nxpts, nypts): Point position of new point in spatial coordinates Returns: A tuple containing: | **relative_new_point_angle (float):** Absolute angle between cursor and target position [rad] | **relative_new_point_pos (float):** Absolute position of the target relative to the cursor ''' # Handle single point case relative_new_point_pos = new_point_pos - ref_point_pos if len(ref_point_pos.shape) == 1 or relative_new_point_pos.shape[1] == 1: relative_new_point_angle = np.arctan2(relative_new_point_pos[1], relative_new_point_pos[0]) if relative_new_point_angle < 0: relative_new_point_angle = 2*np.pi + relative_new_point_angle # Handle multi-point case else: relative_new_point_angle = np.arctan2(relative_new_point_pos[:,1], relative_new_point_pos[:,0]) relative_new_point_angle_mask = relative_new_point_angle < 0 relative_new_point_angle[relative_new_point_angle_mask] = 2*np.pi + relative_new_point_angle[relative_new_point_angle_mask] return relative_new_point_angle, relative_new_point_pos
[docs]def get_inst_target_dir(trial_aligned_pos, targetpospertrial): ''' This function calculates the instantaneous direction from the cursor to the target at each time point across each trial. This function is specific to the 2D case at the moment with X coordinates being the horizontal dimension and Y coordinates being the vertical dimension. Args: trial_aligned_pos (ntime, ntrials, 2): Position of the cursor in X and Y coordinates.trial_aligned_pos[:,:,0] corresponds to the X cursor positions and trial_aligned_pos[:,:,1] to the Y cursor positions. targetpospertrial (ntrials, 2): X and Y pos of target. Returns: (ntime, ntrials): Array including instantaneous direction to the target from the cursor [rad] ''' ntime = trial_aligned_pos.shape[0] ntrials = trial_aligned_pos.shape[1] inst_target_dir = np.zeros((ntime, ntrials))*np.nan for itrial in range(ntrials): cursor_location = trial_aligned_pos[:, itrial,:] target_location = targetpospertrial[itrial, :] rel_target_angle, _ = get_relative_point_location(cursor_location, target_location) inst_target_dir[:, itrial] = rel_target_angle return inst_target_dir
[docs]def mean_fr_inst_dir(data, trial_aligned_pos, targetpos, data_binwidth, ntarget_directions, data_samplerate, cursor_samplerate): ''' This function takes trial aligned neural data, cursor position, and target position then calculates the mean firing rate per target location. Each target location is the instantaneous target direction from the current cursor position, and therefore has multiple values during a single trial. The target locaitons are determined by calling aopy.postproc.get_inst_target_dir. The target directions are assumed to be evenly spaced around the origin and the 0'th target starts directly horizontal from the origin. This function is specific to the 2D case at the moment with X coordinates being the horizontal dimension and Y coordinates being the vertical dimension. Args: data (ntime, nunit, ntrial): Trial aligned data trial_aligned_pos (ntime, ntrials, 2): Position of the cursor in X and Y coordinates.trial_aligned_pos[:,:,0] corresponds to the X cursor positions and trial_aligned_pos[:,:,1] to the Y cursor positions. targetpos (ntrial, 2): Target position for each trial. First column is x target position, second column is y target position. data_binwidth (float): Bin size for neural data and cursor position. Can not be smaller than allowed by cursor position sampling rate. ntarget_directions (float): Number of directions to bin instantaneous direction into. data_samplerate (int): Sampling rate for data cursor_samplerate (int): Sampling rate for cursor position Returns: (nunit, ntarget_directions): Average firing rate per unit per direction bin. [spikes/s] ''' # Check that binwidths are not lower than allowed by sampling rate max_cursor_binwidth = 1/cursor_samplerate #[s/sample] if data_binwidth < max_cursor_binwidth: data_binwidth = max_cursor_binwidth ndatatime, nunit, ntrial = data.shape nbins = math.ceil(ndatatime/(data_samplerate*data_binwidth)) # the number of bins # Bin neural data and cursor pos for each trial to make them the same samplingrate binned_data = np.zeros((nbins, nunit, ntrial))*np.nan for itrial in range(ntrial): binned_data[:,:,itrial] = precondition.bin_spikes(data[:,:,itrial], data_samplerate, data_binwidth) # (use precondition.bin_spikes to get average value in each bin) binned_cursorxpos = precondition.bin_spikes(trial_aligned_pos[:,:,0], cursor_samplerate, data_binwidth)/cursor_samplerate binned_cursorypos = precondition.bin_spikes(trial_aligned_pos[:,:,1], cursor_samplerate, data_binwidth)/cursor_samplerate binned_cursorpos = np.concatenate((np.expand_dims(binned_cursorxpos,axis=2),np.expand_dims(binned_cursorypos,axis=2)), axis=2) # Get instantaneous target location for each cursor pos (ntime, ntrial) inst_target_dir = get_inst_target_dir(binned_cursorpos, targetpos) # Match the instantaneous direction to the correct target direction bin and ensure the bin # range starts entered on target 0 (directly horizontal from the origin) targetloc_binwidth = (2*np.pi)/ntarget_directions # [rad] Angular bin size of each target direction target_binid = 1 + (inst_target_dir-(targetloc_binwidth/2))//targetloc_binwidth target_binid[target_binid==ntarget_directions] = 0 #combine first and last bins # Average data and place into correct points in array mean_dir_fr = np.zeros((nunit, ntarget_directions)) for iunit in range(nunit): for idir in range(ntarget_directions): temp_data = binned_data[:,iunit,:] mean_dir_fr[iunit, idir] = np.nanmean(temp_data[target_binid==idir]) return mean_dir_fr
[docs]def sample_events(events, times, samplerate): ''' Converts a list of events and timestamps to a matrix of events where each column is a different event and each row is a sample in time. For example, if we have events 'reward' and 'penalty', and we want them as separate rasters:: >>> events = ["reward", "reward", "penalty", "reward"] >>> times = [0.3, 0.5, 0.7, 1.0] >>> samplerate = 10 >>> frame_events, event_names = sample_events(events, times, samplerate) >>> print(frame_events) [[False, False], [False, False], [False, False], [False, True ], [False, False], [False, True ], [False, False], [ True, False], [False, False], [False, False], [False, True ]] >>> print(event_names) ["penalty", "reward"] Args: events (list): list of event names or numbers times (list): list of timestamps for each event samplerate (float): rate at which you want to sample the events Returns: tuple: tuple containing: | **frame_events (nt, n_events):** logical index of 'events' at the given sampling rate | **event_names (n_events):** list of event column names (sorted alphabetically) ''' n_samples = round(times[-1]*samplerate) + 1 unique_events = np.unique(events) frame_events = np.zeros((n_samples, len(unique_events)), dtype='bool') for idx_event in range(len(events)): unique_idx = unique_events == events[idx_event] event_time = times[idx_event] event_frame = round(event_time * samplerate) frame_events[event_frame,unique_idx] = True return frame_events, unique_events
[docs]def get_calibrated_eye_data(eye_data, coefficients): """ Apply the least square fitting coefficients to segments of eye data Args: eye_data (nt, nch): data to calibrate. Typically 4 channels (left eye x, left eye y, right eye x, right eye y) coefficients (nch, 2): coefficients to use for calibration for each channel of data returns: (nt, nch) ndarray: calibrated data """ #caliberated_eye_data_segments = np.empty((num_time_points, num_dims)) return eye_data * coefficients[:,0] + coefficients[:,1]
[docs]def smooth_timeseries_gaus(timeseries_data, sd, samplerate, nstd=3, conv_mode='same', return_kernel=False): """ Smooths a time series by convolving it with a user-defined Gaussian kernel. The length of the kernel is computed using integers in the following range: .. math:: x = [\\frac{-nstd*sd*samplerate}{1000}, \\frac{nstd*sd*samplerate}{1000}] The Gaussian kernel is then defined by the following equation: .. math:: g(x) = \\frac{1}{\\sigma\\sqrt{2\\pi}}e^{\\frac{-x^{2}}{2\\sigma^{2}}} Args: timeseries_data (ndarray): The input time series data to be smoothed (i.e. binned firing rates). This can be an array with any number of dimensions, but the smoothing is applied along the first axis (time). sd (float): The width of the standard deviation of the Gaussian filter in time [ms]. samplerate (int): The sample rate of the time series data [Hz]. nstd (float or int, optional): The number of standard deviations to be used in the filter calculation. Default is 3. conv_mode (str, optional): The convolution mode, which defines the size of the output. If the default input of 'same' is used, the edges will be computed with zero padding. Accepts 'full', 'valid', or 'same'. Default is 'same'. See `scipy.signal.convolve` for full documentation on the modes. return_kernel (bool, optional): If True, this function returns the kernel used in convolution. Otherwise, only the smoothed data is returned. Returns: ndarray: The smoothed time series data. The shape of the output matches the input data, except in the case of 'valid' or 'full' convolution modes where the output may be smaller depending on the filter size. Example: timeseries_data = np.random.randn(1000) # Example time series data smoothed_data = smooth_timeseries_gaus(timeseries_data, 1000, 50) .. image:: _images/gaus_smoothing_example.png """ sample_std = sd * samplerate / 1000 # Convert from ms to samples x = np.arange(-sample_std * nstd, (nstd * sample_std) + 1) gaus_filter = (1 / (sample_std * np.sqrt(2 * np.pi))) * np.exp(-(x ** 2) / (2 * sample_std ** 2)) if return_kernel: return np.apply_along_axis(convolve, 0, timeseries_data, gaus_filter, mode=conv_mode, method='direct'), gaus_filter else: return np.apply_along_axis(convolve, 0, timeseries_data, gaus_filter, mode=conv_mode, method='direct')