Source code for aopy.utils.base

# utils.py
# Any extra utility functions belong here
# Helper functions, math, other things that don't really pertain to neural data analysis

import re
from datetime import datetime, timedelta
import os
import sys
import math
import matplotlib.pyplot as plt
import numpy as np

from .. import visualization

'''
Test signals
'''
[docs]def generate_test_signal(duration, samplerate, frequencies, amplitudes, noise_amplitude=0.): ''' Generates a test time series signal with multiple frequencies, specified in freq, for T timelength at a sampling rate of fs Args: duration (float): time period in seconds samplerate (int): sampling frequency in Hz frequencies (1D array): list of frequencies to be mixed in the test signal amplitudes (1D array): list of amplitudes for each frequency noise_amplitude (float, optional): amplitude of noise added on top of test signal Returns: tuple: Tuple containing: | **x (1D array):** cosine wave with multiple frequencies (and noise) | **t (1D array):** time vector for x ''' n_samples = int(duration * samplerate) t = np.linspace(0, duration, n_samples, endpoint=False) x = np.random.normal(0,noise_amplitude,n_samples) # start with some noise for i in range(len(frequencies)): x += amplitudes[i] * np.cos(2 * np.pi * frequencies[i] * t) return x, t
[docs]def generate_multichannel_test_signal(duration, samplerate, n_channels, frequency, amplitude): ''' Generate sine waves offset in phase by 2*pi/n_channels at the given amplitude and frequency Args: duration (float): time in seconds samplerate (int): sampling rate of the signal in Hz n_channels (int): number of channels to generate frequency (float): frequency in Hz amplitude (float): amplitude of each sine wave Returns: (nt, nch) array: timeseries data across channels ''' time = np.arange(0, duration, 1/samplerate) data = [] for i in range(n_channels): theta = 2*i*np.pi/n_channels # shift phase for each channel sinewave = amplitude * np.sin(2 * np.pi * frequency * time + theta) data.append(sinewave) data = np.array(data).T return data
[docs]def save_test_signal_ecube(data, save_dir, voltsperbit, datasource='Headstages'): ''' Create a binary file with eCube formatting using the given data Args: data (nt, nch): test_signal to save save_dir (str): where to save the file voltsperbit (float): gain of the data you are creating datasource (str): eCube source from which you want the data to be labeled (i.e. Headstages, AnalogPanel, or DigitalPanel) Returns: str: filename of the new data ''' intdata = np.array(data/voltsperbit, dtype='<i2') # turn into integer data flatdata = intdata.reshape(-1) timestamp = np.array([1, 2, 3, 4], dtype='<i2') # Save it to the test file datestr = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # e.g. 2021-05-06_11-47-02 filename = f"{datasource}_{data.shape[1]}_Channels_int16_{datestr}.bin" filepath = os.path.join(save_dir, filename) with open(filepath, 'wb') as f: f.write(timestamp) f.write(flatdata.tobytes()) return filename
[docs]def count_unique_symbols(files): ''' Utility for counting how many times each unique symbol is listed in the given list and ranking them by descending number of uses. Args: files (list): list of filenames containing symbols generated by vscode 'List Symbols' Returns: tuple: tuple containing: | **unique_symbols (list):** list of unique symbols | **counts (list):** list of counts for each unique symbol ''' symbols = [] for filename in files: with open(filename, mode='r') as f: text = f.read() symbols += re.findall(r"variable (\w+)", text) unique_symbols, counts = np.unique(symbols, return_counts=True) order = np.argsort(-counts) return unique_symbols[order], counts[order]
''' Digital calc '''
[docs]def convert_analog_to_digital(analog_data, thresh=.3): ''' This function takes analog data and converts it to digital data given a threshold. It scales the analog to between 0 and 1 and uses thres as a Args: analog_data (nt, 1): Time series array of analog data thresh (float, optional): Minimum threshold value to use in conversion Returns: (nt, nch): Array of 1's or 0's indicating if the analog input was above threshold ''' # Scale data between 0 and 1 so that threshold is a percentange minval = np.min(analog_data) maxval = np.max(analog_data) if maxval == minval: analog_data_scaled = (analog_data - minval) else: analog_data_scaled = (analog_data - minval)/(maxval - minval) # Initialize digital_data digital_data = np.zeros(analog_data_scaled.shape, dtype='bool') # Default to zero digital_data[analog_data_scaled >= thresh] = 1 return digital_data
[docs]def detect_edges(digital_data, samplerate, rising=True, falling=True, check_alternating=True, min_pulse_width=None): ''' Finds the timestamp and corresponding value of all the bit flips in data. Assumes the first element in data isn't a transition By default, also enforces that rising and falling edges must alternate, always taking the last edge as the most valid one. For example:: >>> data = [0, 0, 3, 0, 3, 2, 2, 0, 1, 7, 3, 2, 2, 0] >>> ts, values = detect_edges(data, fs) >>> print(values) [3, 0, 3, 0, 7, 0] Args: digital_data (ntime x 1): masked binary data array samplerate (int): sampling rate of the data used to calculate timestamps rising (bool, optional): include low to high transitions falling (bool, optional): include high to low transitions check_alternating (bool, optional): if True, enforces that rising and falling edges must be alternating. An edge is valid when it is no longer rising or falling. min_pulse_width (float, optional): if not None, makes sure rising edges are followed by a minimum pulse width before calculating edge values Returns: tuple: tuple containing: | **timestamps (nbitflips):** when the bits flipped | **values (nbitflips):** corresponding values for each change ''' digital_data = np.squeeze(np.uint64(digital_data)) # important conversion for binary math if min_pulse_width: channels = convert_digital_to_channels(digital_data) channels = copy_edges_forwards(channels, int(samplerate*min_pulse_width)-1, axis=0) digital_data = convert_channels_to_digital(channels) # Find edges rising_idx = (~digital_data[:-1] & digital_data[1:]) > 0 # find low->high transitions falling_idx = (~digital_data[1:] & digital_data[:-1]) > 0 # Find any non-alternating edges invalid = np.zeros((len(digital_data)-1,), dtype='?') if check_alternating: all_edges = np.where(rising_idx | falling_idx)[0] next_edge_rising = True next_edge_falling = True for idx in range(len(all_edges)): this_idx = all_edges[idx] if next_edge_rising and rising_idx[this_idx]: # Expected rising and found rising next_edge_rising = False next_edge_falling = True elif next_edge_falling and falling_idx[this_idx]: # Expected falling and found falling next_edge_rising = True next_edge_falling = False elif idx > 0: # skip the first edge since there is no previous edge # Unexpected; there must be an extra edge somewhere. # We will count this one as valid and the previous one as invalid prev_idx = all_edges[idx-1] invalid[prev_idx] = True # Assemble final index logical_idx = np.zeros((len(digital_data)-1,), dtype='?') if rising: logical_idx |= rising_idx if falling: logical_idx |= falling_idx logical_idx &= np.logical_not(invalid) logical_idx = np.insert(logical_idx, 0, False) # first element never a transition time = np.arange(np.size(digital_data))/samplerate return time[logical_idx], digital_data[logical_idx]
[docs]def extract_bits(data, mask): ''' Apply bit mask and shift data to the least significant set bit in the mask. For example, extract_bits(0001000011110000, 1111111100000000) => 00010000 extract_bits(0001000011110000, 0000000011111111) => 11110000 extract_bits(0001000011001100, 0000001111001111) => 00111100 Args: data (ntime): digital data mask (int): which bits to filter Returns: (nt): masked and shifted data ''' data = np.array(data).astype(int) result = np.zeros(shape=data.shape, dtype=data.dtype) mask_bit_positions = [i for i, bit in enumerate(reversed(bin(mask)[2:])) if bit == '1'] for pos in mask_bit_positions[::-1]: result <<= 1 result |= ((data >> pos) & 1).astype(data.dtype) return result
[docs]def convert_channels_to_mask(channels): ''' Helper function to take a range of channels into a bitmask Args: channels (int array): 0-indexed channels to be masked Returns: int: binary mask of the given channels ''' try: # Range of channels _ = iter(channels) channels = np.array(channels) flags = np.zeros(64, dtype=int) flags[channels] = 1 return int(np.dot(np.array([2**i for i in range(64)]), flags)) except: # Single channel return int(1 << channels)
[docs]def convert_digital_to_channels(data_64_bit): ''' Converts 64-bit digital data from eCube into channels. Args: data_64_bit (n): masked 64-bit data, little-endian Returns: (n, 64): where channel 0 is least significant bit ''' # Take the input, split into bytes, then unpack each byte, all little endian packed = np.squeeze(np.uint64(data_64_bit)) # required conversion to unsigned int unpacked = np.unpackbits(packed.view(np.dtype('<u1')), bitorder='little') return unpacked.reshape((packed.size, 64))
[docs]def convert_channels_to_digital(data_channels): ''' Converts binary channels from eCube into 64-bit digital data. Args: data_channels (n, 64): where channel 0 is least significant bit Returns: (n): masked 64-bit data, little-endian ''' # Take the input, split into bytes, then unpack each byte, all little endian packed = np.packbits(data_channels, bitorder='little').view(np.dtype('<u8')) return np.squeeze(packed)
[docs]def get_edges_from_onsets(onsets, pulse_width): ''' This function calculates the values and timepoints corresponding to a given time series of pulse onsets (timestamp corresponding to the rising edge of a pulse). Args: onsets (nonsets): Time point corresponding to a pulse onset. pulse_width (float): Pulse duration Returns: tuple: tuple containing: | **timestampes (2*nonsets + 1):** Timestamps of the rising and falling edges. Always starts at 0. | **values (2*nonsets + 1):** Values corresponding to the output timestamps. ''' timestamps = np.zeros((1+len(onsets)*2,)) values = np.zeros((1+len(onsets)*2,)) for t in range(len(onsets)): timestamps[1+2*t] = onsets[t] values[1+2*t] = 1 timestamps[2+2*t] = onsets[t]+pulse_width values[2+2*t] = 0 return timestamps, values
[docs]def get_pulse_edge_times( digital_data, samplerate ): """get_pulse_edge_times Args: digital_data (nt, 1): array of data from ecube digital panel samplerate (numeric): data sampling rate (Hz) Returns: edge_times (npulse, 2): start and end times from each detected pulse """ edge_times, edge_val = detect_edges(digital_data, samplerate) start_idx = np.where(edge_val == 1)[0][0] end_idx = np.where(edge_val == 0)[0][-1] edge_times = edge_times[start_idx:(end_idx+1)] edge_val = edge_val[start_idx:(end_idx+1)] edge_pairs = edge_times.reshape(-1,2) return edge_pairs
[docs]def compute_pulse_duty_cycles( edge_pairs ): """compute_pulse_duty_cycles Args: edge_pairs (npulse, 2): start, end times from a series of pulses Returns: duty_cycle (npulse): duty cycle of each pulse. Pulse period assumed to be constant. """ pulse_times = edge_pairs[:,0] pulse_period = np.diff(pulse_times,axis=0) duty_cycle = np.squeeze(np.diff(edge_pairs,axis=-1))/pulse_period[0] return duty_cycle
[docs]def max_repeated_nans(a): ''' Utility to calculate the maximum number of consecutive nans Args: a (ndarray): input sequence Returns: int: max consecutive nans ''' mask = np.concatenate(([False],np.isnan(a),[False])) if ~mask.any(): return 0 else: idx = np.nonzero(mask[1:] != mask[:-1])[0] return (idx[1::2] - idx[::2]).max()
[docs]def copy_edges_forwards(data, n_steps, truncate_edges=False, copy_per_step=False, axis=0): ''' Forces pulses to have a fixed width of eactly n_steps. First, find the rising edges of the data, then copy them forwards n_steps times. Works across multiple channels simulatenously. Args: data ((nt,) or (nt, nch)): digital data n_steps (int): how many timesteps should pulses be truncate_edges (bool, optional): if True, then edges will always be set to n_steps length. If false, then edges that are longer than n_steps will remain the same length. Default False. copy_per_step (bool, optional): copy edges one step at a time or one edge at a time; changes processing time but output stays the same. If there are long edges, the default option False is faster. If there are a lot of short edges, then setting copy_per_step=True will be faster. Default False. axis (int, optional): along which axis to copy edges. Default 0. Returns: (nt,) or (nt, nch): digital data but with fixed pulse widths Note: Only works for 1- or 2-D arrays ''' if data.ndim == 1: data = np.expand_dims(data, 1) if axis != 0: data = data.T nt = data.shape[0] edges = (~data[:-1] & data[1:]) > 0 # find low->high transitions edges = np.insert(edges, 0, False, axis=0) # first element never a transition if not copy_per_step: # Find the edges, set their pulses directly idx = np.where(edges) for e_idx in zip(idx[0],idx[1]): edges[e_idx[0]:min(nt,e_idx[0]+n_steps+1),e_idx[1]] = 1 else: # For all of the data at once, shift to the right by n_steps, or-ing with edges at each step for n in range(n_steps): edges_shifted = np.roll(edges, 1, axis=0) edges_shifted[0,:] = 0 edges = np.logical_or(edges, edges_shifted) if axis != 0: edges = edges.T data = data.T if truncate_edges: return np.squeeze(edges) return np.squeeze(edges | data)
def count_repetitions(arr, diff_thr=0): """ Counts the number of repetitions in an array. Always counts the first and last element of the array as different from before and after the array. Args: arr (nt,): The input array. Only supports 1d arrays. diff_thr (numeric, optional): Minimum step size in the data. Returns: tuple: A tuple of two numpy arrays: | **repetitions (nt,):** Lengths of the repetitions in the input array, | **change_idx (nt,):** Indices where the repetitions start """ assert np.array(arr).ndim == 1, "Only supports 1d arrays" if len(arr) == 0: return [], [] # Calculate the absolute difference between adjacent elements in the array diff = np.abs(np.diff(arr)) change = diff > diff_thr change = np.insert(change, 0, True) # Find the indices where the 'change' array is True change_idx = np.where(change)[0] # Count the length of the gaps between those changes to find the repetitions change = np.append(change_idx, len(arr)) repetitions = np.diff(change) return repetitions, change_idx
[docs]def segment_array(arr, category, duplicate_endpoints=False): ''' Segments an array into subarrays based on a corresponding category array. Args: arr (nt,): The array to segment. category (nt,): An array of the same length as `arr` containing a category label for each element in the corresponding array. duplicate_endpoints (bool, optional): if True, each subsequent subarray will start with the last element of the preceding subarray. Returns: tuple: Tuple containing: | **segments (list of arrays):** A list of subarrays of `arr`, where each subarray corresponds to a unique value in `category`. | **segmented_category (list of arrays):** An array of the same length as `segments`, where each element corresponds to the category label for the corresponding subarray in `segments`. ''' run_arr, idx = count_repetitions(category) end_indices = np.cumsum(run_arr) if duplicate_endpoints: segments = [] start_idx = 0 for end_idx in end_indices: if start_idx == 0: segments.append(arr[:end_idx]) else: segments.append(arr[start_idx-1:end_idx]) start_idx = end_idx else: segments = np.split(arr, end_indices[:-1]) return segments, [category[i] for i in idx]
[docs]def count_repetitions(arr, diff_thr=0): """ Counts the number of repetitions in an array. Always counts the first and last element of the array as different from before and after the array. Args: arr (nt,): The input array. Only supports 1d arrays. diff_thr (numeric, optional): Minimum step size in the data. Returns: tuple: A tuple of two numpy arrays: | **repetitions (nt,):** Lengths of the repetitions in the input array, | **change_idx (nt,):** Indices where the repetitions start """ assert np.array(arr).ndim == 1, "Only supports 1d arrays" if len(arr) == 0: return [], [] # Calculate the absolute difference between adjacent elements in the array diff = np.abs(np.diff(arr)) change = diff > diff_thr change = np.insert(change, 0, True) # Find the indices where the 'change' array is True change_idx = np.where(change)[0] # Count the length of the gaps between those changes to find the repetitions change = np.append(change_idx, len(arr)) repetitions = np.diff(change) return repetitions, change_idx
[docs]def nextpow2(x): ''' Next higher power of 2. It is often useful for finding the nearest power of two sequence length for FFT operations. Args: x (int or float): input number Returns: int: the first P such that 2**P >= abs(x). ''' return 1 if x == 0 else math.ceil(math.log2(x))
''' Other utils ''' # simple progressbar, not tied to the iterator
[docs]def derivative(x, y, norm=True): ''' Computes the derivative of y along x. Args: x (nt): independent variable, e.g. time y (nt, ...): dependent variable, e.g. position norm (bool, optional): also compute the norm of y if it is multidimensional (default True). Set to false to output component wise derivative. Returns: nt: derivative of y ''' dy = np.gradient(y, axis=0, edge_order=2) dx = np.gradient(x) if norm and dy.ndim > 1: dy = np.linalg.norm(dy, axis=1) dydx = dy/dx elif not norm and dy.ndim > 1: dydx = dy/np.expand_dims(dx, len(dy.shape)-1) else: dydx = dy/dx return dydx
[docs]def calc_euclid_dist_mat(pos): ''' Calculates a matrix of euclidean distance. Each entry in the matrix is the distance between ith and jth position Args: pos (nch,2): x, y position list, e.g. for each electrode Returns: (nch, nch) array: distance between each given position ''' n_channels = np.shape(pos)[0] dist = np.zeros((n_channels, n_channels)) x_pos = pos[:,0] y_pos = pos[:,1] for index, _ in np.ndenumerate(dist): p1 = index[0] p2 = index[1] dist[p1, p2] = math.dist([x_pos[p1], y_pos[p1]], [x_pos[p2], y_pos[p2]]) return dist
[docs]def calc_radial_dist(pos, origin=(0,0)): ''' Calculates a matrix of radial distance from a given origin. Each entry in the matrix is the distance between ith and jth electrode channel Args: pos (nch,2): x, y position list, e.g. for each electrode origin (2,): point from which to calculate radial distance Returns: (nch,) array: radius between each given position and the origin ''' n_channels = np.shape(pos)[0] dist = np.zeros((n_channels,)) for i, p in enumerate(pos): dist[i] = math.dist(origin, p) return dist
[docs]def first_nonzero(arr, axis=0, all_zeros_val=-1): ''' Helper function to find the first non-zero element in an array Args: arr (ndarray): array containing zeros axis (int, optional): axis along which to compute the first nonzero. Defaults to 0. all_zeros_val (float, optional): value to indicate no nonzero elements were found. Defaults to -1. Returns: ndarray: array of indices with one less dimension than the input ''' mask = (arr != 0) return np.where(mask.any(axis=axis), mask.argmax(axis=axis), all_zeros_val)
''' Synchronization '''
[docs]def extract_barcodes_from_times(on_times,off_times,inter_barcode_interval=30,bar_duration=0.017,barcode_duration_ceiling=2,nbits=32,): """ Read barcodes from timestamped rising and falling edges. This function came from the openephys repository Notes: ignores first code in prod (ok, but not intended) ignores first on pulse (intended - this is needed to identify that a barcode is starting) Args: on_times (ndarray): Timestamps of rising edges on the barcode line off_times (ndarray): Timestamps of falling edges on the barcode line inter_barcode_interval (float) : Minimun duration of time between barcodes. bar_duration (float): A value slightly shorter than the expected duration of each bar barcode_duration_ceiling (float) : The maximum duration of a single barcode nbits (int): The bit-depth of each barcode Returns: tuple: tuple containing: | **barcode_start_times (list):** For each detected barcode, the time at which that barcode started | **barcodes (list of int):** For each detected barcode, the value of that barcode as an integer. """ start_indices = np.diff(on_times) a = np.where(start_indices > inter_barcode_interval)[0] barcode_start_times = on_times[a + 1] barcodes = [] for i, t in enumerate(barcode_start_times): oncode = on_times[np.where(np.logical_and(on_times > t, on_times < t + barcode_duration_ceiling))[0]] offcode = off_times[np.where(np.logical_and(off_times > t, off_times < t + barcode_duration_ceiling))[0]] currTime = offcode[0] bits = np.zeros((nbits,)) for bit in range(0, nbits): nextOn = np.where(oncode > currTime)[0] nextOff = np.where(offcode > currTime)[0] if nextOn.size > 0: nextOn = oncode[nextOn[0]] else: nextOn = t + inter_barcode_interval if nextOff.size > 0: nextOff = offcode[nextOff[0]] else: nextOff = t + inter_barcode_interval if nextOn < nextOff: bits[bit] = 1 currTime += bar_duration barcode = 0 # least sig left for bit in range(0, nbits): barcode += bits[bit] * pow(2, bit) barcodes.append(int(barcode)) return barcode_start_times, barcodes
[docs]def get_first_last_times(barcode_on_times, barcode_on_times_main, barcode, barcode_main): ''' Get the first and last time when barcodes (sync pulses) come to each stream. Args: barcode_on_times (n_times) : the times at which barcode comes to the auxiliary stream barcode_on_times_main (k_times) : the times at which barcode comes to the main stream barcode (n-length list) : Unique barcode number in the auxiliary stream barcode_main (k-length list) : Unique barcode number in the main stream Return: tuple: tuple containing: | **first_last_times (2):** barcode on_times that corresponds to the first and last barcode in the recording | **first_last_times (2):** barcode on_times in the main stream that corresponds to the first and last barcode in the recording ''' # Get barcode index that is shared across both streams index_main = [] index = [] if barcode_on_times.shape[0] == barcode_on_times_main.shape[0]: # if shape of barcode timing is same, use the first and last indices index_main.append(0) index_main.append(-1) index.append(0) index.append(-1) else: for idx, i_barcode in enumerate(barcode): if i_barcode in barcode_main: index_main.append(barcode_main.index(i_barcode)) index.append(idx) # Get the indices that correspond to the first and last barcode in the recording first_last_idx = [index[0],index[-1]] first_last_idx_main = [index_main[0],index_main[-1]] # Get the times that correspond to the first and last barcode in the recording first_last_times = barcode_on_times[first_last_idx] first_last_times_main = barcode_on_times_main[first_last_idx_main] return first_last_times,first_last_times_main
[docs]def sync_timestamp_offline(timestamp, on_times, on_times_main): ''' Synchroniza timestamps with timestamps in another stream Args timestamps (nt) : timestamps in the auxiliary stream that should be synchronized to main stream on_time (2) : the first and last times when sync pulses come to the auxiliary stream in the recording on_time_main (2) : the first and last times when sync pulses come to the main stream in the recording Retuen: tuple: tuple containing: | **sync_timestamps (nt):** synchronized timestamps | **scaling (float):** scaling factor between streams ''' scaling = (on_times_main[-1] - on_times_main[0])/(on_times[-1] - on_times[0]) sync_timestamp = (timestamp - on_times[0]) * scaling + on_times_main[0] return sync_timestamp, scaling
[docs]def convert_port_number(port_number, datatype='ap'): ''' convert port_number to directory name made by openephys Args: port_number (int): port number which a probe connected to. natural number from 1 to 4. datatyoe (str, optional): datatype of neuropixel. 'ap' or 'lfp' Returns: probe_dir (str): Probe directory name that contains AP data ''' letter = chr(ord('@')+port_number) probe_dir = f'Neuropix-PXI-100.Probe{letter}-{datatype.upper()}' return probe_dir
[docs]def multiply_mat_batch(data, mat, save_path, scale = 1, max_memory_gb = 1., dtype='int16', min_batch_size=0): ''' Multiply a matrix to data in each batch to save memory. The result is saved in save_path. This function can be used to multiply an inverse matrix by spike band time series. Args: data (nt, nch): neural data. This should be a memory mapping array. mat (anysize, nch): matrix to multiply by data save_path (str): file path to save destriped lfp data scale (float, optional): Scaling factor to multiply by data. 1/200 is necessary for whitened data in kilosort4. default is 1. max_memory_gb (float): memory size in GB to determine batch size. default is 1.0 GB. dtype (str, optional): dtype for data. default is int16. min_batch_size (int): the number of size in integer to ensure that batch size is more than min_batch_size. default is 0. Returns: None ''' n_samples, n_channels = data.shape batch_size = int (max_memory_gb*1e9 / (n_channels*np.dtype(dtype).itemsize)) batch_size += min_batch_size Nbatches = np.ceil(n_samples/batch_size).astype(int) # Multiply matrix in each batch to save memory x = np.memmap(save_path, dtype=dtype, mode='w+', shape=(n_samples, n_channels)) for ibatch in range(Nbatches): tmp = mat @ (scale*data[ibatch*batch_size:(ibatch+1)*batch_size, :].T) x[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp.T).astype(dtype) x.flush()
[docs]def generate_poisson_timestamps(mu, max_time, min_time=0., refractory_period=0.): """ Generate timestamps following a Poisson process with mean time between events mu, with a specified minimum refractory period, and that fall within a specified time window. The number of timestamps generated is determined by the time window and the mean time between events and cannot be specified directly. The generated timestamps are random but can be repeated by setting the random seed using `np.random.seed()`. Args: mu (float): Mean time between events in seconds. max_time (float): End time of the window in seconds. min_time (float, optional): Start time of the window in seconds. Default 0. refractory_period (float, optional): Minimum refractory period between events in seconds. Default 0. Returns: np.ndarray: Array of timestamps within the specified time window. Note: The distribution is not guaranteed to be poisson when the refractory period is nonzero. As the refractory period increases, the distribution will approach a uniform distribution. """ # Initial estimate of required number of events (overestimate to ensure coverage) estimated_events = int((max_time - min_time) / mu * 1.5) # Generate inter-event times inter_event_times = np.random.exponential(mu, estimated_events) inter_event_times = np.maximum(inter_event_times, refractory_period) # Use a cumulative sum to transform intervals into times event_times = np.cumsum(inter_event_times) event_times += min_time event_times = event_times[event_times < max_time] return event_times
''' datetime utils '''
[docs]def get_consecutive_days(dates): ''' Find consecutive days in a list of dates. Args: dates (list of datetime): list of dates to check for consecutive days Returns: list of lists: each sublist contains a list of consecutive dates ''' if not dates: return [] consecutive_dates = [] temp_consec_dates = [dates[0]] for i in range(1, len(dates)): if dates[i] - dates[i-1] == timedelta(days=1): temp_consec_dates.append(dates[i]) else: if len(temp_consec_dates) > 1: consecutive_dates.append(temp_consec_dates) temp_consec_dates = [dates[i]] if len(temp_consec_dates) > 1: consecutive_dates.append(temp_consec_dates) return consecutive_dates
[docs]def scale_data_by_p_value(data, p, k=100, p0=0.08): ''' Scale data by a sigmoid function of p-value. Useful for visualizing data maps generated with p-values, to emphasize significant values. See https://www.science.org/doi/full/10.1126/scitranslmed.aay4682 for an example. Args: data (nch,): per-channel data to scale p (nch,): p-values corresponding to data k (float, optional): steepness of the sigmoid. Default 100. p0 (float, optional): midpoint of the sigmoid. Default 0.08. Returns: (nch,) array: scaled data Examples: Given a 240-channel map of p-values and corresponding data from an ECoG array, plot the original data, p-values, and scaled data .. code-block:: python p = np.linspace(0, 1, 240) data = np.random.randn(240) scaled_data = scale_data_by_p_value(data, p, k=100, p0=0.08) plt.figure(figsize=(9,2.5)) plt.subplot(1,3,1) im = aopy.visualization.plot_ECoG244_data_map(data, elec_data=True) im.set_clim(-3, 3) plt.colorbar(im) plt.title("Original data") plt.subplot(1,3,2) im = aopy.visualization.plot_ECoG244_data_map(p, cmap='viridis', elec_data=True) plt.colorbar(im) plt.title("p-values") plt.subplot(1,3,3) im = aopy.visualization.plot_ECoG244_data_map(scaled_data, elec_data=True) im.set_clim(-3, 3) plt.colorbar(im) plt.title("Scaled data") plt.tight_layout() .. image:: _images/scale_by_p_value.png ''' w = 1. / (1. + np.exp(-k * (p0 - p))) return data * w
[docs]def digitize_by_angle(vectors, start_angle=math.pi/4, clockwise=True, bins=4): ''' Bin 2D vectors into angular bins. Args: vectors (ntarg): List or array of 2D vectors. start_angle (float, optional): Starting angle for binning in radians. Default is -pi/4. clockwise (bool, optional): If True, bins are assigned in clockwise order. The first bin is ahead of the start angle in this direction. Default is True. bins (int, optional): Number of angular bins. Default is 4. Returns: (ntarg,) int: Array of bin indices corresponding to each vector. ''' vectors = np.array(vectors) angles = np.arctan2(vectors[:,1], vectors[:,0]) angles = (angles - start_angle) % (2 * np.pi) - np.pi if clockwise: angles = -angles bin_edges = np.linspace(-np.pi, np.pi, bins+1) return np.digitize(angles, bin_edges)
[docs]def reindex_targets(target_locations, target_idxs, start_angle=5*math.pi/8, clockwise=True, bins=8, debug=True): ''' Reindex target indices based on their angular location. Default behavior is to place target 1 at the top and index a total of 8 targets clockwise. Args: target_locations (ntarg, 2): List or array of 2D target locations target_idxs (ntarg,): Original target indices start_angle (float, optional): Starting angle for binning in radians. Default is -3pi/4. clockwise (bool, optional): If True, bins are assigned in clockwise order. The first bin is ahead of the start angle in this direction. Default is True. bins (int, optional): Number of angular bins. Default is 8. debug (bool, optional): If True, plot the original and new target indices. Default is True. Returns: (ntarg,) int: Array of new bin indices corresponding to each target location. Examples: Given a set of target locations and their original indices, reindex them based on their angular location and plot the original and new indices. .. code-block:: python target_locations = [[5,0], [3.53, 3.53], [0,5], [-3.53,3.53], [-5,0], [-3.53,-3.53], [0,-5], [3.53,-3.53], [8,0], [0,8], [-8,0], [0,-8]] target_idxs = np.array([3,2,1,8,7,6,5,4,1,2,3,4]) reindex_targets(target_locations, target_idxs) .. image:: _images/test_reindex_targets.png .. code-block:: python target_locations = [[4,0], [0,4], [-4,0], [0,-4], [7,0], [0,7], [-7,0], [0,-7]] target_idxs = np.array([0,1,2,3,1,2,3,4]) reindex_targets(target_locations, target_idxs, start_angle=np.pi/4, clockwise=False, bins=4) .. image:: _images/test_reindex_targets_ccw.png ''' new_idx = digitize_by_angle(target_locations, start_angle=start_angle, clockwise=clockwise, bins=bins) if debug: plt.figure(figsize=(8,4)) # Original indices plt.subplot(1,2,1) visualization.plot_targets(target_locations, target_radius=2, bounds=(-10,10,-10,10)) locs = np.array([np.array(t) for t in target_locations]) target_locs_and_idx = np.hstack([locs, np.array(target_idxs).reshape(-1,1)]) unique_targs = np.unique(target_locs_and_idx, axis=0) for idx in range(len(unique_targs)): plt.text(*unique_targs[idx][:2], int(unique_targs[idx][-1]), fontsize=10, ha='center', va='center') plt.title('Original indices') # Vector bins for idx in range(bins): if clockwise: angle = start_angle - (2*np.pi/bins)*idx else: angle = start_angle + (2*np.pi/bins)*idx plt.plot([0, 10*np.cos(angle)], [0, 10*np.sin(angle)], 'k--' if idx > 0 else 'r--', linewidth=1) # New indices plt.subplot(1,2,2) visualization.plot_targets(target_locations, target_radius=2, bounds=(-10,10,-10,10)) locs = np.array([np.array(t) for t in target_locations]) target_locs_and_idx = np.hstack([locs, new_idx.reshape(-1,1)]) unique_targs = np.unique(target_locs_and_idx, axis=0) for idx in range(len(unique_targs)): plt.text(*unique_targs[idx][:2], int(unique_targs[idx][-1]), fontsize=10, ha='center', va='center') plt.title('New indices') return new_idx