Source code for nltools.utils

"""
NeuroLearn Utilities
====================

handy utilities.

"""
__all__ = [
    "get_resource_path",
    "get_anatomical",
    "set_algorithm",
    "attempt_to_import",
    "all_same",
    "concatenate",
    "_bootstrap_apply_func",
    "set_decomposition_algorithm",
]
__author__ = ["Luke Chang"]
__license__ = "MIT"

from os.path import dirname, join, sep as pathsep
import nibabel as nib
import importlib
import os
from sklearn.pipeline import Pipeline
from sklearn.utils import check_random_state
from scipy.spatial.distance import squareform
import numpy as np
import pandas as pd
import collections
from types import GeneratorType
from h5py import File as h5File


def to_h5(obj, file_name, obj_type="brain_data", h5_compression="gzip"):
    """User a combination of pandas and h5py to save objects to h5 files. Replaces
    deepdish. File loading is handled by class-specific methods"""

    if obj_type not in ["brain_data", "adjacency"]:
        raise TypeError("obj_type must be one of 'brain_data' or 'adjacency'")

    if obj_type == "brain_data":
        with pd.HDFStore(file_name, "w") as f:
            f["X"] = obj.X
            f["Y"] = obj.Y

        with h5File(file_name, "a") as f:
            f.create_dataset("data", data=obj.data, compression=h5_compression)
            f.create_dataset(
                "mask_affine", data=obj.mask.affine, compression=h5_compression
            )
            f.create_dataset(
                "mask_data", data=obj.mask.get_fdata(), compression=h5_compression
            )
            f.create_dataset("mask_file_name", data=obj.mask.get_filename())
    else:
        with pd.HDFStore(file_name, "w") as f:
            f["Y"] = obj.Y

        with h5File(file_name, "a") as f:
            f.create_dataset("data", data=obj.data, compression=h5_compression)
            f.create_dataset("matrix_type", data=obj.matrix_type)
            f.create_dataset("issymmetric", data=obj.issymmetric)
            f.create_dataset("labels", data=obj.labels)
            f.create_dataset("is_single_matrix", data=obj.is_single_matrix)


