Source code for funpack.processing_functions

#!/usr/bin/env python
#
# processing_functions.py - Processing functions
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains definitions of processing functions - functions which
may be specifeid in the processing table.


A processing function may perform any sort of processing on one or more
variables. A processing function may add, remove, or manipulate the columns of
the :class:`DataTable`.


All processing functions must accept the following as their first two
positional arguments:


 - The :class:`.DataTable` object, containing references to the data, variable,
   and processing table.
 - A list of integer ID of the variables to process.


Furthermore, all processing functions must return one of the following:

 - ``None``, indicating that no columns are to be added or removed.

 - A ``list`` (must be a ``list``) of :class:`.Column` objects describing the
   columns that should be removed from the data.

 - A ``tuple`` (must be a ``tuple``) of length 2, containing:

    - A list of ``pandas.Series`` that should be added to the data.

    - A list of variable IDs to use for each new ``Series``. This list must
      have the same length as the list of new ``Series``, but if they are not
      associated with any specific variable, ``None`` may be used.

 - A ``tuple`` of length 3, containing:

    - List of columns to be removed
    - List of ``Series`` to be added
    - List of variable IDs for each new ``Series``.

 - A ``tuple`` of length 4, containing the above, and:

    - List of dicts associated with each of the new ``Series``. These will be
      passed as keyword arguments to the :class:`.Column` objects that
      represent each of the new ``Series``.

The following processing functions are defined:

 .. autosummary::
   :nosignatures:

   removeIfSparse
   removeIfRedundant
   binariseCategorical
   expandCompound
   createDiagnosisColumns
   removeField
