Source code for aopy.preproc.neuropixel

# neuropixel.py
# 
# Preprocessing neuropixel data

import os
import glob

import numpy as np
from ibldsp.voltage import destripe_lfp
from pathlib import Path
from ..utils import extract_barcodes_from_times, get_first_last_times, sync_timestamp_offline, convert_port_number
from scipy.io import savemat
import h5py
from .. import preproc
from pathlib import Path
from ..precondition import calc_ks_waveforms, filter_lfp, downsample, filter_spikes
from ..utils import extract_barcodes_from_times, get_first_last_times, sync_timestamp_offline, convert_port_number, multiply_mat_batch
from .. import data as aodata

[docs]def sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,inter_barcode_interval=20,bar_duration=0.02,verbose=False): ''' This function is specfic to synchronization between neuropixels and ecube. Args: raw_timestamp (nt) : raw timestamp that is not synchronized on_times_np (ndarray) : timings when sync line rises to 1 in neruopixel streams off_times_np (ndarray): timings when sync line returns to 0 in neruopixel streams on_times_ecube (ndarray): timings when sync line rises to 1 in ecube streams off_times_ecube (ndarray): timings when sync line returns to 0 in ecube streams bar_duration (float): duration of each bar that is sent to each stream verbose (bool): print barcode times and barcodes for each stream Returns: tuple: tuple containing: | **sync_timestamps (nt):** synchronized timestamps | **scaling (float):** scaling factor between streams ''' # Get each barcode timing and label barcode_ontimes_np,barcode_np = extract_barcodes_from_times(on_times_np,off_times_np,inter_barcode_interval=inter_barcode_interval,bar_duration=bar_duration) barcode_ontimes_ecube,barcode_ecube = extract_barcodes_from_times(on_times_ecube,off_times_ecube,inter_barcode_interval=inter_barcode_interval,bar_duration=bar_duration) # Get the first and last barcode timing in the recording at each stream first_last_times_np, first_last_times_ecube = get_first_last_times(barcode_ontimes_np,barcode_ontimes_ecube,barcode_np, barcode_ecube) if verbose: print(f'neuropixels barcode times: {first_last_times_np}, ecube barcode times{first_last_times_ecube}') return sync_timestamp_offline(raw_timestamp, first_last_times_np, first_last_times_ecube)
[docs]def classify_ks_unit(spike_times, spike_label): ''' Classify unit activity identified by kilosort into each single unit Args: spike_times (nspikes): spike times generated by kilosort spike_label (nspikes): cluster labels of each spike generated by kilosort Returns: spike_times_unit (dict): spike times for each unit ''' spike_times_unit = {} for unit_label in np.unique(spike_label): spike_times_unit[f'{unit_label}'] = spike_times[spike_label==unit_label.astype(int)] return spike_times_unit
[docs]def parse_ksdata_entries(kilosort_dir, concat_data_dir, port_number, kilosort_version='kilosort4'): ''' Parse concatenated data processed by kilosort into the task entries and save relevant data (spike_indices, spike_clusters, and ks_label) Args: kilosort_dir (str): kilosort directory (ex. '/data/preprocessed/kilosort') concat_data_dir (str): data directory that contains concatenated data and kilosort_output (ex. '2023-06-30_Neuropixel_ks_affi_bottom_port1') port_number (int): port number which a probe connected to. natural number from 1 to 4. kilosort_version (str, optional): kilosort version. This should be kilosort4 or kilosort2.5. Default is kilosort4 Returns: None ''' port_path = Path(kilosort_dir) / concat_data_dir / f'port{port_number}' if kilosort_version == 'kilosort4': ks_output_path = port_path / 'kilosort4' elif kilosort_version == 'kilosort2.5': ks_output_path = port_path / 'kilosort_output' else: raise ValueError('Wrong kilosort version. Choose kilosort4 or kilosort2.5') # Load kilosort data spike_indices = np.load(ks_output_path/'spike_times.npy') spike_clusters = np.load(ks_output_path/'spike_clusters.npy') # Load datasize of each entry and filename datasize_entry = np.load(port_path/'datasize_entry.npy') task_paths = np.load(port_path/'task_path.npy') nentry = datasize_entry.shape[0] datasize_entry = np.cumsum(datasize_entry) for idx in range(nentry): # Get spike indices included in a corresponding entry if idx == 0: ientry_idx = spike_indices < datasize_entry[idx] else: ientry_idx = (spike_indices >= datasize_entry[idx-1])&(spike_indices < datasize_entry[idx]) # Subtract previous entries' datasize so that the first index of each entry can become 0. spike_indices_entry = spike_indices[ientry_idx] if idx > 0: spike_indices_entry -= datasize_entry[idx-1] # Make new directory to save parsed data concat_id = concat_data_dir.split('_')[-1] task_entry_dir = task_paths[idx] + f'_{concat_id}' if kilosort_version == 'kilosort4': parsed_data_save_path = Path(kilosort_dir)/ task_entry_dir / f'port{port_number}' /'kilosort4' else: parsed_data_save_path = Path(kilosort_dir)/ task_entry_dir / f'port{port_number}' /'kilosort_output' if not os.path.exists(parsed_data_save_path): os.makedirs(parsed_data_save_path) # save data np.save(parsed_data_save_path/'spike_times.npy', spike_indices_entry) np.save(parsed_data_save_path/'spike_clusters.npy', spike_clusters[ientry_idx])
[docs]def concat_neuropixels(np_datadir, kilosort_dir, subject, te_ids, date, port_number, concat_number =1, max_memory_gb = 0.1): ''' Concatenate continuous.dat files of different sessions specified by multiple task ids (te_ids) The concatenated data is saved into a port folder of the kilosort preproc directory Args: np_datadir (str): neuropixel data directory where the data files are located (ex. '/data/raw/neuropixels') kilosort_dir (str): kilosort directory (ex. '/data/preprocessed/kilosort') subject (str): subject name te_ids (list): list of task ids to concatenate date (str): date of recording (ex. '2023-04-14') port_number (int): port number which a probe connected to. natural number from 1 to 4. concat_number (int): the nubmer of concatenation used to make a folder to save concatenated data max_memory_gb (float): max memory used to load binary data at one time Returns: None ''' # Extract directory names from task ids concat_path = [] for task_id in te_ids: task_path = f'{date}_Neuropixel_{subject}_te{task_id}' concat_path.append(task_path) assert len(concat_path) > 0, "No Data to Concatenate" dtype = 'int16' savedir_name = f'{date}_Neuropixel_{subject}_concat{concat_number}' # Directory name to save concatenated data savedir_path = Path(kilosort_dir) / savedir_name / f'port{port_number}' if not os.path.exists(savedir_path): os.makedirs(savedir_path) print('\n', 'Working in ', savedir_path) datasize_entry = [] for idx, np_recorddir in enumerate(concat_path): print(f'Processing {np_recorddir}') _,metadata = aodata.load_neuropixel_data(np_datadir, np_recorddir, 'ap', port_number = port_number) nch = metadata['num_channels'] probe_dir = convert_port_number(port_number) data_path = os.path.join(np_datadir, np_recorddir) continuous_data_path = glob.glob(os.path.join(data_path,f'**/*{probe_dir}/continuous.dat'),recursive=True)[0] data = np.memmap(continuous_data_path, dtype = dtype) datasize_entry.append(int(data.shape[0]/nch)) # Save data (concatentate data) chunksize = int(max_memory_gb * 1e9 / np.dtype(dtype).itemsize / nch) nchunk = int(np.ceil(data.shape[0]/chunksize)) for ichunk in range(nchunk): if ichunk != nchunk-1: data_reshape = data[ichunk*chunksize*nch:(ichunk+1)*chunksize*nch].reshape(-1,nch) else: data_reshape = data[ichunk*chunksize*nch:].reshape(-1,nch) if (idx == 0) & (ichunk == 0): save_filename = 'continuous.dat' f = open(os.path.join(savedir_path,save_filename), 'wb') # save data_reshape.tofile(f) f.close() else: f = open(os.path.join(savedir_path,save_filename), 'a') # append data_reshape.tofile(f) f.close() # Save channel position as mat file for kilosort pos = {'xpos': metadata['xpos'],'ypos':metadata['ypos']} savemat(os.path.join(savedir_path, 'channel_pos.mat'), pos) # Save datasize and filename of each entry to parse data after spike sorting np.save(os.path.join(savedir_path, 'datasize_entry'), np.array(datasize_entry)) np.save(os.path.join(savedir_path, 'task_path'), np.array(concat_path))
[docs]def sync_ts_data_timestamps(data, sync_timestamps): ''' Synchronize time series data by padding nan or cropping datapoints based on synchronized timestamps so that data would begin at 0 on the synchronized time axis Args: data (nt, nch): time series data to preprocess sync_timestamp (nt): synchronized timestamps Returns: sync_data (sync_nt, nch): synchronized time series data. The shape of data changes. ''' dt = sync_timestamps[1]-sync_timestamps[0] if data.ndim == 1: data = data[:,np.newaxis] nch = data.shape[1] # When the initial timestamp is more than 0, pad np.nan so that data could begin at time 0 if sync_timestamps[0]>=0: tmp = np.arange(sync_timestamps[0]-dt,0,-dt) padding_datapoints = tmp.shape[0] pad_to_data = np.zeros((padding_datapoints,nch))*np.nan sync_data = np.concatenate([pad_to_data, data],axis=0) sync_timestamps = np.concatenate([pad_to_data[:,0], sync_timestamps],axis=0) # When the initial timestamp is less than 0, crop the head of data so that data could begin at time 0 else: not_crop_datapoints = sync_timestamps >= 0 sync_data = data[not_crop_datapoints,:] sync_timestamps = sync_timestamps[not_crop_datapoints] return np.squeeze(sync_data), sync_timestamps
[docs]def destripe_lfp_batch(lfp_data, save_path, sample_rate, bit_volts, max_memory_gb = 1., dtype='int16', min_batch_size = 21): ''' Destripe LFP data in each batch to save memory. The result is saved in save_path. Args: lfp_data (nt, nch): LFP data. This should be a memory mapping array. save_path (str): file path to save destriped lfp data sample_rate (float): sampling rate in Hz bit_volts (float): volt per bit max_memory_gb (float): memory size in GB to determine batch size. default is 1.0 GB. dtype (str, optional): dtype for data. default is int16. min_batch_size (int): the number of size in integer to ensure that batch size is more than min_batch_size to run destripe_lfp Returns: None ''' # Load data and metadata n_samples, n_channels = lfp_data.shape # Create memmap array to save destriped lfp data lfp_destriped = np.memmap(save_path, dtype=dtype, mode='w+', shape=lfp_data.shape) # Destripe lfp batch_size = int (max_memory_gb*1e9 / (n_channels*np.dtype(type(bit_volts)).itemsize)) batch_size += min_batch_size # ensure that batch size is more than 21 to run destrip_lfp Nbatches = np.ceil(n_samples/batch_size).astype(int) for ibatch in range(Nbatches): tmp = destripe_lfp((lfp_data[ibatch*batch_size:(ibatch+1)*batch_size, :]*bit_volts).T/1e6, sample_rate)*1e6 lfp_destriped[ibatch*batch_size:(ibatch+1)*batch_size, :] = (tmp/bit_volts).T.astype(dtype) lfp_destriped.flush()
[docs]def proc_neuropixel_spikes(datadir,np_recorddir,ecube_files,kilosort_dir,port_number,result_filename,node_idx=0,ex_idx=0,version='kilosort4'): ''' Preprocess neuropixels data saved by openephys. Kilosort must be performed before this functions. Synchronize spikes detected by kilosort with ecube's timestamps. Extract waveforms from drift corrected ap band data. Args: datadir (str): data directory where the data files are located np_recorddir (str): data folder where neuropixels data for a specific task id is saved ecube_files (str): data folder where ecube data for a specific task id is saved kilosort_dir (str): data folder where preprocessed data of kilosort is saved port_number (int): port number which a probe is connected to. This number corresponds to drive number. result_filename (str): where to store the processed result node_idx (int, optional): record node index. this is usually 0. ex_idx (int, optional): experiment index. this is usually 0. version (str, optional): kilosort version, 'kilosort4' or 'kilosort2.5' is permitted Returns: tuple: tuple containing: | **spike_group (HDF5 group):** spike times for each drive saved like nested dictionary (ex. drive1/spikes) | **waveform_group (HDF5 group):** spike waveforms for each drive saved like a nested dictionary (ex. drive1/waveforms) | **metadata (dict):** dictionary of exp metadata ''' # Load data and metadata datatype = 'ap' data_object, metadata = aodata.load_neuropixel_data(datadir,np_recorddir,datatype=datatype,node_idx=node_idx,ex_idx=ex_idx,port_number=port_number) data = data_object.samples metadata['samplerate'] = int(metadata.pop('sample_rate')) # Syncronize timestamps in neuropixels to timestamps in ecube raw_timestamp = np.arange(data.shape[0])/metadata['samplerate'] on_times_np, off_times_np = aodata.get_neuropixel_digital_input_times(datadir,np_recorddir,datatype=datatype,port_number=port_number) on_times_ecube, off_times_ecube = aodata.get_ecube_digital_input_times(datadir,ecube_files,ch=-1) sync_timestamps,_ = preproc.sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185,verbose=True) # Path for the kilosort directory metadata['kilosort_version'] = version if version == 'kilosort4': kilosort_output_dir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}'/'kilosort4' elif version == 'kilosort2.5': kilosort_output_dir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}'/'kilosort_output' else: raise ValueError('Wrong kilosort version. Choose kilosort4 or kilosort2.5') # Synchronize spike times detected by kilosort with ecube timestamps try: spike_path = kilosort_output_dir/'spike_times.npy' spike_indices = np.load(spike_path) except: raise ValueError(f'Spike file {spike_path} is not found. Please run kilosort first.') spike_times = spike_indices/metadata['samplerate'] sync_spike_times, _ = preproc.sync_neuropixel_ecube(spike_times,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185) # Classify spikes into clusters spike_clusters = np.load(kilosort_output_dir/'spike_clusters.npy') sync_unit_times = preproc.classify_ks_unit(sync_spike_times, spike_clusters) # Extract spike waveforms from drift corrected ap band data unit_times = preproc.classify_ks_unit(spike_times, spike_clusters) # Must be not-sync data templates = np.load(kilosort_output_dir/'templates.npy') channel_pos = np.stack([metadata['xpos'],metadata['ypos']]).T if version == 'kilosort4': drift_corrected_data = np.memmap(kilosort_output_dir/'temp_wh.dat', dtype='int16', shape=data.shape) elif version == 'kilosort2.5': drift_corrected_data = np.memmap(kilosort_output_dir.parent/'temp_wh.dat', dtype='int16', shape=data.shape) unit_waveforms, _, unit_pos = calc_ks_waveforms(drift_corrected_data, metadata['samplerate'], unit_times, templates, channel_pos, waveforms_nch=1) # Save kilosort labels in metadata ks_label = np.genfromtxt(kilosort_output_dir/'cluster_KSLabel.tsv',skip_header=1,dtype=h5py.special_dtype(vlen=str)) ks_label = np.array(ks_label, dtype=h5py.special_dtype(vlen=str)) # Save synchonized spike times and waveforms into hdf file hdf = h5py.File(result_filename, 'a', track_order=True) spike_group = hdf.create_group(f'drive{port_number}/spikes', track_order=True) waveform_group = hdf.create_group(f'drive{port_number}/waveforms', track_order=True) for iunit in np.unique(spike_clusters): spike_group.create_dataset(f'{iunit}',data=sync_unit_times[f'{iunit}']) waveform_group.create_dataset(f'{iunit}',data=unit_waveforms[f'{iunit}']) hdf.close() # Organize metadata metadata['n_samples'] = data.shape[0] metadata['n_channels'] = metadata.pop('num_channels') metadata['microvoltsperbit'] = metadata.pop('bit_volts') metadata['voltsperbit'] = list(1e-6*np.array(metadata['microvoltsperbit'])) metadata['ap_samplerate'] = 1/(sync_timestamps[1]-sync_timestamps[0]) metadata['sync_timestamps'] = sync_timestamps metadata['unique_label'] = np.unique(spike_clusters) metadata['spike_pos'] = {unit_number: unit_pos[f'{unit_number}'].tolist() for unit_number in unit_pos.keys()} metadata['ks_label'] = ks_label return spike_group, waveform_group, metadata
[docs]def proc_neuropixel_ts(datadir,np_recorddir,ecube_files,kilosort_dir,datatype,port_number,result_filename,delete_file=True,node_idx=0,ex_idx=0,max_memory_gb=1., max_memory_gb_destriping=1.): ''' Prerocess neuropixels time-series data saved by openephys. To use this function when datatype is 'ap', kilosort must be applied before this function. lfp data is destriped, low-pass filtered with 500 Hz, and then downsampled to 1000 Hz For ap data, the drift corrected whitened data computed by kilosort is used The whitened data is multipled by inverse whitened matrix, band-pass filtered between 300-5000, then downsampled to 10k Hz. Both ap and lfp time-series data are synchronized with ecube timestamps. Args: datadir (str): data directory where the data files are located np_recorddir (str): data folder where neuropixels data for a specific task id is saved ecube_files (str): data folder where ecube data for a specific task id is saved kilosort_dir (str): data folder where preprocessed data of kilosort is saved datatype (str): data type. 'lfp' or 'ap' port_number (int): port number which a probe is connected to. natural number from 1 to 4. result_filename (str): where to store the processed result delete_file (bool): whether to delete the saved file created by destripe_lfp or multiply_mat_batch node_idx (int, optional): record node index. this is usually 0. ex_idx (int, optional): experiment index. this is usually 0. max_memory_gb (float, optional): max memory in performing destripe_lfp or multiply_mat_batch Returns: tuple: tuple containing: | **dset (dict):** time series data of preprocessed lfp or ap | **metadata (dict):** dictionary of exp metadata ''' if (datatype != 'ap')&(datatype != 'lfp'): raise ValueError(f"Unknown datatype {datatype}") # Load data and metadata data_object, metadata = aodata.load_neuropixel_data(datadir,np_recorddir,datatype=datatype,node_idx=node_idx,ex_idx=ex_idx,port_number=port_number) data = data_object.samples metadata['samplerate'] = int(metadata.pop('sample_rate')) metadata['microvoltsperbit'] = metadata.pop('bit_volts') metadata['voltsperbit'] = list(1e-6*np.array(metadata['microvoltsperbit'])) metadata['n_channels'] = metadata.pop('num_channels') if datatype == 'lfp': down_samplerate = 1000 elif datatype == 'ap': down_samplerate = 10000 downsample_factor = metadata['samplerate']/down_samplerate n_samples = int(np.ceil(data.shape[0]/downsample_factor)) # Syncronize downsampled timestamps in neuropixels with timestamps in ecube raw_timestamp = np.arange(n_samples)/down_samplerate # use downsampled sampling rate on_times_np, off_times_np = aodata.get_neuropixel_digital_input_times(datadir,np_recorddir,datatype=datatype,port_number=port_number) on_times_ecube, off_times_ecube = aodata.get_ecube_digital_input_times(datadir,ecube_files,ch=-1) sync_timestamps,_ = aodata.preproc.sync_neuropixel_ecube(raw_timestamp,on_times_np,off_times_np,on_times_ecube,off_times_ecube,bar_duration=0.0185,verbose=True) # Compute batch size to save memory channel_chunk_size = int (max_memory_gb*1e9 / (data.shape[0]*np.dtype(np.int16).itemsize)) Nbatches = np.ceil(metadata['n_channels']/channel_chunk_size).astype(int) kilosort_portdir = Path(kilosort_dir)/np_recorddir.split('/')[-1]/f'port{port_number}' hdf = h5py.File(result_filename, 'a') print('-'*50) if datatype == 'lfp': # Destripe LFP lfp_destriped_path = kilosort_portdir / 'lfp_destriped.dat' print(f'Destriping LFP data') destripe_lfp_batch(data, lfp_destriped_path, metadata['samplerate'], metadata['microvoltsperbit'][0], max_memory_gb=max_memory_gb_destriping) lfp_destriped = np.memmap(lfp_destriped_path, dtype=np.int16, shape=data.shape) # Process data in each channel to save memory print(f'Filtering and synchronizing LFP data') for ibatch in range(Nbatches): # filter_lfp applies both filtering and downsampling data_filt, _ = filter_lfp(lfp_destriped[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size], \ metadata['samplerate'], lfp_samplerate=down_samplerate, low_cut=500., buttord=4) sync_data, sync_timestamps_new = sync_ts_data_timestamps(data_filt, sync_timestamps) if ibatch == 0: dset = hdf.create_dataset(f'drive{port_number}/{datatype}_data', (sync_data.shape[0], metadata['n_channels']), dtype=np.int16) dset[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size] = np.squeeze(sync_data) # Delete files created by destripe_lfp_batch if delete_file: del lfp_destriped lfp_destriped_path.unlink() elif datatype == 'ap': # Multiply inverse whitening matrix to whitened spike band time series try: inverse_mat_path = kilosort_portdir / 'kilosort4' / 'whitening_mat_inv.npy' inverse_mat = np.load(inverse_mat_path) # this is an inverse of the whitening matrix used in kilosort temp_wh_path = kilosort_portdir / 'kilosort4' / 'temp_wh.dat' temp_wh = np.memmap(temp_wh_path, dtype=np.int16, mode='r', shape=data.shape) # this is the whitened time-series data except: raise ValueError(f'inv_matrix {inverse_mat_path} or temp_wh {temp_wh_path} is not found. Please run kilosort first.') inv_temp_wh_path = kilosort_portdir / 'inv_temp_wh.dat' print(f'Multiplyng inverse matrix by whitened ap data') multiply_mat_batch(temp_wh, inverse_mat, inv_temp_wh_path, scale = 1/200, max_memory_gb=max_memory_gb) inv_temp_wh = np.memmap(inv_temp_wh_path, dtype=np.int16, shape=data.shape) # Process data in each channel to save memory print(f'Filtering and synchronizing AP data') for ibatch in range(Nbatches): # filter_spikes only applies filtering, not downsampling tmp = filter_spikes(inv_temp_wh[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size], \ metadata['samplerate'], low_pass=300, high_pass=5000, buttord=4) data_filt = downsample(tmp, metadata['samplerate'], down_samplerate) sync_data, sync_timestamps_new = sync_ts_data_timestamps(data_filt, sync_timestamps) if ibatch == 0: dset = hdf.create_dataset(f'drive{port_number}/{datatype}_data', (sync_data.shape[0], metadata['n_channels']), dtype=np.int16) dset[:,ibatch*channel_chunk_size:(ibatch+1)*channel_chunk_size] = np.squeeze(sync_data) # Delete files created by multiply_mat_batch if delete_file: del inv_temp_wh inv_temp_wh_path.unlink() metadata['sync_timestamps'] = sync_timestamps_new hdf.close() # Organize metadata metadata['n_samples'] = sync_data.shape[0] # shchronization changes the shape of data metadata['down_samplerate'] = down_samplerate metadata[f'{datatype}_samplerate'] = 1/(sync_timestamps[1]-sync_timestamps[0]) return dset, metadata