Source code for brainspace.utils.parcellation

"""
Utility functions for parcellations/labelings.
"""

# Author: Oualid Benkarim <oualid.benkarim@mcgill.ca>
# License: BSD 3 clause


import numpy as np
from scipy.stats import mode
from scipy.optimize import linear_sum_assignment

from sklearn.utils.extmath import weighted_mode


[docs]def relabel_consecutive(lab, start_from=0): """Relabel array with consecutive values. Parameters ---------- lab : ndarray Array to relabel. start_from : int, optional Initial label. The default is 0. Returns ------- new_lab : ndarray Array with consecutive labels. """ new_lab = np.empty_like(lab) new_lab[:] = np.unique(lab, return_inverse=True)[1] new_lab += start_from return new_lab
[docs]def relabel(lab, new_labels=None): """Relabel array. Parameters ---------- lab : array_like Array to relabel. new_labels: array_like or dict, optional New labels. If dict, provide new label for each label in input array. If array_like, mapping is performed in ascending order. If None, relabel consecutively, starting from 0. Default is None. Returns ------- new_lab : ndarray Array with new labels. """ if isinstance(new_labels, dict): new_lab = lab.copy() for l1, l2 in new_labels.items(): new_lab[lab == l1] = l2 return new_lab if new_labels is None: return relabel_consecutive(lab) keys = np.unique(lab)[:new_labels.size] return relabel(lab, dict(zip(keys, new_labels)))
def find_label_correspondence(lab1, lab2): """Find label correspondences. Parameters ---------- lab1 : ndarray, shape = (n_labels,) First array of labels. lab2 : ndarray, shape = (n_labels,) Second array of labels. Returns ------- dict Dictionary with label correspondences between first and second arrays. Notes ----- Correspondences are based on largest overlap using the Hungarian algorithm. """ u1, idx1 = np.unique(lab1, return_inverse=True) u2, idx2 = np.unique(lab2, return_inverse=True) upairs, n_overlap = np.unique(list(zip(idx1, idx2)), axis=0, return_counts=True) cost = np.full((u1.size, u2.size), max(lab1.size, lab2.size), dtype=np.float32) cost[tuple([*upairs.T])] -= n_overlap ridx, cidx = linear_sum_assignment(cost) return dict(zip(u1[ridx], u2[cidx]))
[docs]def relabel_by_overlap(lab, ref_lab): """Relabel according to overlap with reference. Parameters ---------- lab : ndarray, shape = (n_labels,) Array of labels. ref_lab : ndarray, shape = (n_labels,) Reference array of labels. Returns ------- new_lab : ndarray, shape = (n_labels,) Array relabeled using the reference array. Notes ----- Correspondences between labels are based on largest overlap using the Hungarian algorithm. """ u1 = np.unique(lab) u2 = np.unique(ref_lab) if u1.size > u2.size: thresh = lab.max() + 1 lab_shifted = lab + thresh lab_corr = find_label_correspondence(lab_shifted, ref_lab) lab_shifted = relabel(lab_shifted, new_labels=lab_corr) ulab = np.unique(lab_shifted) ulab = ulab[ulab >= thresh] map_seq = dict(zip(ulab, np.arange(ulab.size) + ref_lab.max() + 1)) return relabel(lab_shifted, new_labels=map_seq) lab_corr = find_label_correspondence(lab, ref_lab) return relabel(lab, new_labels=lab_corr)
[docs]def map_to_mask(values, mask, fill=0, axis=0): """Assign data to mask. Parameters ---------- values : ndarray, shape = (n_values,) or (n_samples, n_values) Source array of values. mask : ndarray, shape = (n_mask,) Mask of boolean values. Data is mapped to mask. If `values` is 2D, the mask is applied according to `axis`. fill : float, optional Value used to fill elements outside the mask. Default is 0. axis : {0, 1}, optional If ``axis == 0`` map rows. Otherwise, map columns. Default is 0. Returns ------- output : ndarray Values mapped to mask. If `values` is 1D, shape (n_mask,). When `values` is 2D, shape (n_samples, n_mask) if ``axis == 0`` and (n_mask, n_samples) otherwise. """ if np.issubdtype(values.dtype, np.integer) and not np.isfinite(fill): raise ValueError("Cannot use non-finite 'fill' with integer arrays.") if axis == 1 and values.ndim > 1: values = values.T values2d = np.atleast_2d(values) if values2d.shape[1] > np.count_nonzero(mask): raise ValueError("Mask cannot allocate values.") mapped = np.full((values2d.shape[0], mask.size), fill, dtype=values.dtype) mapped[:, mask] = values2d if values.ndim == 1: return mapped[0] if axis == 1: return mapped.T return mapped
[docs]def map_to_labels(source_val, target_lab, mask=None, fill=0, axis=0, source_lab=None): """Map data in source to target according to their labels. Target labels are sorted in ascending order, such that the smallest label indexes the value at position 0 in `source_val`. If `source_lab` is specified, any label in `target_lab` must be in `source_lab`. Parameters ---------- source_val : ndarray, shape = (n_values,) or (n_samples, n_values) Source array of values. target_lab : ndarray, shape = (n_labels,) Target labels. mask : int or ndarray, shape = (n_labels,), optional If mask is not None, only consider target labels in mask. If int, it indicates the label denoting label to discard. Default is None. fill : float, optional Value used to fill elements outside the mask. Only used if mask is not None. Default is 0. axis : {0, 1}, optional If ``axis == 0``, map rows. Otherwise, map columns. Default is 0. source_lab : ndarray, shape = (n_values,), optional Source labels for source values. If None, use unique labels in `target_lab` in ascending order. Default is None. Returns ------- target_val : ndarray, shape = (n_labels,) or (n_samples, n_labels) Target array with corresponding source values. """ if mask is not None: if not isinstance(mask, np.ndarray): mask = target_lab != mask target_lab = target_lab[mask] mapped = map_to_labels(source_val, target_lab, axis=axis, source_lab=source_lab) return map_to_mask(mapped, mask, fill=fill, axis=axis) if axis == 1 and source_val.ndim > 1: source_val = source_val.T values2d = np.atleast_2d(source_val) ulab, idx = np.unique(target_lab, return_inverse=True) if ulab.size > values2d.shape[1]: raise ValueError('There are more target labels than source values.') if source_lab is not None: if source_lab.size != values2d.shape[1]: raise ValueError('Source values and labels must have same size.') if not np.isin(ulab, source_lab).all(): raise ValueError('Cannot find target labels in source labels.') uq_sl, idx_sl = np.unique(source_lab, return_inverse=True) if source_lab.size != uq_sl.size: raise ValueError('Source labels must have distinct labels.') values2d = values2d[:, idx_sl] mapped = values2d[:, idx] if source_val.ndim == 1: return mapped[0] if axis == 1: return mapped.T return mapped
def _get_redop(red_op, weights=None, axis=None): if red_op in ['mean', 'average']: if weights is None: def fred(x, w): return np.mean(x, axis=axis) else: def fred(x, w): return np.average(x, weights=w, axis=axis) elif red_op == 'median': def fred(x, w): return np.median(x, axis=axis) elif red_op == 'mode': if weights is None: def fred(x, w): return mode(x, axis=axis)[0].ravel() else: def fred(x, w): return weighted_mode(x, w, axis=axis) elif red_op == 'sum': def fred(x, w): return np.sum(x if w is None else w * x, axis=axis) elif red_op == 'max': def fred(x, w): return np.max(x, axis=axis) elif red_op == 'min': def fred(x, w): return np.min(x, axis=axis) else: raise ValueError("Unknown reduction operation '{0}'".format(red_op)) return fred
[docs]def reduce_by_labels(values, labels, weights=None, target_labels=None, red_op='mean', axis=0, dtype=None): """Summarize data in `values` according to `labels`. Parameters ---------- values : 1D or 2D ndarray Array of values. labels : 1D ndarray, shape = (n_lab,) Labels used summarize values. weights : 1D ndarray, shape = (n_lab,), optional Weights associated with labels. Only used when `red_op` is 'average', 'mean', 'sum' or 'mode'. Weights are not normalized. Default is None. target_labels : 1D ndarray, optional Target labels. Arrange new array following the ordering of labels in the `target_labels`. When None, new array is arranged in ascending order of `labels`. Default is None. red_op : str or callable, optional How to summarize data. If str, options are: {'min', 'max', 'sum', 'mean', 'median', 'mode', 'average'}. If callable, it should receive a 1D array of values, array of weights (or None) and return a scalar value. Default is 'mean'. dtype : dtype, optional Data type of output array. When None, if `red_op` in {'min', 'max', 'sum', 'mode'}, output is same type as `values`, otherwise output is float. Default is None. axis : {0, 1}, optional If ``axis == 0``, apply to each row (reduce number of columns per row). Otherwise, apply to each column (reduce number of rows per column). Default is 0. Returns ------- target_values : ndarray Summarized target values. """ if axis == 1 and values.ndim == 1: axis = 0 if target_labels is None: uq_tl = np.unique(labels) idx_back = None else: uq_tl, idx_back = np.unique(target_labels, return_inverse=True) if weights is not None: weights = np.atleast_2d(weights) v2d = np.atleast_2d(values) if axis == 1: v2d = v2d.T if isinstance(red_op, str): fred = _get_redop(red_op, weights=weights, axis=1) else: fred = red_op if dtype is None: dtype = np.float if red_op in {'min', 'max', 'sum', 'mode'}: dtype = values.dtype mapped = np.empty((v2d.shape[0], uq_tl.size), dtype=dtype) for i, lab in enumerate(uq_tl): mask = labels == lab wm = None if weights is None else weights[:, mask] if isinstance(red_op, str): mapped[:, i] = fred(v2d[:, mask], wm) else: for idx in range(v2d.shape[0]): mapped[idx, i] = fred(v2d[idx, mask], wm) if idx_back is not None: mapped = mapped[:, idx_back] if axis == 1: mapped = mapped.T if values.ndim == 1: return mapped[0] return mapped