Source code for funpack.datatable

#!/usr/bin/env python
#
# datatable.py - The DataTable class.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides the :class:`DataTable` class, a container
class which holds a reference to the loaded data.
"""


import itertools             as it
import multiprocessing       as mp
import multiprocessing.dummy as mpd
import                          random
import                          string
import                          logging
import                          contextlib
import                          collections
import collections.abc       as abc

import pandas                as pd

import funpack.util as util


log = logging.getLogger(__name__)


AUTO_VARIABLE_ID = 5000000
"""Starting variable ID to use for unknown data. Automatically generated
variable IDs really shoulld not conflict with actual UKB variable IDs.
"""


MODIFIED_COLUMN =  ''.join(random.choices(string.ascii_letters, k=20))
"""Flag used internally by the :class:`DataTable` when merging subtables.
"""


[docs] class Column: """The ``Column`` is a simple container class containing metadata about a single column in a data file. See the :func:`.parseColumnName` function for important information about column naming conventions in the UK BioBank. A fundamental assumption made throughout much of the ``funpack`` code-base is that columns with a variable ID (``vid``) equal to 0 are index columns (e.g. subject ID). A ``Column`` object has the following attributes: - ``datafile``: Input file that the column originates from (``None`` for generated columns). - ``name``: Column name. - ``origname``: Column name as it appears in the input/output file(s) - ``index``: Location of the column in the input/output file(s) - ``vid``: Variable ID - ``visit``: Visit number - ``instance``: Instance number/code - ``metadata``: Metadata which may be used to generate descriptive information about the column (see :func:`funpack.main.generateDescription`). - ``basevid``: ID of the variable that the data in this column was derived/generated from (and has the same data type as), if not the original ``vid``. For example, see the ``take`` argument to :func:`.processing_functions.binariseCategorical`. - ``fillval``: Fill value to use for missing values, instead of the global default (e.g. as specified by ``--tsv_missing_values``). Any other keyword arguments given when a ``Column`` is created will be added as attributes - this feature may be used by cleaning/processing steps to pass information onto subsequent steps. """ def __init__(self, datafile, name, index, vid=None, visit=0, instance=0, metadata=None, basevid=None, fillval=None, origname=None, **kwargs): if basevid is None: basevid = vid if origname is None: origname = name self.datafile = datafile self.name = name self.origname = origname self.index = index self.vid = vid self.visit = visit self.instance = instance self.metadata = metadata self.basevid = basevid self.fillval = fillval for k, v in kwargs.items(): setattr(self, k, v) def __str__(self): return 'Column({}, {} -> {}, {}, {}, {}, {}, {}, {})'.format( self.datafile, self.name, self.origname, self.index, self.vid, self.visit, self.instance, self.basevid, self.fillval) def __repr__(self): return str(self) def __hash__(self): return hash(str(self)) def __eq__(self, other): return (isinstance(other, Column) and self.datafile == other.datafile and self.name == other.name and self.origname == other.origname and self.index == other.index and self.vid == other.vid and self.visit == other.visit and self.instance == other.instance and self.basevid == other.basevid and self.fillval == other.fillval)
[docs] class DataTable(util.Singleton): """The ``DataTable`` is a simple container class. It keeps references to the variable and processing tables, and the data itself. The :func:`importData` function creates and returns a ``DataTable``. A ``DataTable`` has the following attributes and helper methods: .. autosummary:: :nosignatures: pool manager parallel vartable proctable cattable columns allColumns present visits instances variables Data should be accessed/modified via these methods: .. autosummary:: :nosignatures: index __getitem__ __setitem__ Columns can be added/removed, and rows removed, via these methods: .. autosummary:: :nosignatures: maskSubjects addColumns removeColumns Columns can be "flagged" with metadata labels via the :meth:`addFlag` method. All of the flags on a column can be retrieved via the :meth:`getFlags` method. The :meth:`subtable` method can be used to generate a replica ``DataTable`` with a specific subset of columns. It is intended for parallelisation, so that child processes are given a view of only the columns that are relevant to them, and as little copying between processes as possible takes place. The :meth:`subtable` and :meth:`merge` methods are intended to be used like so: 1. Create subtables which only contain data for specific columns:: cols = ['1-0.0', '2-0.0', '3-0.0'] subtables = [dtable.subtable([c]) for c in cols] 2. Use multiprocessing to perform parallel processing on each column:: def mytask(dtable, col): dtable[:, col] += 5 return dtable with dtable.pool() as pool: subtables = pool.starmap(mytask, zip(subtables, cols)) 3. Merge the results back into the main table:: dtable.merge(subtables) Modifications must occur through the :meth:`DataTable.__setitem__` interface, so it can keep track of which columns have been modified. Addition or removal of columns or rows on subtables is not supported. """ def __init__(self, data, columns, vartable, proctable, cattable, njobs=1, mgr=None, subtable=False): """Create a ``DataTable``. :arg data: ``pandas.DataFrame`` containing the data. :arg columns: List of :class:`.Column` objects, representing the columns that are in the data. :arg vartable: ``pandas.DataFrame`` containing the variable table. :arg proctable: ``pandas.DataFrame`` containing the processing table. :arg cattable ``pandas.DataFrame`` containing the category table. :arg njobs: Number of jobs to use for parallelising tasks. :arg mgr: :class:`multiprocessing.Manager` object for parallelisation :arg subtable: For internal use. Used to differentiate between the main ``DataTable``, and child ``DataTable`` objects created via :meth:`subtable`. """ self.__data = data self.__vartable = vartable self.__proctable = proctable self.__cattable = cattable self.__njobs = njobs self.__mgr = mgr self.__subtable = subtable self.__flags = collections.defaultdict(set) # The varmap is a dictionary of # { vid : [Column] } mappings, # and the colmap is a dictionary # of { name : Column } mappings self.__varmap = collections.OrderedDict() self.__colmap = collections.OrderedDict() for col in columns: self.__colmap[col.name] = col if col.vid in self.__varmap: self.__varmap[col.vid].append(col) else: self.__varmap[col.vid] = [col] def __getstate__(self): """Returns the state of this :class:`.DataTable` for pickling. """ return (self.__data, self.__vartable, self.__proctable, self.__cattable, self.__varmap, self.__colmap, self.__subtable, self.__flags) def __setstate__(self, state): """Set the state of this :class:`.DataTable` for unpickling. """ self.__data = state[0] self.__vartable = state[1] self.__proctable = state[2] self.__cattable = state[3] self.__varmap = state[4] self.__colmap = state[5] self.__subtable = state[6] self.__flags = state[7] self.__njobs = 1 self.__mgr = None
[docs] @contextlib.contextmanager def pool(self): """Return a ``multiprocessing.Pool`` object for performing tasks in parallel on the data. If the ``njobs`` argument passed to :meth:`__init__` was ``1``, or if this is a ``DataTable`` instance that has been unpickled (i.e. instances which are running in a sub-process), a ``multiprocessing.dummy.Pool`` instance is created and returned. """ if self.__njobs == 1: Pool = mpd.Pool else: Pool = mp.Pool with Pool(self.__njobs) as pool: yield pool pool.close() pool.join()
@property def isSubtable(self): """Returns ``True`` if this ``DataTable`` was created by a :meth:`subtable` call. """ return self.__subtable @property def manager(self): """Returns a ``multiprocessing.Manager`` for sharing data between processes. """ if self.__mgr is None: return mpd.Manager() else: return self.__mgr @property def njobs(self): """Returns the number of jobs that will be used by the :attr:`pool` for parallelising tasks. """ return self.__njobs @property def parallel(self): """Returns ``True`` if this invocation of ``funpack`` has the ability to run tasks in parallel, ``False`` otherwise. """ return self.__njobs > 1 @property def vartable(self): """Returns the ``pandas.DataFrame`` containing the variable table. """ return self.__vartable @property def proctable(self): """Returns the ``pandas.DataFrame`` containing the processing table. """ return self.__proctable @property def cattable(self): """Returns the ``pandas.DataFrame`` containing the category table. """ return self.__cattable @property def index(self): """Returns the subject indices.""" return self.__data.index @property def shape(self): """Returns the (nrows, ncolumns) shape of the data.""" return self.__data.shape @property def variables(self): """Returns a list of all integer variable IDs present in the data. The list includes the index variable (which has an id of ``0``). """ return list(self.__varmap.keys()) @property def allColumns(self): """Returns a list of all columns present in the data, including index columns. """ return list(it.chain(*[self.columns(v) for v in self.variables])) @property def indexColumns(self): """Returns a list of all index columns present in the data. """ return list(self.columns(0)) @property def dataColumns(self): """Returns a list of all non-index columns present in the data. """ return [c for c in self.allColumns if c.vid != 0]
[docs] def present(self, variable, visit=None, instance=None): """Returns ``True`` if the specified variable (and optionally visit/ instance) is present in the data, ``False`` otherwise. """ try: self.columns(variable, visit, instance) return True except KeyError: return False
[docs] def columns(self, variable, visit=None, instance=None): """Return the data columns corresponding to the specified ``variable``, ``visit`` and ``instance``. :arg variable: Integer variable ID :arg visit: Visit number. If ``None``, column names for all visits are returned. :arg instance: Instance number. If ``None``, column names for all instances are returned. :returns: A list of :class:`.Column` objects. """ cols = list(self.__varmap[variable]) if visit is not None: cols = [c for c in cols if c.visit == visit] if instance is not None: cols = [c for c in cols if c.instance == instance] return cols
[docs] def visits(self, variable): """Returns the visit IDs for the given ``variable``. """ cols = self.columns(variable) return list({c.visit for c in cols})
[docs] def instances(self, variable): """Returns the instance IDs for the given ``variable``. """ cols = self.columns(variable) return list({c.instance for c in cols})
[docs] def maskSubjects(self, mask): """Remove subjects where ``mask is False``. """ self.__data = self.__data[mask]
[docs] def removeColumns(self, cols): """Remove the columns described by ``cols``. :arg cols: Sequence of :class:`Column` objects to remove. """ names = [c.name for c in cols] self.__data.drop(names, axis=1, inplace=True) for col in cols: vcols = self.__varmap[col.vid] vcols.remove(col) self.__colmap.pop(col.name) if len(vcols) > 0: self.__varmap[col.vid] = vcols else: self.__varmap.pop(col.vid)
[docs] def addColumns(self, series, vids=None, kwargs=None): """Adds one or more new columns to the data set. .. note:: It is assumed that each ``pandas.Series`` object shares the same row indices as this ``DataTable``. :arg series: Sequence of ``pandas.Series`` objects containing the new column data to add. :arg vids: Sequence of variables each new column is associated with. If ``None`` (the default), variable IDs are automatically assigned. :arg kwargs: Sequence of dicts, one for each series, containing arguments to pass to the :class:`Column` constructor (e.g. ``visit``, ``metadata``, etc). """ import funpack.loadtables as loadtables # noqa: E501 pylint: disable=import-outside-toplevel if vids is None: vids = [None] * len(series) if kwargs is None: kwargs = [None] * len(series) for s in series: if s.name in self.__data.columns: raise ValueError( 'A column with name {} already exists - remove ' 'it, or assign to it directly'.format(s.name)) if len(vids) != len(series): raise ValueError('length of vids does not match series') if len(kwargs) != len(series): raise ValueError('length of kwargs does not match series') startidx = len(self.__data.columns) idxs = range(startidx, startidx + len(series)) # if vids are not provided, auto-generate # a vid for each column starting from here. startvid = max(max(self.variables) + 1, AUTO_VARIABLE_ID) # It's much faster to concatenate two # DataFrames together than to seuentially # concatenate each series to our existing # dataframe. We've already checked for # column conflicts, and we assume that # row indices are aligned. log.debug('Merging %u new columns into main data table.', len(series)) series = pd.concat(series, axis=1, verify_integrity=False) self.__data = pd.concat((self.__data, series), axis=1, copy=False, verify_integrity=False) # create a new Column object # describing each new column for name, idx, vid, kw in zip(series, idxs, vids, kwargs): if vid is None: vid = startvid startvid = startvid + 1 if kw is None: kw = {} col = Column(None, name, idx, vid, **kw) self.__colmap[name] = col # new column on existing variable. # We assume the data type is the # same as the existing columns for # this variable if vid in self.__varmap: self.__varmap[vid].append(col) # new variable - the addNewVariable # function will sort things out else: dtype = self.__data[name].dtype self.__varmap[vid] = [col] loadtables.addNewVariable( self.__vartable, vid, name, dtype)
[docs] def getFlags(self, col): """Return any flags associated with the specified column. :arg col: :class:`Column` object :returns: A ``set`` containing any flags associated with ``col`` """ return set(self.__flags[col])
[docs] def addFlag(self, col, flag): """Adds a flag for the specified column. :arg col: :class:`Column` object :arg flag: Flag to add """ self.__flags[col].add(flag)
def __getitem__(self, slc): """Get the specified slice from the data. This method has the same interface as the ``pandas.DataFrame.loc`` accessor. """ return self.__data.loc[slc] def __setitem__(self, slc, value): """Set the specified slice in the data. This method suports a limited form of the ``pandas.DataFrame.loc`` interface. Slices/labels for both rows and columns must be specified, and columns may only be specified via a slice, list, tuple, or individual label. """ if not isinstance(slc, tuple) or len(slc) != 2: raise RuntimeError('DataTable.__setitem__ requires both ' 'rows and columns to be indexed.') rows, cols = slc # In pandas >= 2.0.0, "data.loc[:, slc] = value" will # attempt to update values in-place, which can cause # problems if we want to change the type of a column. if isinstance(rows, slice) and rows == slice(None): self.__data[cols] = value else: self.__data.loc[slc] = value # Flag the column as modified. This is # detected by the merge method when # it merges subtables back in. if self.isSubtable: cols = slc[1] if isinstance(cols, slice): cols = self.__data.columns[cols] elif not isinstance(cols, (list, tuple)): cols = [cols] for col in cols: self.addFlag(self.__colmap[col], MODIFIED_COLUMN) def __len__(self): """Returns the number of rows in the data set. """ return len(self.__data)
[docs] def subtable(self, columns=None, rows=None): """Return a new ``DataTable`` which only contains the data for the specified ``columns`` and/or ``rows``. This method can be used to create a replica ``DataTable`` where the underlying ``pandas.DataFrame`` only contains the specified columns. It is intended to be used when parallelising tasks, so that child processes are given a view of only the relevant columns. .. note:: The :meth:`merge` method cannot be used to merge in a subtable that contains a subset of rows. :arg columns: List of :class:`Column` objects. :arg rows: Sequence of row indices. :returns: A new :class:`DataTable`. """ if columns is None: columns = self.dataColumns colnames = slice(None) else: colnames = [c.name for c in columns] columns = [self.allColumns[0]] + columns if rows is None: rows = slice(None) return DataTable(self.__data.loc[rows, colnames], columns, self.__vartable, self.__proctable, self.__cattable, self.__njobs, subtable=True)
[docs] def merge(self, subtables): """Merge the data from the given ``subtables`` into this ``DataTable``. It is assumed that the ``subtables`` each contain a sub-set of the columns in this ``DataTable``. .. note:: The :meth:`merge` method cannot be used to merge in a subtable that contains a subset of rows. :arg subtables: A single ``DataTable``, or a sequence of ``DataTable`` instances, returned by :meth:`subtable`. """ if not isinstance(subtables, abc.Sequence): subtables = [subtables] # Gather a list of all subtable dataframes, # and a list of all columns to be copied. # We only copy modified columns - we assume # that all changes to the subtable occurred # via DataTable.__setitem__ subdfs = [] subcols = [] for subtable in subtables: subtcols = [c.name for c in subtable.dataColumns if MODIFIED_COLUMN in subtable.getFlags(c)] subdfs.append(subtable[:, subtcols]) subcols.extend(subtcols) # if there are columns to merge from # any subtable, create a single dataframe # containing all of them - quicker than # merging them separately if len(subcols) > 0: log.debug('merging %u subtable dataframes (%u columns)', len(subdfs), len(subcols)) if len(subdfs) > 1: subdf = pd.concat(subdfs, axis='columns', verify_integrity=False, copy=False) else: subdf = subdfs[0] # merge subtable data into the # main dataframe, preserving # column ordering colorder = self.__data.columns self.__data = pd.concat((self.__data.drop(columns=subcols), subdf.loc[:, subcols]), axis='columns', verify_integrity=False, copy=False) self.__data = self.__data[colorder] # copy column metadata # over from subtables for subtable in subtables: for subcol in subtable.dataColumns: mycol = self.__colmap[subcol.name] myflags = self.__flags[mycol] subflags = subtable.getFlags(subcol) subflags = subflags.difference((MODIFIED_COLUMN,)) self.__flags[mycol] = myflags.union(subflags) if subcol.metadata is not None: mycol.metadata = subcol.metadata