Source code for nltools.data.brain_data

"""
NeuroLearn Brain Data
=====================

Classes to represent brain image data.

"""

# Notes:
# Need to figure out how to speed up loading and resampling of data

__author__ = ["Luke Chang"]
__license__ = "MIT"

from nilearn.signal import clean
from scipy.stats import ttest_1samp, pearsonr, spearmanr
from scipy.spatial.distance import cdist
from scipy.stats import t as t_dist
from scipy.signal import detrend
from scipy.interpolate import pchip
import os
import shutil
import nibabel as nib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tempfile
from copy import deepcopy
from sklearn.metrics import balanced_accuracy_score
from sklearn.metrics.pairwise import pairwise_distances, cosine_similarity
from sklearn.utils import check_random_state
from sklearn.preprocessing import scale
from pynv import Client
from joblib import Parallel, delayed
from nltools.mask import expand_mask
from nltools.analysis import Roc
from nilearn.maskers import NiftiMasker
from nilearn.plotting import plot_stat_map
from nilearn.image import smooth_img, resample_to_img
from nilearn.masking import intersect_masks
from nilearn.regions import connected_regions, connected_label_regions
from nltools.utils import (
    set_algorithm,
    attempt_to_import,
    concatenate,
    _bootstrap_apply_func,
    set_decomposition_algorithm,
    check_brain_data,
    check_brain_data_is_single,
    _roi_func,
    get_mni_from_img_resolution,
    to_h5,
)
from nltools.cross_validation import set_cv
from nltools.plotting import scatterplot, plot_interactive_brain, plot_brain
from nltools.stats import (
    pearson,
    fdr,
    holm_bonf,
    threshold,
    fisher_r_to_z,
    fisher_z_to_r,
    transform_pairwise,
    summarize_bootstrap,
    procrustes,
    find_spikes,
    regress_permutation,
)
from nltools.stats import regress as regression
from .adjacency import Adjacency
from nltools.prefs import MNI_Template, resolve_mni_path
from nilearn.decoding import SearchLight
from pathlib import Path
import warnings
from h5py import File as h5File


# Optional dependencies
nx = attempt_to_import("networkx", "nx")
tables = attempt_to_import("tables")
MAX_INT = np.iinfo(np.int32).max


