Source code for funpack.processing

#!/usr/bin/env python
#
# processing.py - Cleaning and processing parsing and functions.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains functionality for running "processes", processing
steps which are applied to one or more variables, and which may result in
columns being removed from, or new columns being added to, the data
table. These processes are specified in the processing table.


The functionality in this module is also used to manage cleaning functions,
which are specified in the ``Clean`` column of the variable table. Definitions
of all available (pre-)processing functions are in the
:mod:`cleaning_functions` and :mod:`.processing_functions` modules.


The :func:`processData` function is also defined here - it executes the
processes defined in the processing table.


Logic for parsing processing step strings can be found in the
:mod:`funpack.parsing.process` module.


Special processing functions can be applied to a variable's data by adding
them to the ``Clean`` and ``Process``  of the variable or processing
table respectively.  Processing is specified as a comma-separated list of
process functions - for example::


    process1, process2(), process3('arg1', arg2=1234)


The :func:`.parseProcesses` function parses such a line, and returns a list of
:class:`.Process` objects which can be used to query the process name and
arguments, and to run each process.


The processing table
--------------------


Within the processing table, the above process specification is preceeded by a
variable definition, which states which variables the process should be
applied to, and how they should be applied - either independently, or
together. This information is used to parallelise the processing where
possible.


For each process, the processing table contains a "process variable type", a
list of vids, and the process itself.  The process variable type is one of
the following:

  - No process variable type, simply a comma-separated list of VIDS, The
    process is applied to the specified vids::

        1,2,3 processName

  - ``'independent'``: The process is applied independently to the specified
    vids::

        independent,1,2,3 processName

  - ``'all'``: The process is applied to all vids::

        all processName

  - ``'all_independent'``: The process is applied independently to all vids::

       all_independent processName

  - ``'all_except'``: The process is applied to all vids except the specified
    ones::

       all_except,1,2,3 processName

  - ``'all_independent_except'``: The process is applied independently to all
    vids except the specified ones::

       all_independent_except,1,2,3 processName


For example, the :func:`.processing_functions.binariseCategorical` function
applies its logic independently to each variable, so it makes sense to specify
that it should be applied independently to a set of variables::

    independent,1,2,3,4,5 binariseCategorical

However, the :func:`.removeIfRedundant` function works on a collection of
variables (probably all variables in the data set), and this must be specified
in the processing table.

    all,1,2,3,4,5 removeIfRedundant(0.99)


Broadcasting arguments
----------------------

.. warning:: Broadcasting arguments are deprecated, and will be removed in
             FUNPACK 4.0.0. The alternative to broadcasting is for processing
             functions to perform their own parallelisation.

When a process is applied independently to more than one variable, the input
arguments to the process may need to be different for each variable. This can
be accomplished by using a _broadcast_ argument - simply prefix the argument
name with ``'broadcast_'``, and then specify a list containing the argument
values for each variable. For example, the following specification will result
in the :func:`.processing_functions.binariseCategorical` process being applied
independently to variables 1, 2, and 3, with values taken from variables 4, 5,
and 6 respectively.

    independent,1,2,3 binariseCategorical(broadcast_take=[4, 5, 6])

Note that broadcast arguments are useful as a performance optimisation - the
above specification is functionally equivalent to::

    1 binariseCategorical(take=4)
    2 binariseCategorical(take=5)
    3 binariseCategorical(take=6)