[docs]def get_resource_path(): """Get path to nltools resource directory.""" return join(dirname(__file__), "resources") + pathsep
[docs]def get_anatomical(): """Get nltools default anatomical image. DEPRECATED. See MNI_Template and resolve_mni_path from nltools.prefs """ return nib.load(os.path.join(get_resource_path(), "MNI152_T1_2mm.nii.gz"))
def get_mni_from_img_resolution(brain, img_type="plot"): """ Get the path to the resolution MNI anatomical image that matches the resolution of a Brain_Data instance. Used by Brain_Data.plot() and .iplot() to set backgrounds appropriately. Args: brain: Brain_Data instance Returns: file_path: path to MNI image """ if img_type not in ["plot", "brain"]: raise ValueError("img_type must be 'plot' or 'brain' ") res_array = np.abs(np.diag(brain.nifti_masker.affine_)[:3]) voxel_dims = np.unique(abs(res_array)) if len(voxel_dims) != 1: raise ValueError( "Voxels are not isometric and cannot be visualized in standard space" ) else: dim = str(int(voxel_dims[0])) + "mm" if img_type == "brain": mni = f"MNI152_T1_{dim}_brain.nii.gz" else: mni = f"MNI152_T1_{dim}.nii.gz" return os.path.join(get_resource_path(), mni)
[docs]def set_algorithm(algorithm, *args, **kwargs): """Setup the algorithm to use in subsequent prediction analyses. Args: algorithm: The prediction algorithm to use. Either a string or an (uninitialized) scikit-learn prediction object. If string, must be one of 'svm','svr', linear','logistic','lasso', 'lassopcr','lassoCV','ridge','ridgeCV','ridgeClassifier', 'randomforest', or 'randomforestClassifier' kwargs: Additional keyword arguments to pass onto the scikit-learn clustering object. Returns: predictor_settings: dictionary of settings for prediction """ # NOTE: function currently located here instead of analysis.py to avoid circular imports predictor_settings = {} predictor_settings["algorithm"] = algorithm def load_class(import_string): class_data = import_string.split(".") module_path = ".".join(class_data[:-1]) class_str = class_data[-1] module = importlib.import_module(module_path) return getattr(module, class_str) algs_classify = { "svm": "sklearn.svm.SVC", "logistic": "sklearn.linear_model.LogisticRegression", "ridgeClassifier": "sklearn.linear_model.RidgeClassifier", "ridgeClassifierCV": "sklearn.linear_model.RidgeClassifierCV", "randomforestClassifier": "sklearn.ensemble.RandomForestClassifier", } algs_predict = { "svr": "sklearn.svm.SVR", "linear": "sklearn.linear_model.LinearRegression", "lasso": "sklearn.linear_model.Lasso", "lassoCV": "sklearn.linear_model.LassoCV", "ridge": "sklearn.linear_model.Ridge", "ridgeCV": "sklearn.linear_model.RidgeCV", "randomforest": "sklearn.ensemble.RandomForest", } if algorithm in algs_classify.keys(): predictor_settings["prediction_type"] = "classification" alg = load_class(algs_classify[algorithm]) predictor_settings["predictor"] = alg(*args, **kwargs) elif algorithm in algs_predict: predictor_settings["prediction_type"] = "prediction" alg = load_class(algs_predict[algorithm]) predictor_settings["predictor"] = alg(*args, **kwargs) elif algorithm == "lassopcr": predictor_settings["prediction_type"] = "prediction" from sklearn.linear_model import Lasso from sklearn.decomposition import PCA predictor_settings["_lasso"] = Lasso() predictor_settings["_pca"] = PCA() predictor_settings["predictor"] = Pipeline( steps=[ ("pca", predictor_settings["_pca"]), ("lasso", predictor_settings["_lasso"]), ] ) elif algorithm == "pcr": predictor_settings["prediction_type"] = "prediction" from sklearn.linear_model import LinearRegression from sklearn.decomposition import PCA predictor_settings["_regress"] = LinearRegression() predictor_settings["_pca"] = PCA() predictor_settings["predictor"] = Pipeline( steps=[ ("pca", predictor_settings["_pca"]), ("regress", predictor_settings["_regress"]), ] ) else: raise ValueError( """Invalid prediction/classification algorithm name. Valid options are 'svm','svr', 'linear', 'logistic', 'lasso', 'lassopcr','lassoCV','ridge','ridgeCV','ridgeClassifier', 'randomforest', or 'randomforestClassifier'.""" ) return predictor_settings
[docs]def set_decomposition_algorithm(algorithm, n_components=None, *args, **kwargs): """Setup the algorithm to use in subsequent decomposition analyses. Args: algorithm: The decomposition algorithm to use. Either a string or an (uninitialized) scikit-learn decomposition object. If string must be one of 'pca','nnmf', ica','fa', 'dictionary', 'kernelpca'. kwargs: Additional keyword arguments to pass onto the scikit-learn clustering object. Returns: predictor_settings: dictionary of settings for prediction """ # NOTE: function currently located here instead of analysis.py to avoid circular imports def load_class(import_string): class_data = import_string.split(".") module_path = ".".join(class_data[:-1]) class_str = class_data[-1] module = importlib.import_module(module_path) return getattr(module, class_str) algs = { "pca": "sklearn.decomposition.PCA", "ica": "sklearn.decomposition.FastICA", "nnmf": "sklearn.decomposition.NMF", "fa": "sklearn.decomposition.FactorAnalysis", "dictionary": "sklearn.decomposition.DictionaryLearning", "kernelpca": "sklearn.decomposition.KernelPCA", } if algorithm in algs.keys(): alg = load_class(algs[algorithm]) alg = alg(n_components, *args, **kwargs) else: raise ValueError( """Invalid prediction/classification algorithm name. Valid options are 'pca','ica', 'nnmf', 'fa'""" ) return alg
def isiterable(obj): """Returns True if the object is one of allowable iterable types.""" return isinstance(obj, (list, tuple, GeneratorType)) module_names = {} Dependency = collections.namedtuple("Dependency", "package value") def attempt_to_import(dependency, name=None, fromlist=None): if name is None: name = dependency try: mod = __import__(dependency, fromlist=fromlist) except ImportError: mod = None module_names[name] = Dependency(dependency, mod) return mod def all_same(items): return np.all(x == items[0] for x in items)
[docs]def concatenate(data): """Concatenate a list of Brain_Data() or Adjacency() objects""" if not isinstance(data, list): raise ValueError("Make sure you are passing a list of objects.") if all([isinstance(x, data[0].__class__) for x in data]): # Temporarily Removing this for circular imports (LC) # if not isinstance(data[0], (Brain_Data, Adjacency)): # raise ValueError('Make sure you are passing a list of Brain_Data' # ' or Adjacency objects.') out = data[0].__class__() for i in data: out = out.append(i) else: raise ValueError("Make sure all objects in the list are the same type.") return out
def _bootstrap_apply_func(data, function, random_state=None, *args, **kwargs): """Bootstrap helper function. Sample with replacement and apply function""" random_state = check_random_state(random_state) data_row_id = range(data.shape()[0]) new_dat = data[ random_state.choice(data_row_id, size=len(data_row_id), replace=True) ] return getattr(new_dat, function)(*args, **kwargs) def check_square_numpy_matrix(data): """Helper function to make sure matrix is square and numpy array""" from nltools.data import Adjacency if isinstance(data, Adjacency): data = data.squareform() elif isinstance(data, pd.DataFrame): data = data.values else: data = np.array(data) if len(data.shape) != 2: try: data = squareform(data) except ValueError: raise ValueError( "Array does not contain the correct number of elements to be square" ) return data def check_brain_data(data, mask=None): """Check if data is a Brain_Data Instance.""" from nltools.data import Brain_Data if not isinstance(data, Brain_Data): if isinstance(data, nib.Nifti1Image): data = Brain_Data(data, mask=mask) else: raise ValueError("Make sure data is a Brain_Data instance.") else: if mask is not None: data = data.apply_mask(mask) return data def check_brain_data_is_single(data): """Logical test if Brain_Data instance is a single image Args: data: brain data Returns: (bool) """ data = check_brain_data(data) if len(data.shape()) > 1: return False else: return True def _roi_func(brain, roi, algorithm, cv_dict, **kwargs): """Brain_Data.predict_multi() helper function""" return brain.apply_mask(roi).predict( algorithm=algorithm, cv_dict=cv_dict, plot=False, **kwargs ) class AmbiguityError(Exception): pass def generate_jitter(n_trials, mean_time=5, min_time=2, max_time=12, atol=0.2): """Generate jitter from exponential distribution with constraints Draws from exponential distribution until the distribution satisfies the constraints: np.abs(np.mean(min_time > data < max_time) - mean_time) <= atol Args: n_trials: (int) number of trials to generate jitter mean_time: (float) desired mean of distribution min_time: (float) desired min of distribution max_time: (float) desired max of distribution atol: (float) precision of deviation from mean Returns: data: (np.array) jitter for each trial """ def generate_data(n_trials, scale=5, min_time=2, max_time=12): data = [] i = 0 while i < n_trials: datam = np.random.exponential(scale=5) if (datam > min_time) & (datam < max_time): data.append(datam) i += 1 return data mean_diff = False while ~mean_diff: data = generate_data(n_trials, min_time=min_time, max_time=max_time) mean_diff = np.isclose(np.mean(data), mean_time, rtol=0, atol=atol) return data