[docs]class Brain_Data(object): """ Brain_Data is a class to represent neuroimaging data in python as a vector rather than a 3-dimensional matrix.This makes it easier to perform data manipulation and analyses. Args: data: nibabel data instance or list of files Y: Pandas DataFrame of training labels X: Pandas DataFrame Design Matrix for running univariate models mask: binary nifiti file to mask brain data **kwargs: Additional keyword arguments to pass to the prediction algorithm """ def __init__(self, data=None, Y=None, X=None, mask=None, **kwargs): # Flag to support hdf5 files saved using nltools <= 0.4.8 legacy_h5 = kwargs.pop("legacy_h5", False) self._h5_compression = kwargs.pop("h5_compression", "gzip") # Setup default or specified nifti masker if mask is None: # Load default mask self.mask = nib.load(resolve_mni_path(MNI_Template)["mask"]) elif isinstance(mask, (str, Path)): self.mask = nib.load(str(mask)) elif isinstance(mask, nib.Nifti1Image): self.mask = mask else: raise TypeError("mask is not a nibabel instance or a " "valid file name") # Learn transformation on mask self.nifti_masker = NiftiMasker(mask_img=self.mask, **kwargs) # Setup data if data is None: self.data = np.array([]) # Filepath or URL elif isinstance(data, (str, Path)): # Convert Path objects to string; converting string is a no-op so this won't affect real strings to_load = str(data) # Web download if "http://" in to_load or "https://" in to_load: from nltools.datasets import download_nifti tmp_dir = os.path.join(tempfile.gettempdir(), str(os.times()[-1])) os.makedirs(tmp_dir) data = nib.load(download_nifti(to_load, data_dir=tmp_dir)) # HDF5 elif (".h5" in to_load) or (".hdf5" in to_load): # <= 0.4.8 compatibility if legacy_h5: with tables.open_file(to_load, mode="r") as f: # Setup data self.data = np.array(f.root["data"]) # Setup X if len(list(f.root["X_columns"])): self.X = pd.DataFrame( np.array(f.root["X"]).squeeze(), columns=[ e.decode("utf-8") if isinstance(e, bytes) else e for e in np.array(f.root["X_columns"]) ], index=[ e.decode("utf-8") if isinstance(e, bytes) else e for e in np.array(f.root["X_index"]) ], ) else: self.X = pd.DataFrame() # Setup Y if len(list(f.root["Y_columns"])): self.Y = pd.DataFrame( np.array(f.root["Y"]).squeeze(), columns=[ e.decode("utf-8") if isinstance(e, bytes) else e for e in np.array(f.root["Y_columns"]) ], index=[ e.decode("utf-8") if isinstance(e, bytes) else e for e in np.array(f.root["Y_index"]) ], ) else: self.Y = pd.DataFrame() # Setup mask if mask is None: # User didn't request a mask so try to load it from the h5 file, # i.e. overwrite the default mask loaded above filename = ( f.root["mask_file_name"] if "mask_file_name" in f.root else self.mask.get_filename() ) self.mask = nib.Nifti1Image( np.array(f.root["mask_data"]), affine=np.array(f.root["mask_affine"]), file_map={"image": nib.FileHolder(filename=filename)}, ) nifti_masker = NiftiMasker(self.mask) self.nifti_masker = nifti_masker.fit(self.mask) else: # Mask is already set above so use the default or user requested # mask rather than the one in h5 (if it exists) if "mask_data" in f.root: warnings.warn( "Existing mask found in HDF5 file but is being ignored because you passed a value for mask. Set mask=None to use existing mask in the HDF5 file" ) # We're done initializing return else: # Load X and Y dataframes with pd.HDFStore(to_load, "r") as f: self.X = f["X"] self.Y = f["Y"] # Load data and masker stuffs with h5File(to_load, "r") as f: self.data = np.array(f["data"]) if mask is None: # User didn't request a mask so try to load it from the h5 file, # i.e. overwrite the default mask loaded above self.mask = nib.Nifti1Image( np.array(f["mask_data"]), affine=np.array(f["mask_affine"]), file_map={ "image": nib.FileHolder( filename=f["mask_file_name"][()].decode() ) }, ) nifti_masker = NiftiMasker(self.mask) self.nifti_masker = nifti_masker.fit(self.mask) else: # Mask is already set above so use the default or user requested # mask rather than the one in h5 (if it exists) if "mask_data" in f: warnings.warn( "Existing mask found in HDF5 file but is being ignored because you passed a value for mask. Set mask=None to use existing mask in the HDF5 file" ) # We're done initializing return # .nii/.nii.gz file else: data = nib.load(data) self.data = self.nifti_masker.fit_transform(data) # Nibabel/Nilearn object elif isinstance(data, nib.Nifti1Image): self.data = np.array(self.nifti_masker.fit_transform(data)) # List of brain data objects or string filenames elif isinstance(data, list): # Brain datas if isinstance(data[0], Brain_Data): tmp = concatenate(data) for item in ["data", "Y", "X", "mask", "nifti_masker"]: setattr(self, item, getattr(tmp, item)) # File paths or niftis # NOTE: We don't support list of hdf5 filepaths! Only .nii/.nii.gz else: if all(isinstance(x, data[0].__class__) for x in data): self.data = [] for i in data: # Filepath if isinstance(i, (str, Path)): to_load = str(i) self.data.append( self.nifti_masker.fit_transform(nib.load(to_load)) ) # Loaded nifti object elif isinstance(i, nib.Nifti1Image): self.data.append(self.nifti_masker.fit_transform(i)) self.data = np.concatenate(self.data) else: raise ValueError( "Make sure all objects in the list are the same type." ) else: raise TypeError( "data must be a single or list of: Brain_Data, filepath, or nib.Nifti1Image" ) # Collapse any extra data dimension if 1 in self.data.shape: self.data = self.data.squeeze() # Setup Y dataframe if Y is None: self.Y = pd.DataFrame() elif isinstance(Y, (str, Path)): self.Y = pd.read_csv(Y, header=None, index_col=None) elif isinstance(Y, pd.DataFrame): self.Y = Y else: raise TypeError("Make sure Y filepath or pandas data frame.") # Setup X dataframe if X is None: self.X = pd.DataFrame() elif isinstance(X, (str, Path)): self.X = pd.read_csv(X, header=None, index_col=None) elif isinstance(X, pd.DataFrame): self.X = X else: raise TypeError("Make sure X filepath or pandas data frame.") # Ensure consistency if not self.Y.empty and self.data.shape[0] != self.Y.shape[0]: raise ValueError( f"Y rows ({self.Y.shape[0]}) do not match data rows ({self.data.shape[0]})" ) if not self.X.empty and self.data.shape[0] != self.X.shape[0]: raise ValueError( f"X rows ({self.X.shape[0]}) do not match data rows ({self.data.shape[0]})" ) def __repr__(self): return "%s.%s(data=%s, Y=%s, X=%s, mask=%s)" % ( self.__class__.__module__, self.__class__.__name__, self.shape(), self.Y.shape, self.X.shape, os.path.basename(self.mask.get_filename()), ) def __getitem__(self, index): new = deepcopy(self) if isinstance(index, (int, np.integer)): new.data = np.array(self.data[index, :]).squeeze() else: if isinstance(index, slice): new.data = self.data[index, :] else: index = np.array(index).flatten() new.data = np.array(self.data[index, :]).squeeze() if not self.Y.empty: new.Y = self.Y.iloc[index] if isinstance(new.Y, pd.Series): new.Y.reset_index(inplace=True, drop=True) if not self.X.empty: new.X = self.X.iloc[index] if len(new.X) > 1: new.X.reset_index(inplace=True, drop=True) return new def __setitem__(self, index, value): if not isinstance(value, Brain_Data): raise ValueError( "Make sure the value you are trying to set is a " "Brain_Data() instance." ) self.data[index, :] = value.data if not value.Y.empty: self.Y.values[index] = value.Y if not value.X.empty: if self.X.shape[1] != value.X.shape[1]: raise ValueError("Make sure self.X is the same size as " "value.X.") self.X.values[index] = value.X def __len__(self): return self.shape()[0] def __add__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = new.data + y elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = new.data + y.data else: raise ValueError("Can only add int, float, or Brain_Data") return new def __radd__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = y + new.data elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = y.data + new.data else: raise ValueError("Can only add int, float, or Brain_Data") return new def __sub__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = new.data - y elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = new.data - y.data else: raise ValueError("Can only add int, float, or Brain_Data") return new def __rsub__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = y - new.data elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = y.data - new.data else: raise ValueError("Can only add int, float, or Brain_Data") return new def __mul__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = new.data * y elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = np.multiply(new.data, y.data) elif isinstance(y, (list, np.ndarray)): if len(y) != len(self): raise ValueError( "Vector multiplication requires that the " "length of the vector match the number of " "images in Brain_Data instance." ) else: new.data = np.dot(new.data.T, y).T else: raise ValueError("Can only multiply int, float, list, or Brain_Data") return new def __rmul__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = y * new.data elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = np.multiply(y.data, new.data) else: raise ValueError("Can only multiply int, float, or Brain_Data") return new def __truediv__(self, y): new = deepcopy(self) if isinstance(y, (int, np.integer, float, np.floating)): new.data = new.data / y elif isinstance(y, Brain_Data): if self.shape() != y.shape(): raise ValueError( "Both Brain_Data() instances need to be the " "same shape." ) new.data = np.divide(new.data, y.data) else: raise ValueError("Can only divide int, float, list, or Brain_Data") return new def __iter__(self): for x in range(len(self)): yield self[x]
[docs] def shape(self): """Get images by voxels shape.""" return self.data.shape
[docs] def mean(self, axis=0): """Get mean of each voxel or image Args: axis: (int) across images=0 (default), within images=1 Returns: out: (float/np.array/Brain_Data) """ out = deepcopy(self) if check_brain_data_is_single(self): out = np.mean(self.data) else: if axis == 0: out.data = np.mean(self.data, axis=0) out.X = pd.DataFrame() out.Y = pd.DataFrame() elif axis == 1: out = np.mean(self.data, axis=1) else: raise ValueError("axis must be 0 or 1") return out
[docs] def median(self, axis=0): """Get median of each voxel or image Args: axis: (int) across images=0 (default), within images=1 Returns: out: (float/np.array/Brain_Data) """ out = deepcopy(self) if check_brain_data_is_single(self): out = np.median(self.data) else: if axis == 0: out.data = np.median(self.data, axis=0) out.X = pd.DataFrame() out.Y = pd.DataFrame() elif axis == 1: out = np.median(self.data, axis=1) else: raise ValueError("axis must be 0 or 1") return out
[docs] def std(self, axis=0): """Get standard deviation of each voxel or image. Args: axis: (int) across images=0 (default), within images=1 Returns: out: (float/np.array/Brain_Data) """ out = deepcopy(self) if check_brain_data_is_single(self): out = np.std(self.data) else: if axis == 0: out.data = np.std(self.data, axis=0) out.X = pd.DataFrame() out.Y = pd.DataFrame() elif axis == 1: out = np.std(self.data, axis=1) else: raise ValueError("axis must be 0 or 1") return out
[docs] def sum(self): """Sum over voxels.""" out = deepcopy(self) if len(self.shape()) > 1: out.data = np.sum(out.data, axis=0) out.X = pd.DataFrame() out.Y = pd.DataFrame() else: out = np.sum(self.data) return out
[docs] def to_nifti(self): """Convert Brain_Data Instance into Nifti Object""" return self.nifti_masker.inverse_transform(self.data)
[docs] def write(self, file_name): """Write out Brain_Data object to Nifti or HDF5 File. Args: file_name: (str) name of nifti file including path """ if isinstance(file_name, Path): file_name = str(file_name) if (".h5" in file_name) or (".hdf5" in file_name): to_h5( self, file_name, obj_type="brain_data", h5_compression=self._h5_compression, ) else: self.to_nifti().to_filename(file_name)
[docs] def scale(self, scale_val=100.0): """Scale all values such that they are on the range [0, scale_val], via grand-mean scaling. This is NOT global-scaling/intensity normalization. This is useful for ensuring that data is on a common scale (e.g. good for multiple runs, participants, etc) and if the default value of 100 is used, can be interpreted as something akin to (but not exactly) "percent signal change." This is consistent with default behavior in AFNI and SPM. Change this value to 10000 to make consistent with FSL. Args: scale_val: (int/float) what value to send the grand-mean to; default 100 """ out = deepcopy(self) out.data = out.data / out.data.mean() * scale_val return out
[docs] def plot( self, limit=5, anatomical=None, view="axial", colorbar=False, black_bg=True, draw_cross=False, threshold_upper=None, threshold_lower=None, figsize=(15, 2), axes=None, **kwargs, ): """Create a quick plot of self.data. Will plot each image separately Args: limit: (int) max number of images to return anatomical: (nifti, str) nifti image or file name to overlay view: (str) 'axial' for limit number of axial slices; 'glass' for ortho-view glass brain; 'mni' for multi-slice view mni brain; 'full' for both glass and mni views threshold_upper: (str/float) threshold if view is 'glass', 'mni', or 'full' threshold_lower: (str/float)threshold if view is 'glass', 'mni', or 'full' save: (str/bool): optional string file name or path for saving; only applies if view is 'mni', 'glass', or 'full'. Filenames will appended with the orientation they belong to """ if view == "axial": if threshold_upper is not None or threshold_lower is not None: print("threshold is ignored for simple axial plots") if anatomical is not None: if not isinstance(anatomical, nib.Nifti1Image): if isinstance(anatomical, str) or isinstance(anatomical, str): anatomical = nib.load(anatomical) else: raise ValueError("anatomical is not a nibabel instance") else: # anatomical = nib.load(resolve_mni_path(MNI_Template)['plot']) anatomical = get_mni_from_img_resolution(self, img_type="plot") if self.data.ndim == 1: if axes is None: _, axes = plt.subplots(nrows=1, figsize=figsize) plot_stat_map( self.to_nifti(), anatomical, cut_coords=range(-40, 60, 10), display_mode="z", black_bg=black_bg, colorbar=colorbar, draw_cross=draw_cross, axes=axes, **kwargs, ) else: if axes is not None: print("axes is ignored when plotting multiple images") n_subs = np.minimum(self.data.shape[0], limit) _, a = plt.subplots( nrows=n_subs, figsize=(figsize[0], len(self) * figsize[1]) ) for i in range(n_subs): plot_stat_map( self[i].to_nifti(), anatomical, cut_coords=range(-40, 60, 10), display_mode="z", black_bg=black_bg, colorbar=colorbar, draw_cross=draw_cross, axes=a[i], **kwargs, ) return elif view in ["glass", "mni", "full"]: if self.data.ndim == 1: return plot_brain( self, how=view, thr_upper=threshold_upper, thr_lower=threshold_lower, **kwargs, ) else: raise ValueError( "Plotting in 'glass', 'mni', or 'full' views only works with a 3D image" ) else: raise ValueError("view must be one of: 'axial', 'glass', 'mni', 'full'.")
[docs] def iplot(self, threshold=0, surface=False, anatomical=None, **kwargs): """Create an interactive brain viewer for the current brain data instance. Args: threshold: (float/str) two-sided threshold to initialize the visualization, maybe be a percentile string; default 0 surface: (bool) whether to create a surface-based plot; default False anatomical: nifti image or filename to overlay kwargs: optional arguments to nilearn.view_img or nilearn.view_img_on_surf Returns: interactive brain viewer widget """ if anatomical is not None: if not isinstance(anatomical, nib.Nifti1Image): if isinstance(anatomical, str) or isinstance(anatomical, Path): anatomical = nib.load(anatomical) else: raise ValueError("anatomical is not a nibabel instance") else: # anatomical = nib.load(resolve_mni_path(MNI_Template)['brain']) anatomical = get_mni_from_img_resolution(self, img_type="brain") return plot_interactive_brain( self, threshold=threshold, surface=surface, anatomical=anatomical, **kwargs )
[docs] def regress(self, mode="ols", **kwargs): """Run a mass-univariate regression across voxels. Three types of regressions can be run: 1) Standard OLS (default) 2) Robust OLS (heteroscedasticty and/or auto-correlation robust errors), i.e. OLS with "sandwich estimators" 3) ARMA (auto-regressive and moving-average lags = 1 by default; experimental) For more information see the help for nltools.stats.regress ARMA notes: This experimental mode is similar to AFNI's 3dREMLFit but without spatial smoothing of voxel auto-correlation estimates. It can be **very computationally intensive** so parallelization is used by default to try to speed things up. Speed is limited because a unique ARMA model is fit to *each voxel* (like AFNI/FSL), but unlike SPM, which assumes the same AR parameters (~0.2) at each voxel. While coefficient results are typically very similar to OLS, std-errors and so t-stats, dfs and and p-vals can differ greatly depending on how much auto-correlation is explaining the response in a voxel relative to other regressors in the design matrix. Args: mode (str): kind of model to fit; must be one of 'ols' (default), 'robust', or 'arma' kwargs (dict): keyword arguments to nltools.stats.regress Returns: out: dictionary of regression statistics in Brain_Data instances {'beta','t','p','df','residual'} """ if not isinstance(self.X, pd.DataFrame): raise ValueError("Make sure self.X is a pandas DataFrame.") if self.X.empty: raise ValueError("Make sure self.X is not empty.") if self.data.shape[0] != self.X.shape[0]: raise ValueError("self.X does not match the correct size of " "self.data") b, se, t, p, df, res = regression(self.X, self.data, mode=mode, **kwargs) # Prevent copy of all data in self multiple times; instead start with an empty instance and copy only needed attributes from self, and use this as a template for other outputs b_out = self.__class__() b_out.mask = deepcopy(self.mask) b_out.nifti_masker = deepcopy(self.nifti_masker) # Use this as template for other outputs before setting data se_out = b_out.copy() t_out = b_out.copy() p_out = b_out.copy() df_out = b_out.copy() res_out = b_out.copy() ( b_out.data, se_out.data, t_out.data, p_out.data, df_out.data, res_out.data, ) = ( b, se, t, p, df, res, ) return { "beta": b_out, "t": t_out, "p": p_out, "df": df_out, "sigma": se_out, "residual": res_out, }
[docs] def randomise( self, n_permute=5000, threshold_dict=None, return_mask=False, **kwargs ): """ Run mass-univariate regression at each voxel with inference performed via permutation testing ala randomise in FSL. Operates just like .regress(), but intended to be used for second-level analyses. Args: n_permute (int): number of permutations threshold_dict: (dict) a dictionary of threshold parameters {'unc':.001} or {'fdr':.05} return_mask: (bool) optionally return the thresholding mask Returns: out: dictionary of maps for betas, tstats, and pvalues """ if not isinstance(self.X, pd.DataFrame): raise ValueError("Make sure self.X is a pandas DataFrame.") if self.X.empty: raise ValueError("Make sure self.X is not empty.") if self.data.shape[0] != self.X.shape[0]: raise ValueError("self.X does not match the correct size of " "self.data") b, t, p = regress_permutation(self.X, self.data, n_permute=n_permute, **kwargs) # Prevent copy of all data in self multiple times; instead start with an empty instance and copy only needed attributes from self, and use this as a template for other outputs b_out = self.__class__() b_out.mask = deepcopy(self.mask) b_out.nifti_masker = deepcopy(self.nifti_masker) # Use this as template for other outputs before setting data t_out = b_out.copy() p_out = b_out.copy() b_out.data, t_out.data, p_out.data = (b, t, p) if threshold_dict is not None: if isinstance(threshold_dict, dict): if "unc" in threshold_dict: thr = threshold_dict["unc"] elif "fdr" in threshold_dict: thr = fdr(p_out.data, q=threshold_dict["fdr"]) elif "holm-bof" in threshold_dict: thr = holm_bonf(p.data, alpha=threshold_dict["holm-bonf"]) elif "permutation" in threshold_dict: thr = 0.05 if return_mask: thr_t_out, thr_mask = threshold(t_out, p_out, thr, True) out = { "beta": b_out, "t": t_out, "p": p_out, "thr_t": thr_t_out, "thr_mask": thr_mask, } else: thr_t_out = threshold(t_out, p_out, thr) out = {"beta": b_out, "t": t_out, "p": p_out, "thr_t": thr_t_out} else: raise ValueError( "threshold_dict is not a dictionary. " "Make sure it is in the form of {'unc': .001} " "or {'fdr': .05}" ) else: out = {"beta": b_out, "t": t_out, "p": p_out} return out
[docs] def ttest(self, threshold_dict=None, return_mask=False): """Calculate one sample t-test across each voxel (two-sided) Args: threshold_dict: (dict) a dictionary of threshold parameters {'unc':.001} or {'fdr':.05} return_mask: (bool) if thresholding is requested, optionall return the mask of voxels that exceed threshold, e.g. for use with another map Returns: out: (dict) dictionary of regression statistics in Brain_Data instances {'t','p'} """ t = deepcopy(self) p = deepcopy(self) t.data, p.data = ttest_1samp(self.data, 0, 0) if threshold_dict is not None: if isinstance(threshold_dict, dict): if "unc" in threshold_dict: thr = threshold_dict["unc"] elif "fdr" in threshold_dict: thr = fdr(p.data, q=threshold_dict["fdr"]) elif "holm-bonf" in threshold_dict: thr = holm_bonf(p.data, alpha=threshold_dict["holm-bonf"]) if return_mask: thr_t, thr_mask = threshold(t, p, thr, True) out = {"t": t, "p": p, "thr_t": thr_t, "thr_mask": thr_mask} else: thr_t = threshold(t, p, thr) out = {"t": t, "p": p, "thr_t": thr_t} else: raise ValueError( "threshold_dict is not a dictionary. " "Make sure it is in the form of {'unc': .001} " "or {'fdr': .05}" ) else: out = {"t": t, "p": p} return out
[docs] def append(self, data, **kwargs): """Append data to Brain_Data instance Args: data: (Brain_Data) Brain_Data instance to append kwargs: optional inputs to Design_Matrix append Returns: out: (Brain_Data) new appended Brain_Data instance """ data = check_brain_data(data) if self.isempty(): out = deepcopy(data) else: error_string = ( "Data to append has different number of voxels " "then Brain_Data instance." ) if len(self.shape()) == 1 & len(data.shape()) == 1: if self.shape()[0] != data.shape()[0]: raise ValueError(error_string) elif len(self.shape()) == 1 & len(data.shape()) > 1: if self.shape()[0] != data.shape()[1]: raise ValueError(error_string) elif len(self.shape()) > 1 & len(data.shape()) == 1: if self.shape()[1] != data.shape()[0]: raise ValueError(error_string) elif self.shape()[1] != data.shape()[1]: raise ValueError(error_string) out = deepcopy(self) out.data = np.vstack([self.data, data.data]) if out.Y.size: out.Y = pd.concat([self.Y, data.Y]) if self.X.size: if isinstance(self.X, pd.DataFrame): out.X = pd.concat([self.X, data.X], **kwargs) else: out.X = np.vstack([self.X, data.X]) return out
[docs] def empty(self, data=True, Y=True, X=True): """Initalize Brain_Data.data as empty""" tmp = deepcopy(self) if data: tmp.data = np.array([]) if Y: tmp.Y = pd.DataFrame() if X: tmp.X = pd.DataFrame() return tmp
[docs] def isempty(self): """Check if Brain_Data.data is empty""" if isinstance(self.data, np.ndarray): boolean = False if self.data.size else True if isinstance(self.data, list): boolean = True if not self.data else False return boolean
[docs] def similarity(self, image, method="correlation"): """Calculate similarity of Brain_Data() instance with single Brain_Data or Nibabel image Args: image: (Brain_Data, nifti) image to evaluate similarity method: (str) Type of similarity ['correlation','dot_product','cosine'] Returns: pexp: (list) Outputs a vector of pattern expression values """ supported_metrics = [ "correlation", "pearson", "rank_correlation", "spearman", "dot_product", "cosine", ] if method not in supported_metrics: raise ValueError(f"method must be one of {supported_metrics}") image = check_brain_data(image) # Check to make sure masks are the same for each dataset and if not # create a union mask # This might be handy code for a new Brain_Data method if np.sum(self.nifti_masker.mask_img.get_fdata() == 1) != np.sum( image.nifti_masker.mask_img.get_fdata() == 1 ): new_mask = intersect_masks( [self.nifti_masker.mask_img, image.nifti_masker.mask_img], threshold=1, connected=False, ) new_nifti_masker = NiftiMasker(mask_img=new_mask) data2 = new_nifti_masker.fit_transform(self.to_nifti()) image2 = new_nifti_masker.fit_transform(image.to_nifti()) else: data2 = self.data image2 = image.data if method == "dot_product": func = lambda x, y: np.dot(x, y) elif method in ["pearson", "correlation"]: func = lambda x, y: pearsonr(x, y)[0] elif method in ["spearman", "rank_correlation"]: func = lambda x, y: spearmanr(x, y)[0] elif method == "cosine": func = method out = cdist(np.atleast_2d(data2), np.atleast_2d(image2), func).squeeze() # cdist metric argument returns distances by default (unless we specific a # custom function like above) so flip it to similarity out = 1 - out if method == "cosine" else out return out
[docs] def distance(self, metric="euclidean", **kwargs): """Calculate distance between images within a Brain_Data() instance. Args: metric: (str) type of distance metric (can use any scikit learn or sciypy metric) Returns: dist: (Adjacency) Outputs a 2D distance matrix. """ return Adjacency( pairwise_distances(self.data, metric=metric, **kwargs), matrix_type="Distance", )
[docs] def multivariate_similarity(self, images, method="ols"): """Predict spatial distribution of Brain_Data() instance from linear combination of other Brain_Data() instances or Nibabel images Args: self: Brain_Data instance of data to be applied images: Brain_Data instance of weight map Returns: out: dictionary of regression statistics in Brain_Data instances {'beta','t','p','df','residual'} """ # Notes: Should add ridge, and lasso, elastic net options options if len(self.shape()) > 1: raise ValueError("This method can only decompose a single brain " "image.") images = check_brain_data(images) # Check to make sure masks are the same for each dataset and if not create a union mask # This might be handy code for a new Brain_Data method if np.sum(self.nifti_masker.mask_img.get_fdata() == 1) != np.sum( images.nifti_masker.mask_img.get_fdata() == 1 ): new_mask = intersect_masks( [self.nifti_masker.mask_img, images.nifti_masker.mask_img], threshold=1, connected=False, ) new_nifti_masker = NiftiMasker(mask_img=new_mask) data2 = new_nifti_masker.fit_transform(self.to_nifti()) image2 = new_nifti_masker.fit_transform(images.to_nifti()) else: data2 = self.data image2 = images.data # Add intercept and transpose image2 = np.vstack((np.ones(image2.shape[1]), image2)).T # Calculate pattern expression if method == "ols": b = np.dot(np.linalg.pinv(image2), data2) res = data2 - np.dot(image2, b) sigma = np.std(res, axis=0) stderr = np.dot( np.matrix( np.diagonal(np.linalg.inv(np.dot(image2.T, image2))) ** 0.5 ).T, np.matrix(sigma), ) t_out = b / stderr df = image2.shape[0] - image2.shape[1] p = 2 * (1 - t_dist.cdf(np.abs(t_out), df)) else: raise NotImplementedError return { "beta": b, "t": t_out, "p": p, "df": df, "sigma": sigma, "residual": res, }
[docs] def predict(self, algorithm=None, cv_dict=None, plot=True, verbose=True, **kwargs): """Run prediction Args: algorithm: Algorithm to use for prediction. Must be one of 'svm', 'svr', 'linear', 'logistic', 'lasso', 'ridge', 'ridgeClassifier','pcr', or 'lassopcr' cv_dict: Type of cross_validation to use. A dictionary of {'type': 'kfolds', 'n_folds': n}, {'type': 'kfolds', 'n_folds': n, 'stratified': Y}, {'type': 'kfolds', 'n_folds': n, 'subject_id': holdout}, or {'type': 'loso', 'subject_id': holdout} where 'n' = number of folds, and 'holdout' = vector of subject ids that corresponds to self.Y plot: Boolean indicating whether or not to create plots. verbose (bool): print performance; Default True **kwargs: Additional keyword arguments to pass to the prediction algorithm Returns: output: a dictionary of prediction parameters """ # Set algorithm if algorithm is not None: predictor_settings = set_algorithm(algorithm, **kwargs) else: # Use SVR as a default predictor_settings = set_algorithm("svr", **{"kernel": "linear"}) # Initialize output dictionary output = {"Y": np.array(self.Y).flatten()} predictor = predictor_settings["predictor"] # Overall Fit for weight map predictor.fit(self.data, np.ravel(output["Y"])) output["yfit_all"] = predictor.predict(self.data) if predictor_settings["prediction_type"] == "classification": if predictor_settings["algorithm"] not in [ "svm", "ridgeClassifier", "ridgeClassifierCV", ]: output["prob_all"] = predictor.predict_proba(self.data) else: output["dist_from_hyperplane_all"] = predictor.decision_function( self.data ) if predictor_settings["algorithm"] == "svm" and predictor.probability: output["prob_all"] = predictor.predict_proba(self.data) # Intercept if predictor_settings["algorithm"] == "pcr": output["intercept"] = predictor_settings["_regress"].intercept_ elif predictor_settings["algorithm"] == "lassopcr": output["intercept"] = predictor_settings["_lasso"].intercept_ else: output["intercept"] = predictor.intercept_ # Weight map output["weight_map"] = self.empty() if predictor_settings["algorithm"] == "lassopcr": output["weight_map"].data = np.dot( predictor_settings["_pca"].components_.T, predictor_settings["_lasso"].coef_, ) elif predictor_settings["algorithm"] == "pcr": output["weight_map"].data = np.dot( predictor_settings["_pca"].components_.T, predictor_settings["_regress"].coef_, ) else: output["weight_map"].data = predictor.coef_.squeeze() # Cross-Validation Fit from sklearn.base import clone if cv_dict is not None: cv = set_cv(Y=self.Y, cv_dict=cv_dict) predictor_cv = predictor_settings["predictor"] output["yfit_xval"] = output["yfit_all"].copy() output["intercept_xval"] = [] # Multi-class classification, init weightmaps as list if (predictor_settings["prediction_type"] == "classification") and ( len(np.unique(self.Y)) > 2 ): output["weight_map_xval"] = [] else: # Otherwise we'll have a single weightmap output["weight_map_xval"] = output["weight_map"].copy() output["cv_idx"] = [] wt_map_xval = [] # Initialize zero'd arrays that will be filled during cross-validation and fitting # These will need change shape if doing multi-class or probablistic predictions if (predictor_settings["algorithm"] == "logistic") or ( predictor_settings["algorithm"] == "svm" and predictor.probability ): # If logistic or svm prob, probs == number of classes probs_init = np.zeros((len(self.Y), len(np.unique(self.Y)))) # however if num classes == 2 decision function == 1, but if num class > 2, decision function == num classes (sklearn weirdness) if len(np.unique(self.Y)) == 2: dec_init = np.zeros(len(self.Y)) else: dec_init = np.zeros((len(self.Y), len(np.unique(self.Y)))) # else: # # if len(np.unique(self.Y)) == 2: # dec_init = np.zeros(len(self.Y)) # else: # dec_init = np.zeros((len(self.Y), len(np.unique(self.Y)))) if predictor_settings["prediction_type"] == "classification": if predictor_settings["algorithm"] not in [ "svm", "ridgeClassifier", "ridgeClassifierCV", ]: output["prob_xval"] = probs_init else: output["dist_from_hyperplane_xval"] = dec_init if ( predictor_settings["algorithm"] == "svm" and predictor_cv.probability ): output["prob_xval"] = probs_init for train, test in cv: # Ensure estimators are always indepedent across folds predictor_cv = clone(predictor_settings["predictor"]) predictor_cv.fit(self.data[train], np.ravel(self.Y.iloc[train])) output["yfit_xval"][test] = predictor_cv.predict( self.data[test] ).ravel() if predictor_settings["prediction_type"] == "classification": if predictor_settings["algorithm"] not in [ "svm", "ridgeClassifier", "ridgeClassifierCV", ]: output["prob_xval"][test] = predictor_cv.predict_proba( self.data[test] ) else: output["dist_from_hyperplane_xval"][ test ] = predictor_cv.decision_function(self.data[test]) if ( predictor_settings["algorithm"] == "svm" and predictor_cv.probability ): output["prob_xval"][test] = predictor_cv.predict_proba( self.data[test] ) # Intercept if predictor_settings["algorithm"] == "pcr": output["intercept_xval"].append(predictor_cv["regress"].intercept_) elif predictor_settings["algorithm"] == "lassopcr": output["intercept_xval"].append(predictor_cv["lasso"].intercept_) else: output["intercept_xval"].append(predictor_cv.intercept_) output["cv_idx"].append((train, test)) # Weight map # Multi-class classification, weightmaps as list if (predictor_settings["prediction_type"] == "classification") and ( len(np.unique(self.Y)) > 2 ): tmp = output["weight_map"].empty() tmp.data = predictor_cv.coef_.squeeze() output["weight_map_xval"].append(tmp) # Regression or binary classification else: if predictor_settings["algorithm"] == "lassopcr": wt_map_xval.append( np.dot( predictor_cv["pca"].components_.T, predictor_cv["lasso"].coef_, ) ) elif predictor_settings["algorithm"] == "pcr": wt_map_xval.append( np.dot( predictor_cv["pca"].components_.T, predictor_cv["regress"].coef_, ) ) else: wt_map_xval.append(predictor_cv.coef_.squeeze()) output["weight_map_xval"].data = np.array(wt_map_xval) # Print Results if predictor_settings["prediction_type"] == "classification": output["mcr_all"] = balanced_accuracy_score( self.Y.values, output["yfit_all"] ) if verbose: print("overall accuracy: %.2f" % output["mcr_all"]) if cv_dict is not None: output["mcr_xval"] = np.mean( output["yfit_xval"] == np.array(self.Y).flatten() ) if verbose: print("overall CV accuracy: %.2f" % output["mcr_xval"]) elif predictor_settings["prediction_type"] == "prediction": output["rmse_all"] = np.sqrt( np.mean((output["yfit_all"] - output["Y"]) ** 2) ) output["r_all"] = pearsonr(output["Y"], output["yfit_all"])[0] if verbose: print("overall Root Mean Squared Error: %.2f" % output["rmse_all"]) print("overall Correlation: %.2f" % output["r_all"]) if cv_dict is not None: output["rmse_xval"] = np.sqrt( np.mean((output["yfit_xval"] - output["Y"]) ** 2) ) output["r_xval"] = pearsonr(output["Y"], output["yfit_xval"])[0] if verbose: print( "overall CV Root Mean Squared Error: %.2f" % output["rmse_xval"] ) print("overall CV Correlation: %.2f" % output["r_xval"]) # Plot if plot: if cv_dict is not None: if predictor_settings["prediction_type"] == "prediction": scatterplot( pd.DataFrame( {"Y": output["Y"], "yfit_xval": output["yfit_xval"]} ) ) elif predictor_settings["prediction_type"] == "classification": if len(np.unique(self.Y)) > 2: print("Skipping ROC plot because num_classes > 2") else: if predictor_settings["algorithm"] not in [ "svm", "ridgeClassifier", "ridgeClassifierCV", ]: output["roc"] = Roc( input_values=output["prob_xval"][:, 1], binary_outcome=output["Y"].astype("bool"), ) else: output["roc"] = Roc( input_values=output["dist_from_hyperplane_xval"], binary_outcome=output["Y"].astype("bool"), ) if ( predictor_settings["algorithm"] == "svm" and predictor_cv.probability ): output["roc"] = Roc( input_values=output["prob_xval"][:, 1], binary_outcome=output["Y"].astype("bool"), ) output["roc"].plot() output["weight_map"].plot() return output
[docs] def predict_multi( self, algorithm=None, cv_dict=None, method="searchlight", rois=None, process_mask=None, radius=2.0, scoring=None, n_jobs=1, verbose=0, **kwargs, ): """Perform multi-region prediction. This can be a searchlight analysis or multi-roi analysis if provided a Brain_Data instance with labeled non-overlapping rois. Args: algorithm (string): algorithm to use for prediction Must be one of 'svm', 'svr', 'linear', 'logistic', 'lasso', 'ridge', 'ridgeClassifier','pcr', or 'lassopcr' cv_dict: Type of cross_validation to use. Default is 3-fold. A dictionary of {'type': 'kfolds', 'n_folds': n}, {'type': 'kfolds', 'n_folds': n, 'stratified': Y}, {'type': 'kfolds', 'n_folds': n, 'subject_id': holdout}, or {'type': 'loso', 'subject_id': holdout} where 'n' = number of folds, and 'holdout' = vector of subject ids that corresponds to self.Y method (string): one of 'searchlight' or 'roi' rois (string/nltools.Brain_Data): nifti file path or Brain_data instance containing non-overlapping regions-of-interest labeled by integers process_mask (nib.Nifti1Image/nltools.Brain_Data): mask to constrain where to perform analyses; only applied if method = 'searchlight' radius (float): radius of searchlight in mm; default 2mm scoring (function): callable scoring function; see sklearn documentation; defaults to estimator's default scoring function n_jobs (int): The number of CPUs to use to do permutation; default 1 because this can be very memory intensive verbose (int): whether parallelization progress should be printed; default 0 Returns: output: image of results """ if method not in ["searchlight", "rois"]: raise ValueError("method must be one of 'searchlight' or 'roi'") if method == "roi" and rois is None: raise ValueError( "With method = 'roi' a file path, or nibabel/nltools instance with roi labels must be provided" ) # Set algorithm if algorithm is not None: predictor_settings = set_algorithm(algorithm, **kwargs) else: # Use SVR as a default predictor_settings = set_algorithm("svr", **{"kernel": "linear"}) estimator = predictor_settings["predictor"] if cv_dict is not None: cv = set_cv(Y=self.Y, cv_dict=cv_dict, return_generator=False) groups = cv_dict["subject_id"] if cv_dict["type"] == "loso" else None else: cv = None groups = None if method == "rois": if isinstance(rois, str) or isinstance(rois, Path): if os.path.isfile(rois): rois_img = Brain_Data(rois, mask=self.mask) elif isinstance(rois, Brain_Data): rois_img = rois.copy() else: raise TypeError("rois must be a file path or a Brain_Data instance") if len(rois_img.shape()) == 1: rois_img = expand_mask(rois_img, custom_mask=self.mask) if len(rois_img.shape()) != 2: raise ValueError( "rois cannot be coerced into a mask. Make sure nifti file or Brain_Data is 3d with non-overlapping integer labels or 4d with non-overlapping boolean masks" ) out = Parallel(n_jobs=n_jobs, verbose=verbose)( delayed(_roi_func)(self, r, algorithm, cv_dict, **kwargs) for r in rois_img ) elif method == "searchlight": # Searchlight if process_mask is None: process_mask_img = None elif isinstance(process_mask, nib.Nifti1Image): process_mask_img = process_mask elif isinstance(process_mask, Brain_Data): process_mask_img = process_mask.to_nifti() elif isinstance(process_mask, str) or isinstance(process_mask, Path): if os.path.isfile(process_mask): process_mask_img = nib.load(process_mask) else: raise ValueError( "process mask file path specified but can't be found" ) else: raise TypeError( "process_mask is not a valid nibabel instance, Brain_Data instance or file path" ) sl = SearchLight( mask_img=self.mask, process_mask_img=process_mask_img, estimator=estimator, n_jobs=n_jobs, scoring=scoring, cv=cv, verbose=verbose, radius=radius, ) in_image = self.to_nifti() sl.fit(in_image, self.Y, groups=groups) out = nib.Nifti1Image(sl.scores_, affine=self.nifti_masker.affine_) out = Brain_Data(out, mask=self.mask) return out
[docs] def apply_mask(self, mask, resample_mask_to_brain=False): """Mask Brain_Data instance Note target data will be resampled into the same space as the mask. If you would like the mask resampled into the Brain_Data space, then set resample_mask_to_brain=True. Args: mask: (Brain_Data or nifti object) mask to apply to Brain_Data object. resample_mask_to_brain: (bool) Will resample mask to brain space before applying mask (default=False). Returns: masked: (Brain_Data) masked Brain_Data object """ masked = deepcopy(self) mask = check_brain_data(mask) if not check_brain_data_is_single(mask): raise ValueError("Mask must be a single image") n_vox = len(self) if check_brain_data_is_single(self) else self.shape()[1] if resample_mask_to_brain: mask = resample_to_img(mask.to_nifti(), masked.to_nifti()) mask = check_brain_data(mask, masked.mask) nifti_masker = NiftiMasker(mask_img=mask.to_nifti()).fit() if n_vox == len(mask): if check_brain_data_is_single(masked): masked.data = masked.data[mask.data.astype(bool)] else: masked.data = masked.data[:, mask.data.astype(bool)] else: masked.data = nifti_masker.fit_transform(masked.to_nifti()) masked.nifti_masker = nifti_masker if (len(masked.shape()) > 1) & (masked.shape()[0] == 1): masked.data = masked.data.flatten() return masked
[docs] def extract_roi(self, mask, metric="mean", n_components=None): """Extract activity from mask Args: mask: (nifti) nibabel mask can be binary or numbered for different rois metric: type of extraction method ['mean', 'median', 'pca'], (default=mean) NOTE: Only mean currently works! n_components: if metric='pca', number of components to return (takes any input into sklearn.Decomposition.PCA) Returns: out: mean within each ROI across images """ metrics = ["mean", "median", "pca"] mask = check_brain_data(mask) ma = mask.copy() if metric not in metrics: raise NotImplementedError if len(np.unique(ma.data)) == 2: masked = self.apply_mask(ma) if check_brain_data_is_single(masked): if metric == "mean": out = masked.mean() elif metric == "median": out = masked.median() else: raise ValueError("Not possible to run PCA on a single image") else: if metric == "mean": out = masked.mean(axis=1) elif metric == "median": out = masked.median(axis=1) else: output = masked.decompose( algorithm="pca", n_components=n_components, axis="images" ) out = output["weights"].T elif len(np.unique(ma.data)) > 2: # make sure each ROI id is an integer ma.data = np.round(ma.data).astype(int) all_mask = expand_mask(ma) if check_brain_data_is_single(self): if metric == "mean": out = np.array([self.apply_mask(m).mean() for m in all_mask]) elif metric == "median": out = np.array([self.apply_mask(m).median() for m in all_mask]) else: raise ValueError("Not possible to run PCA on a single image") else: if metric == "mean": out = np.array([self.apply_mask(m).mean(axis=1) for m in all_mask]) elif metric == "median": out = np.array( [self.apply_mask(m).median(axis=1) for m in all_mask] ) else: out = [] for m in all_mask: masked = self.apply_mask(m) output = masked.decompose( algorithm="pca", n_components=n_components, axis="images" ) out.append(output["weights"].T) else: raise ValueError("Mask must be binary or integers") return out
[docs] def icc(self, icc_type="icc2"): """Calculate intraclass correlation coefficient for data within Brain_Data class ICC Formulas are based on: Shrout, P. E., & Fleiss, J. L. (1979). Intraclass correlations: uses in assessing rater reliability. Psychological bulletin, 86(2), 420. icc1: x_ij = mu + beta_j + w_ij icc2/3: x_ij = mu + alpha_i + beta_j + (ab)_ij + epsilon_ij Code modifed from nipype algorithms.icc https://github.com/nipy/nipype/blob/master/nipype/algorithms/icc.py Args: icc_type: type of icc to calculate (icc: voxel random effect, icc2: voxel and column random effect, icc3: voxel and column fixed effect) Returns: ICC: (np.array) intraclass correlation coefficient """ Y = self.data.T [n, k] = Y.shape # Degrees of Freedom dfc = k - 1 dfe = (n - 1) * (k - 1) dfr = n - 1 # Sum Square Total mean_Y = np.mean(Y) SST = ((Y - mean_Y) ** 2).sum() # create the design matrix for the different levels x = np.kron(np.eye(k), np.ones((n, 1))) # sessions x0 = np.tile(np.eye(n), (k, 1)) # subjects X = np.hstack([x, x0]) # Sum Square Error predicted_Y = np.dot( np.dot(np.dot(X, np.linalg.pinv(np.dot(X.T, X))), X.T), Y.flatten("F") ) residuals = Y.flatten("F") - predicted_Y SSE = (residuals**2).sum() MSE = SSE / dfe # Sum square column effect - between colums SSC = ((np.mean(Y, 0) - mean_Y) ** 2).sum() * n MSC = SSC / dfc / n # Sum Square subject effect - between rows/subjects SSR = SST - SSC - SSE MSR = SSR / dfr if icc_type == "icc1": # ICC(2,1) = (mean square subject - mean square error) / # (mean square subject + (k-1)*mean square error + # k*(mean square columns - mean square error)/n) # ICC = (MSR - MSRW) / (MSR + (k-1) * MSRW) NotImplementedError("This method isn't implemented yet.") elif icc_type == "icc2": # ICC(2,1) = (mean square subject - mean square error) / # (mean square subject + (k-1)*mean square error + # k*(mean square columns - mean square error)/n) ICC = (MSR - MSE) / (MSR + (k - 1) * MSE + k * (MSC - MSE) / n) elif icc_type == "icc3": # ICC(3,1) = (mean square subject - mean square error) / # (mean square subject + (k-1)*mean square error) ICC = (MSR - MSE) / (MSR + (k - 1) * MSE) return ICC
[docs] def detrend(self, method="linear"): """Remove linear trend from each voxel Args: type: ('linear','constant', optional) type of detrending Returns: out: (Brain_Data) detrended Brain_Data instance """ if len(self.shape()) == 1: raise ValueError( "Make sure there is more than one image in order " "to detrend." ) out = deepcopy(self) out.data = detrend(out.data, type=method, axis=0) return out
[docs] def copy(self): """Create a copy of a Brain_Data instance.""" return deepcopy(self)
[docs] def upload_neurovault( self, access_token=None, collection_name=None, collection_id=None, img_type=None, img_modality=None, **kwargs, ): """Upload Data to Neurovault. Will add any columns in self.X to image metadata. Index will be used as image name. Args: access_token: (str, Required) Neurovault api access token collection_name: (str, Optional) name of new collection to create collection_id: (int, Optional) neurovault collection_id if adding images to existing collection img_type: (str, Required) Neurovault map_type img_modality: (str, Required) Neurovault image modality Returns: collection: (pd.DataFrame) neurovault collection information """ if access_token is None: raise ValueError("You must supply a valid neurovault access token") api = Client(access_token=access_token) # Check if collection exists if collection_id is not None: collection = api.get_collection(collection_id) else: try: collection = api.create_collection(collection_name) except ValueError: print( "Collection Name already exists. Pick a " "different name or specify an existing collection id" ) tmp_dir = os.path.join(tempfile.gettempdir(), str(os.times()[-1])) os.makedirs(tmp_dir) def add_image_to_collection( api, collection, dat, tmp_dir, index_id=0, **kwargs ): """Upload image to collection Args: api: pynv Client instance collection: collection information dat: Brain_Data instance to upload tmp_dir: temporary directory index_id: (int) index for file naming """ if (len(dat.shape()) > 1) & (dat.shape()[0] > 1): raise ValueError('"dat" must be a single image.') if not dat.X.empty and isinstance(dat.X.name, str): img_name = dat.X.name else: img_name = collection["name"] + "_" + str(index_id) + ".nii.gz" f_path = os.path.join(tmp_dir, img_name) dat.write(f_path) if not dat.X.empty: kwargs.update(dict([(k, dat.X.loc[k]) for k in dat.X.keys()])) api.add_image( collection["id"], f_path, name=img_name, modality=img_modality, map_type=img_type, **kwargs, ) if len(self.shape()) == 1: add_image_to_collection( api, collection, self, tmp_dir, index_id=0, **kwargs ) else: for i, x in enumerate(self): add_image_to_collection( api, collection, x, tmp_dir, index_id=i, **kwargs ) shutil.rmtree(tmp_dir, ignore_errors=True) return collection
[docs] def r_to_z(self): """Apply Fisher's r to z transformation to each element of the data object.""" out = self.copy() out.data = fisher_r_to_z(out.data) return out
[docs] def z_to_r(self): """Convert z score back into r value for each element of data object""" out = self.copy() out.data = fisher_z_to_r(out.data) return out
[docs] def filter(self, sampling_freq=None, high_pass=None, low_pass=None, **kwargs): """Apply 5th order butterworth filter to data. Wraps nilearn functionality. Does not default to detrending and standardizing like nilearn implementation, but this can be overridden using kwargs. Args: sampling_freq: sampling freq in hertz (i.e. 1 / TR) high_pass: high pass cutoff frequency low_pass: low pass cutoff frequency kwargs: other keyword arguments to nilearn.signal.clean Returns: Brain_Data: Filtered Brain_Data instance """ if sampling_freq is None: raise ValueError("Need to provide sampling rate (TR)!") if high_pass is None and low_pass is None: raise ValueError("high_pass and/or low_pass cutoff must be" "provided!") standardize = kwargs.get("standardize", False) detrend = kwargs.get("detrend", False) out = self.copy() out.data = clean( out.data, t_r=1.0 / sampling_freq, detrend=detrend, standardize=standardize, high_pass=high_pass, low_pass=low_pass, **kwargs, ) return out
[docs] def dtype(self): """Get data type of Brain_Data.data.""" return self.data.dtype
[docs] def astype(self, dtype): """Cast Brain_Data.data as type. Args: dtype: datatype to convert Returns: Brain_Data: Brain_Data instance with new datatype """ out = self.copy() out.data = out.data.astype(dtype) return out
[docs] def standardize(self, axis=0, method="center"): """Standardize Brain_Data() instance. Args: axis: 0 for observations 1 for voxels method: ['center','zscore'] Returns: Brain_Data Instance """ if axis == 1 and len(self.shape()) == 1: raise IndexError( "Brain_Data is only 3d but standardization was requested over observations" ) out = self.copy() if method == "zscore": with_std = True elif method == "center": with_std = False else: raise ValueError('method must be ["center","zscore"') out.data = scale(out.data, axis=axis, with_std=with_std) return out
[docs] def groupby(self, mask): """Create groupby instance""" return Groupby(self, mask)
[docs] def aggregate(self, mask, func): """Create new Brain_Data instance that aggregages func over mask""" dat = self.groupby(mask) values = dat.apply(func) return dat.combine(values)
[docs] def threshold(self, upper=None, lower=None, binarize=False, coerce_nan=True): """Threshold Brain_Data instance. Provide upper and lower values or percentages to perform two-sided thresholding. Binarize will return a mask image respecting thresholds if provided, otherwise respecting every non-zero value. Args: upper: (float or str) Upper cutoff for thresholding. If string will interpret as percentile; can be None for one-sided thresholding. lower: (float or str) Lower cutoff for thresholding. If string will interpret as percentile; can be None for one-sided thresholding. binarize (bool): return binarized image respecting thresholds if provided, otherwise binarize on every non-zero value; default False coerce_nan (bool): coerce nan values to 0s; default True Returns: Thresholded Brain_Data object. """ b = self.copy() if coerce_nan: b.data = np.nan_to_num(b.data) if isinstance(upper, str) and upper[-1] == "%": upper = np.percentile(b.data, float(upper[:-1])) if isinstance(lower, str) and lower[-1] == "%": lower = np.percentile(b.data, float(lower[:-1])) if upper and lower: b.data[(b.data < upper) & (b.data > lower)] = 0 elif upper: b.data[b.data < upper] = 0 elif lower: b.data[b.data > lower] = 0 if binarize: b.data[b.data != 0] = 1 return b
[docs] def regions( self, min_region_size=1350, extract_type="local_regions", smoothing_fwhm=6, is_mask=False, ): """Extract brain connected regions into separate regions. Args: min_region_size (int): Minimum volume in mm3 for a region to be kept. extract_type (str): Type of extraction method ['connected_components', 'local_regions']. If 'connected_components', each component/region in the image is extracted automatically by labelling each region based upon the presence of unique features in their respective regions. If 'local_regions', each component/region is extracted based on their maximum peak value to define a seed marker and then using random walker segementation algorithm on these markers for region separation. smoothing_fwhm (scalar): Smooth an image to extract more sparser regions. Only works for extract_type 'local_regions'. is_mask (bool): Whether the Brain_Data instance should be treated as a boolean mask and if so, calls connected_label_regions instead. Returns: Brain_Data: Brain_Data instance with extracted ROIs as data. """ if is_mask: regions, _ = connected_label_regions(self.to_nifti()) else: regions, _ = connected_regions( self.to_nifti(), min_region_size, extract_type, smoothing_fwhm ) return Brain_Data(regions, mask=self.mask)
[docs] def transform_pairwise(self): """Extract brain connected regions into separate regions. Args: Returns: Brain_Data: Brain_Data instance tranformed into pairwise comparisons """ out = self.copy() out.data, new_Y = transform_pairwise(self.data, self.Y) out.Y = pd.DataFrame(new_Y) out.Y.replace(-1, 0, inplace=True) return out
[docs] def bootstrap( self, function, n_samples=5000, save_weights=False, n_jobs=-1, random_state=None, *args, **kwargs, ): """Bootstrap a `Brain_Data` method. Args: function: (str) method to apply to data for each bootstrap n_samples: (int) number of samples to bootstrap with replacement save_weights: (bool) Save each bootstrap iteration (useful for aggregating many bootstraps on a cluster) n_jobs: (int) The number of CPUs to use to do the computation. -1 means all CPUs.Returns: Returns: output: summarized studentized bootstrap output Examples: >>> b = dat.bootstrap('mean', n_samples=5000) >>> b = dat.bootstrap('predict', n_samples=5000, algorithm='ridge') >>> b = dat.bootstrap('predict', n_samples=5000, save_weights=True) """ random_state = check_random_state(random_state) seeds = random_state.randint(MAX_INT, size=n_samples) bootstrapped = Parallel(n_jobs=n_jobs)( delayed(_bootstrap_apply_func)( self, function, random_state=seeds[i], *args, **kwargs ) for i in range(n_samples) ) if function == "predict": bootstrapped = [x["weight_map"] for x in bootstrapped] bootstrapped = Brain_Data(bootstrapped, mask=self.mask) return summarize_bootstrap(bootstrapped, save_weights=save_weights)
[docs] def decompose( self, algorithm="pca", axis="voxels", n_components=None, *args, **kwargs ): """Decompose Brain_Data object Args: algorithm: (str) Algorithm to perform decomposition types=['pca','ica','nnmf','fa','dictionary','kernelpca'] axis: dimension to decompose ['voxels','images'] n_components: (int) number of components. If None then retain as many as possible. Returns: output: a dictionary of decomposition parameters """ out = { "decomposition_object": set_decomposition_algorithm( *args, algorithm=algorithm, n_components=n_components, **kwargs ) } if axis == "images": out["decomposition_object"].fit(self.data.T) out["components"] = self.empty() out["components"].data = ( out["decomposition_object"].transform(self.data.T).T ) out["weights"] = out["decomposition_object"].components_.T elif axis == "voxels": out["decomposition_object"].fit(self.data) out["weights"] = out["decomposition_object"].transform(self.data) out["components"] = self.empty() out["components"].data = out["decomposition_object"].components_ return out
[docs] def align(self, target, method="procrustes", axis=0, *args, **kwargs): """Align Brain_Data instance to target object using functional alignment Alignment type can be hyperalignment or Shared Response Model. When using hyperalignment, `target` image can be another subject or an already estimated common model. When using SRM, `target` must be a previously estimated common model stored as a numpy array. Transformed data can be back projected to original data using Tranformation matrix. See nltools.stats.align for aligning multiple Brain_Data instances Args: target: (Brain_Data) object to align to. method: (str) alignment method to use ['probabilistic_srm','deterministic_srm','procrustes'] axis: (int) axis to align on Returns: out: (dict) a dictionary containing transformed object, transformation matrix, and the shared response matrix Examples: - Hyperalign using procrustes transform: >>> out = data.align(target, method='procrustes') - Align using shared response model: >>> out = data.align(target, method='probabilistic_srm', n_features=None) - Project aligned data into original data: >>> original_data = np.dot(out[‘transformed’].data,out[‘transformation_matrix’].T) """ if method not in ["probabilistic_srm", "deterministic_srm", "procrustes"]: raise ValueError( "Method must be ['probabilistic_srm','deterministic_srm','procrustes']" ) source = self.copy() data1 = self.data.copy() if method == "procrustes": target = check_brain_data(target) data2 = target.data.copy() # pad columns if different shapes sizes_1 = [x.shape[1] for x in [data1, data2]] C = max(sizes_1) y = data1[:, 0:C] missing = C - y.shape[1] add = np.zeros((y.shape[0], missing)) data1 = np.append(y, add, axis=1) else: data2 = target.copy() if axis == 1: data1 = data1.T data2 = data2.T out = {} if method in ["deterministic_srm", "probabilistic_srm"]: if not isinstance(target, np.ndarray): raise ValueError( "Common Model must be a numpy array for ['deterministic_srm', 'probabilistic_srm']" ) if data2.shape[0] != data1.shape[0]: raise ValueError( "The number of timepoints(TRs) does not match the model." ) A = data1.T.dot(data2) # # Solve the Procrustes problem U, _, V = np.linalg.svd(A, full_matrices=False) out["transformation_matrix"] = source out["transformation_matrix"].data = U.dot(V).T out["transformed"] = data1.dot(out["transformation_matrix"].data.T) out["common_model"] = target elif method == "procrustes": _, transformed, out["disparity"], tf_mtx, out["scale"] = procrustes( data2, data1 ) source.data = transformed out["transformed"] = source out["common_model"] = target out["transformation_matrix"] = source.copy() out["transformation_matrix"].data = tf_mtx if axis == 1: if method == "procrustes": out["transformed"].data = out["transformed"].data.T else: out["transformed"] = out["transformed"].T return out
[docs] def smooth(self, fwhm): """Apply spatial smoothing using nilearn smooth_img() Args: fwhm: (float) full width half maximum of gaussian spatial filter Returns: Brain_Data instance """ out = self.copy() out.data = out.nifti_masker.fit_transform(smooth_img(self.to_nifti(), fwhm)) if 1 in out.data.shape: out.data = out.data.squeeze() return out
[docs] def find_spikes(self, global_spike_cutoff=3, diff_spike_cutoff=3): """Function to identify spikes from Time Series Data Args: global_spike_cutoff: (int,None) cutoff to identify spikes in global signal in standard deviations, None indicates do not calculate. diff_spike_cutoff: (int,None) cutoff to identify spikes in average frame difference in standard deviations, None indicates do not calculate. Returns: pandas dataframe with spikes as indicator variables """ return find_spikes( self, global_spike_cutoff=global_spike_cutoff, diff_spike_cutoff=diff_spike_cutoff, )
[docs] def temporal_resample(self, sampling_freq=None, target=None, target_type="hz"): """ Resample Brain_Data timeseries to a new target frequency or number of samples using Piecewise Cubic Hermite Interpolating Polynomial (PCHIP) interpolation. This function can up- or down-sample data. Note: this function can use quite a bit of RAM. Args: sampling_freq: (float) sampling frequency of data in hertz target: (float) upsampling target target_type: (str) type of target can be [samples,seconds,hz] Returns: upsampled Brain_Data instance """ out = self.copy() if target_type == "samples": n_samples = target elif target_type == "seconds": n_samples = target * sampling_freq elif target_type == "hz": n_samples = float(sampling_freq) / float(target) else: raise ValueError('Make sure target_type is "samples", "seconds", or "hz".') orig_spacing = np.arange(0, self.shape()[0], 1) new_spacing = np.arange(0, self.shape()[0], n_samples) out.data = np.zeros([len(new_spacing), self.shape()[1]]) for i in range(self.shape()[1]): interpolate = pchip(orig_spacing, self.data[:, i]) out.data[:, i] = interpolate(new_spacing) return out
[docs]class Groupby(object): def __init__(self, data, mask): data = check_brain_data(data) mask = check_brain_data(mask) mask.data = np.round(mask.data).astype(int) if len(mask.shape()) <= 1: if len(np.unique(mask.data)) > 2: mask = expand_mask(mask) else: raise ValueError("mask does not have enough groups.") self.mask = mask self.split(data, mask) def __repr__(self): return "%s.%s(len=%s)" % ( self.__class__.__module__, self.__class__.__name__, len(self), ) def __len__(self): return len(self.data) def __iter__(self): for x in self.data: yield (x, self.data[x]) def __getitem__(self, index): if isinstance(index, int): return self.data[index] else: raise ValueError("Groupby currently only supports integer indexing")
[docs] def split(self, data, mask): """Split Brain_Data instance into separate masks and store as a dictionary. """ self.data = {} for i, m in enumerate(mask): self.data[i] = data.apply_mask(m)
[docs] def apply(self, method): """Apply Brain_Data instance methods to each element of Groupby object. """ return dict([(i, getattr(x, method)()) for i, x in self])
[docs] def combine(self, value_dict): """Combine value dictionary back into masks""" out = self.mask.copy().astype(float) for i in iter(value_dict.keys()): if isinstance(value_dict[i], Brain_Data): if value_dict[i].shape()[0] == np.sum(self.mask[i].data): out.data[i, out.data[i, :] == 1] = value_dict[i].data else: raise ValueError("Brain_Data instances are different " "shapes.") elif isinstance(value_dict[i], (float, int, bool, np.number)): out.data[i, :] = out.data[i, :] * value_dict[i] else: raise ValueError( "No method for aggregation implented for %s " "yet." % type(value_dict[i]) ) return out.sum()