"""


import functools       as ft
import itertools       as it
import                    logging
import                    collections

import numpy            as np
import pandas           as pd
import pandas.api.types as pdtypes

from typing import List, Optional, Any

from . import processing_functions_core as core
from . import                              util
from . import                              custom
from . import                              datatable
from . import                              loadtables


log = logging.getLogger(__name__)


[docs] @custom.processor() def removeIfSparse( dtable : datatable.DataTable, vids : List[int], minpres : Optional[float] = None, minstd : Optional[float] = None, mincat : Optional[float] = None, maxcat : Optional[float] = None, abspres : bool = True, abscat : bool = True, naval : Optional[Any] = None, ignoreType : bool = False ) -> List[datatable.Column]: """removeIfSparse([minpres][, minstd][, mincat][, maxcat][, abspres][, abscat][, naval]) Removes columns deemed to be sparse. Removes columns for all specified variables if they fail a sparsity test. The test is based on the following criteria: - The number/proportion of non-NA values must be greater than or equal to ``minpres``. - The standard deviation of the data must be greater than ``minstd``. - For integer and categorical types, the number/proportion of the largest category must be greater than ``mincat``. - For integer and categorical types, the number/proportion of the largest category must be less than ``maxcat``. If **any** of these criteria are **not met**, the data is considered to be sparse. Each criteria can be disabled by passing in ``None`` for the relevant parameter. The ``minstd`` test is only performed on numeric columns, and the ``mincat``/``maxcat`` tests are only performed on integer/categorical columns. If ``abspres=True`` (the default), ``minpres`` is interpreted as an absolute count. If ``abspress=False``, ``minpres`` is interpreted as a proportion. Similarly, If ``abscat=True`` (the default), ``mincat`` and ``maxcat`` are interpreted as absolute counts. Otherwise ``mincat`` and ``maxcat`` are interpreted as proportions The ``naval`` argument can be used to customise the value to consider as "missing" - it defaults to ``np.nan``. """ # noqa # :arg ignoreType: Defaults to ``False``. If ``True``, all specified tests # are run regardless of the types of the ``vids``. Only # used for testing. cols = [] series = [] vtypes = [] for vid in vids: if ignoreType: vtype = None else: vtype = dtable.vartable.loc[vid, 'Type'] vcols = dtable.columns(vid) cols .extend(vcols) series.extend([dtable[:, c.name] for c in vcols]) vtypes.extend([vtype] * len(vcols)) log.debug('Checking %u columns for sparsity %s ...', len(series), vids[:5]) func = ft.partial(core.isSparse, minpres=minpres, minstd=minstd, mincat=mincat, maxcat=maxcat, abspres=abspres, abscat=abscat, naval=naval) with dtable.pool() as pool: results = pool.starmap(func, zip(series, vtypes)) remove = [] for col, (isSparse, reason, value) in zip(cols, results): if isSparse: log.debug('Dropping sparse column %s (%s: %f)', col.name, reason, value) remove.append(col) if len(remove) > 0: log.debug('Dropping %u sparse columns: %s ...', len(remove), [r.name for r in remove[:5]]) return remove
[docs] @custom.processor() def removeIfRedundant(dtable : datatable.DataTable, vids : List[int], corrthres : float, nathres : float = None, skipUnknowns : bool = False, precision : str = None, pairwise : bool = False): """removeIfRedundant(corrthres, [nathres]) Removes columns deemed to be redundant. Removes columns from the specified group of variables if they are found to be redundant with any other columns in the group. Redundancy is determined by calculating the correlation between all pairs of columns - columns with an absolute correlation greater than ``corrthres`` are identified as redundant. The test can optionally take the patterns oof missing values into account - if ``nathres`` is provided, the missingness correlation is also calculated between all column pairs. Columns must have absolute correlation greater than ``corrthres`` **and** absolute missingness correlation greater than ``nathres`` to be identified as redundant. The ``skipUnknowns`` option defaults to ``False``. If it is set to ``True``, columns which are deemed to be redundant with respect to an unknown or uncategorised column are **not** dropped. The ``precision`` option can be set to either ``'double'`` (the default) or ``'single'`` - this controls whether 32 bit (single) or 64 bit (double) precision floating point is used for the correlation calculation. Double precision is recommended, as the correlation calculation algorithm can be unstable for data with large values (>10e5). """ # :arg pairwise: Use alternative pairwise implementation. If ``pairwise`` # is ``True``, an alternative implementation is used which # may be faster on data sets with high missingness # correlation. # Ignore non-numeric columns cols = list(it.chain(*[dtable.columns(v) for v in vids])) cols = [c for c in cols if pdtypes.is_numeric_dtype(dtable[:, c.name])] colnames = [c.name for c in cols] data = dtable[:, colnames] with np.errstate(divide='ignore'): if pairwise: redundant = _pairwiseRemoveIfRedundant( dtable, data, corrthres, nathres) else: redundant = _removeIfRedundant( dtable, data, corrthres, nathres, precision) redundant = util.dedup(sorted(redundant)) if skipUnknowns: copy = [] for idxa, idxb in redundant: colb = cols[idxa] bvid = colb.vid cats = loadtables.variableCategories(dtable.cattable, [bvid])[bvid] if 'unknown' in cats or 'uncategorised' in cats: namea = colnames[idxa] nameb = colnames[idxb] log.debug('Column %s is redundant with %s, but %s is ' 'unknown / uncategorised, so %s will not be ' 'dropped', namea, nameb, nameb, namea) else: copy.append((idxa, idxb)) redundant = copy if len(redundant) > 0: log.debug('Dropping %u redundant columns', len(redundant)) return [cols[r[0]] for r in redundant]
def _removeIfRedundant(dtable, data, corrthres, nathres=None, precision=None): """Default fast implementation of redundancy check. Used when the ``pairwise`` option to :func:`removeIfRedundant` is ``False``. :arg dtable: The :class:`.DataTable` containing all data :arg data: ``pandas.DataFrame`` containing the data to check :arg corrthres: Correlation threshold - see :func:`.redundantColumns`. :arg nathres: Missingness correlation threshold - see :func:`.redundantColumns`. :arg precision: Floating point precision -``'single'`` or ``'double'``. :returns: Sequence of tuples of column indices, where each tuple ``(a, b)`` indicates that column ``a`` is redundant with respect to column ``b``. """ return core.matrixRedundantColumns(data, corrthres, nathres, precision) def _pairwiseRemoveIfRedundant(dtable, data, corrthres, nathres=None): """Alternative implementation of redundancy check. Used when the ``pairwise`` option to :func:`removeIfRedundant` is ``True``. :arg dtable: The :class:`.DataTable` containing all data :arg data: ``pandas.DataFrame`` containing the data to check :arg corrthres: Correlation threshold - see :func:`.redundantColumns`. :arg nathres: Missingness correlation threshold - see :func:`.redundantColumns`. :returns: Sequence of tuples of column indices, where each tuple ``(a, b)`` indicates that column ``a`` is redundant with respect to column ``b``. """ ncols = len(data.columns) # If we are correlating missingness, # we use the naCorrelation function # to identify all of the column pairs # which are na-correlated - the pairs # which fail this test do not need to # be subjected to the correlation test # (and therefore pass the redundancy # check) if nathres is not None: nacorr = core.naCorrelation(pd.isna(data), nathres) # As the matrix is symmetric, we can # drop column pairs where x >= y. nacorr = np.triu(nacorr, k=1) colpairs = np.where(nacorr) colpairs = np.vstack(colpairs).T # Otherwise we generate an array # containing indices of all column # pairs. else: xs, ys = np.triu_indices(ncols, k=1) colpairs = np.vstack((xs, ys)).T # we need at least # one pair of columns if len(colpairs) == 0: return [] # evaluate all pairs at once if not dtable.parallel: log.debug('Checking %u columns for redundancy', ncols) redundant = core.pairwiseRedundantColumns( data, colpairs, corrthres=corrthres) # evaluate in parallel else: # Split the column pairs # into njobs chunks, and # run in parallel chunksize = int(np.ceil(len(colpairs) / dtable.njobs)) pairchunks = [colpairs[i:i + chunksize] for i in range(0, len(colpairs), chunksize)] log.debug('Checking %u columns for redundancy (%u tasks)', ncols, len(pairchunks)) with dtable.pool() as pool: results = [] for i, chunk in enumerate(pairchunks): # We can pass the full dataframe # to each task, as it should be # read-accessible via shared memory. token = 'task {} / {}'.format(i + 1, len(pairchunks)) result = pool.apply_async( core.pairwiseRedundantColumns, kwds=dict(data=data, colpairs=chunk, corrthres=corrthres, token=token)) results.append(result) # wait for the tasks to complete, # and gather the results (indices # of redundant columns) into a list redundant = [] for result in results: redundant.extend(result.get()) return redundant # auxvids tells the processing runner the # "take" argument refers to other variables # which are not processed, but are needed # to perform the processing. # # "filterMissing" tells the processing # runner *not* to remove variables which # are not present in the data from the list # of vids that are passed in - we do our # own check here.
[docs] @custom.processor(auxvids=['take'], filterMissing=False) def binariseCategorical(dtable, vids, acrossVisits=False, acrossInstances=True, minpres=None, nameFormat=None, replace=True, take=None, fillval=None, replaceTake=True): """binariseCategorical([acrossVisits][, acrossInstances][, minpres][, nameFormat][, replace][, take][, fillval][, replaceTake]) Replace a categorical column with one binary column per category. Binarises categorical variables - replaces their columns with one new column for each unique value, containing ``1`` for subjects with that value, and ``0`` otherwise. Thos procedure is applied independently to all variables that are specified. The ``acrossVisits`` option controls whether the binarisation is applied across visits for each variable. It defaults to ``False``, meaning that the binarisation is applied separately to the columns within each visit. If set to ``True``, the binarisation will be applied to the columns for all visits. Similarly, the ``acrossInstances`` option controls whether the binarisation is applied across instances. This defaults to ``True``, which is usually desirable - for example, data field `41202 <https://biobank.ctsu.ox.ac.uk/crystal/field.cgi?id=41202>`_ contains multiple ICD10 diagnoses, separated across different instances. If the ``minpres`` option is specified, it is used as a threshold - categorical values with less than this many occurrences will not be added as columns. The ``nameFormat`` argument controls how the new data columns should be named - it must be a format string using named replacement fields ``'vid'``, ``'visit'``, ``'instance'``, and ``'value'``. The ``'visit'`` and ``'instance'`` fields may or may not be necessary, depending on the value of the ``acrossVisits`` and ``acrossInstances`` arguments. The default value for the ``nameFormat`` string is as follows: ================ =================== ====================================== ``acrossVisits`` ``acrossInstances`` ``nameFormat`` ================ =================== ====================================== ``False`` ``False`` ``'{vid}-{visit}.{instance}_{value}'`` ``False`` ``True`` ``'{vid}-{visit}.{value}'`` ``True`` ``False`` ``'{vid}-{value}.{instance}'`` ``True`` ``True`` ``'{vid}-{value}'`` ================ =================== ====================================== The ``replace`` option controls whether the original un-binarised columns should be removed - it defaults to ``True``, which will cause them to be removed. By default, the new binary columns (one for each unique value in the input columns) will contain a ``1`` indicating that the value is present, or a ``0`` indicating its absence. As an alternative to this, the ``take`` option can be used to specify *another* variable from which to take values when populating the output columns. ``take`` may be set to a variable ID, or sequence of variable IDs (one for each of the input variables) to take values from. If provided, the generated columns will have values from the column(s) of this variable, instead of containinng binary 0/1 values. A ``take`` variable must have columns that match the columns of the corresponding variable (by both visits and instances). If ``take`` is being used, the ``fillval`` option can be used to specify the the value to use for ``False`` / ``0`` rows. It defaults to ``np.nan``. The ``replaceTake`` option is similar to ``replace`` - it controls whether the columns associated with the ``take`` variables are removed (``True`` - the defailt), or retained. """ # noqa # get groups of columns for vid, grouped # according to acrossVisits/acrossInstances def gatherColumnGroups(vid): colgroups = [] visits = dtable.visits( vid) instances = dtable.instances(vid) if not (acrossVisits or acrossInstances): for visit, instance in it.product(visits, instances): colgroups.append(dtable.columns(vid, visit, instance)) elif acrossInstances and (not acrossVisits): for visit in visits: colgroups.append(dtable.columns(vid, visit)) elif (not acrossInstances) and acrossVisits: for instance in instances: colgroups.append(dtable.columns(vid, instance=instance)) else: colgroups = [dtable.columns(vid)] return colgroups defaultNameFormat = { (False, False) : '{vid}-{visit}.{instance}_{value}', (False, True) : '{vid}-{visit}.{value}', (True, False) : '{vid}-{value}.{instance}', (True, True) : '{vid}-{value}', } if nameFormat is None: nameFormat = defaultNameFormat[acrossVisits, acrossInstances] # if take is a single vid or None, # we turn it into [take] * len(vids) if not isinstance(take, collections.abc.Sequence): take = [take] * len(vids) if len(take) != len(vids): raise ValueError('take must be either None, a single variable ID, ' 'or a list of variable IDs, one for each of the ' 'main vids.') remove = [] newseries = [] newvids = [] newcolargs = [] for vid, takevid in zip(vids, take): if (not dtable.present(vid)) or \ (takevid is not None and not dtable.present(takevid)): log.warning('Variable %u (or take: %s) is not present in the ' 'data set - skipping the binariseCategorical step', vid, takevid) continue colgrps = gatherColumnGroups(vid) if takevid is None: takegrps = [None] * len(colgrps) else: takegrps = gatherColumnGroups(takevid) for cols, takecols in zip(colgrps, takegrps): log.debug('Calling binariseCategorical (vid: %i, ' '%u columns)', vid, len(cols)) if takecols is None: tkdata = None else: tkdata = dtable[:, [c.name for c in takecols]] data = dtable[:, [c.name for c in cols]] binarised, values = core.binariseCategorical(data, minpres=minpres, take=tkdata, token=vid, njobs=dtable.njobs) if replace: remove.extend(cols) if replaceTake and (takecols is not None): remove.extend(takecols) for col, val in zip(binarised.T, values): fmtargs = { 'vid' : str(int(cols[0].vid)), 'visit' : str(int(cols[0].visit)), 'instance' : str(int(cols[0].instance)), 'value' : str(val) } series = pd.Series( col, index=dtable.index, name=nameFormat.format(**fmtargs)) # The value is stored on each Column object as # an attribute "binariseCategorical_value". # This may be used by other processing # functions (see e.g. createDiagnosisColumns # with the binarised=True option). The value # is also stored as "metadata", which is used # by the --description_file cli option - see # funpack.main.generateDescription). colargs = { 'metadata' : val, 'binariseCategorical_value' : val, 'basevid' : takevid, 'fillval' : fillval } newvids .append(vid) newcolargs.append(colargs) newseries .append(series) return remove, newseries, newvids, newcolargs
[docs] @custom.processor() def expandCompound(dtable, vids, nameFormat=None, replace=True): """expandCompound([nameFormat][, replace]) Expand a compound column into a set of columns, one for each value. Expands compound variables into a set of columns, one for each value. Rows with different number of values are padded with ``np.nan``. This procedure is applied independently to each column of each specified variable. The ``nameFormat`` option can be used to control how the new columns should be named - it must be a format string using named replacement fields ``'vid'``, ``'visit'``, ``'instance'``, and ``'index'``. The default value for ``nameFormat`` is ``'{vid}-{visit}.{instance}_{index}'``. The ``replace`` option controls whether the original columns are removed (``True`` - the default), or retained. """ if nameFormat is None: nameFormat = '{vid}-{visit}.{instance}_{index}' columns = list(it.chain(*[dtable.columns(v) for v in vids])) newseries = [] newvids = [] for column in columns: data = dtable[:, column.name] newdata = core.expandCompound(data) for i in range(newdata.shape[1]): coldata = newdata[:, i] name = nameFormat.format(vid=column.vid, visit=column.visit, instance=column.instance, index=i) newvids .append(column.vid) newseries.append(pd.Series(coldata, index=dtable.index, name=name)) if replace: return columns, newseries, newvids else: return newseries, newvids
[docs] @custom.processor(auxvids=["primvid", "secvid"]) def createDiagnosisColumns(dtable, vids, primvid, secvid, replace=True, primfmt=None, secfmt=None, binarised=False): """createDiagnosisColumns(primvid, secvid) Create binary columns for (e.g.) ICD10 codes, denoting them as either primary or secondary. This function is intended for use with data fields containing ICD9, ICD10, OPSC3, and OPSC4 diagnosis codes. The UK Biobank publishes these diagnosis/operative procedure codes twice: - The codes are published in a "main" data field containing all codes - The codes are published again in two other data fields, containing separate "primary" and "secondary" codes. ===== =============== ================== ==================== Code Main data field Primary data field Secondary data field ===== =============== ================== ==================== ICD10 41270 41202 41204 ICD9 41271 41203 41205 OPSC4 41272 41200 41210 OPSC3 41273 41256 41258 ===== =============== ================== ==================== For example, this function may be applied to the ICD10 diagnosis codes like so:: 41270 createDiagnosisColumns(41202, 41204) When applied to one of the main data fields, (e.g. 41270 - ICD10 diagnoses), this function will create two new columns for every unique ICD10 diagnosis code: - the first column contains a 1 if the code corresponds to a primary diagnosis (i.e. is also in 41202). - the second column contains a 1 if the code corresponds to a secondary diagnosis (i.e. is also in 41204). The ``replace`` option defaults to ``True`` - this causes the primary and secondary code columns to be removed from the data set. The ``binarised`` option defaults to ``False``, which causes this function to expect the input columns to be in their raw format, as described in the UKB showcase (e.g. https://biobank.ctsu.ox.ac.uk/crystal/field.cgi?id=41270). The ``binarised`` option can be set to ``True``, which will allow this function to be applied to data fields which have been passed through the :func:`binariseCategorical` function. """ # noqa if len(vids) == 0: return if len(vids) != 1: raise ValueError('createDiagnosisColumns can only be ' f'applied to one datafield [{vids}]') if primfmt is None: primfmt = '{primvid}-{code}.primary' if secfmt is None: secfmt = '{secvid}-{code}.secondary' # create separate data frames containing # only primary/secondary code columns mainvid = vids[0] maincols = dtable.columns(mainvid) pricols = dtable.columns(primvid) seccols = dtable.columns(secvid) pridf = dtable[:, [c.name for c in pricols]] secdf = dtable[:, [c.name for c in seccols]] log.debug('Counting unique values across %u columns...', len(maincols)) # Original data field has already been passed # through binariseCategorical - we have one # input column per unique code (and can get # those codes from the Column objects) if binarised: uniq = [c.binariseCategorical_value for c in maincols] uniq = sorted(np.unique(uniq)) else: uniq = [dtable[:, c.name].dropna() for c in maincols] uniq = sorted(pd.unique(pd.concat(uniq, ignore_index=True))) log.debug('Identifying primary/secondary codes [%s = %s + %s]...', mainvid, primvid, secvid) pribin = np.zeros((dtable.shape[0], len(uniq)), dtype=np.uint8) secbin = np.zeros((dtable.shape[0], len(uniq)), dtype=np.uint8) for i, code in enumerate(uniq): pribin[:, i] = (pridf == code).any(axis=1) secbin[:, i] = (secdf == code).any(axis=1) fmtargs = {'mainvid' : mainvid, 'primvid' : primvid, 'secvid' : secvid} pribincols = [primfmt.format(code=code, **fmtargs) for code in uniq] secbincols = [secfmt .format(code=code, **fmtargs) for code in uniq] pribincols = [pd.Series(d, index=dtable.index, name=c) for d, c in zip(pribin.T, pribincols)] secbincols = [pd.Series(d, index=dtable.index, name=c) for d, c in zip(secbin.T, secbincols)] add = pribincols + secbincols addvids = [primvid] * len(uniq) + [secvid] * len(uniq) colargs = [{'metadata' : c} for c in uniq] * 2 if replace: remove = pricols + seccols else: remove = [] return remove, add, addvids, colargs
[docs] @custom.processor() def removeField(dtable, vids): """removeField() Remove all columns associated with one or more datafields/variables. This function can be used to simply remove columns from the data set. This can be useful if a variable is required for some processing step, but is not needed in the final output file. """ return list(it.chain(*[dtable.columns(v) for v in vids]))