Source code for nltools.file_reader

"""
NeuroLearn File Reading Tools
=============================

"""

__all__ = ["onsets_to_dm"]
__author__ = ["Eshin Jolly"]
__license__ = "MIT"

import pandas as pd
import numpy as np
from nltools.data import Design_Matrix
import warnings
from pathlib import Path


[docs]def onsets_to_dm( F, sampling_freq, run_length, header="infer", sort=False, keep_separate=True, add_poly=None, unique_cols=None, fill_na=None, **kwargs, ): """ This function can assist in reading in one or several in a 2-3 column onsets files, specified in seconds and converting it to a Design Matrix organized as samples X Stimulus Classes. sampling_freq should be specified in hertz; for TRs use hertz = 1/TR. Onsets files **must** be organized with columns in one of the following 4 formats: 1) 'Stim, Onset' 2) 'Onset, Stim' 3) 'Stim, Onset, Duration' 4) 'Onset, Duration, Stim' No other file organizations are currently supported. *Note:* Stimulus offsets (onset + duration) that fall into an adjacent TR include that full TR. E.g. offset of 10.16s with TR = 2 has an offset of TR 5, which spans 10-12s, rather than an offset of TR 4, which spans 8-10s. Args: F (str/Path/pd.DataFrame): filepath or pandas dataframe sampling_freq (float): samping frequency in hertz, i.e 1 / TR run_length (int): run length in number of TRs header (str/None, optional): whether there's an additional header row in the supplied file/dataframe. See `pd.read_csv` for more details. Defaults to `"infer"`. sort (bool, optional): whether to sort dataframe columns alphabetically. Defaults to False. keep_separate (bool, optional): if a list of files or dataframes is supplied, whether to create separate polynomial columns per file. Defaults to `True`. add_poly (bool/int, optional): whether to add Nth order polynomials to design matrix. Defaults to None. unique_cols (list/None, optional): if a list of files or dataframes is supplied, what additional columns to keep separate per file (e.g. spikes). Defaults to None. fill_na (Any, optional): what to replace NaNs with. Defaults to None (no filling). Returns: nltools.data.Design_Matrix: design matrix organized as TRs x Stims """ if not isinstance(F, list): F = [F] if not isinstance(sampling_freq, (float, np.floating)): raise TypeError("sampling_freq must be a float") out = [] TR = 1.0 / sampling_freq for f in F: if isinstance(f, str) or isinstance(f, Path): df = pd.read_csv(f, header=header, **kwargs) elif isinstance(f, pd.core.frame.DataFrame): df = f.copy() else: raise TypeError("Input needs to be file path or pandas dataframe!") # Keep an unaltered copy of the original dataframe for checking purposes below data = df.copy() if df.shape[1] == 2: warnings.warn( "Only 2 columns in file, assuming all stimuli are the same duration" ) elif df.shape[1] == 1 or df.shape[1] > 3: raise ValueError("Can only handle files with 2 or 3 columns!") # Try to infer the header if header is None: possibleHeaders = ["Stim", "Onset", "Duration"] if isinstance(df.iloc[0, 0], str): df.columns = possibleHeaders[: df.shape[1]] elif isinstance(df.iloc[0, df.shape[1] - 1], str): df.columns = possibleHeaders[1:] + [possibleHeaders[0]] else: raise ValueError( "Can't figure out onset file organization. Make sure file has no more than 3 columns specified as 'Stim,Onset,Duration' or 'Onset,Duration,Stim'" ) # Compute an offset in seconds if a Duration is provided if df.shape[1] == 3: df["Offset"] = df["Onset"] + df["Duration"] # Onset always starts at the closest TR rounded down, e.g. # with TR = 2, and onset = 10.1 or 11.7 will both have onset of TR 5 as it spans the window 10-12s df["Onset"] = df["Onset"].apply(lambda x: int(np.floor(x / TR))) # Offset includes the subsequent if Offset falls within window covered by that TR # but not if it falls exactly on the subsequent TR, e.g. if TR = 2, and offset = 10.16, then TR 5 will be included but if offset = 10.00, TR 5 will not be included, as it covers the window 10-12s if "Offset" in df.columns: def conditional_round(x, TR): """Conditional rounding to the next TR if offset falls within window, otherwise not""" dur_in_TRs = x / TR dur_in_TRs_rounded_down = np.floor(dur_in_TRs) # If in the future we wanted to enable the ability to include a TR based on a % of that TR we can change the next line to compare to some value, e.g. at least 0.5s into that TR: dur_in_TRs - dur_in_TRs_rounded_down > 0.5 if dur_in_TRs > dur_in_TRs_rounded_down: return dur_in_TRs_rounded_down else: return dur_in_TRs_rounded_down - 1 # Apply function df["Offset"] = df["Offset"].apply(conditional_round, args=(TR,)) # Build dummy codes X = Design_Matrix( np.zeros([run_length, df["Stim"].nunique()]), columns=df["Stim"].unique(), sampling_freq=sampling_freq, ) for i, row in df.iterrows(): if "Offset" in df.columns: X.loc[row["Onset"] : row["Offset"], row["Stim"]] = 1 else: X.loc[row["Onset"], row["Stim"]] = 1 # DISABLED cause this isn't quite accurate for stimuli of different durations # Run a check # if "Offset" in df.columns: # onsets = X.sum().values # stim_counts = data.Stim.value_counts(sort=False)[X.columns] # durations = data.groupby("Stim").Duration.mean().values # for i, (o, c, d) in enumerate(zip(onsets, stim_counts, durations)): # if c * (d / TR) <= o <= c * ((d / TR) + 1): # pass # else: # warnings.warn( # f"Computed onsets for {data.Stim.unique()[i]} are inconsistent ({o}) with expected values ({c * (d / TR)} to {c * ((d / TR) + 1)}). Please manually verify the outputted Design_Matrix!" # ) if sort: X = X.reindex(sorted(X.columns), axis=1) out.append(X) if len(out) > 1: if add_poly is not None: out = [e.add_poly(add_poly) for e in out] out_dm = out[0].append( out[1:], keep_separate=keep_separate, unique_cols=unique_cols, fill_na=fill_na, ) else: out_dm = out[0] if add_poly is not None: out_dm = out_dm.add_poly(add_poly) if fill_na is not None: out_dm = out_dm.fill_na(fill_na) return out_dm