# utils.py
# Any extra utility functions belong here
# Helper functions, math, other things that don't really pertain to neural data analysis
import re
from datetime import datetime, timedelta
import os
import sys
import math
import matplotlib.pyplot as plt
import numpy as np
from .. import visualization
'''
Test signals
'''
[docs]def generate_test_signal(duration, samplerate, frequencies, amplitudes, noise_amplitude=0.):
'''
Generates a test time series signal with multiple frequencies, specified in freq, for T timelength at a sampling rate of fs
Args:
duration (float): time period in seconds
samplerate (int): sampling frequency in Hz
frequencies (1D array): list of frequencies to be mixed in the test signal
amplitudes (1D array): list of amplitudes for each frequency
noise_amplitude (float, optional): amplitude of noise added on top of test signal
Returns:
tuple: Tuple containing:
| **x (1D array):** cosine wave with multiple frequencies (and noise)
| **t (1D array):** time vector for x
'''
n_samples = int(duration * samplerate)
t = np.linspace(0, duration, n_samples, endpoint=False)
x = np.random.normal(0,noise_amplitude,n_samples) # start with some noise
for i in range(len(frequencies)):
x += amplitudes[i] * np.cos(2 * np.pi * frequencies[i] * t)
return x, t
[docs]def generate_multichannel_test_signal(duration, samplerate, n_channels, frequency, amplitude):
'''
Generate sine waves offset in phase by 2*pi/n_channels at the given amplitude and frequency
Args:
duration (float): time in seconds
samplerate (int): sampling rate of the signal in Hz
n_channels (int): number of channels to generate
frequency (float): frequency in Hz
amplitude (float): amplitude of each sine wave
Returns:
(nt, nch) array: timeseries data across channels
'''
time = np.arange(0, duration, 1/samplerate)
data = []
for i in range(n_channels):
theta = 2*i*np.pi/n_channels # shift phase for each channel
sinewave = amplitude * np.sin(2 * np.pi * frequency * time + theta)
data.append(sinewave)
data = np.array(data).T
return data
[docs]def save_test_signal_ecube(data, save_dir, voltsperbit, datasource='Headstages'):
'''
Create a binary file with eCube formatting using the given data
Args:
data (nt, nch): test_signal to save
save_dir (str): where to save the file
voltsperbit (float): gain of the data you are creating
datasource (str): eCube source from which you want the data to be
labeled (i.e. Headstages, AnalogPanel, or DigitalPanel)
Returns:
str: filename of the new data
'''
intdata = np.array(data/voltsperbit, dtype='<i2') # turn into integer data
flatdata = intdata.reshape(-1)
timestamp = np.array([1, 2, 3, 4], dtype='<i2')
# Save it to the test file
datestr = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # e.g. 2021-05-06_11-47-02
filename = f"{datasource}_{data.shape[1]}_Channels_int16_{datestr}.bin"
filepath = os.path.join(save_dir, filename)
with open(filepath, 'wb') as f:
f.write(timestamp)
f.write(flatdata.tobytes())
return filename
[docs]def count_unique_symbols(files):
'''
Utility for counting how many times each unique symbol is listed in the given list
and ranking them by descending number of uses.
Args:
files (list): list of filenames containing symbols generated by vscode 'List Symbols'
Returns:
tuple: tuple containing:
| **unique_symbols (list):** list of unique symbols
| **counts (list):** list of counts for each unique symbol
'''
symbols = []
for filename in files:
with open(filename, mode='r') as f:
text = f.read()
symbols += re.findall(r"variable (\w+)", text)
unique_symbols, counts = np.unique(symbols, return_counts=True)
order = np.argsort(-counts)
return unique_symbols[order], counts[order]
'''
Digital calc
'''
[docs]def convert_analog_to_digital(analog_data, thresh=.3):
'''
This function takes analog data and converts it to digital data given a
threshold. It scales the analog to between 0 and 1 and uses thres as a
Args:
analog_data (nt, 1): Time series array of analog data
thresh (float, optional): Minimum threshold value to use in conversion
Returns:
(nt, nch): Array of 1's or 0's indicating if the analog input was above threshold
'''
# Scale data between 0 and 1 so that threshold is a percentange
minval = np.min(analog_data)
maxval = np.max(analog_data)
if maxval == minval:
analog_data_scaled = (analog_data - minval)
else:
analog_data_scaled = (analog_data - minval)/(maxval - minval)
# Initialize digital_data
digital_data = np.zeros(analog_data_scaled.shape, dtype='bool') # Default to zero
digital_data[analog_data_scaled >= thresh] = 1
return digital_data
[docs]def detect_edges(digital_data, samplerate, rising=True, falling=True, check_alternating=True,
min_pulse_width=None):
'''
Finds the timestamp and corresponding value of all the bit flips in data. Assumes
the first element in data isn't a transition
By default, also enforces that rising and falling edges must alternate, always taking the
last edge as the most valid one. For example::
>>> data = [0, 0, 3, 0, 3, 2, 2, 0, 1, 7, 3, 2, 2, 0]
>>> ts, values = detect_edges(data, fs)
>>> print(values)
[3, 0, 3, 0, 7, 0]
Args:
digital_data (ntime x 1): masked binary data array
samplerate (int): sampling rate of the data used to calculate timestamps
rising (bool, optional): include low to high transitions
falling (bool, optional): include high to low transitions
check_alternating (bool, optional): if True, enforces that rising and falling
edges must be alternating. An edge is valid when it is no longer rising or falling.
min_pulse_width (float, optional): if not None, makes sure rising edges are
followed by a minimum pulse width before calculating edge values
Returns:
tuple: tuple containing:
| **timestamps (nbitflips):** when the bits flipped
| **values (nbitflips):** corresponding values for each change
'''
digital_data = np.squeeze(np.uint64(digital_data)) # important conversion for binary math
if min_pulse_width:
channels = convert_digital_to_channels(digital_data)
channels = copy_edges_forwards(channels, int(samplerate*min_pulse_width)-1, axis=0)
digital_data = convert_channels_to_digital(channels)
# Find edges
rising_idx = (~digital_data[:-1] & digital_data[1:]) > 0 # find low->high transitions
falling_idx = (~digital_data[1:] & digital_data[:-1]) > 0
# Find any non-alternating edges
invalid = np.zeros((len(digital_data)-1,), dtype='?')
if check_alternating:
all_edges = np.where(rising_idx | falling_idx)[0]
next_edge_rising = True
next_edge_falling = True
for idx in range(len(all_edges)):
this_idx = all_edges[idx]
if next_edge_rising and rising_idx[this_idx]:
# Expected rising and found rising
next_edge_rising = False
next_edge_falling = True
elif next_edge_falling and falling_idx[this_idx]:
# Expected falling and found falling
next_edge_rising = True
next_edge_falling = False
elif idx > 0: # skip the first edge since there is no previous edge
# Unexpected; there must be an extra edge somewhere.
# We will count this one as valid and the previous one as invalid
prev_idx = all_edges[idx-1]
invalid[prev_idx] = True
# Assemble final index
logical_idx = np.zeros((len(digital_data)-1,), dtype='?')
if rising:
logical_idx |= rising_idx
if falling:
logical_idx |= falling_idx
logical_idx &= np.logical_not(invalid)
logical_idx = np.insert(logical_idx, 0, False) # first element never a transition
time = np.arange(np.size(digital_data))/samplerate
return time[logical_idx], digital_data[logical_idx]
[docs]def convert_channels_to_mask(channels):
'''
Helper function to take a range of channels into a bitmask
Args:
channels (int array): 0-indexed channels to be masked
Returns:
int: binary mask of the given channels
'''
try:
# Range of channels
_ = iter(channels)
channels = np.array(channels)
flags = np.zeros(64, dtype=int)
flags[channels] = 1
return int(np.dot(np.array([2**i for i in range(64)]), flags))
except:
# Single channel
return int(1 << channels)
[docs]def convert_digital_to_channels(data_64_bit):
'''
Converts 64-bit digital data from eCube into channels.
Args:
data_64_bit (n): masked 64-bit data, little-endian
Returns:
(n, 64): where channel 0 is least significant bit
'''
# Take the input, split into bytes, then unpack each byte, all little endian
packed = np.squeeze(np.uint64(data_64_bit)) # required conversion to unsigned int
unpacked = np.unpackbits(packed.view(np.dtype('<u1')), bitorder='little')
return unpacked.reshape((packed.size, 64))
[docs]def convert_channels_to_digital(data_channels):
'''
Converts binary channels from eCube into 64-bit digital data.
Args:
data_channels (n, 64): where channel 0 is least significant bit
Returns:
(n): masked 64-bit data, little-endian
'''
# Take the input, split into bytes, then unpack each byte, all little endian
packed = np.packbits(data_channels, bitorder='little').view(np.dtype('<u8'))
return np.squeeze(packed)
[docs]def get_edges_from_onsets(onsets, pulse_width):
'''
This function calculates the values and timepoints corresponding to a given time series
of pulse onsets (timestamp corresponding to the rising edge of a pulse).
Args:
onsets (nonsets): Time point corresponding to a pulse onset.
pulse_width (float): Pulse duration
Returns:
tuple: tuple containing:
| **timestampes (2*nonsets + 1):** Timestamps of the rising and falling edges. Always starts at 0.
| **values (2*nonsets + 1):** Values corresponding to the output timestamps.
'''
timestamps = np.zeros((1+len(onsets)*2,))
values = np.zeros((1+len(onsets)*2,))
for t in range(len(onsets)):
timestamps[1+2*t] = onsets[t]
values[1+2*t] = 1
timestamps[2+2*t] = onsets[t]+pulse_width
values[2+2*t] = 0
return timestamps, values
[docs]def get_pulse_edge_times( digital_data, samplerate ):
"""get_pulse_edge_times
Args:
digital_data (nt, 1): array of data from ecube digital panel
samplerate (numeric): data sampling rate (Hz)
Returns:
edge_times (npulse, 2): start and end times from each detected pulse
"""
edge_times, edge_val = detect_edges(digital_data, samplerate)
start_idx = np.where(edge_val == 1)[0][0]
end_idx = np.where(edge_val == 0)[0][-1]
edge_times = edge_times[start_idx:(end_idx+1)]
edge_val = edge_val[start_idx:(end_idx+1)]
edge_pairs = edge_times.reshape(-1,2)
return edge_pairs
[docs]def compute_pulse_duty_cycles( edge_pairs ):
"""compute_pulse_duty_cycles
Args:
edge_pairs (npulse, 2): start, end times from a series of pulses
Returns:
duty_cycle (npulse): duty cycle of each pulse. Pulse period assumed to be constant.
"""
pulse_times = edge_pairs[:,0]
pulse_period = np.diff(pulse_times,axis=0)
duty_cycle = np.squeeze(np.diff(edge_pairs,axis=-1))/pulse_period[0]
return duty_cycle
[docs]def max_repeated_nans(a):
'''
Utility to calculate the maximum number of consecutive nans
Args:
a (ndarray): input sequence
Returns:
int: max consecutive nans
'''
mask = np.concatenate(([False],np.isnan(a),[False]))
if ~mask.any():
return 0
else:
idx = np.nonzero(mask[1:] != mask[:-1])[0]
return (idx[1::2] - idx[::2]).max()
[docs]def copy_edges_forwards(data, n_steps, truncate_edges=False, copy_per_step=False, axis=0):
'''
Forces pulses to have a fixed width of eactly n_steps. First, find the rising edges of the data,
then copy them forwards n_steps times. Works across multiple channels simulatenously.
Args:
data ((nt,) or (nt, nch)): digital data
n_steps (int): how many timesteps should pulses be
truncate_edges (bool, optional): if True, then edges will always be set to n_steps length.
If false, then edges that are longer than n_steps will remain the same length. Default False.
copy_per_step (bool, optional): copy edges one step at a time or one edge at a time; changes
processing time but output stays the same. If there are long edges, the default option False
is faster. If there are a lot of short edges, then setting copy_per_step=True will be faster.
Default False.
axis (int, optional): along which axis to copy edges. Default 0.
Returns:
(nt,) or (nt, nch): digital data but with fixed pulse widths
Note:
Only works for 1- or 2-D arrays
'''
if data.ndim == 1:
data = np.expand_dims(data, 1)
if axis != 0:
data = data.T
nt = data.shape[0]
edges = (~data[:-1] & data[1:]) > 0 # find low->high transitions
edges = np.insert(edges, 0, False, axis=0) # first element never a transition
if not copy_per_step:
# Find the edges, set their pulses directly
idx = np.where(edges)
for e_idx in zip(idx[0],idx[1]):
edges[e_idx[0]:min(nt,e_idx[0]+n_steps+1),e_idx[1]] = 1
else:
# For all of the data at once, shift to the right by n_steps, or-ing with edges at each step
for n in range(n_steps):
edges_shifted = np.roll(edges, 1, axis=0)
edges_shifted[0,:] = 0
edges = np.logical_or(edges, edges_shifted)
if axis != 0:
edges = edges.T
data = data.T
if truncate_edges:
return np.squeeze(edges)
return np.squeeze(edges | data)
def count_repetitions(arr, diff_thr=0):
"""
Counts the number of repetitions in an array. Always counts the first and
last element of the array as different from before and after the array.
Args:
arr (nt,): The input array. Only supports 1d arrays.
diff_thr (numeric, optional): Minimum step size in the data.
Returns:
tuple: A tuple of two numpy arrays:
| **repetitions (nt,):** Lengths of the repetitions in the input array,
| **change_idx (nt,):** Indices where the repetitions start
"""
assert np.array(arr).ndim == 1, "Only supports 1d arrays"
if len(arr) == 0:
return [], []
# Calculate the absolute difference between adjacent elements in the array
diff = np.abs(np.diff(arr))
change = diff > diff_thr
change = np.insert(change, 0, True)
# Find the indices where the 'change' array is True
change_idx = np.where(change)[0]
# Count the length of the gaps between those changes to find the repetitions
change = np.append(change_idx, len(arr))
repetitions = np.diff(change)
return repetitions, change_idx
[docs]def segment_array(arr, category, duplicate_endpoints=False):
'''
Segments an array into subarrays based on a corresponding category array.
Args:
arr (nt,): The array to segment.
category (nt,): An array of the same length as `arr` containing a
category label for each element in the corresponding array.
duplicate_endpoints (bool, optional): if True, each subsequent subarray
will start with the last element of the preceding subarray.
Returns:
tuple: Tuple containing:
| **segments (list of arrays):** A list of subarrays of `arr`, where each subarray corresponds to a unique value in `category`.
| **segmented_category (list of arrays):** An array of the same length as `segments`, where each element corresponds to the category label for the corresponding subarray in `segments`.
'''
run_arr, idx = count_repetitions(category)
end_indices = np.cumsum(run_arr)
if duplicate_endpoints:
segments = []
start_idx = 0
for end_idx in end_indices:
if start_idx == 0:
segments.append(arr[:end_idx])
else:
segments.append(arr[start_idx-1:end_idx])
start_idx = end_idx
else:
segments = np.split(arr, end_indices[:-1])
return segments, [category[i] for i in idx]
[docs]def count_repetitions(arr, diff_thr=0):
"""
Counts the number of repetitions in an array. Always counts the first and
last element of the array as different from before and after the array.
Args:
arr (nt,): The input array. Only supports 1d arrays.
diff_thr (numeric, optional): Minimum step size in the data.
Returns:
tuple: A tuple of two numpy arrays:
| **repetitions (nt,):** Lengths of the repetitions in the input array,
| **change_idx (nt,):** Indices where the repetitions start
"""
assert np.array(arr).ndim == 1, "Only supports 1d arrays"
if len(arr) == 0:
return [], []
# Calculate the absolute difference between adjacent elements in the array
diff = np.abs(np.diff(arr))
change = diff > diff_thr
change = np.insert(change, 0, True)
# Find the indices where the 'change' array is True
change_idx = np.where(change)[0]
# Count the length of the gaps between those changes to find the repetitions
change = np.append(change_idx, len(arr))
repetitions = np.diff(change)
return repetitions, change_idx
[docs]def nextpow2(x):
'''
Next higher power of 2. It is often useful for finding the nearest power of two
sequence length for FFT operations.
Args:
x (int or float): input number
Returns:
int: the first P such that 2**P >= abs(x).
'''
return 1 if x == 0 else math.ceil(math.log2(x))
'''
Other utils
'''
# simple progressbar, not tied to the iterator
[docs]def print_progress_bar(count, total, status=''):
"""print_progress_bar
Args:
count (num): current progress count
total (int): total count, i.e. what count is at 100%
status (str, optional): printed status message. Defaults to ''.
"""
bar_len = 60
filled_len = int(round(bar_len * count / float(total)))
percents = round(100.0 * count / float(total), 1)
bar = '=' * filled_len + '-' * (bar_len - filled_len)
sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percents, '%', status))
sys.stdout.flush()
[docs]def derivative(x, y, norm=True):
'''
Computes the derivative of y along x.
Args:
x (nt): independent variable, e.g. time
y (nt, ...): dependent variable, e.g. position
norm (bool, optional): also compute the norm of y if it is multidimensional (default True). Set to false to output component wise derivative.
Returns:
nt: derivative of y
'''
dy = np.gradient(y, axis=0, edge_order=2)
dx = np.gradient(x)
if norm and dy.ndim > 1:
dy = np.linalg.norm(dy, axis=1)
dydx = dy/dx
elif not norm and dy.ndim > 1:
dydx = dy/np.expand_dims(dx, len(dy.shape)-1)
else:
dydx = dy/dx
return dydx
[docs]def calc_euclid_dist_mat(pos):
'''
Calculates a matrix of euclidean distance. Each entry in the matrix is the distance between ith and jth position
Args:
pos (nch,2): x, y position list, e.g. for each electrode
Returns:
(nch, nch) array: distance between each given position
'''
n_channels = np.shape(pos)[0]
dist = np.zeros((n_channels, n_channels))
x_pos = pos[:,0]
y_pos = pos[:,1]
for index, _ in np.ndenumerate(dist):
p1 = index[0]
p2 = index[1]
dist[p1, p2] = math.dist([x_pos[p1], y_pos[p1]], [x_pos[p2], y_pos[p2]])
return dist
[docs]def calc_radial_dist(pos, origin=(0,0)):
'''
Calculates a matrix of radial distance from a given origin. Each entry in the matrix is the distance between ith and jth electrode channel
Args:
pos (nch,2): x, y position list, e.g. for each electrode
origin (2,): point from which to calculate radial distance
Returns:
(nch,) array: radius between each given position and the origin
'''
n_channels = np.shape(pos)[0]
dist = np.zeros((n_channels,))
for i, p in enumerate(pos):
dist[i] = math.dist(origin, p)
return dist
[docs]def first_nonzero(arr, axis=0, all_zeros_val=-1):
'''
Helper function to find the first non-zero element in an array
Args:
arr (ndarray): array containing zeros
axis (int, optional): axis along which to compute the first nonzero. Defaults to 0.
all_zeros_val (float, optional): value to indicate no nonzero elements were found. Defaults to -1.
Returns:
ndarray: array of indices with one less dimension than the input
'''
mask = (arr != 0)
return np.where(mask.any(axis=axis), mask.argmax(axis=axis), all_zeros_val)
'''
Synchronization
'''
[docs]def get_first_last_times(barcode_on_times, barcode_on_times_main, barcode, barcode_main):
'''
Get the first and last time when barcodes (sync pulses) come to each stream.
Args:
barcode_on_times (n_times) : the times at which barcode comes to the auxiliary stream
barcode_on_times_main (k_times) : the times at which barcode comes to the main stream
barcode (n-length list) : Unique barcode number in the auxiliary stream
barcode_main (k-length list) : Unique barcode number in the main stream
Return:
tuple: tuple containing:
| **first_last_times (2):** barcode on_times that corresponds to the first and last barcode in the recording
| **first_last_times (2):** barcode on_times in the main stream that corresponds to the first and last barcode in the recording
'''
# Get barcode index that is shared across both streams
index_main = []
index = []
if barcode_on_times.shape[0] == barcode_on_times_main.shape[0]:
# if shape of barcode timing is same, use the first and last indices
index_main.append(0)
index_main.append(-1)
index.append(0)
index.append(-1)
else:
for idx, i_barcode in enumerate(barcode):
if i_barcode in barcode_main:
index_main.append(barcode_main.index(i_barcode))
index.append(idx)
# Get the indices that correspond to the first and last barcode in the recording
first_last_idx = [index[0],index[-1]]
first_last_idx_main = [index_main[0],index_main[-1]]
# Get the times that correspond to the first and last barcode in the recording
first_last_times = barcode_on_times[first_last_idx]
first_last_times_main = barcode_on_times_main[first_last_idx_main]
return first_last_times,first_last_times_main
[docs]def sync_timestamp_offline(timestamp, on_times, on_times_main):
'''
Synchroniza timestamps with timestamps in another stream
Args
timestamps (nt) : timestamps in the auxiliary stream that should be synchronized to main stream
on_time (2) : the first and last times when sync pulses come to the auxiliary stream in the recording
on_time_main (2) : the first and last times when sync pulses come to the main stream in the recording
Retuen:
tuple: tuple containing:
| **sync_timestamps (nt):** synchronized timestamps
| **scaling (float):** scaling factor between streams
'''
scaling = (on_times_main[-1] - on_times_main[0])/(on_times[-1] - on_times[0])
sync_timestamp = (timestamp - on_times[0]) * scaling + on_times_main[0]
return sync_timestamp, scaling
[docs]def convert_port_number(port_number, datatype='ap'):
'''
convert port_number to directory name made by openephys
Args:
port_number (int): port number which a probe connected to. natural number from 1 to 4.
datatyoe (str, optional): datatype of neuropixel. 'ap' or 'lfp'
Returns:
probe_dir (str): Probe directory name that contains AP data
'''
letter = chr(ord('@')+port_number)
probe_dir = f'Neuropix-PXI-100.Probe{letter}-{datatype.upper()}'
return probe_dir
[docs]def multiply_mat_batch(data, mat, save_path, scale = 1, max_memory_gb = 1., dtype='int16', min_batch_size=0):
'''
Multiply a matrix to data in each batch to save memory. The result is saved in save_path.
This function can be used to multiply an inverse matrix by spike band time series.
Args:
data (nt, nch): neural data. This should be a memory mapping array.
mat (anysize, nch): matrix to multiply by data
save_path (str): file path to save destriped lfp data
scale (float, optional): Scaling factor to multiply by data. 1/200 is necessary for whitened data in kilosort4. default is 1.
max_memory_gb (float): memory size in GB to determine batch size. default is 1.0 GB.
dtype (str, optional): dtype for data. default is int16.
min_batch_size (int): the number of size in integer to ensure that batch size is more than min_batch_size. default is 0.
Returns:
None
'''
n_samples, n_channels = data.shape
batch_size = int (max_memory_gb*1e9 / (n_channels*np.dtype(dtype).itemsize))
batch_size += min_batch_size
Nbatches = np.ceil(n_samples/batch_size).astype(int)
# Multiply matrix in each batch to save memory
x = np.memmap(save_path, dtype=dtype, mode='w+', shape=(n_samples, n_channels))
for ibatch in range(Nbatches):
tmp = mat @ (scale*data[ibatch*batch_size:(ibatch+1)*batch_size, :].T)
x[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp.T).astype(dtype)
x.flush()
[docs]def generate_poisson_timestamps(mu, max_time, min_time=0., refractory_period=0.):
"""
Generate timestamps following a Poisson process with mean time between events mu,
with a specified minimum refractory period, and that fall within a specified time window.
The number of timestamps generated is determined by the time window and the mean time between
events and cannot be specified directly. The generated timestamps are random but can be
repeated by setting the random seed using `np.random.seed()`.
Args:
mu (float): Mean time between events in seconds.
max_time (float): End time of the window in seconds.
min_time (float, optional): Start time of the window in seconds. Default 0.
refractory_period (float, optional): Minimum refractory period between events in seconds. Default 0.
Returns:
np.ndarray: Array of timestamps within the specified time window.
Note:
The distribution is not guaranteed to be poisson when the refractory period is nonzero. As
the refractory period increases, the distribution will approach a uniform distribution.
"""
# Initial estimate of required number of events (overestimate to ensure coverage)
estimated_events = int((max_time - min_time) / mu * 1.5)
# Generate inter-event times
inter_event_times = np.random.exponential(mu, estimated_events)
inter_event_times = np.maximum(inter_event_times, refractory_period)
# Use a cumulative sum to transform intervals into times
event_times = np.cumsum(inter_event_times)
event_times += min_time
event_times = event_times[event_times < max_time]
return event_times
'''
datetime utils
'''
[docs]def get_consecutive_days(dates):
'''
Find consecutive days in a list of dates.
Args:
dates (list of datetime): list of dates to check for consecutive days
Returns:
list of lists: each sublist contains a list of consecutive dates
'''
if not dates:
return []
consecutive_dates = []
temp_consec_dates = [dates[0]]
for i in range(1, len(dates)):
if dates[i] - dates[i-1] == timedelta(days=1):
temp_consec_dates.append(dates[i])
else:
if len(temp_consec_dates) > 1:
consecutive_dates.append(temp_consec_dates)
temp_consec_dates = [dates[i]]
if len(temp_consec_dates) > 1:
consecutive_dates.append(temp_consec_dates)
return consecutive_dates
[docs]def scale_data_by_p_value(data, p, k=100, p0=0.08):
'''
Scale data by a sigmoid function of p-value. Useful for visualizing data maps
generated with p-values, to emphasize significant values.
See https://www.science.org/doi/full/10.1126/scitranslmed.aay4682 for an example.
Args:
data (nch,): per-channel data to scale
p (nch,): p-values corresponding to data
k (float, optional): steepness of the sigmoid. Default 100.
p0 (float, optional): midpoint of the sigmoid. Default 0.08.
Returns:
(nch,) array: scaled data
Examples:
Given a 240-channel map of p-values and corresponding data from an ECoG array,
plot the original data, p-values, and scaled data
.. code-block:: python
p = np.linspace(0, 1, 240)
data = np.random.randn(240)
scaled_data = scale_data_by_p_value(data, p, k=100, p0=0.08)
plt.figure(figsize=(9,2.5))
plt.subplot(1,3,1)
im = aopy.visualization.plot_ECoG244_data_map(data, elec_data=True)
im.set_clim(-3, 3)
plt.colorbar(im)
plt.title("Original data")
plt.subplot(1,3,2)
im = aopy.visualization.plot_ECoG244_data_map(p, cmap='viridis', elec_data=True)
plt.colorbar(im)
plt.title("p-values")
plt.subplot(1,3,3)
im = aopy.visualization.plot_ECoG244_data_map(scaled_data, elec_data=True)
im.set_clim(-3, 3)
plt.colorbar(im)
plt.title("Scaled data")
plt.tight_layout()
.. image:: _images/scale_by_p_value.png
'''
w = 1. / (1. + np.exp(-k * (p0 - p)))
return data * w
[docs]def digitize_by_angle(vectors, start_angle=math.pi/4, clockwise=True, bins=4):
'''
Bin 2D vectors into angular bins.
Args:
vectors (ntarg): List or array of 2D vectors.
start_angle (float, optional): Starting angle for binning in radians. Default is -pi/4.
clockwise (bool, optional): If True, bins are assigned in clockwise order. The
first bin is ahead of the start angle in this direction. Default is True.
bins (int, optional): Number of angular bins. Default is 4.
Returns:
(ntarg,) int: Array of bin indices corresponding to each vector.
'''
vectors = np.array(vectors)
angles = np.arctan2(vectors[:,1], vectors[:,0])
angles = (angles - start_angle) % (2 * np.pi) - np.pi
if clockwise:
angles = -angles
bin_edges = np.linspace(-np.pi, np.pi, bins+1)
return np.digitize(angles, bin_edges)
[docs]def reindex_targets(target_locations, target_idxs, start_angle=5*math.pi/8,
clockwise=True, bins=8, debug=True):
'''
Reindex target indices based on their angular location. Default behavior
is to place target 1 at the top and index a total of 8 targets clockwise.
Args:
target_locations (ntarg, 2): List or array of 2D target locations
target_idxs (ntarg,): Original target indices
start_angle (float, optional): Starting angle for binning in radians. Default is -3pi/4.
clockwise (bool, optional): If True, bins are assigned in clockwise order. The
first bin is ahead of the start angle in this direction. Default is True.
bins (int, optional): Number of angular bins. Default is 8.
debug (bool, optional): If True, plot the original and new target indices. Default is True.
Returns:
(ntarg,) int: Array of new bin indices corresponding to each target location.
Examples:
Given a set of target locations and their original indices, reindex them based on their angular location
and plot the original and new indices.
.. code-block:: python
target_locations = [[5,0], [3.53, 3.53], [0,5], [-3.53,3.53],
[-5,0], [-3.53,-3.53], [0,-5], [3.53,-3.53],
[8,0], [0,8], [-8,0], [0,-8]]
target_idxs = np.array([3,2,1,8,7,6,5,4,1,2,3,4])
reindex_targets(target_locations, target_idxs)
.. image:: _images/test_reindex_targets.png
.. code-block:: python
target_locations = [[4,0], [0,4], [-4,0], [0,-4],
[7,0], [0,7], [-7,0], [0,-7]]
target_idxs = np.array([0,1,2,3,1,2,3,4])
reindex_targets(target_locations, target_idxs,
start_angle=np.pi/4, clockwise=False, bins=4)
.. image:: _images/test_reindex_targets_ccw.png
'''
new_idx = digitize_by_angle(target_locations, start_angle=start_angle,
clockwise=clockwise, bins=bins)
if debug:
plt.figure(figsize=(8,4))
# Original indices
plt.subplot(1,2,1)
visualization.plot_targets(target_locations, target_radius=2, bounds=(-10,10,-10,10))
locs = np.array([np.array(t) for t in target_locations])
target_locs_and_idx = np.hstack([locs, np.array(target_idxs).reshape(-1,1)])
unique_targs = np.unique(target_locs_and_idx, axis=0)
for idx in range(len(unique_targs)):
plt.text(*unique_targs[idx][:2], int(unique_targs[idx][-1]), fontsize=10, ha='center', va='center')
plt.title('Original indices')
# Vector bins
for idx in range(bins):
if clockwise:
angle = start_angle - (2*np.pi/bins)*idx
else:
angle = start_angle + (2*np.pi/bins)*idx
plt.plot([0, 10*np.cos(angle)], [0, 10*np.sin(angle)],
'k--' if idx > 0 else 'r--', linewidth=1)
# New indices
plt.subplot(1,2,2)
visualization.plot_targets(target_locations, target_radius=2, bounds=(-10,10,-10,10))
locs = np.array([np.array(t) for t in target_locations])
target_locs_and_idx = np.hstack([locs, new_idx.reshape(-1,1)])
unique_targs = np.unique(target_locs_and_idx, axis=0)
for idx in range(len(unique_targs)):
plt.text(*unique_targs[idx][:2], int(unique_targs[idx][-1]), fontsize=10, ha='center', va='center')
plt.title('New indices')
return new_idx