"""
Implementation of variogram-matching procedure.
"""
# Author: Joshua Burt <joshua.burt@yale.edu>
# License: BSD 3 clause
import numpy as np
import numpy.lib.format
from pathlib import Path
from sklearn.base import BaseEstimator
from sklearn.linear_model import LinearRegression
# ----------------------
# ------ Checks --------
# ----------------------
def is_string_like(obj):
""" Check whether `obj` behaves like a string. """
try:
obj + ''
except (TypeError, ValueError):
return False
return True
def check_map(x):
"""
Check that brain map is array_like and one dimensional.
Parameters
----------
x : 1D ndarray
Brain map
Returns
-------
None
Raises
------
TypeError : `x` is not a ndarray object
ValueError : `x` is not one-dimensional
"""
if not isinstance(x, np.ndarray):
e = "Brain map must be array-like\n"
e += "got type {}".format(type(x))
raise TypeError(e)
if x.ndim != 1:
e = "Brain map must be one-dimensional\n"
e += "got shape {}".format(x.shape)
raise ValueError(e)
def check_pv(pv):
"""
Check input argument `pv`.
Parameters
----------
pv : int
Percentile of the pairwise distance distribution at which to truncate
during variogram fitting.
Returns
-------
int
Raises
------
ValueError : `pv` lies outside range (0, 100]
"""
try:
pv = int(pv)
except ValueError:
raise ValueError("parameter 'pv' must be an integer in (0,100]")
if pv <= 0 or pv > 100:
raise ValueError("parameter 'pv' must be in (0,100]")
return pv
def check_deltas(deltas):
"""
Check input argument `deltas`.
Parameters
----------
deltas : 1D ndarray or List[float]
Proportions of neighbors to include for smoothing, in (0, 1]
Returns
-------
None
Raises
------
TypeError : `deltas` is not a List or ndarray object
ValueError : One or more elements of `deltas` lies outside (0,1]
"""
if not isinstance(deltas, list) and not isinstance(deltas, np.ndarray):
raise TypeError("Parameter `deltas` must be a list or ndarray")
for d in deltas:
if d <= 0 or d > 1:
raise ValueError("Each element of `deltas` must lie in (0,1]")
def count_lines(filename):
"""
Count number of lines in a file.
Parameters
----------
filename : filename
Returns
-------
int
number of lines in file
"""
with open(filename, 'rb') as f:
lines = 0
buf_size = 1024 * 1024
read_f = f.raw.read
buf = read_f(buf_size)
while buf:
lines += buf.count(b'\n')
buf = read_f(buf_size)
return lines
# ----------------------
# ------ Data I/O ------
# ----------------------
def dataio(x):
"""
Data I/O for core classes.
To facilitate flexible user inputs, this function loads data from:
- txt files
- npy files (memory-mapped arrays)
- array_like data
Parameters
----------
x : filename or ndarray or np.memmap
Returns
-------
ndarray or np.memmap
Raises
------
FileExistsError : file does not exist
RuntimeError : file is empty
ValueError : file type cannot be determined by file extension
TypeError : input is not a filename or array_like object
"""
if is_string_like(x):
if not Path(x).exists():
raise FileExistsError("file does not exist: {}".format(x))
elif Path(x).stat().st_size == 0:
raise RuntimeError("file is empty: {}".format(x))
elif Path(x).suffix == ".npy": # memmap
return np.load(x, mmap_mode='r')
elif Path(x).suffix == ".txt": # text file
return np.loadtxt(x).squeeze()
else:
raise ValueError(
"expected npy or txt file, got {}".format(Path(x).suffix))
else:
if not isinstance(x, np.ndarray):
raise TypeError(
"expected filename or array_like obj, got {}".format(type(x)))
return x
def txt2memmap(dist_file, output_dir, maskfile=None, delimiter=' '):
"""
Export distance matrix to memory-mapped array.
Parameters
----------
dist_file : filename
Path to `delimiter`-separated distance matrix file
output_dir : filename
Path to directory in which output files will be written
maskfile : filename or ndarray or None, default None
Path to a neuroimaging/txt file containing a mask, or a mask
represented as a numpy array. Mask scalars are cast to boolean, and
all elements not equal to zero will be masked.
delimiter : str
Delimiting character in `dist_file`
Returns
-------
dict
Keys are 'D' and 'index'; values are absolute paths to the
corresponding binary files on disk.
Notes
-----
Each row of the distance matrix is sorted before writing to file. Thus, a
second mem-mapped array is necessary, the i-th row of which contains
argsort(d[i]).
If `maskfile` is not None, a binary mask.txt file will also be written to
the output directory.
Raises
------
IOError : `output_dir` doesn't exist
ValueError : Mask image and distance matrix have inconsistent sizes
"""
op = Path(output_dir)
nlines = count_lines(dist_file)
if not op.exists():
raise IOError("Output directory does not exist: {}".format(output_dir))
# Load mask if one was provided
if maskfile is not None:
mask = dataio(maskfile).astype(bool)
if mask.size != nlines:
e = "Incompatible input sizes\n"
e += "{} rows in {}\n".format(nlines, dist_file)
e += "{} elements in {}".format(mask.size, maskfile)
raise ValueError(e)
mask_fileout = str(op.joinpath("mask.txt"))
np.savetxt( # Write to text file
fname=mask_fileout, X=mask.astype(int), fmt="%i", delimiter=',')
nv = int((~mask).sum()) # number of non-masked elements
idx = np.arange(nlines)[~mask] # indices of non-masked elements
else:
nv = nlines
idx = np.arange(nlines)
# Build memory-mapped arrays
with open(dist_file, 'r') as fp:
npydfile = str(op.joinpath("distmat.npy"))
npyifile = str(op.joinpath("index.npy"))
fpd = numpy.lib.format.open_memmap(
npydfile, mode='w+', dtype=np.float32, shape=(nv, nv))
fpi = numpy.lib.format.open_memmap(
npyifile, mode='w+', dtype=np.int32, shape=(nv, nv))
ifp = 0 # Build memory-mapped arrays one row of distances at a time
for il, l in enumerate(fp): # Loop over lines of file
if il not in idx: # Keep only CIFTI vertices
continue
else:
line = l.rstrip()
if line:
data = np.array(line.split(delimiter), dtype=np.float32)
if data.size != nlines:
raise RuntimeError(
"Distance matrix is not square: {}".format(
dist_file))
d = data[idx]
sort_idx = np.argsort(d)
fpd[ifp, :] = d[sort_idx] # sorted row of distances
fpi[ifp, :] = sort_idx # sort indexes
ifp += 1
del fpd # Flush memory changes to disk
del fpi
return {'distmat': npydfile, 'index': npyifile} # Return filenames
# ----------------------
# - Smoothing kernels --
# ----------------------
def gaussian(d):
"""
Gaussian kernel which truncates at one standard deviation.
Parameters
----------
d : ndarray, shape (N,) or (M,N)
one- or two-dimensional array of distances
Returns
-------
ndarray, shape (N,) or (M,N)
Gaussian kernel weights
Raises
------
TypeError : `d` is not array_like
"""
try: # 2-dim
return np.exp(-1.25 * np.square(d / d.max(axis=-1)[:, np.newaxis]))
except IndexError: # 1-dim
return np.exp(-1.25 * np.square(d/d.max()))
except AttributeError:
raise TypeError("expected array_like, got {}".format(type(d)))
def exp(d):
"""
Exponentially decaying kernel which truncates at e^{-1}.
Parameters
----------
d : ndarray, shape (N,) or (M,N)
one- or two-dimensional array of distances
Returns
-------
ndarray, shape (N,) or (M,N)
Exponential kernel weights
Notes
-----
Characteristic length scale is set to d.max(axis=-1), i.e. the maximum
distance within each row.
Raises
------
TypeError : `d` is not array_like
"""
try: # 2-dim
return np.exp(-d / d.max(axis=-1)[:, np.newaxis])
except IndexError: # 1-dim
return np.exp(-d/d.max())
except AttributeError:
raise TypeError("expected array_like, got {}".format(type(d)))
def invdist(d):
"""
Inverse distance kernel.
Parameters
----------
d : ndarray, shape (N,) or (M,N)
One- or two-dimensional array of distances
Returns
-------
ndarray, shape (N,) or (M,N)
Inverse distance, i.e. d^{-1}
Raises
------
ZeroDivisionError : `d` includes zero value
TypeError : `d` is not array_like
"""
try:
return 1. / d
except ZeroDivisionError as e:
raise ZeroDivisionError(e)
except AttributeError:
raise TypeError("expected array_like, got {}".format(type(d)))
def uniform(d):
"""
Uniform (i.e., distance independent) kernel.
Parameters
----------
d : ndarray, shape (N,) or (M,N)
One- or two-dimensional array of distances
Returns
-------
ndarray, shape (N,) or (M,N)
Uniform kernel weights
Notes
-----
Each element is normalized to 1/N such that columns sum to unity.
Raises
------
TypeError : `d` is not array_like
"""
try: # 2-dim
return np.ones(d.shape) / d.shape[-1]
except IndexError: # 1-dim
return np.ones(d.size) / d.size
except AttributeError:
raise TypeError("expected array_like, got {}".format(type(d)))
def check_kernel(kernel):
"""
Check that a valid kernel was specified and return callable.
Parameters
----------
kernel : 'exp' or 'gaussian' or 'invdist' or 'uniform'
Kernel selection
Returns
-------
Callable
Raises
------
NotImplementedError : kernel is not implemented
"""
kernels = {'exp': exp,
'gaussian': gaussian,
'invdist': invdist,
'uniform': uniform}
if kernel not in kernels.keys():
e = "'{}' is not a valid kernel\n".format(kernel)
e += "Valid kernels: {}".format(", ".join([k for k in kernels.keys()]))
raise NotImplementedError(e)
return kernels[kernel]
# ----------------------
# ---- Core classes ----
# ----------------------
# class Base:
# """ Base implementation of map generator.
#
# Parameters
# ----------
# x : filename or 1D ndarray
# Target brain map
# D : filename or ndarray, shape (N,N)
# Pairwise distance matrix
# deltas : 1D ndarray or List[float], default [0.1,0.2,...,0.9]
# Proportion of neighbors to include for smoothing, in (0, 1]
# kernel : str, default 'exp'
# Kernel with which to smooth permuted maps:
# 'gaussian' : Gaussian function.
# 'exp' : Exponential decay function.
# 'invdist' : Inverse distance.
# 'uniform' : Uniform weights (distance independent).
# pv : int, default 25
# Percentile of the pairwise distance distribution at which to
# truncate during variogram fitting
# nh : int, default 25
# Number of uniformly spaced distances at which to compute variogram
# resample : bool, default False
# Resample surrogate maps' values from target brain map
# b : float or None, default None
# Gaussian kernel bandwidth for variogram smoothing. If None, set to
# three times the spacing between variogram x-coordinates.
#
# Notes
# -----
# Passing resample=True preserves the distribution of values in the target
# map, with the possibility of worsening the simulated surrogate maps'
# variograms fits.
#
# """
#
# def __init__(self, x, D, deltas=np.linspace(0.1, 0.9, 9),
# kernel='exp', pv=25, nh=25, resample=False, b=None):
#
# self.x = x
# self.D = D
# n = self._x.size
# self.resample = resample
# self.nh = nh
# self.deltas = deltas
# self.pv = pv
# self.nmap = n
# self.kernel = kernel # Smoothing kernel selection
# self._ikn = np.arange(n)[:, None]
# self._triu = np.triu_indices(self._nmap, k=1) # upper triangular inds
# self._u = self._D[self._triu] # variogram X-coordinate
# self._v = self.compute_variogram(self._x) # variogram Y-coord
#
# # Get indices of pairs with u < pv'th percentile
# self._uidx = np.where(self._u < np.percentile(self._u, self._pv))[0]
# self._uisort = np.argsort(self._u[self._uidx])
#
# # Find sorted indices of first `kmax` elements of each row of dist. mat.
# self._disort = np.argsort(self._D, axis=-1)
# self._jkn = dict.fromkeys(deltas)
# self._dkn = dict.fromkeys(deltas)
# for delta in deltas:
# k = int(delta*n)
# # find index of k nearest neighbors for each area
# self._jkn[delta] = self._disort[:, 1:k+1] # prevent self-coupling
# # find distance to k nearest neighbors for each area
# self._dkn[delta] = self._D[(self._ikn, self._jkn[delta])]
#
# # Smoothed variogram and variogram _b
# utrunc = self._u[self._uidx]
# self._h = np.linspace(utrunc.min(), utrunc.max(), self._nh)
# self.b = b
# self._smvar = self.smooth_variogram(self._v)
#
# # Linear regression model
# self._lm = LinearRegression(fit_intercept=True)
#
# def __call__(self, n=1):
# """
# Randomly generate new surrogate map(s).
#
# Parameters
# ----------
# n : int, default 1
# Number of surrogate maps to randomly generate
#
# Returns
# -------
# ndarray, shape (n,N)
# Randomly generated map(s) with matched spatial autocorrelation
#
# Notes
# -----
# Chooses a level of smoothing that produces a smoothed variogram which
# best approximates the true smoothed variogram. Selecting resample='True'
# preserves the original map's value distribution at the expense of
# worsening the surrogate maps' variogram fit.
#
# """
# print("Generating {} maps...".format(n))
# surrs = np.empty((n, self._nmap))
# for i in range(n): # generate random maps
#
# xperm = self.permute_map() # Randomly permute values
# res = dict.fromkeys(self._deltas)
#
# for delta in self.deltas: # foreach neighborhood size
# # Smooth the permuted map using delta proportion of
# # neighbors to reintroduce spatial autocorrelation
# sm_xperm = self.smooth_map(x=xperm, delta=delta)
#
# # Calculate empirical variogram of the smoothed permuted map
# vperm = self.compute_variogram(sm_xperm)
#
# # Calculate smoothed variogram of the smoothed permuted map
# smvar_perm = self.smooth_variogram(vperm)
#
# # Fit linear regression btwn smoothed variograms
# res[delta] = self.regress(smvar_perm, self._smvar)
#
# alphas, betas, residuals = np.array(
# [res[d] for d in self._deltas]).T
#
# # Select best-fit model and regression parameters
# iopt = np.argmin(residuals)
# dopt = self._deltas[iopt]
# aopt = alphas[iopt]
# bopt = betas[iopt]
#
# # Transform and smooth permuted map using best-fit parameters
# sm_xperm_best = self.smooth_map(x=xperm, delta=dopt)
# surr = (np.sqrt(np.abs(bopt)) * sm_xperm_best +
# np.sqrt(np.abs(aopt)) * np.random.randn(self._nmap))
# surrs[i] = surr
#
# if self._resample: # resample values from empirical map
# sorted_map = np.sort(self._x)
# for i, surr in enumerate(surrs):
# ii = np.argsort(surr)
# np.put(surr, ii, sorted_map)
#
# return surrs.squeeze()
#
# def compute_variogram(self, x):
# """
# Compute variogram values (i.e., one-half squared pairwise differences).
#
# Parameters
# ----------
# x : 1D ndarray
# Brain map scalar array
#
# Returns
# -------
# v : ndarray, shape (N(N-1)/2,)
# Variogram y-coordinates, i.e. 0.5 * (x_i - x_j) ^ 2
#
# """
# diff_ij = np.subtract.outer(x, x)
# v = 0.5 * np.square(diff_ij)[self._triu]
# return v
#
# def permute_map(self):
# """
# Return randomly permuted brain map.
#
# Returns
# -------
# 1D ndarray
# Random permutation of target brain map
#
# """
# perm_idx = np.random.permutation(np.arange(self._x.size))
# mask_perm = self._x.mask[perm_idx]
# x_perm = self._x.data[perm_idx]
# return np.ma.masked_array(data=x_perm, mask=mask_perm)
#
# def smooth_map(self, x, delta):
# """
# Smooth `x` using `delta` proportion of nearest neighbors.
#
# Parameters
# ----------
# x : 1D ndarray
# Brain map scalars
# delta : float
# Proportion of neighbors to include for smoothing, in (0, 1)
#
# Returns
# -------
# 1D ndarray
# Smoothed brain map
#
# """
# # Values of k nearest neighbors for each brain area
# xkn = x[self._jkn[delta]]
# weights = self._kernel(self._dkn[delta]) # Distance-weight kernel
# # Kernel-weighted sum
# return (weights * xkn).sum(axis=1) / weights.sum(axis=1)
#
# def smooth_variogram(self, v, return_h=False):
# """
# Smooth a variogram.
#
# Parameters
# ----------
# v : 1D ndarray
# Variogram values, i.e. 0.5 * (x_i - x_j) ^ 2
# return_h : bool, default False
# Return distances at which the smoothed variogram was computed
#
# Returns
# -------
# 1D ndarray, shape (nh,)
# Smoothed variogram values
# 1D ndarray, shape (nh,)
# Distances at which smoothed variogram was computed (returned only if
# `return_h` is True)
#
# Raises
# ------
# ValueError : `v` has unexpected size.
#
# """
# u = self._u[self._uidx]
# v = v[self._uidx]
# if len(u) != len(v):
# raise ValueError(
# "argument v: expected size {}, got {}".format(len(u), len(v)))
# # Subtract each h from each pairwise distance u
# # Each row corresponds to a unique h
# du = np.abs(u - self._h[:, None])
# w = np.exp(-np.square(2.68 * du / self._b) / 2)
# denom = w.sum(axis=1)
# wv = w * v[None, :]
# num = wv.sum(axis=1)
# output = num / denom
# if not return_h:
# return output
# return output, self._h
#
# def regress(self, x, y):
# """
# Linearly regress `x` onto `y`.
#
# Parameters
# ----------
# x : 1D ndarray
# Independent variable
# y : 1D ndarray
# Dependent variable
#
# Returns
# -------
# alpha : float
# Intercept term (offset parameter)
# beta : float
# Regression coefficient (scale parameter)
# res : float
# Sum of squared residuals
#
# """
# self._lm.fit(X=np.expand_dims(x, -1), y=y)
# beta = self._lm.coef_
# alpha = self._lm.intercept_
# y_pred = self._lm.predict(X=np.expand_dims(x, -1))
# res = np.sum(np.square(y-y_pred))
# return alpha, beta, res
#
# @property
# def x(self):
# """ 1D ndarray : brain map scalar array """
# return self._x
#
# @x.setter
# def x(self, x):
# x_ = dataio(x)
# check_map(x=x_)
# brain_map = np.ma.masked_array(data=x_, mask=np.isnan(x_))
# self._x = brain_map
#
# @property
# def D(self):
# """ ndarray, shape (N,N) : Pairwise distance matrix """
# return self._D
#
# @D.setter
# def D(self, x):
# d_ = dataio(x)
# if not np.allclose(d_, d_.T):
# raise ValueError("Distance matrix must be symmetric")
# n = self._x.size
# if d_.shape != (n, n):
# e = "Distance matrix must have dimensions consistent with brain map"
# e += "\nDistance matrix shape: {}".format(d_.shape)
# e += "\nBrain map size: {}".format(n)
# raise ValueError(e)
# self._D = d_
#
# @property
# def nmap(self):
# """ int : length of brain map """
# return self._nmap
#
# @nmap.setter
# def nmap(self, x):
# self._nmap = int(x)
#
# @property
# def pv(self):
# """ int : percentile of pairwise distances at which to truncate """
# return self._pv
#
# @pv.setter
# def pv(self, x):
# pv = check_pv(x)
# self._pv = pv
#
# @property
# def deltas(self):
# """ 1D ndarray or List[float] : proportions of nearest neighbors """
# return self._deltas
#
# @deltas.setter
# def deltas(self, x):
# check_deltas(deltas=x)
# self._deltas = x
#
# @property
# def nh(self):
# """ int : number of variogram distance intervals """
# return self._nh
#
# @nh.setter
# def nh(self, x):
# self._nh = x
#
# @property
# def h(self):
# """ 1D ndarray : distances at which smoothed variogram is computed """
# return self._h
#
# @property
# def kernel(self):
# """ Callable : smoothing kernel function """
# return self._kernel
#
# @kernel.setter
# def kernel(self, x):
# kernel_callable = check_kernel(x)
# self._kernel = kernel_callable
#
# @property
# def resample(self):
# """ bool : whether to resample surrogate maps from target map """
# return self._resample
#
# @resample.setter
# def resample(self, x):
# if not isinstance(x, bool):
# e = "parameter `resample`: expected bool, got {}".format(type(x))
# raise TypeError(e)
# self._resample = x
#
# @property
# def b(self):
# """ numeric : Gaussian kernel bandwidth """
# return self._b
#
# @b.setter
# def b(self, x):
# if x is not None:
# try:
# self._b = float(x)
# except (ValueError, TypeError):
# e = "bandwidth b: expected numeric, got {}".format(type(x))
# raise ValueError(e)
# else: # set bandwidth equal to 3x bin spacing
# self._b = 3.*np.mean(self._h[1:] - self._h[:-1])
#
#
# class Sampled:
# """
# Sampling implementation of map generator.
#
# Parameters
# ----------
# x : 1D ndarray
# Target brain map
# D : ndarray or memmap, shape (N,N)
# Pairwise distance matrix between elements of `x`. Each row of `D` should
# be sorted. Indices used to sort each row are passed to the `index`
# argument. See :func:`brainsmash.mapgen.memmap.txt2memmap` or the online
# documentation for more details (brainsmash.readthedocs.io)
# index : filename or ndarray or memmap, shape(N,N)
# See above
# ns : int, default 500
# Take a subsample of `ns` rows from `D` when fitting variograms
# deltas : ndarray or List[float], default [0.3, 0.5, 0.7, 0.9]
# Proportions of neighbors to include for smoothing, in (0, 1]
# kernel : str, default 'exp'
# Kernel with which to smooth permuted maps
# - 'gaussian' : gaussian function
# - 'exp' : exponential decay function
# - 'invdist' : inverse distance
# - 'uniform' : uniform weights (distance independent)
# pv : int, default 70
# Percentile of the pairwise distance distribution (in `D`) at
# which to truncate during variogram fitting
# nh : int, default 25
# Number of uniformly spaced distances at which to compute variogram
# knn : int, default 1000
# Number of nearest regions to keep in the neighborhood of each region
# b : float or None, default None
# Gaussian kernel bandwidth for variogram smoothing. if None,
# three times the distance interval spacing is used.
# resample : bool, default False
# Resample surrogate map values from the target brain map
# verbose : bool, default False
# Print surrogate count each time new surrogate map created
#
# Notes
# -----
# Passing resample=True will preserve the distribution of values in the
# target map, at the expense of worsening simulated surrogate maps'
# variograms fits. This worsening will increase as the empirical map
# more strongly deviates from normality.
#
# Raises
# ------
# ValueError : `x` and `D` have inconsistent sizes
#
# """
#
# def __init__(self, x, D, index, ns=500, pv=70, nh=25, knn=1000, b=None,
# deltas=np.arange(0.3, 1., 0.2), kernel='exp', resample=False,
# verbose=False):
#
# self._verbose = verbose
# self.x = x
# n = self._x.size
# self.nmap = int(n)
# self.knn = knn
# self.D = D
# self.index = index
# self.resample = resample
# self.nh = int(nh)
# self.deltas = deltas
# self.ns = int(ns)
# self.b = b
# self.pv = pv
# self._ikn = np.arange(self._nmap)[:, None]
#
# # Store k nearest neighbors from distance and index matrices
# self.kernel = kernel # Smoothing kernel selection
# self._dmax = np.percentile(self._D, self._pv)
# self.h = np.linspace(self._D.min(), self._dmax, self._nh)
# if not self._b:
# self.b = 3 * (self.h[1] - self.h[0])
#
# # Linear regression model
# self._lm = LinearRegression(fit_intercept=True)
#
# def __call__(self, n=1):
# """
# Randomly generate new surrogate map(s).
#
# Parameters
# ----------
# n : int, default 1
# Number of surrogate maps to randomly generate
#
# Returns
# -------
# ndarray, shape (n,N)
# Randomly generated map(s) with matched spatial autocorrelation
#
# Notes
# -----
# Chooses a level of smoothing that produces a smoothed variogram which
# best approximates the true smoothed variogram. Selecting resample='True'
# preserves the map value distribution at the expense of worsening the
# surrogate maps' variogram fits.
#
# """
# if self._verbose:
# print("Generating {} maps...".format(n))
# surrs = np.empty((n, self._nmap))
# for i in range(n): # generate random maps
# if self._verbose:
# print(i+1)
#
# # Randomly permute map
# x_perm = self.permute_map()
#
# # Randomly select subset of regions to use for variograms
# idx = self.sample()
#
# # Compute empirical variogram
# v = self.compute_variogram(self._x, idx)
#
# # Variogram ordinates; use nearest neighbors because local effect
# u = self._D[idx, :]
# uidx = np.where(u < self._dmax)
#
# # Smooth empirical variogram
# smvar, u0 = self.smooth_variogram(u[uidx], v[uidx], return_h=True)
#
# res = dict.fromkeys(self._deltas)
#
# for d in self._deltas: # foreach neighborhood size
#
# k = int(d * self._knn)
#
# # Smooth the permuted map using k nearest neighbors to
# # reintroduce spatial autocorrelation
# sm_xperm = self.smooth_map(x=x_perm, k=k)
#
# # Calculate variogram values for the smoothed permuted map
# vperm = self.compute_variogram(sm_xperm, idx)
#
# # Calculate smoothed variogram of the smoothed permuted map
# smvar_perm = self.smooth_variogram(u[uidx], vperm[uidx])
#
# # Fit linear regression btwn smoothed variograms
# res[d] = self.regress(smvar_perm, smvar)
#
# alphas, betas, residuals = np.array(
# [res[d] for d in self._deltas]).T
#
# # Select best-fit model and regression parameters
# iopt = np.argmin(residuals)
# dopt = self._deltas[iopt]
# self._dopt = dopt
# kopt = int(dopt * self._knn)
# aopt = alphas[iopt]
# bopt = betas[iopt]
#
# # Transform and smooth permuted map using best-fit parameters
# sm_xperm_best = self.smooth_map(x=x_perm, k=kopt)
# surr = (np.sqrt(np.abs(bopt)) * sm_xperm_best +
# np.sqrt(np.abs(aopt)) * np.random.randn(self._nmap))
# surrs[i] = surr
#
# if self._resample: # resample values from empirical map
# sorted_map = np.sort(self._x)
# for i, surr in enumerate(surrs):
# ii = np.argsort(surr)
# np.put(surr, ii, sorted_map)
#
# if self._ismasked:
# return np.ma.masked_array(
# data=surrs, mask=np.isnan(surrs)).squeeze()
# return surrs.squeeze()
#
# def compute_variogram(self, x, idx):
# """
# Compute variogram of `x` using pairs of regions indexed by `idx`.
#
# Parameters
# ----------
# x : 1Dndarray
# Brain map
# idx : ndarray[int], shape (ns,)
# Indices of randomly sampled brain regions
#
# Returns
# -------
# v : ndarray, shape (ns,ns)
# Variogram y-coordinates, i.e. 0.5 * (x_i - x_j) ^ 2, for i,j in idx
#
# """
# diff_ij = x[idx][:, None] - x[self._index[idx, :]]
# return 0.5 * np.square(diff_ij)
#
# def permute_map(self):
# """
# Return a random permutation of the target brain map.
#
# Returns
# -------
# 1D ndarray
# Random permutation of target brain map
#
# """
# perm_idx = np.random.permutation(self._nmap)
# if self._ismasked:
# mask_perm = self._x.mask[perm_idx]
# x_perm = self._x.data[perm_idx]
# return np.ma.masked_array(data=x_perm, mask=mask_perm)
# return self._x[perm_idx]
#
# def smooth_map(self, x, k):
# """
# Smooth `x` using `k` nearest neighboring regions.
#
# Parameters
# ----------
# x : 1D ndarray
# Brain map
# k : float
# Number of nearest neighbors to include for smoothing
#
# Returns
# -------
# x_smooth : 1D ndarray
# Smoothed brain map
#
# Notes
# -----
# Assumes `D` provided at runtime has been sorted.
#
# """
# jkn = self._index[:, :k] # indices of k nearest neighbors
# xkn = x[jkn] # values of k nearest neighbors
# dkn = self._D[:, :k] # distances to k nearest neighbors
# weights = self._kernel(dkn) # distance-weighted kernel
# # Kernel-weighted sum
# return (weights * xkn).sum(axis=1) / weights.sum(axis=1)
#
# def smooth_variogram(self, u, v, return_h=False):
# """
# Smooth a variogram.
#
# Parameters
# ----------
# u : 1D ndarray
# Pairwise distances, ie variogram x-coordinates
# v : 1D ndarray
# Squared differences, ie variogram y-coordinates
# return_h : bool, default False
# Return distances at which smoothed variogram is computed
#
# Returns
# -------
# ndarray, shape (nh,)
# Smoothed variogram samples
# ndarray, shape (nh,)
# Distances at which smoothed variogram was computed (returned if
# `return_h` is True)
#
# Raises
# ------
# ValueError : `u` and `v` are not identically sized
#
# """
# if len(u) != len(v):
# raise ValueError("u and v must have same number of elements")
#
# # Subtract each element of h from each pairwise distance `u`.
# # Each row corresponds to a unique h.
# du = np.abs(u - self._h[:, None])
# w = np.exp(-np.square(2.68 * du / self._b) / 2)
# denom = w.sum(axis=1)
# wv = w * v[None, :]
# num = wv.sum(axis=1)
# output = num / denom
# if not return_h:
# return output
# return output, self._h
#
# def regress(self, x, y):
# """
# Linearly regress `x` onto `y`.
#
# Parameters
# ----------
# x : 1D ndarray
# Independent variable
# y : 1D ndarray
# Dependent variable
#
# Returns
# -------
# alpha : float
# Intercept term (offset parameter)
# beta : float
# Regression coefficient (scale parameter)
# res : float
# Sum of squared residuals
#
# """
# self._lm.fit(X=np.expand_dims(x, -1), y=y)
# beta = self._lm.coef_.item()
# alpha = self._lm.intercept_
# ypred = self._lm.predict(np.expand_dims(x, -1))
# res = np.sum(np.square(y-ypred))
# return alpha, beta, res
#
# def sample(self):
# """
# Randomly sample (without replacement) brain areas for variogram
# computation.
#
# Returns
# -------
# ndarray, shape (ns,)
# Indices of randomly sampled areas
#
# """
# return np.random.choice(
# a=self._nmap, size=self._ns, replace=False).astype(np.int32)
#
# @property
# def x(self):
# """1D ndarray : brain map scalars """
# if self._ismasked:
# return np.ma.copy(self._x)
# return np.copy(self._x)
#
# @x.setter
# def x(self, x):
# self._ismasked = False
# x_ = dataio(x)
# check_map(x=x_)
# mask = np.isnan(x_)
# if mask.any():
# self._ismasked = True
# brain_map = np.ma.masked_array(data=x_, mask=mask)
# else:
# brain_map = x_
# self._x = brain_map
#
# @property
# def D(self):
# """ndarray or memmap, shape (N,N) : Pairwise distance matrix """
# return np.copy(self._D)
#
# @D.setter
# def D(self, x):
# x_ = dataio(x)
# n = self._x.size
# if x_.shape[0] != n:
# raise ValueError(
# "D size along axis=0 must equal brain map size")
# self._D = x_[:, 1:self._knn + 1] # prevent self-coupling
#
# @property
# def index(self):
# """ndarray or memmap : indexes used to sort each row of dist. matrix """
# return np.copy(self._index)
#
# @index.setter
# def index(self, x):
# x_ = dataio(x)
# n = self._x.size
# if x_.shape[0] != n:
# raise ValueError(
# "index size along axis=0 must equal brain map size")
# self._index = x_[:, 1:self._knn+1].astype(np.int32)
#
# @property
# def nmap(self):
# """ int : length of brain map """
# return self._nmap
#
# @nmap.setter
# def nmap(self, x):
# self._nmap = int(x)
#
# @property
# def pv(self):
# """ int : percentile of pairwise distances at which to truncate """
# return self._pv
#
# @pv.setter
# def pv(self, x):
# pv = check_pv(x)
# self._pv = pv
#
# @property
# def deltas(self):
# """ 1D ndarray or List[float] : proportions of nearest neighbors """
# return self._deltas
#
# @deltas.setter
# def deltas(self, x):
# check_deltas(deltas=x)
# self._deltas = x
#
# @property
# def nh(self):
# """ int : number of variogram distance intervals """
# return self._nh
#
# @nh.setter
# def nh(self, x):
# self._nh = x
#
# @property
# def kernel(self):
# """ Callable : smoothing kernel function
#
# Notes
# -----
# When setting kernel, use name of kernel as defined in ``config.py``.
#
# """
# return self._kernel
#
# @kernel.setter
# def kernel(self, x):
# kernel_callable = check_kernel(x)
# self._kernel = kernel_callable
#
# @property
# def resample(self):
# """ bool : whether to resample surrogate map values from target maps """
# return self._resample
#
# @resample.setter
# def resample(self, x):
# if not isinstance(x, bool):
# raise TypeError("expected bool, got {}".format(type(x)))
# self._resample = x
#
# @property
# def knn(self):
# """ int : number of nearest neighbors included in distance matrix """
# return self._knn
#
# @knn.setter
# def knn(self, x):
# if x > self._nmap:
# raise ValueError('knn must be less than len(X)')
# self._knn = int(x)
#
# @property
# def ns(self):
# """ int : number of randomly sampled regions used to construct map """
# return self._ns
#
# @ns.setter
# def ns(self, x):
# self._ns = int(x)
#
# @property
# def b(self):
# """ numeric : Gaussian kernel bandwidth """
# return self._b
#
# @b.setter
# def b(self, x):
# self._b = x
#
# @property
# def h(self):
# """ 1D ndarray : distances at which variogram is evaluated """
# return self._h
#
# @h.setter
# def h(self, x):
# self._h = x
[docs]class SurrogateMaps(BaseEstimator):
""" Spatial autocorrelation-preserving surrogate brain maps.
Parameters
----------
deltas : 1D ndarray or List[float], optional
Proportion of neighbors to include for smoothing, in (0, 1]
Default is [0.1,0.2,...,0.9].
kernel : str, optional
Kernel with which to smooth permuted maps:
'gaussian' : Gaussian function.
'exp' : Exponential decay function.
'invdist' : Inverse distance.
'uniform' : Uniform weights (distance independent).
Default is 'exp'.
pv : int, optional
Percentile of the pairwise distance distribution at which to
truncate during variogram fitting. Default is 25.
nh : int, optional
Number of uniformly spaced distances at which to compute variogram.
Default is 25.
resample : bool, optional
Resample surrogate maps' values from target brain map.
Default is False.
b : float or None, optional
Gaussian kernel bandwidth for variogram smoothing. If None, set to
three times the spacing between variogram x-coordinates.
Default is None.
n_rep : int, optional
Number of randomizations (i.e., surrogate maps). Default is 100.
random_state : int or None, optional
Random state. Default is None.
See Also
--------
:class:`.SampledSurrogateMaps`
:class:`.SpinPermutations`
:class:`.MoranRandomization`
Notes
-----
Passing resample=True preserves the distribution of values in the target
map, with the possibility of worsening the simulated surrogate maps'
variograms fits.
"""
[docs] def __init__(self, deltas=None, kernel='exp', pv=25, nh=25, resample=False,
b=None, n_rep=100, random_state=None):
self.deltas = np.linspace(0.1, 0.9, 9) if deltas is None else deltas
check_deltas(deltas=self.deltas)
self.resample = resample
self.nh = nh
self.pv = check_pv(pv)
self.kernel = check_kernel(kernel) # Smoothing kernel selection
self.b = b
self.n_rep = n_rep
self.random_state = random_state
self._rs = np.random.RandomState(self.random_state)
# Linear regression model
self._lm = LinearRegression(fit_intercept=True)
def _check_distance(self, dist):
d = dataio(dist)
if not np.allclose(d, d.T):
raise ValueError("Distance matrix must be symmetric")
return d
def _check_map(self, x):
x_ = dataio(x)
check_map(x=x_)
brain_map = np.ma.masked_array(data=x_, mask=np.isnan(x_))
if self._dist.shape[0] != brain_map.size:
e = "Brain map must have dimensions consistent with distance matrix"
e += "\nDistance matrix shape: {}".format(self._dist.shape)
e += "\nBrain map size: {}".format(brain_map.size)
raise ValueError(e)
return brain_map
[docs] def fit(self, dist):
""" Prepare data for sorrugate map generation..
Parameters
----------
dist : filename or ndarray, shape (N,N)
Pairwise (geodesic) distance matrix.
Returns
-------
self : object
Returns self.
"""
self._dist = self._check_distance(dist)
self.nmap = n = dist.shape[0]
self._ikn = np.arange(n)[:, None]
self._triu = np.triu_indices(n, k=1) # upper triangular inds
self._u = self._dist[self._triu] # variogram X-coordinate
# self._v = self.compute_variogram(self._x) # variogram Y-coord
# Get indices of pairs with u < pv'th percentile
self._uidx = np.where(self._u < np.percentile(self._u, self.pv))[0]
self._uisort = np.argsort(self._u[self._uidx])
# Find sorted indices of first `kmax` elements of each row of dist.
self._disort = np.argsort(self._dist, axis=-1)
self._jkn = dict.fromkeys(self.deltas)
self._dkn = dict.fromkeys(self.deltas)
for delta in self.deltas:
k = int(delta*n)
# find index of k nearest neighbors for each area
self._jkn[delta] = self._disort[:, 1:k+1] # prevent self-coupling
# find distance to k nearest neighbors for each area
self._dkn[delta] = self._dist[(self._ikn, self._jkn[delta])]
# Smoothed variogram and variogram _b
utrunc = self._u[self._uidx]
self._h = np.linspace(utrunc.min(), utrunc.max(), self.nh)
if self.b is None:
self.b = 3.*np.mean(self._h[1:] - self._h[:-1])
return self
[docs] def randomize(self, x, n_rep=None):
""" Generate surrogate maps from `x`.
Parameters
----------
x : filename or 1D ndarray
Target brain map
n_rep : int or None, optional
Number of surrogates maps to randomly generate. If None, use
the default `n_rep`.
Returns
-------
output : ndarray, shape = (n_rep, n_verts)
Randomly generated map(s) with matched spatial autocorrelation.
"""
x = self._check_map(x)
nmap = x.size
n_rep = self.n_rep if n_rep is None else n_rep
v = self.compute_variogram(x) # variogram Y-coord
smvar = self.smooth_variogram(v)
surrs = np.empty((n_rep, nmap))
for i in range(n_rep): # generate random maps
xperm = self.permute_map(x) # Randomly permute values
res = dict.fromkeys(self.deltas)
for delta in self.deltas: # foreach neighborhood size
# Smooth the permuted map using delta proportion of
# neighbors to reintroduce spatial autocorrelation
sm_xperm = self.smooth_map(x=xperm, delta=delta)
# Calculate empirical variogram of the smoothed permuted map
vperm = self.compute_variogram(sm_xperm)
# Calculate smoothed variogram of the smoothed permuted map
smvar_perm = self.smooth_variogram(vperm)
# Fit linear regression btwn smoothed variograms
res[delta] = self.regress(smvar_perm, smvar)
alphas, betas, residuals = np.array(
[res[d] for d in self.deltas]).T
# Select best-fit model and regression parameters
iopt = np.argmin(residuals)
dopt = self.deltas[iopt]
aopt = alphas[iopt]
bopt = betas[iopt]
# Transform and smooth permuted map using best-fit parameters
sm_xperm_best = self.smooth_map(x=xperm, delta=dopt)
surr = (np.sqrt(np.abs(bopt)) * sm_xperm_best +
np.sqrt(np.abs(aopt)) * self._rs.randn(nmap))
surrs[i] = surr
if self.resample: # resample values from empirical map
sorted_map = np.sort(x)
for i, surr in enumerate(surrs):
ii = np.argsort(surr)
np.put(surr, ii, sorted_map)
return surrs.squeeze()
[docs] def compute_variogram(self, x):
"""
Compute variogram values (i.e., one-half squared pairwise differences).
Parameters
----------
x : 1D ndarray
Brain map scalar array
Returns
-------
v : ndarray, shape (N(N-1)/2,)
Variogram y-coordinates, i.e. 0.5 * (x_i - x_j) ^ 2
"""
diff_ij = np.subtract.outer(x, x)
v = 0.5 * np.square(diff_ij)[self._triu]
return v
[docs] def permute_map(self, x):
"""
Return randomly permuted brain map.
Parameters
----------
x : 1D masked ndarray
Brain map scalars
Returns
-------
1D ndarray
Random permutation of target brain map
"""
pidx = self._rs.permutation(x.size)
return np.ma.masked_array(data=x.data[pidx], mask=x.mask[pidx])
[docs] def smooth_map(self, x, delta):
"""
Smooth `x` using `delta` proportion of nearest neighbors.
Parameters
----------
x : 1D ndarray
Brain map scalars
delta : float
Proportion of neighbors to include for smoothing, in (0, 1)
Returns
-------
1D ndarray
Smoothed brain map
"""
# Values of k nearest neighbors for each brain area
xkn = x[self._jkn[delta]]
weights = self.kernel(self._dkn[delta]) # Distance-weight kernel
return (weights * xkn).sum(axis=1) / weights.sum(axis=1)
[docs] def smooth_variogram(self, v, return_h=False):
"""
Smooth a variogram.
Parameters
----------
v : 1D ndarray
Variogram values, i.e. 0.5 * (x_i - x_j) ^ 2
return_h : bool, default False
Return distances at which the smoothed variogram was computed
Returns
-------
1D ndarray, shape (nh,)
Smoothed variogram values
1D ndarray, shape (nh,)
Distances at which smoothed variogram was computed (returned only
if `return_h` is True)
Raises
------
ValueError : `v` has unexpected size.
"""
u = self._u[self._uidx]
v = v[self._uidx]
if len(u) != len(v):
raise ValueError(
"argument v: expected size {}, got {}".format(len(u), len(v)))
# Subtract each h from each pairwise distance u
# Each row corresponds to a unique h
du = np.abs(u - self._h[:, None])
w = np.exp(-np.square(2.68 * du / self.b) / 2)
denom = w.sum(axis=1)
wv = w * v[None, :]
num = wv.sum(axis=1)
output = num / denom
if not return_h:
return output
return output, self._h
[docs] def regress(self, x, y):
"""
Linearly regress `x` onto `y`.
Parameters
----------
x : 1D ndarray
Independent variable
y : 1D ndarray
Dependent variable
Returns
-------
alpha : float
Intercept term (offset parameter)
beta : float
Regression coefficient (scale parameter)
res : float
Sum of squared residuals
"""
self._lm.fit(X=np.expand_dims(x, -1), y=y)
beta = self._lm.coef_.item()
alpha = self._lm.intercept_
y_pred = self._lm.predict(X=np.expand_dims(x, -1))
res = np.sum(np.square(y-y_pred))
return alpha, beta, res
@property
def h(self):
""" 1D ndarray : distances at which smoothed variogram is computed """
return self._h
[docs]class SampledSurrogateMaps(BaseEstimator):
"""
Spatial autocorrelation-preserving surrogate brain maps wih sampling.
Parameters
----------
ns : int, optional
Take a subsample of `ns` rows from `D` when fitting variograms.
Default is 500.
deltas : 1D ndarray or List[float], optional
Proportion of neighbors to include for smoothing, in (0, 1]
Default is [0.1,0.2,...,0.9].
kernel : str, optional
Kernel with which to smooth permuted maps:
'gaussian' : Gaussian function.
'exp' : Exponential decay function.
'invdist' : Inverse distance.
'uniform' : Uniform weights (distance independent).
Default is 'exp'.
pv : int, optional
Percentile of the pairwise distance distribution at which to
truncate during variogram fitting. Default is 25.
nh : int, optional
Number of uniformly spaced distances at which to compute variogram.
Default is 25.
knn : int, optional
Number of nearest regions to keep in the neighborhood of each region.
Default is 1000.
b : float or None, default None
Gaussian kernel bandwidth for variogram smoothing. if None,
three times the distance interval spacing is used.
resample : bool, optional
Resample surrogate maps' values from target brain map.
Default is False.
n_rep : int, optional
Number of randomizations (i.e., surrogate maps). Default is 100.
random_state : int or None, optional
Random state. Default is None.
verbose : bool, default False
Print surrogate count each time new surrogate map created
See Also
--------
:class:`.SurrogateMaps`
:class:`.SpinPermutations`
:class:`.MoranRandomization`
Notes
-----
Passing resample=True will preserve the distribution of values in the
target map, at the expense of worsening simulated surrogate maps'
variograms fits. This worsening will increase as the empirical map
more strongly deviates from normality.
"""
[docs] def __init__(self, ns=500, pv=70, nh=25, knn=1000, b=None, deltas=None,
kernel='exp', resample=False, n_rep=100, random_state=None,
verbose=False):
self.deltas = np.arange(0.3, 1., 0.2) if deltas is None else deltas
check_deltas(deltas=self.deltas)
self.ns = int(ns)
self.resample = resample
self.nh = nh
self.pv = check_pv(pv)
self.kernel = check_kernel(kernel) # Smoothing kernel selection
self.knn = knn
self.b = b
self.verbose = verbose
self.n_rep = n_rep
self.random_state = random_state
self._rs = np.random.RandomState(self.random_state)
# Linear regression model
self._lm = LinearRegression(fit_intercept=True)
def _check_distance(self, dist):
dist = dataio(dist)
return dist[:, 1:self.knn + 1] # prevent self-coupling
def _check_index(self, index):
index = dataio(index)
return index[:, 1:self.knn + 1].astype(np.int32)
def _check_map(self, x):
self._ismasked = False
x = dataio(x)
check_map(x=x)
mask = np.isnan(x)
if mask.any():
self._ismasked = True
return np.ma.masked_array(data=x, mask=mask)
return x
[docs] def fit(self, dist, index):
""" Prepare data for surrogate map generation.
Parameters
----------
dist : ndarray or memmap, shape (N,N)
Pairwise distance matrix. Each row of `dist` should be sorted.
Indices used to sort each row are passed to through the `index`
argument. See :func:`brainspace.variogram.txt2memmap`.
index : filename or ndarray or memmap, shape(N,N)
Indices used to sort each row of `dist`.
Returns
-------
self : object
Returns self.
"""
self._dist = self._check_distance(dist)
self._index = self._check_index(index)
if self._dist.shape != self._index.shape:
raise ValueError('Dimensions of distanace and index matrices are '
'inconsistent: distance shape = {}, index '
'shape = {}.'.format(self._dist.shape,
self._index.shape))
n = self._dist.shape[0]
self._ikn = np.arange(n)[:, None]
self._dmax = np.percentile(self._dist, self.pv)
self._h = np.linspace(self._dist.min(), self._dmax, self.nh)
if not self.b:
self.b = 3 * (self._h[1] - self._h[0])
return self
[docs] def randomize(self, x, n_rep=None):
""" Generate surrogate maps from `x`.
Parameters
----------
x : filename or 1D ndarray
Target brain map
n_rep : int or None, optional
Number of surrogates maps to randomly generate. If None, use
the default `n_rep`.
Returns
-------
output : ndarray, shape = (n_rep, n_verts)
Randomly generated map(s) with matched spatial autocorrelation.
"""
x = self._check_map(x)
n = x.size
if n != self._dist.shape[0]:
raise ValueError("Size of distance matrix along axis=0 must equal "
"brain map size.")
n_rep = self.n_rep if n_rep is None else n_rep
if self.verbose:
print("Generating {} maps...".format(n_rep))
surrs = np.empty((n_rep, n))
for i in range(n_rep): # generate random maps
if self.verbose:
print(i+1)
# Randomly permute map
x_perm = self.permute_map(x)
# Randomly select subset of regions to use for variograms
idx = self.sample(n)
# Compute empirical variogram
v = self.compute_variogram(x, idx)
# Variogram ordinates; use nearest neighbors because local effect
u = self._dist[idx, :]
uidx = np.where(u < self._dmax)
# Smooth empirical variogram
smvar, u0 = self.smooth_variogram(u[uidx], v[uidx], return_h=True)
res = dict.fromkeys(self.deltas)
for d in self.deltas: # foreach neighborhood size
k = int(d * self.knn)
# Smooth the permuted map using k nearest neighbors to
# reintroduce spatial autocorrelation
sm_xperm = self.smooth_map(x=x_perm, k=k)
# Calculate variogram values for the smoothed permuted map
vperm = self.compute_variogram(sm_xperm, idx)
# Calculate smoothed variogram of the smoothed permuted map
smvar_perm = self.smooth_variogram(u[uidx], vperm[uidx])
# Fit linear regression btwn smoothed variograms
res[d] = self.regress(smvar_perm, smvar)
alphas, betas, residuals = np.array(
[res[d] for d in self.deltas]).T
# Select best-fit model and regression parameters
iopt = np.argmin(residuals)
dopt = self.deltas[iopt]
kopt = int(dopt * self.knn)
aopt = alphas[iopt]
bopt = betas[iopt]
# Transform and smooth permuted map using best-fit parameters
sm_xperm_best = self.smooth_map(x=x_perm, k=kopt)
surr = (np.sqrt(np.abs(bopt)) * sm_xperm_best +
np.sqrt(np.abs(aopt)) * self._rs.randn(n))
surrs[i] = surr
if self.resample: # resample values from empirical map
sorted_map = np.sort(x)
for i, surr in enumerate(surrs):
ii = np.argsort(surr)
np.put(surr, ii, sorted_map)
if self._ismasked:
return np.ma.masked_array(
data=surrs, mask=np.isnan(surrs)).squeeze()
return surrs.squeeze()
[docs] def compute_variogram(self, x, idx):
"""
Compute variogram of `x` using pairs of regions indexed by `idx`.
Parameters
----------
x : 1D ndarray
Brain map
idx : ndarray[int], shape (ns,)
Indices of randomly sampled brain regions
Returns
-------
v : ndarray, shape (ns,ns)
Variogram y-coordinates, i.e. 0.5 * (x_i - x_j) ^ 2,
for i,j in idx
"""
diff_ij = x[idx][:, None] - x[self._index[idx, :]]
return 0.5 * np.square(diff_ij)
[docs] def permute_map(self, x):
"""
Return a random permutation of `x`.
Parameters
----------
x : 1D ndarray
Brain map
Returns
-------
1D ndarray
Random permutation of target brain map
"""
pidx = self._rs.permutation(x.size)
if self._ismasked:
return np.ma.masked_array(data=x.data[pidx], mask=x.mask[pidx])
return x[pidx]
[docs] def smooth_map(self, x, k):
"""
Smooth `x` using `k` nearest neighboring regions.
Parameters
----------
x : 1D ndarray
Brain map
k : float
Number of nearest neighbors to include for smoothing
Returns
-------
x_smooth : 1D ndarray
Smoothed brain map
Notes
-----
Assumes `dist` provided at runtime has been sorted.
"""
jkn = self._index[:, :k] # indices of k nearest neighbors
xkn = x[jkn] # values of k nearest neighbors
dkn = self._dist[:, :k] # distances to k nearest neighbors
weights = self.kernel(dkn) # distance-weighted kernel
return (weights * xkn).sum(axis=1) / weights.sum(axis=1)
[docs] def smooth_variogram(self, u, v, return_h=False):
"""
Smooth a variogram.
Parameters
----------
u : 1D ndarray
Pairwise distances, ie variogram x-coordinates
v : 1D ndarray
Squared differences, ie variogram y-coordinates
return_h : bool, default False
Return distances at which smoothed variogram is computed
Returns
-------
ndarray, shape (nh,)
Smoothed variogram samples
ndarray, shape (nh,)
Distances at which smoothed variogram was computed (returned if
`return_h` is True)
Raises
------
ValueError : `u` and `v` are not identically sized
"""
if len(u) != len(v):
raise ValueError("u and v must have same number of elements")
# Subtract each element of h from each pairwise distance `u`.
# Each row corresponds to a unique h.
du = np.abs(u - self._h[:, None])
w = np.exp(-np.square(2.68 * du / self.b) / 2)
denom = w.sum(axis=1)
wv = w * v[None, :]
num = wv.sum(axis=1)
output = num / denom
if not return_h:
return output
return output, self._h
[docs] def regress(self, x, y):
"""
Linearly regress `x` onto `y`.
Parameters
----------
x : 1D ndarray
Independent variable
y : 1D ndarray
Dependent variable
Returns
-------
alpha : float
Intercept term (offset parameter)
beta : float
Regression coefficient (scale parameter)
res : float
Sum of squared residuals
"""
self._lm.fit(X=np.expand_dims(x, -1), y=y)
beta = self._lm.coef_.item()
alpha = self._lm.intercept_
ypred = self._lm.predict(np.expand_dims(x, -1))
res = np.sum(np.square(y-ypred))
return alpha, beta, res
[docs] def sample(self, n):
"""
Randomly sample (without replacement) brain areas for variogram
computation.
Returns
-------
ndarray, shape (ns,)
Indices of randomly sampled areas
"""
return self._rs.choice(a=n, size=self.ns,
replace=False).astype(np.int32)
@property
def h(self):
""" 1D ndarray : distances at which variogram is evaluated """
return self._h