import argparse
import os
import shutil
from contextlib import contextmanager
from typing import Sequence
import dask.array
import nibabel as nib
import numpy as np
from fsl.utils.path import hasExt
from nibabel.cifti2 import Cifti2Header, Cifti2Image
from nibabel.cifti2.cifti2_axes import Axis, BrainModelAxis, ScalarAxis
from nibabel.gifti import GiftiImage
from numpy.lib.stride_tricks import as_strided
from mcot.surface.all import BrainStructure, get_brain_structure, write_gifti
from . import cifti
[docs]class GreyOrdinates(object):
"""Represents data on voxels or vertices."""
[docs] def __init__(self, data, brain_model_axis: BrainModelAxis, other_axes: Sequence[Axis]=None, parent_file=None):
"""Defines a new dataset in greyordinate space.
:param data: (..., N) array for N greyordinates
:param brain_model_axis: CIFTI axis describing the greyordinate space
:param other_axes: sequence of CIFTI axes describing the other dimensions
:param parent_file: file in which the dataset has been stored
"""
self.data = data
if data.shape[-1] != len(brain_model_axis):
raise ValueError("Last axis of data does not match number of greyordinates")
self.brain_model_axis = brain_model_axis
if other_axes is not None:
if len(other_axes) != self.data.ndim - 1:
raise ValueError("Number of axis does not match dimensionality of the data")
if tuple(len(ax) for ax in other_axes) != self.data.shape[:-1]:
raise ValueError("Size of other axes does not match data size")
self.other_axes = None if other_axes is None else tuple(other_axes)
self.parent_file = parent_file
[docs] def volume(self, ):
"""Get the volumetric data as a Nifti1Image."""
if self.brain_model_axis.volume_mask.sum() == 0:
raise ValueError(f"Can not create volume without voxels in {self}")
data = np.full(self.brain_model_axis.volume_shape + self.data.shape[:-1], np.nan,
dtype=self.data.dtype)
voxels = self.brain_model_axis.voxel[self.brain_model_axis.volume_mask]
data[tuple(voxels.T)] = np.transpose(self.data, (-1, ) + tuple(range(self.data.ndim - 1)))[self.brain_model_axis.volume_mask]
return nib.Nifti1Image(data, affine=self.brain_model_axis.affine)
[docs] def surface(self, anatomy, fill=np.nan, partial=False):
"""Gets a specific surface.
:param anatomy: BrainStructure or string like 'CortexLeft' or 'CortexRight'
:param fill: which value to fill the array with if not all vertices are defined
:param partial: only return the part of the surface defined in the greyordinate file (ignores `fill` if set)
:return:
- if not partial: (..., n_vertices) array
- if partial: (N, ) int array with indices on the surface included in (..., N) array
"""
if isinstance(anatomy, str):
anatomy = BrainStructure.from_string(anatomy, issurface=True)
if anatomy.cifti not in self.brain_model_axis.name:
raise ValueError(f"No surface data for {anatomy.cifti} found")
slc, bm = None, None
arr = np.full(self.data.shape[:-1] + (self.brain_model_axis.nvertices[anatomy.cifti],), fill,
dtype=self.data.dtype)
for name, slc_try, bm_try in self.brain_model_axis.iter_structures():
if name == anatomy.cifti:
if partial:
if bm is not None:
raise ValueError(f"Surface {anatomy} does not form a contiguous block")
slc, bm = slc_try, bm_try
else:
arr[..., bm_try.vertex] = self.data[..., slc_try]
if not partial:
return arr
else:
return bm.vertex, self.data[..., slc]
[docs] def to_hdf5(self, group, compression='gzip'):
"""Stores the image in the HDF5 group."""
other_axes = (None, ) * (self.data.ndim - 1) if self.other_axes is None else self.other_axes
cifti.to_hdf5(group, self.data, other_axes + (self.brain_model_axis, ), compression=compression)
[docs] @classmethod
def from_hdf5(cls, group):
"""Retrieves data from HDF5 group."""
data, axes = cifti.from_hdf5(group)
return cls(data, axes[-1], axes[:-1])
[docs] @classmethod
@contextmanager
def empty_hdf5(cls, filename, axes, dtype=float):
"""Creates an empty greyordinate object based on the axes.
Data will be stored on disk in an HDF5 file
:param filename: filename to store the HDF5 file in or HDF5 group
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
import h5py
if isinstance(filename, str):
group = h5py.File(filename, 'w')
to_close = True
else:
group = filename
to_close = False
data = cifti.empty_hdf5(group, axes, dtype)
try:
yield cls(data, axes[-1], axes[:-1], parent_file=group)
except:
if to_close:
os.remove(filename)
raise
if to_close:
group.close()
[docs] @classmethod
@contextmanager
def empty_zarr(cls, filename, axes, dtype=float):
"""Creates an empty greyordinate object based on the axes.
Data will be stored on disk in an HDF5 file
:param filename: filename to store the HDF5 file in or HDF5 group
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
import zarr
if isinstance(filename, str):
group = zarr.group(filename, 'w')
else:
group = filename
data = cifti.empty_zarr(group, axes, dtype)
try:
yield cls(data, axes[-1], axes[:-1], parent_file=group)
except:
to_delete = filename if group is not filename else group.path
if os.path.isdir(to_delete):
shutil.rmtree(to_delete)
raise
[docs] def to_cifti(self, other_axes=None):
"""Create a CIFTI image from the data.
:param other_axes: defines the axes besides the greyordinate one
:return: nibabel CIFTI image
"""
if other_axes is None:
other_axes = self.other_axes
if other_axes is None:
if self.data.ndim != 1:
raise ValueError("Can not store to CIFTI without defining what is stored along the other dimensions")
other_axes = []
if other_axes is not None:
if len(other_axes) != self.data.ndim - 1:
raise ValueError("Number of axis does not match dimensionality of the data")
if tuple(len(ax) for ax in other_axes) != self.data.shape[:-1]:
raise ValueError("Size of other axes does not match data size")
data = self.data
if data.ndim == 1:
data = data[None, :]
other_axes = [ScalarAxis(['default'])]
return Cifti2Image(
data,
header=Cifti2Header.from_axes(list(other_axes) + [self.brain_model_axis])
)
[docs] @classmethod
def from_cifti(cls, filename, writable=False):
"""Creates new greyordinate object from dense CIFTI file.
:param filename: CIFTI filename
:param writable: if True, opens data array in writable mode
"""
if isinstance(filename, str):
img: Cifti2Image = nib.load(filename)
else:
img = filename
if writable:
data = np.memmap(filename, img.dataobj.dtype, mode='r+',
offset=img.dataobj.offset, shape=img.shape, order='F')
else:
data = np.asanyarray(img.dataobj)
axes = [img.header.get_axis(idx) for idx in range(data.ndim)]
if not isinstance(axes[-1], BrainModelAxis):
raise ValueError("Last axis of dense CIFTI file should be a BrainModelAxis")
return GreyOrdinates(data, axes[-1], axes[:-1])
[docs] @classmethod
@contextmanager
def empty_cifti(cls, filename, axes, dtype=float):
"""Creates an empty greyordinate object based on the axes.
Data will be stored on disk in CIFTI format
:param filename: filename to store the CIFTI file in
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where the data is stored in the new HDF5 file
"""
hdr = cifti.cifti2.Cifti2Header.from_axes(axes)
data = np.zeros(1, dtype=dtype)
shape = tuple(len(ax) for ax in axes)
data_shaped = as_strided(data, shape=shape, strides=tuple(0 for _ in axes),
writeable=False)
cifti.cifti2.Cifti2Image(data_shaped, header=hdr).to_filename(filename)
go = cls.from_cifti(filename, writable=True)
try:
yield go
except:
os.remove(filename)
raise
go.data.flush()
del go
[docs] @classmethod
def from_gifti(cls, filename, mask_values=(0, np.nan)):
"""Creates a new greyordinate object from a GIFTI file.
:param filename: GIFTI filename
:param mask_values: values to mask out
:return: greyordinate object representing the unmasked vertices
"""
if isinstance(filename, str):
img = nib.load(filename)
else:
img = filename
datasets = [darr.data for darr in img.darrays]
if len(datasets) == 1:
data = datasets[0]
else:
data = np.concatenate(
[np.atleast_2d(d) for d in datasets], axis=0
)
mask = np.ones(data.shape, dtype='bool')
for value in mask_values:
if value is np.nan:
mask &= ~np.isnan(data)
else:
mask &= ~(data == value)
while mask.ndim > 1:
mask = mask.any(0)
anatomy = get_brain_structure(img)
bm_axes = BrainModelAxis.from_mask(mask, name=anatomy.cifti)
return GreyOrdinates(data[..., mask], bm_axes)
[docs] @classmethod
def from_nifti(cls, filename, mask_values=(np.nan, 0)):
"""Creates a new greyordinate object from a NIFTI file.
:param filename: NIFTI filename
:param mask_values: which values to mask out
:return: greyordinate object representing the unmasked voxels
"""
if isinstance(filename, str):
img = nib.load(filename)
else:
img = filename
data = img.get_fdata()
mask = np.ones(data.shape, dtype='bool')
for value in mask_values:
if value is np.nan:
mask &= ~np.isnan(data)
else:
mask &= ~(data == value)
while mask.ndim > 3:
mask = mask.any(-1)
inverted_data = np.transpose(data[mask], tuple(range(1, data.ndim - 2)) + (0, ))
bm_axes = BrainModelAxis.from_mask(mask, affine=img.affine)
return GreyOrdinates(inverted_data, bm_axes)
def __add__(self, other):
"""Adds the overlapping part of the arrays.
Only voxels/vertices in common between the greyordinate spaces
are added
"""
if not isinstance(other, GreyOrdinates):
return NotImplemented
if self.other_axes is not None:
if other.other_axes is not None:
if self.other_axes != other.other_axes:
raise ValueError("can not concatenate greyordinates when non-brain-model axes do not match")
other_axes = self.other_axes
else:
other_axes = other.other_axes
new_bm, slices = cifti.combine([self.brain_model_axis, other.brain_model_axis])
return GreyOrdinates(
data=self.data[..., slices[0]] + other.data[..., slices[1]],
brain_model_axis=new_bm,
other_axes=other_axes,
)
[docs] @classmethod
@contextmanager
def empty(cls, filename, axes, dtype=float):
"""Creates an empty file to store the greyordinates with the type
determined by the extension:
- .nii: CIFTI file
- .h5/hdf5/he2/he5: HDF5 file representing CIFTI data
- .zarr: zarr file representing CIFTI data
:param filename: target filename
:param axes: cifti2 axes
:param dtype: data type of array
:return: Greyordinate object where CIFTI data can be stored
"""
formats = {
'zarr': ('.zarr',),
'hdf5': ('.hdf5', '.h5', '.he2', '.he5'),
'cifti': ('.nii',),
}
for format, exts in formats.items():
if hasExt(filename, exts):
break
else:
raise ValueError(f"Extension of {filename} not recognised as CIFTI, HDF5 or zarr file")
with getattr(cls, f'empty_{format}')(filename, axes, dtype) as go:
yield go
[docs] @classmethod
def from_filename(cls, filename, mask_values=(0, np.nan), writable=False):
"""Reads greyordinate data from the given file.
File can be:
- NIFTI mask
- GIFTI mask
- CIFTI file
- HDF5 file representing CIFTI data
- zarr file representing CIFTI data
:param filename: input filename
:param mask_values: which values are outside of the mask for NIFTI or GIFTI input
:param writable: allow to write to disk
:return: greyordinates object
"""
try:
import h5py
except ImportError:
h5py = False
try:
import zarr
except ImportError:
zarr = False
if isinstance(filename, str):
try:
if not h5py:
raise OSError()
img = h5py.File(filename, 'r+' if writable else 'r')
except OSError:
try:
if not zarr:
raise ValueError()
img = zarr.open(filename, mode='r+' if writable else 'r')
except ValueError:
img = nib.load(filename)
else:
img = filename
if h5py and isinstance(img, h5py.Group):
return cls.from_hdf5(img)
if zarr and isinstance(img, zarr.Group):
return cls.from_hdf5(img)
if isinstance(img, nib.Nifti1Image):
if writable:
raise ValueError("Can not open NIFTI file in writable mode")
return cls.from_nifti(filename, mask_values)
if isinstance(img, Cifti2Image):
return cls.from_cifti(filename, writable=writable)
if isinstance(img, GiftiImage):
if writable:
raise ValueError("Can not open GIFTI file in writable mode")
return cls.from_gifti(filename, mask_values)
raise ValueError(f"I do not know how to convert {type(img)} into greyordinates (from {filename})")
[docs] def to_filename(self, filename):
"""Stores the greyordinate data to the given filename.
Type of storage is determined by the extension of the filename:
- .dscalar/dconn/dlabel.nii: CIFTI file
- .h5/hdf5/he2/he5: HDF5 file representing CIFTI data
- .zarr: zarr file representing CIFTI data
- .gii: GIFTI file (only stores surface data;
raises error if more that one surface is represented in the greyordinates)
- .nii: NIFTI file (only stores the volumetric data)
:param filename: target filename
"""
if hasExt(filename, ('.dscalar.nii', '.dconn.nii', '.dlabel.nii')):
self.to_cifti().to_filename(filename)
elif hasExt(filename, ('.h5', '.hdf5', '.he2', 'he5')):
import h5py
with h5py.File(filename, 'w') as f:
self.to_hdf5(f)
elif hasExt(filename, ('.zarr', )):
import zarr
f = zarr.group(filename)
self.to_hdf5(f)
elif hasExt(filename, ('.gii', )):
surfaces = np.unique(self.brain_model_axis.name[self.brain_model_axis.surface_mask])
if len(surfaces) > 1:
raise ValueError(f"Can not write to GIFTI file as more than one surface has been defined: {surfaces}")
if len(surfaces) == 0:
raise ValueError("Can not write to GIFTI file as no surface has been provided")
write_gifti(filename, [self.surface(surfaces[0])], surfaces[0])
elif hasExt(filename, ('.nii.gz', '.nii')):
self.volume().to_filename(filename)
else:
raise IOError(f"Extension of {filename} not recognized for NIFTI, GIFTI, or CIFTI file")
[docs] def as_dask(self, chunks='auto', name='greyordinates'):
"""Returns the greyordinates as a dask array.
:param chunks: shape of the chunks (defaults to chunks of the dataset)
:param name: name of the dask array
:return: dask array
"""
if chunks == 'auto':
chunks = getattr(self.data, 'chunks', 'auto')
if chunks != 'auto':
size_chunks = np.prod(chunks)
if size_chunks < 2e7:
mult_factor = int((2e7 / size_chunks) ** (1. / self.data.ndim))
chunks = tuple(c * mult_factor for c in chunks)
return dask.array.from_array(self.data, chunks, name)
[docs] def transpose(self, ):
"""Transposes a dense connectome."""
if self.data.ndim != 2:
raise ValueError("Can only transpose 2D datasets")
return GreyOrdinates(np.transpose(self.data), brain_model_axis=self.other_axes[0],
other_axes=(self.brain_model_axis, ), parent_file=self.parent_file)
[docs]def stack(greyordinates, axis=0):
"""Stacks a sequene of greyordinates along the given axis.
Resulting GreyOrdinate will only contain voxels/vertices in all GreyOrdinate arrays
:param greyordinates: individual greyordinates to be merged
:return: merged greyordinate object
"""
new_bm, slices = cifti.combine([go.brain_model_axis for go in greyordinates])
new_arr = np.stack([go.data[..., slc] for go, slc in zip(greyordinates, slices)], axis=axis)
ref_axes = set([go.other_axes for go in greyordinates if go.other_axes is not None])
if len(ref_axes) == 0:
other_axes = None
elif len(ref_axes) == 1:
other_axes = list(ref_axes[0])
other_axes.insert(axis, ScalarAxis([f'stacked_{idx + 1}' for idx in range(len(greyordinates))]))
else:
raise ValueError("Failed to merge greyordinates as their other axes did not match")
return GreyOrdinates(new_arr, new_bm, other_axes)
[docs]def concatenate(greyordinates, axis=0):
"""Stacks a sequene of greyordinates along the given axis.
Resulting GreyOrdinate will only contain voxels/vertices in all GreyOrdinate arrays
:param greyordinates: individual greyordinates to be merged
:return: merged greyordinate object
"""
if len(greyordinates) == 1:
return greyordinates[0]
ref_axes = [go.other_axes for go in greyordinates if go.other_axes is not None]
if len(ref_axes) == 0:
other_axes = None
elif any(ra != ref_axes[0] for ra in ref_axes[1:]):
raise ValueError("Failed to merge greyordinates as their other axes did not match")
else:
other_axes = ref_axes[0]
if axis == -1 or axis == greyordinates[0].ndim - 1:
new_bm = greyordinates[0].brain_model_axis
for go in greyordinates[1:]:
new_bm = new_bm + go.brain_model_axis
new_arr = np.concatenate([go.data for go in greyordinates], axis=axis)
return GreyOrdinates(new_arr, new_bm, other_axes)
else:
new_bm, slices = cifti.combine([go.brain_model_axis for go in greyordinates])
new_arr = np.concatenate([go.data[..., slc] for go, slc in zip(greyordinates, slices)], axis=axis)
return GreyOrdinates(new_arr, new_bm, other_axes)
[docs]def parse_greyordinate(filename):
"""Parses a set of filenames as a single greyordinate object.
:param filename: '@'-symbol separated files (NIFTI, GIFTI, and/or CIFTI)
:return: single Greyordinate object representing the full dataset
"""
try:
parts = [GreyOrdinates.from_filename(fn) for fn in filename.split('@')]
except (ValueError, IOError) as e:
raise argparse.ArgumentTypeError(*e.args)
if len(parts) == 0:
raise argparse.ArgumentParser("Can not parse empty string")
return concatenate(parts, axis=-1)