# Source code for nltools.simulator

```"""
NeuroLearn Simulator Tools
==========================

Tools to simulate multivariate data.

"""

__all__ = ["Simulator", "SimulateGrid"]
__author__ = ["Sam Greydanus", "Luke Chang"]

import os
import numpy as np
import nibabel as nib
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal, binom, ttest_1samp
from nltools.data import Brain_Data
from nltools.stats import fdr, one_sample_permutation
from nltools.prefs import MNI_Template, resolve_mni_path
import csv
from copy import deepcopy
from sklearn.utils import check_random_state

[docs]class Simulator:
def __init__(
):  # no scoring param
# self.resource_folder = os.path.join(os.getcwd(),'resources')
if output_dir is None:
self.output_dir = os.path.join(os.getcwd())
else:
self.output_dir = output_dir

raise ValueError("brain_mask is not a string or a nibabel instance")
self.random_state = check_random_state(random_state)

[docs]    def gaussian(self, mu, sigma, i_tot):
"""create a 3D gaussian signal normalized to a given intensity

Args:
mu: average value of the gaussian signal (usually set to 0)
sigma: standard deviation
i_tot: sum total of activation (numerical integral over the gaussian returns this value)
"""
x, y, z = np.mgrid[
]

# Need an (N, 3) array of (x, y) pairs.
xyz = np.column_stack([x.flat, y.flat, z.flat])

covariance = np.diag(sigma**2)
g = multivariate_normal.pdf(xyz, mean=mu, cov=covariance)

# Reshape back to a 3D grid.
g = g.reshape(x.shape).astype(float)

# select only the regions within the brain mask
# adjust total intensity of gaussian
g = np.multiply(i_tot / np.sum(g), g)

return g

[docs]    def sphere(self, r, p):
"""create a sphere of given radius at some point p in the brain mask

Args:
p: point (in coordinates of the brain mask) of the center of the sphere
"""

x, y, z = np.ogrid[
-p[0] : dims[0] - p[0], -p[1] : dims[1] - p[1], -p[2] : dims[2] - p[2]
]
mask = x * x + y * y + z * z <= r * r

activation = np.zeros(dims)
activation = nib.Nifti1Image(activation, affine=np.eye(4))

# return the 3D numpy matrix of zeros containing the sphere as a region of ones
return activation.get_fdata()

[docs]    def normal_noise(self, mu, sigma):
"""produce a normal noise distribution for all all points in the brain mask

Args:
mu: average value of the gaussian signal (usually set to 0)
sigma: standard deviation
"""

if sigma != 0:
n = self.random_state.normal(mu, sigma, vlength)
else:
n = [mu] * vlength

# return the 3D numpy matrix of zeros containing the brain mask filled with noise produced over a normal distribution
return m.get_data()

[docs]    def to_nifti(self, m):
"""convert a numpy matrix to the nifti format and assign to it the brain_mask's affine matrix

Args:
m: the 3D numpy matrix we wish to convert to .nii
"""
if not (type(m) == np.ndarray and len(m.shape) >= 3):  # try 4D
# if not (type(m) == np.ndarray and len(m.shape) == 3):
raise ValueError(
"ERROR: need 3D np.ndarray matrix to create the nifti file"
)
m = m.astype(np.float32)
return ni

"""generate a set of spheres in the brain mask space

Args:
centers: a vector of sphere centers of the form [px, py, pz] or [[px1, py1, pz1], ..., [pxn, pyn, pzn]]
"""
# initialize useful values

# Initialize Spheres with options for multiple radii and centers of the spheres (or just an int and a 3D list)
if center is None:
center = [
[dims[0] / 2, dims[1] / 2, dims[2] / 2] * len(radius)
]  # default value for centers
elif (
isinstance(center, list) and isinstance(center[0], int) and len(radius) == 1
):
centers = [center]
if (
and (type(center) is list)
):
return A
else:
raise ValueError(
"Data type for sphere or radius(ii) or center(s) not recognized."
)

[docs]    def create_data(
self, levels, sigma, radius=5, center=None, reps=1, output_dir=None
):
"""create simulated data with integers

Args:
levels: vector of intensities or class labels
sigma: amount of noise to add
center: center(s) of sphere(s) of the form [px, py, pz] or [[px1, py1, pz1], ..., [pxn, pyn, pzn]]
reps: number of data repetitions useful for trials or subjects
output_dir: string path of directory to output data.  If None, no data will be written
**kwargs: Additional keyword arguments to pass to the prediction algorithm

"""

# Create reps
nlevels = len(levels)
y = levels
rep_id = [1] * len(levels)
for i in range(reps - 1):
y = y + levels
rep_id.extend([i + 2] * nlevels)

# Initialize Spheres with options for multiple radii and centers of the spheres (or just an int and a 3D list)

# for each intensity
A_list = []
for i in y:
A_list.append(np.multiply(A, i))

# generate a different gaussian noise profile for each mask
mu = 0  # values centered around 0
N_list = []
for i in range(len(y)):
N_list.append(self.normal_noise(mu, sigma))

# add noise and signal together, then convert to nifti files
NF_list = []
for i in range(len(y)):
NF_list = Brain_Data(NF_list)

# Assign variables to object
self.data = NF_list
self.y = pd.DataFrame(data=y)
self.rep_id = pd.DataFrame(data=rep_id)

dat = self.data
dat.Y = self.y

# Write Data to files if requested
if output_dir is not None and isinstance(output_dir, str):
NF_list.write(os.path.join(output_dir, "data.nii.gz"))
self.rep_id.to_csv(
)
return dat

[docs]    def create_cov_data(
self, cor, cov, sigma, mask=None, reps=1, n_sub=1, output_dir=None
):
"""create continuous simulated data with covariance

Args:
cor: amount of covariance between each voxel and Y variable
cov: amount of covariance between voxels
sigma: amount of noise to add
center: center(s) of sphere(s) of the form [px, py, pz] or [[px1, py1, pz1], ..., [pxn, pyn, pzn]]
reps: number of data repetitions
n_sub: number of subjects to simulate
output_dir: string path of directory to output data.  If None, no data will be written
**kwargs: Additional keyword arguments to pass to the prediction algorithm

"""

# Initialize Spheres with options for multiple radii and centers of the spheres (or just an int and a 3D list)
A = self.n_spheres(10, None)  # parameters are (radius, center)

# Create n_reps with cov for each voxel within sphere
# Build covariance matrix with each variable correlated with y amount 'cor' and each other amount 'cov'

n_vox = np.sum(flat_sphere == 1)
cov_matrix = np.ones([n_vox + 1, n_vox + 1]) * cov
cov_matrix[0, :] = cor  # set covariance with y
cov_matrix[:, 0] = cor  # set covariance with all other voxels
np.fill_diagonal(cov_matrix, 1)  # set diagonal to 1
mv_sim = self.random_state.multivariate_normal(
np.zeros([n_vox + 1]), cov_matrix, size=reps
)
print(mv_sim)
y = mv_sim[:, 0]
self.y = y
mv_sim = mv_sim[:, 1:]
new_dat = np.ones([mv_sim.shape[0], flat_sphere.shape[1]])
new_dat[:, np.where(flat_sphere == 1)[1]] = mv_sim
new_dat, self.random_state.standard_normal(size=new_dat.shape) * sigma
)
)  # add noise scaled by sigma
self.rep_id = [1] * len(y)
if n_sub > 1:
self.y = list(self.y)
for s in range(1, n_sub):
self.data = nib.concat_images(
[
self.data,
new_dat,
self.random_state.standard_normal(size=new_dat.shape)
* sigma,
)
),
],
axis=3,
)  # add noise scaled by sigma
noise_y = list(y + self.random_state.randn(len(y)) * sigma)
self.y = self.y + noise_y
self.rep_id = self.rep_id + [s + 1] * len(mv_sim[:, 0])
self.y = np.array(self.y)

# # Old method in 4 D space - much slower
# x,y,z = np.where(A==1)
# cov_matrix = np.ones([len(x)+1,len(x)+1]) * cov
# cov_matrix[0,:] = cor # set covariance with y
# cov_matrix[:,0] = cor # set covariance with all other voxels
# np.fill_diagonal(cov_matrix,1) # set diagonal to 1
# mv_sim = self.random_state.multivariate_normal(np.zeros([len(x)+1]),cov_matrix, size=reps) # simulate data from multivariate covar
# self.y = mv_sim[:,0]
# mv_sim = mv_sim[:,1:]
# A_4d = np.resize(A,(reps,A.shape[0],A.shape[1],A.shape[2]))
# for i in range(len(x)):
#     A_4d[:,x[i],y[i],z[i]]=mv_sim[:,i]
# A_4d = np.rollaxis(A_4d,0,4) # reorder shape of matrix so that time is in 4th dimension
# self.rep_id = ???  # need to add this later

# Write Data to files if requested
if output_dir is not None:
if isinstance(output_dir, str):
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.data.to_filename(
os.path.join(
output_dir,
+ str(cor)
+ "_cov"
+ str(cov)
+ "_sigma"
+ str(sigma)
+ ".nii.gz",
)
)
y_file = open(os.path.join(output_dir, "y.csv"), "wb")
wr = csv.writer(y_file, quoting=csv.QUOTE_ALL)
wr.writerow(self.y)

rep_id_file = open(os.path.join(output_dir, "rep_id.csv"), "wb")
wr = csv.writer(rep_id_file, quoting=csv.QUOTE_ALL)
wr.writerow(self.rep_id)

[docs]    def create_ncov_data(
self, cor, cov, sigma, masks=None, reps=1, n_sub=1, output_dir=None
):
"""create continuous simulated data with covariance

Args:
cor: amount of covariance between each voxel and Y variable (an int or a vector)
cov: amount of covariance between voxels (an int or a matrix)
sigma: amount of noise to add
mask: region(s) where we will have activations (list if more than one)
reps: number of data repetitions
n_sub: number of subjects to simulate
output_dir: string path of directory to output data.  If None, no data will be written
**kwargs: Additional keyword arguments to pass to the prediction algorithm

"""

# Initialize Spheres with options for multiple radii and centers of the spheres (or just an int and a 3D list)
A = self.n_spheres(10, None)  # parameters are (radius, center)

if type(cor) is float or type(cor) is int:
cor = [cor]
if type(cov) is float or type(cor) is int:
cov = [cov]
raise ValueError(
"cor matrix has incompatible dimensions for mask list of length "
)
if (
):
raise ValueError(
"cov matrix has incompatible dimensions for mask list of length "
)

# Create n_reps with cov for each voxel within sphere
# Build covariance matrix with each variable correlated with y amount 'cor' and each other amount 'cov'

print("Building correlation/covariation matrix...")
n_vox = np.sum(
)  # this is a list, each entry contains number voxels for given mask
if 0 in n_vox:
raise ValueError(
"one or more processing mask does not fit inside the brain mask"
)

cov_matrix = np.zeros(
[np.sum(n_vox) + 1, np.sum(n_vox) + 1]
)  # one big covariance matrix
for i, nv in enumerate(n_vox):
cstart = np.sum(n_vox[:i]) + 1
cstop = cstart + nv
cov_matrix[0, cstart:cstop] = cor[i]  # set covariance with y
cov_matrix[cstart:cstop, 0] = cor[i]  # set covariance with all other voxels
rstart = np.sum(n_vox[:j]) + 1
rstop = rstart + nv
cov_matrix[cstart:cstop, rstart:rstop] = cov[i][
j
]  # set covariance of this mask's voxels with each of other masks
np.fill_diagonal(cov_matrix, 1)  # set diagonal to 1

# these operations happen in one vector that we'll later split into the separate regions
print("Generating multivariate normal distribution...")
mv_sim_l = self.random_state.multivariate_normal(
np.zeros([np.sum(n_vox) + 1]), cov_matrix, size=reps
)
print(mv_sim_l)

self.y = mv_sim_l[:, 0]
mv_sim = mv_sim_l[:, 1:]

for rep in range(reps):
print(rep, start, stop)
rep, start:stop
]

noise = self.random_state.standard_normal(size=new_dats.shape[1]) * sigma
)  # append 3d simulated data to list
self.rep_id = [1] * len(self.y)

print("Generating subject-level noise...")
print("y == " + str(self.y.shape))
if n_sub > 1:
self.y = list(self.y)
y = list(self.y)
for s in range(1, n_sub):
noise = (
self.random_state.standard_normal(size=new_dats.shape[1]) * sigma
)
self.data = nib.concat_images([self.data, next_subj], axis=3)

y += list(self.y + self.random_state.randn(len(self.y)) * sigma)
print("y == " + str(len(y)))
self.rep_id += [s + 1] * len(mv_sim[:, 0])
self.y = np.array(y)

print("Saving to " + str(output_dir))
print("dat == " + str(self.data.shape))
print("y == " + str(self.y.shape))
if output_dir is not None:
if type(output_dir) is str:
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
self.data.to_filename(
os.path.join(
output_dir,
"simulated_data_"
+ str(sigma)
+ "sigma_"
+ str(n_sub)
+ "subj.nii.gz",
)
)
y_file = open(os.path.join(output_dir, "y.csv"), "wb")
wr = csv.writer(y_file, quoting=csv.QUOTE_ALL)
wr.writerow(self.y)

rep_id_file = open(os.path.join(output_dir, "rep_id.csv"), "wb")
wr = csv.writer(rep_id_file, quoting=csv.QUOTE_ALL)
wr.writerow(self.rep_id)

class SimulateGrid(object):
def __init__(
self,
grid_width=100,
signal_width=20,
n_subjects=20,
sigma=1,
signal_amplitude=None,
random_state=None,
):

self.isfit = False
self.thresholded = None
self.threshold = None
self.threshold_type = None
self.correction = None
self.t_values = None
self.p_values = None
self.n_subjects = n_subjects
self.sigma = sigma
self.grid_width = grid_width
self.random_state = check_random_state(random_state)
self.data = self._create_noise()

if signal_amplitude is not None:
signal_amplitude=signal_amplitude, signal_width=signal_width
)
else:
self.signal_amplitude = None

def _create_noise(self):
"""Generate simualted data using object parameters

Returns:
simulated_data (np.array): simulated noise using object parameters
"""
return (
self.random_state.randn(self.grid_width, self.grid_width, self.n_subjects)
* self.sigma
)

Args:
signal_width (int): width of signal box
signal_amplitude (int): intensity of signal
"""
if signal_width >= self.grid_width:
raise ValueError("Signal width must be smaller than total grid.")

self.signal_amplitude = signal_amplitude
signal = np.repeat(
)
self.data = deepcopy(self.data) + signal * self.signal_amplitude

"""Create a mask for where the signal is located in grid."""

int((np.floor((self.grid_width / 2) - (signal_width / 2)))) : int(
np.ceil((self.grid_width / 2) + (signal_width / 2))
),
int((np.floor((self.grid_width / 2) - (signal_width / 2)))) : int(
np.ceil((self.grid_width / 2) + (signal_width / 2))
),
] = 1
self.signal_width = signal_width

def _run_ttest(self, data):
"""Helper function to run ttest on data"""
flattened = data.reshape(self.grid_width * self.grid_width, self.n_subjects)
t, p = ttest_1samp(flattened.T, 0)
t = np.reshape(t, (self.grid_width, self.grid_width))
p = np.reshape(p, (self.grid_width, self.grid_width))
return (t, p)

def _run_permutation(self, data):
"""Helper function to run a nonparametric one-sample permutation test"""
flattened = data.reshape(self.grid_width * self.grid_width, self.n_subjects)
stats_all = []
for i in range(flattened.shape[0]):
stats = one_sample_permutation(flattened[i, :])
stats_all.append(stats)
mean = np.reshape(
np.array([x["mean"] for x in stats_all]), (self.grid_width, self.grid_width)
)
p = np.reshape(
np.array([x["p"] for x in stats_all]), (self.grid_width, self.grid_width)
)
return (mean, p)

def fit(self):
"""Run ttest on self.data"""
if self.isfit:
raise ValueError("Can't fit because ttest has already been run.")
self.t_values, self.p_values = self._run_ttest(self.data)
self.isfit = True

def _threshold_simulation(self, t, p, threshold, threshold_type, correction=None):
"""Helper function to threshold simulation

Args:
threshold (float): threshold to apply to simulation
threshhold_type (str): type of threshold to use can be a specific t-value or p-value ['t', 'p']

Returns:
threshold_data (np.array): thresholded data
"""
if correction == "fdr":
if threshold_type != "q":
raise ValueError("Must specify a q value when using fdr")

if correction == "permutation":
if threshold_type != "p":
raise ValueError("Must specify a p value when using permutation")

thresholded = deepcopy(t)
if threshold_type == "t":
thresholded[np.abs(t) < threshold] = 0
elif threshold_type == "p":
thresholded[p > threshold] = 0
elif threshold_type == "q":
fdr_threshold = fdr(p.flatten(), q=threshold)
if fdr_threshold < 0:
thresholded = np.zeros(thresholded.shape)
else:
thresholded[p > fdr_threshold] = 0
else:
raise ValueError("Threshold type must be ['t','p','q']")
return thresholded

def threshold_simulation(self, threshold, threshold_type, correction=None):
"""Threshold simulation

Args:
threshold (float): threshold to apply to simulation
threshhold_type (str): type of threshold to use can be a specific t-value or p-value ['t', 'p', 'q']
"""

if not self.isfit:
raise ValueError("Must fit model before thresholding.")

if correction == "fdr":
self.corrected_threshold = fdr(self.p_values.flatten())

self.correction = correction
self.thresholded = self._threshold_simulation(
self.t_values, self.p_values, threshold, threshold_type, correction
)
self.threshold = threshold
self.threshold_type = threshold_type

self.fp_percent = self._calc_false_positives(self.thresholded)
self.tp_percent = self._calc_true_positives(self.thresholded)

def _calc_false_positives(self, thresholded):
"""Calculate percent of grid containing false positives

Args:
thresholded (np.array): thresholded grid
Returns:
fp_percent (float): percentage of grid that contains false positives
"""

fp_percent = np.sum(thresholded != 0) / (self.grid_width**2)
else:
fp_percent = np.sum(thresholded[self.signal_mask != 1] != 0) / (
self.grid_width**2 - self.signal_width**2
)
return fp_percent

def _calc_true_positives(self, thresholded):
"""Calculate percent of mask containing true positives

Args:
thresholded (np.array): thresholded grid
Returns:
tp_percent (float): percentage of grid that contains true positives
"""

tp_percent = np.sum(thresholded[self.signal_mask == 1] != 0) / (
self.signal_width**2
)
return tp_percent

def _calc_false_discovery_rate(self, thresholded):
"""Calculate percent of activated voxels that are false positives

Args:
thresholded (np.array): thresholded grid
Returns:
fp_percent (float): percentage of activated voxels that are false positives
"""
fp_percent = np.sum(thresholded[self.signal_mask == 0] > 0) / np.sum(
thresholded > 0
)
return fp_percent

def run_multiple_simulations(
self, threshold, threshold_type, n_simulations=100, correction=None
):
"""This method will run multiple simulations to calculate overall false positive rate"""

simulations = [
self._run_ttest(self._create_noise()) for _ in range(n_simulations)
]
else:
signal = (
np.repeat(
)
* self.signal_amplitude
)
simulations = [
self._run_ttest(self._create_noise() + signal)
for _ in range(n_simulations)
]

self.multiple_thresholded = [
self._threshold_simulation(
s[0], s[1], threshold, threshold_type, correction=correction
)
for s in simulations
]
self.multiple_fp = np.array(
[self._calc_false_positives(x) for x in self.multiple_thresholded]
)
self.fpr = np.mean(np.array([x for x in self.multiple_fp]) > 0)
self.multiple_tp = np.array(
[self._calc_true_positives(x) for x in self.multiple_thresholded]
)
self.multiple_fdr = np.array(
[self._calc_false_discovery_rate(x) for x in self.multiple_thresholded]
)

def plot_grid_simulation(
self, threshold, threshold_type, n_simulations=100, correction=None
):
"""Create a plot of the simulations"""
if not self.isfit:
self.fit()
self.threshold_simulation(
threshold=threshold,
threshold_type=threshold_type,
correction=correction,
)
self.run_multiple_simulations(
threshold=threshold,
threshold_type=threshold_type,
n_simulations=n_simulations,
)

_, a = plt.subplots(ncols=3, figsize=(15, 5))
else:
_, a = plt.subplots(ncols=4, figsize=(18, 5))
a[3].hist(self.multiple_tp)
a[3].set_ylabel("Frequency", fontsize=18)
a[3].set_xlabel("Percent Signal Recovery", fontsize=18)
a[3].set_title("Average Signal Recovery", fontsize=18)

a[0].imshow(self.t_values)
a[0].set_title("Random Noise", fontsize=18)
a[0].axes.get_xaxis().set_visible(False)
a[0].axes.get_yaxis().set_visible(False)
a[1].imshow(self.thresholded)
a[1].set_title(f"Threshold: {threshold_type} = {threshold}", fontsize=18)
a[1].axes.get_xaxis().set_visible(False)
a[1].axes.get_yaxis().set_visible(False)
a[2].plot(
binom.pmf(
np.arange(0, n_simulations, 1),
n_simulations,
np.mean(self.multiple_fp > 0),
)
)
a[2].axvline(
x=np.mean(self.fpr) * n_simulations,
color="r",
linestyle="dashed",
linewidth=2,
)
a[2].set_title(f"False Positive Rate = {self.fpr:.2f}", fontsize=18)
a[2].set_ylabel("Probability", fontsize=18)
a[2].set_xlabel("False Positive Rate", fontsize=18)
plt.tight_layout()
```