# neuropixel.py
#
# Preprocessing neuropixel data
import os
import glob
import numpy as np
from ibldsp.voltage import destripe_lfp
from pathlib import Path
from ..utils import extract_barcodes_from_times, get_first_last_times, sync_timestamp_offline, convert_port_number
from scipy.io import savemat
import h5py
from .. import preproc
from pathlib import Path
from ..precondition import calc_ks_waveforms, filter_lfp, downsample, filter_spikes
from ..utils import extract_barcodes_from_times, get_first_last_times, sync_timestamp_offline, convert_port_number, multiply_mat_batch
from .. import data as aodata
[docs]def sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,inter_barcode_interval=20,bar_duration=0.02,verbose=False):
'''
This function is specfic to synchronization between neuropixels and ecube.
Args:
raw_timestamp (nt) : raw timestamp that is not synchronized
on_times_np (ndarray) : timings when sync line rises to 1 in neruopixel streams
off_times_np (ndarray): timings when sync line returns to 0 in neruopixel streams
on_times_ecube (ndarray): timings when sync line rises to 1 in ecube streams
off_times_ecube (ndarray): timings when sync line returns to 0 in ecube streams
bar_duration (float): duration of each bar that is sent to each stream
verbose (bool): print barcode times and barcodes for each stream
Returns:
tuple: tuple containing:
| **sync_timestamps (nt):** synchronized timestamps
| **scaling (float):** scaling factor between streams
'''
# Get each barcode timing and label
barcode_ontimes_np,barcode_np = extract_barcodes_from_times(on_times_np,off_times_np,inter_barcode_interval=inter_barcode_interval,bar_duration=bar_duration)
barcode_ontimes_ecube,barcode_ecube = extract_barcodes_from_times(on_times_ecube,off_times_ecube,inter_barcode_interval=inter_barcode_interval,bar_duration=bar_duration)
# Get the first and last barcode timing in the recording at each stream
first_last_times_np, first_last_times_ecube = get_first_last_times(barcode_ontimes_np,barcode_ontimes_ecube,barcode_np, barcode_ecube)
if verbose:
print(f'neuropixels barcode times: {first_last_times_np}, ecube barcode times{first_last_times_ecube}')
return sync_timestamp_offline(raw_timestamp, first_last_times_np, first_last_times_ecube)
[docs]def classify_ks_unit(spike_times, spike_label):
'''
Classify unit activity identified by kilosort into each single unit
Args:
spike_times (nspikes): spike times generated by kilosort
spike_label (nspikes): cluster labels of each spike generated by kilosort
Returns:
spike_times_unit (dict): spike times for each unit
'''
spike_times_unit = {}
for unit_label in np.unique(spike_label):
spike_times_unit[f'{unit_label}'] = spike_times[spike_label==unit_label.astype(int)]
return spike_times_unit
[docs]def parse_ksdata_entries(kilosort_dir, concat_data_dir, port_number, kilosort_version='kilosort4'):
'''
Parse concatenated data processed by kilosort into the task entries and save relevant data (spike_indices, spike_clusters, and ks_label)
Args:
kilosort_dir (str): kilosort directory (ex. '/data/preprocessed/kilosort')
concat_data_dir (str): data directory that contains concatenated data and kilosort_output (ex. '2023-06-30_Neuropixel_ks_affi_bottom_port1')
port_number (int): port number which a probe connected to. natural number from 1 to 4.
kilosort_version (str, optional): kilosort version. This should be kilosort4 or kilosort2.5. Default is kilosort4
Returns:
None
'''
port_path = Path(kilosort_dir) / concat_data_dir / f'port{port_number}'
if kilosort_version == 'kilosort4':
ks_output_path = port_path / 'kilosort4'
elif kilosort_version == 'kilosort2.5':
ks_output_path = port_path / 'kilosort_output'
else:
raise ValueError('Wrong kilosort version. Choose kilosort4 or kilosort2.5')
# Load kilosort data
spike_indices = np.load(ks_output_path/'spike_times.npy')
spike_clusters = np.load(ks_output_path/'spike_clusters.npy')
# Load datasize of each entry and filename
datasize_entry = np.load(port_path/'datasize_entry.npy')
task_paths = np.load(port_path/'task_path.npy')
nentry = datasize_entry.shape[0]
datasize_entry = np.cumsum(datasize_entry)
for idx in range(nentry):
# Get spike indices included in a corresponding entry
if idx == 0:
ientry_idx = spike_indices < datasize_entry[idx]
else:
ientry_idx = (spike_indices >= datasize_entry[idx-1])&(spike_indices < datasize_entry[idx])
# Subtract previous entries' datasize so that the first index of each entry can become 0.
spike_indices_entry = spike_indices[ientry_idx]
if idx > 0:
spike_indices_entry -= datasize_entry[idx-1]
# Make new directory to save parsed data
concat_id = concat_data_dir.split('_')[-1]
task_entry_dir = task_paths[idx] + f'_{concat_id}'
if kilosort_version == 'kilosort4':
parsed_data_save_path = Path(kilosort_dir)/ task_entry_dir / f'port{port_number}' /'kilosort4'
else:
parsed_data_save_path = Path(kilosort_dir)/ task_entry_dir / f'port{port_number}' /'kilosort_output'
if not os.path.exists(parsed_data_save_path):
os.makedirs(parsed_data_save_path)
# save data
np.save(parsed_data_save_path/'spike_times.npy', spike_indices_entry)
np.save(parsed_data_save_path/'spike_clusters.npy', spike_clusters[ientry_idx])
[docs]def concat_neuropixels(np_datadir, kilosort_dir, subject, te_ids, date, port_number, concat_number =1, max_memory_gb = 0.1):
'''
Concatenate continuous.dat files of different sessions specified by multiple task ids (te_ids)
The concatenated data is saved into a port folder of the kilosort preproc directory
Args:
np_datadir (str): neuropixel data directory where the data files are located (ex. '/data/raw/neuropixels')
kilosort_dir (str): kilosort directory (ex. '/data/preprocessed/kilosort')
subject (str): subject name
te_ids (list): list of task ids to concatenate
date (str): date of recording (ex. '2023-04-14')
port_number (int): port number which a probe connected to. natural number from 1 to 4.
concat_number (int): the nubmer of concatenation used to make a folder to save concatenated data
max_memory_gb (float): max memory used to load binary data at one time
Returns:
None
'''
# Extract directory names from task ids
concat_path = []
for task_id in te_ids:
task_path = f'{date}_Neuropixel_{subject}_te{task_id}'
concat_path.append(task_path)
assert len(concat_path) > 0, "No Data to Concatenate"
dtype = 'int16'
savedir_name = f'{date}_Neuropixel_{subject}_concat{concat_number}' # Directory name to save concatenated data
savedir_path = Path(kilosort_dir) / savedir_name / f'port{port_number}'
if not os.path.exists(savedir_path):
os.makedirs(savedir_path)
print('\n', 'Working in ', savedir_path)
datasize_entry = []
for idx, np_recorddir in enumerate(concat_path):
print(f'Processing {np_recorddir}')
_,metadata = aodata.load_neuropixel_data(np_datadir, np_recorddir, 'ap', port_number = port_number)
nch = metadata['num_channels']
probe_dir = convert_port_number(port_number)
data_path = os.path.join(np_datadir, np_recorddir)
continuous_data_path = glob.glob(os.path.join(data_path,f'**/*{probe_dir}/continuous.dat'),recursive=True)[0]
data = np.memmap(continuous_data_path, dtype = dtype)
datasize_entry.append(int(data.shape[0]/nch))
# Save data (concatentate data)
chunksize = int(max_memory_gb * 1e9 / np.dtype(dtype).itemsize / nch)
nchunk = int(np.ceil(data.shape[0]/chunksize))
for ichunk in range(nchunk):
if ichunk != nchunk-1:
data_reshape = data[ichunk*chunksize*nch:(ichunk+1)*chunksize*nch].reshape(-1,nch)
else:
data_reshape = data[ichunk*chunksize*nch:].reshape(-1,nch)
if (idx == 0) & (ichunk == 0):
save_filename = 'continuous.dat'
f = open(os.path.join(savedir_path,save_filename), 'wb') # save
data_reshape.tofile(f)
f.close()
else:
f = open(os.path.join(savedir_path,save_filename), 'a') # append
data_reshape.tofile(f)
f.close()
# Save channel position as mat file for kilosort
pos = {'xpos': metadata['xpos'],'ypos':metadata['ypos']}
savemat(os.path.join(savedir_path, 'channel_pos.mat'), pos)
# Save datasize and filename of each entry to parse data after spike sorting
np.save(os.path.join(savedir_path, 'datasize_entry'), np.array(datasize_entry))
np.save(os.path.join(savedir_path, 'task_path'), np.array(concat_path))
[docs]def sync_ts_data_timestamps(data, sync_timestamps):
'''
Synchronize time series data by padding nan or cropping datapoints based on synchronized timestamps
so that data would begin at 0 on the synchronized time axis
Args:
data (nt, nch): time series data to preprocess
sync_timestamp (nt): synchronized timestamps
Returns:
sync_data (sync_nt, nch): synchronized time series data. The shape of data changes.
'''
dt = sync_timestamps[1]-sync_timestamps[0]
if data.ndim == 1:
data = data[:,np.newaxis]
nch = data.shape[1]
# When the initial timestamp is more than 0, pad np.nan so that data could begin at time 0
if sync_timestamps[0]>=0:
tmp = np.arange(sync_timestamps[0]-dt,0,-dt)
padding_datapoints = tmp.shape[0]
pad_to_data = np.zeros((padding_datapoints,nch))*np.nan
sync_data = np.concatenate([pad_to_data, data],axis=0)
sync_timestamps = np.concatenate([pad_to_data[:,0], sync_timestamps],axis=0)
# When the initial timestamp is less than 0, crop the head of data so that data could begin at time 0
else:
not_crop_datapoints = sync_timestamps >= 0
sync_data = data[not_crop_datapoints,:]
sync_timestamps = sync_timestamps[not_crop_datapoints]
return np.squeeze(sync_data), sync_timestamps
[docs]def destripe_lfp_batch(lfp_data, save_path, sample_rate, bit_volts, max_memory_gb = 1., dtype='int16', min_batch_size = 21):
'''
Destripe LFP data in each batch to save memory. The result is saved in save_path.
Args:
lfp_data (nt, nch): LFP data. This should be a memory mapping array.
save_path (str): file path to save destriped lfp data
sample_rate (float): sampling rate in Hz
bit_volts (float): volt per bit
max_memory_gb (float): memory size in GB to determine batch size. default is 1.0 GB.
dtype (str, optional): dtype for data. default is int16.
min_batch_size (int): the number of size in integer to ensure that batch size is more than min_batch_size to run destripe_lfp
Returns:
None
'''
# Load data and metadata
n_samples, n_channels = lfp_data.shape
# Create memmap array to save destriped lfp data
lfp_destriped = np.memmap(save_path, dtype=dtype, mode='w+', shape=lfp_data.shape)
# Destripe lfp
batch_size = int (max_memory_gb*1e9 / (n_channels*np.dtype(type(bit_volts)).itemsize))
batch_size += min_batch_size # ensure that batch size is more than 21 to run destrip_lfp
Nbatches = np.ceil(n_samples/batch_size).astype(int)
for ibatch in range(Nbatches):
tmp = destripe_lfp((lfp_data[ibatch*batch_size:(ibatch+1)*batch_size, :]*bit_volts).T/1e6, sample_rate)*1e6
lfp_destriped[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp/bit_volts).T.astype(dtype)
lfp_destriped.flush()
[docs]def proc_neuropixel_spikes(datadir,np_recorddir,ecube_files,kilosort_dir,port_number,result_filename,node_idx=0,ex_idx=0,version='kilosort4'):
'''
Preprocess neuropixels data saved by openephys. Kilosort must be performed before this functions.
Synchronize spikes detected by kilosort with ecube's timestamps. Extract waveforms from drift corrected ap band data.
Args:
datadir (str): data directory where the data files are located
np_recorddir (str): data folder where neuropixels data for a specific task id is saved
ecube_files (str): data folder where ecube data for a specific task id is saved
kilosort_dir (str): data folder where preprocessed data of kilosort is saved
port_number (int): port number which a probe is connected to. This number corresponds to drive number.
result_filename (str): where to store the processed result
node_idx (int, optional): record node index. this is usually 0.
ex_idx (int, optional): experiment index. this is usually 0.
version (str, optional): kilosort version, 'kilosort4' or 'kilosort2.5' is permitted
Returns:
tuple: tuple containing:
| **spike_group (HDF5 group):** spike times for each drive saved like nested dictionary (ex. drive1/spikes)
| **waveform_group (HDF5 group):** spike waveforms for each drive saved like a nested dictionary (ex. drive1/waveforms)
| **metadata (dict):** dictionary of exp metadata
'''
# Load data and metadata
datatype = 'ap'
data_object, metadata = aodata.load_neuropixel_data(datadir,np_recorddir,datatype=datatype,node_idx=node_idx,ex_idx=ex_idx,port_number=port_number)
data = data_object.samples
metadata['samplerate'] = int(metadata.pop('sample_rate'))
# Syncronize timestamps in neuropixels to timestamps in ecube
raw_timestamp = np.arange(data.shape[0])/metadata['samplerate']
on_times_np, off_times_np = aodata.get_neuropixel_digital_input_times(datadir,np_recorddir,datatype=datatype,port_number=port_number)
on_times_ecube, off_times_ecube = aodata.get_ecube_digital_input_times(datadir,ecube_files,ch=-1)
sync_timestamps,_ = preproc.sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185,verbose=True)
# Path for the kilosort directory
metadata['kilosort_version'] = version
if version == 'kilosort4':
kilosort_output_dir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}'/'kilosort4'
elif version == 'kilosort2.5':
kilosort_output_dir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}'/'kilosort_output'
else:
raise ValueError('Wrong kilosort version. Choose kilosort4 or kilosort2.5')
# Synchronize spike times detected by kilosort with ecube timestamps
try:
spike_path = kilosort_output_dir/'spike_times.npy'
spike_indices = np.load(spike_path)
except:
raise ValueError(f'Spike file {spike_path} is not found. Please run kilosort first.')
spike_times = spike_indices/metadata['samplerate']
sync_spike_times, _ = preproc.sync_neuropixel_ecube(spike_times,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185)
# Classify spikes into clusters
spike_clusters = np.load(kilosort_output_dir/'spike_clusters.npy')
sync_unit_times = preproc.classify_ks_unit(sync_spike_times, spike_clusters)
# Extract spike waveforms from drift corrected ap band data
unit_times = preproc.classify_ks_unit(spike_times, spike_clusters) # Must be not-sync data
templates = np.load(kilosort_output_dir/'templates.npy')
channel_pos = np.stack([metadata['xpos'],metadata['ypos']]).T
if version == 'kilosort4':
drift_corrected_data = np.memmap(kilosort_output_dir/'temp_wh.dat', dtype='int16', shape=data.shape)
elif version == 'kilosort2.5':
drift_corrected_data = np.memmap(kilosort_output_dir.parent/'temp_wh.dat', dtype='int16', shape=data.shape)
unit_waveforms, _, unit_pos = calc_ks_waveforms(drift_corrected_data, metadata['samplerate'], unit_times, templates, channel_pos, waveforms_nch=1)
# Save kilosort labels in metadata
ks_label = np.genfromtxt(kilosort_output_dir/'cluster_KSLabel.tsv',skip_header=1,dtype=h5py.special_dtype(vlen=str))
ks_label = np.array(ks_label, dtype=h5py.special_dtype(vlen=str))
# Save synchonized spike times and waveforms into hdf file
hdf = h5py.File(result_filename, 'a', track_order=True)
spike_group = hdf.create_group(f'drive{port_number}/spikes', track_order=True)
waveform_group = hdf.create_group(f'drive{port_number}/waveforms', track_order=True)
for iunit in np.unique(spike_clusters):
spike_group.create_dataset(f'{iunit}',data=sync_unit_times[f'{iunit}'])
waveform_group.create_dataset(f'{iunit}',data=unit_waveforms[f'{iunit}'])
hdf.close()
# Organize metadata
metadata['n_samples'] = data.shape[0]
metadata['n_channels'] = metadata.pop('num_channels')
metadata['microvoltsperbit'] = metadata.pop('bit_volts')
metadata['voltsperbit'] = list(1e-6*np.array(metadata['microvoltsperbit']))
metadata['ap_samplerate'] = 1/(sync_timestamps[1]-sync_timestamps[0])
metadata['sync_timestamps'] = sync_timestamps
metadata['unique_label'] = np.unique(spike_clusters)
metadata['spike_pos'] = {unit_number: unit_pos[f'{unit_number}'].tolist() for unit_number in unit_pos.keys()}
metadata['ks_label'] = ks_label
return spike_group, waveform_group, metadata
[docs]def proc_neuropixel_ts(datadir,np_recorddir,ecube_files,kilosort_dir,datatype,port_number,result_filename,delete_file=True,node_idx=0,ex_idx=0,max_memory_gb=1., max_memory_gb_destriping=1.):
'''
Prerocess neuropixels time-series data saved by openephys.
To use this function when datatype is 'ap', kilosort must be applied before this function.
lfp data is destriped, low-pass filtered with 500 Hz, and then downsampled to 1000 Hz
For ap data, the drift corrected whitened data computed by kilosort is used
The whitened data is multipled by inverse whitened matrix, band-pass filtered between 300-5000, then downsampled to 10k Hz.
Both ap and lfp time-series data are synchronized with ecube timestamps.
Args:
datadir (str): data directory where the data files are located
np_recorddir (str): data folder where neuropixels data for a specific task id is saved
ecube_files (str): data folder where ecube data for a specific task id is saved
kilosort_dir (str): data folder where preprocessed data of kilosort is saved
datatype (str): data type. 'lfp' or 'ap'
port_number (int): port number which a probe is connected to. natural number from 1 to 4.
result_filename (str): where to store the processed result
delete_file (bool): whether to delete the saved file created by destripe_lfp or multiply_mat_batch
node_idx (int, optional): record node index. this is usually 0.
ex_idx (int, optional): experiment index. this is usually 0.
max_memory_gb (float, optional): max memory in performing destripe_lfp or multiply_mat_batch
Returns:
tuple: tuple containing:
| **dset (dict):** time series data of preprocessed lfp or ap
| **metadata (dict):** dictionary of exp metadata
'''
if (datatype != 'ap')&(datatype != 'lfp'):
raise ValueError(f"Unknown datatype {datatype}")
# Load data and metadata
data_object, metadata = aodata.load_neuropixel_data(datadir,np_recorddir,datatype=datatype,node_idx=node_idx,ex_idx=ex_idx,port_number=port_number)
data = data_object.samples
metadata['samplerate'] = int(metadata.pop('sample_rate'))
metadata['microvoltsperbit'] = metadata.pop('bit_volts')
metadata['voltsperbit'] = list(1e-6*np.array(metadata['microvoltsperbit']))
metadata['n_channels'] = metadata.pop('num_channels')
if datatype == 'lfp':
down_samplerate = 1000
elif datatype == 'ap':
down_samplerate = 10000
downsample_factor = metadata['samplerate']/down_samplerate
n_samples = int(np.ceil(data.shape[0]/downsample_factor))
# Syncronize downsampled timestamps in neuropixels with timestamps in ecube
raw_timestamp = np.arange(n_samples)/down_samplerate # use downsampled sampling rate
on_times_np, off_times_np = aodata.get_neuropixel_digital_input_times(datadir,np_recorddir,datatype=datatype,port_number=port_number)
on_times_ecube, off_times_ecube = aodata.get_ecube_digital_input_times(datadir,ecube_files,ch=-1)
sync_timestamps,_ = aodata.preproc.sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185,verbose=True)
# Compute batch size to save memory
channel_chunk_size = int (max_memory_gb*1e9 / (data.shape[0]*np.dtype(np.int16).itemsize))
Nbatches = np.ceil(metadata['n_channels']/channel_chunk_size).astype(int)
kilosort_portdir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}'
hdf = h5py.File(result_filename, 'a')
print('-'*50)
if datatype == 'lfp':
# Destripe LFP
lfp_destriped_path = kilosort_portdir / 'lfp_destriped.dat'
print(f'Destriping LFP data')
destripe_lfp_batch(data, lfp_destriped_path, metadata['samplerate'], metadata['microvoltsperbit'][0], max_memory_gb=max_memory_gb_destriping)
lfp_destriped = np.memmap(lfp_destriped_path, dtype=np.int16, shape=data.shape)
# Process data in each channel to save memory
print(f'Filtering and synchronizing LFP data')
for ibatch in range(Nbatches):
# filter_lfp applies both filtering and downsampling
data_filt, _ = filter_lfp(lfp_destriped[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size], \
metadata['samplerate'], lfp_samplerate=down_samplerate, low_cut=500., buttord=4)
sync_data, sync_timestamps_new = sync_ts_data_timestamps(data_filt, sync_timestamps)
if ibatch == 0:
dset = hdf.create_dataset(f'drive{port_number}/{datatype}_data', (sync_data.shape[0], metadata['n_channels']), dtype=np.int16)
dset[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size] = np.squeeze(sync_data)
# Delete files created by destripe_lfp_batch
if delete_file:
del lfp_destriped
lfp_destriped_path.unlink()
elif datatype == 'ap':
# Multiply inverse whitening matrix to whitened spike band time series
try:
inverse_mat_path = kilosort_portdir / 'kilosort4' / 'whitening_mat_inv.npy'
inverse_mat = np.load(inverse_mat_path) # this is an inverse of the whitening matrix used in kilosort
temp_wh_path = kilosort_portdir / 'kilosort4' / 'temp_wh.dat'
temp_wh = np.memmap(temp_wh_path, dtype=np.int16, mode='r', shape=data.shape) # this is the whitened time-series data
except:
raise ValueError(f'inv_matrix {inverse_mat_path} or temp_wh {temp_wh_path} is not found. Please run kilosort first.')
inv_temp_wh_path = kilosort_portdir / 'inv_temp_wh.dat'
print(f'Multiplyng inverse matrix by whitened ap data')
multiply_mat_batch(temp_wh, inverse_mat, inv_temp_wh_path, scale = 1/200, max_memory_gb=max_memory_gb)
inv_temp_wh = np.memmap(inv_temp_wh_path, dtype=np.int16, shape=data.shape)
# Process data in each channel to save memory
print(f'Filtering and synchronizing AP data')
for ibatch in range(Nbatches):
# filter_spikes only applies filtering, not downsampling
tmp = filter_spikes(inv_temp_wh[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size], \
metadata['samplerate'], low_pass=300, high_pass=5000, buttord=4)
data_filt = downsample(tmp, metadata['samplerate'], down_samplerate)
sync_data, sync_timestamps_new = sync_ts_data_timestamps(data_filt, sync_timestamps)
if ibatch == 0:
dset = hdf.create_dataset(f'drive{port_number}/{datatype}_data', (sync_data.shape[0], metadata['n_channels']), dtype=np.int16)
dset[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size] = np.squeeze(sync_data)
# Delete files created by multiply_mat_batch
if delete_file:
del inv_temp_wh
inv_temp_wh_path.unlink()
metadata['sync_timestamps'] = sync_timestamps_new
hdf.close()
# Organize metadata
metadata['n_samples'] = sync_data.shape[0] # shchronization changes the shape of data
metadata['down_samplerate'] = down_samplerate
metadata[f'{datatype}_samplerate'] = 1/(sync_timestamps[1]-sync_timestamps[0])
return dset, metadata