however the latter example cannot take advantage of parallelism in a
multi-core environment.
"""


import functools       as ft
import itertools       as it
import os.path         as op
import                    os
import                    glob
import                    logging
import                    tempfile

import pandas as pd

from . import util


log = logging.getLogger(__name__)


[docs] def processData(dtable): """Applies all processing specified in the processing table to the data. :arg dtable: The :class:`DataTable` instance. """ for i in dtable.proctable.index: procs, vids, parallel = retrieveProcess(dtable, i) allvids = list(it.chain(*vids)) if len(allvids) == 0: continue # Run each process sequentially - # each process may be parallelised # by the runProcess function for proc in procs.values(): log.debug('Running process %s on %u variables %s ...', proc.name, len(allvids), allvids[:5]) fmt = '[{} {} ...] completed in %s (%+iMB)'.format( proc.name, allvids[:5]) with util.timed(proc.name, log, logging.DEBUG, fmt=fmt), \ tempfile.TemporaryDirectory() as workDir: runProcess(proc, dtable, vids, workDir, parallel)
[docs] def retrieveProcess(dtable, procIdx): """Used by :func:`processData`. Retrieves the process at index ``procIdx`` in the processing table, and generates the variable groups that the process should be applied to. :arg dtable: The :class:`.DataTable` :arg procIdx: Index into the processing table :returns: A tuple containing: - A dict of ``{ name : Process }`` mappings containing the :class:`.Process` objects to be executed. - A list of lists, each list containing a group of variable IDs that the process should be applied to - ``True`` if the process can be applied in parallel across the variable groups, ``False`` otherwise. """ i = procIdx ptable = dtable.proctable all_vids = dtable.variables all_vids = [v for v in all_vids if v != 0] # For each process, the processing table # contains a "process variable type", # a list of vids, and the process itself. # The pvtype is one of: # - vids: apply the process to the specified vids # - independent: apply the process independently to the # specified vids # - all: apply the process to all vids # - all_independent: apply the process independently to all # vids # - all_except: apply the process to all vids except the # specified ones # - all_independent_except: apply the process independently to all # vids except the specified ones pvtype, vids = ptable.loc[i, 'Variable'] procs = ptable.loc[i, 'Process'] # Build a list of lists of vids, with # each vid list a group of variables # that is to be processed together. # apply independently to every variable if pvtype in ('all_independent', 'all_independent_except'): if pvtype.endswith('except'): exclude = vids else: exclude = [] vids = [[v] for v in all_vids if v not in exclude] # apply simultaneously to every variable elif pvtype in ('all', 'all_except'): if pvtype.endswith('except'): exclude = vids else: exclude = [] vids = [[v for v in all_vids if v not in exclude]] # apply independently to specified variables elif pvtype == 'independent': vids = [[v] for v in vids] # apply simultaneously to specified variables else: # ptype == 'vids' vids = [vids] return procs, vids, 'independent' in pvtype
[docs] def runProcess(proc, dtable, vids, workDir, parallel): """Called by :func:`processData`. Runs the given process, and updates the :class:`.DataTable` as needed. :arg proc: :class:`.Process` to run. :arg dtable: :class:`.DataTable` containing the data. :arg workDir: Directory to save/load new columns to/from, if the processing is performed in a worker process. :arg vids: List of lists, groups of variable IDs to run the process on. :arg parallel: If ``True``, each variable group is processed in parallel. Otherwise they are processed sequentially. """ results = [] def filterMissing(vids): """Takes a list of variable IDs and removes those that are not present in the data set, emitting a warning for each ID that is removed. """ if not proc.filterMissing: return vids filtered = [] for vid in vids: if dtable.present(vid): filtered.append(vid) else: log.warning('Process %s refers to missing variable %u! ' '(%s)', proc.name, vid, proc.processString) return filtered # run process serially if not parallel: for vg in vids: vg = filterMissing(vg) results.append(proc.run(dtable, vg)) # or run in parallel across vid groups else: with dtable.pool() as pool: # Note. This code is horrible for a number of # reasons, including that parallelisation in # older versions of FUNPACK worked differently, # and for the sake of preserving backwards # compatibility with resepct to the use of the # argument broadcast feature (although I # seriously doubt that anybody is even using # broadcasting). # gather all variables required by this process - # the ones which are specified in the processing # table, along with any auxillary ones speciified # as arguments to the process. "allvids" is used # to creae the sub-table, and "vids" is the list # of vids that we ask the processing function to # process. unfiltered = vids vids = [] allvids = [] bcastIdxs = [] for i, vg in enumerate(unfiltered): vg = filterMissing(vg) auxvids = filterMissing(proc.auxillaryVariables(i)) if len(vg) > 0: vids .append(vg) allvids .append(vg + auxvids) bcastIdxs.append(i) # generate a subtable for each variable group - # this is to minimise the amount of data that # needs to be transferred to worker processes, # if we are parallelising. allcols = [list(it.chain(*[dtable.columns(v) for v in vg])) for vg in allvids] subtables = [dtable.subtable(cols) for cols in allcols] # separate work dir for each variable group workDirs = [op.join(workDir, str(i)) for i in range(len(vids))] func = ft.partial(runParallelProcess, proc) parresults = pool.starmap(func, zip(subtables, vids, workDirs, bcastIdxs)) subtables, parresults = zip(*parresults) results.extend(parresults) # Merge results back in - this # includes in-place modifications # to columns, and column flags/ # metadata. Added/removed columns # are handled below. log.debug('Processing for %u vid groups complete - merging ' 'results into main data table.', len(subtables)) dtable.merge(subtables) remove = [] add = [] addvids = [] addkwargs = [] for r in results: results = unpackResults(proc, r) remove .extend(results[0]) add .extend(results[1]) addvids .extend(results[2]) addkwargs.extend(results[3]) # Parallelised processes save new # series to disk - load them back in. if log.getEffectiveLevel() >= logging.DEBUG: savedSeries = list(glob.glob( op.join(workDir, '**', '*.pkl'), recursive=True)) if len(savedSeries) > 0: log.debug('[%s]: Reading %u new columns from %s %s ...', proc.name, len(savedSeries), workDir, vids[:5]) for i in range(len(add)): if isinstance(add[i], str): add[i] = pd.read_pickle(add[i]) # remove columns first, in case # there is a name clash between # the old and new columns. if len(remove) > 0: dtable.removeColumns(remove) if len(add) > 0: dtable.addColumns(add, addvids, addkwargs)
[docs] def runParallelProcess(proc, dtable, vids, workDir, broadcastIndex=None): """Used by :func:`runProcess`. Calls ``proc.run``, and returns its result and the (possibly modified) ``dtable``. :arg proc: :class:`.Process` to run :arg dtable: :class:`.DataTable` instance (probably a subtable) :arg vids: List of variable IDs :arg workDir: Directory to save new columns to if running in a worker process - this is used to reduce the amount of data that must be transferred between processes. :arg broadcastIndex: Deprecated. Index to use for broadcast arguments - passed through to the :meth:`.Process.run` method. :returns: A tuple containing: - A reference to ``dtable`` - the result of ``proc.run()`` """ os.mkdir(workDir) result = proc.run(dtable, vids, broadcastIndex=broadcastIndex) remove, add, addvids, addkwargs = unpackResults(proc, result) # New columns are saved to disk, # rather than being copied back # to the parent process. We only # do this if running in a # multiprocessing context if not util.inMainProcess() and len(add) > 0: log.debug('[%s]: Saving %u new columns to %s %s...', proc.name, len(add), workDir, vids[:5]) for i, series in enumerate(add): fname = op.join(workDir, '{}.pkl'.format(i)) add[i] = fname series.to_pickle(fname) return dtable, (remove, add, addvids, addkwargs)
[docs] def unpackResults(proc, result): """Parses the results returned by a :class:`.Process`. See the :mod:`.processing_functions` module for details on what can be returned by a processing function. :arg proc: The :class:`.Process` :arg result: The value returned by :meth:`.Process.run`. :returns: A tuple containing: - List of columns to remove - List of new series to add - List of variable IDs for new series - List of :class:`.Column` keyword arguments for new series """ remove = [] add = [] addvids = [] addkwargs = [] if result is None: return remove, add, addvids, addkwargs error = ValueError('Invalid return value from ' 'process {}'.format(proc.name)) def expand(res, length): if res is None: return [None] * length else: return res # columns to remove if isinstance(result, list): remove.extend(result) elif isinstance(result, tuple): # series/vids to add if len(result) == 2: add .extend(result[0]) addvids .extend(expand(result[1], len(result[0]))) addkwargs.extend(expand(None, len(result[0]))) # columns to remove, and # series/vids to add elif len(result) in (3, 4): if len(result) == 3: result = list(result) + [None] remove .extend(result[0]) add .extend(result[1]) addvids .extend(expand(result[2], len(result[1]))) addkwargs.extend(expand(result[3], len(result[1]))) else: raise error else: raise error return remove, add, addvids, addkwargs