Source code for funpack.parsing.process

#!/usr/bin/env python
#
# process.py - The Process class, and functions for parsing processing steps.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""The :class:`Process` class, and functions for parsing processing steps.

This module defines the :class:`Process` class which is used by the
:mod:`funpack.processing` module to run FUNPACK processing steps.
"""


import functools       as ft
import itertools       as it
import                    logging
import                    warnings
import                    collections
import collections.abc as abc

import pyparsing as pp

from funpack import custom


log = logging.getLogger(__name__)


[docs] class NoSuchProcessError(Exception): """Exception raised by the :class:`Process` class when an unknown process name is specified. """
[docs] class Process: """Simple class which represents a single processing step. The :meth:`run` method can be used to run the process on the data for one or more variables. """ def __init__(self, ptype, name, args, kwargs, procstr): """Create a ``Process``. :arg ptype: Process type - either ``cleaner`` or ``processor`` (see the :mod:`.custom` module). :arg name: Process name :arg args: Positional arguments to pass to the process function. :arg kwargs: Keyword arguments to pass to the process function. :arg procstr: Input string containing the process specification. Any keyword arguments which begin with ``'broadcast_'`` are separated out the other keyword arguments, although note that broadcasting is deprecated and will be removed in FUNPACK 4.0.0. See the :meth:`run` method for more details. """ bcastKwargs = collections.OrderedDict() normalKwargs = collections.OrderedDict() for k, v in kwargs.items(): if k.startswith('broadcast_'): warnings.warn(f'[{k}]: Broadcasting is deprecated, and ' 'will be removed in FUNPACK 4.0.0 ', DeprecationWarning) k = '_'.join(k.split('_')[1:]) bcastKwargs[k] = v else: normalKwargs[k] = v # cleaner functions are not # defined in processing_functions, # so in this case func will be None. self.__ptype = ptype self.__name = name self.__args = args self.__kwargs = normalKwargs self.__procstr = procstr self.__bcastKwargs = bcastKwargs self.__metaproc = normalKwargs.pop('metaproc', None) def __repr__(self): """Return a string representation of this ``Process``.""" args = ','.join([str(v) for v in self.__args]) kwargs = ','.join([f'{k}={v}' for k, v in it.chain(self.__kwargs.items(), self.__bcastKwargs.items())]) allargs = [args, kwargs] allargs = [a for a in allargs if a != ''] allargs = ', '.join(allargs) return f'{self.__name}[{self.__ptype}]({allargs})' @property def name(self): """Returns the name of this ``Process``. """ return self.__name @property def args(self): """Returns the positional arguments for this ``Process``. """ return self.__args @property def kwargs(self): """Returns the keyword arguments for this ``Process``. """ return self.__kwargs @property def processString(self): """Returns the original string, from the processing table/ command-line, which defines this ``Process``. """ return self.__procstr @property def broadcastKwargs(self): """Returns the keyword arguments for this ``Process`` which will broadcasted across all variable IDs that are passed to an invocation of :meth:`run`. """ return self.__bcastKwargs @property def filterMissing(self): """Return ``True`` if this processing function expects that the list of variable IDs which it is given will not contain the IDs of variables which are not present in the data. This property is set via a ``filterMissing`` argument passed to the processor decorator function. Its default value is ``True``. """ return custom.args(self.__ptype, self.__name).get('filterMissing', True)
[docs] def auxillaryVariables(self, broadcastIndex=None): """Returns a list of "auxillary" variables for this process. Auxillary variables are variables which a process is not being applied to, but which is needed by the process. These variables are passed in as arguments to the process. The names of any arguments which contain auxillary variables are specified via the ``auxvids`` argument to the processor decorator function. """ auxargs = custom.args(self.__ptype, self.__name).get('auxvids', []) auxvids = [] for arg in auxargs: if arg in self.kwargs: val = self.kwargs[arg] elif ((broadcastIndex is not None) and (arg in self.broadcastKwargs)): val = self.broadcastKwargs[arg][broadcastIndex] else: continue if isinstance(val, abc.Sequence): auxvids.extend(val) else: auxvids.append(val) return auxvids
[docs] def run(self, *args, broadcastIndex=None): """Run the process on the data, passing it the given arguments, and any arguments that were passed to :meth:`__init__`. :arg broadcastIndex: Deprecated. If provided, and if any broadcast arguments were specified for this process, this index is used to retrieve one value each broadcast argument list - this value is then passed to the process function. """ kwargs = self.__kwargs.copy() bcast = self.__bcastKwargs # retrieve the value for each broadcast argument, # and pass them in as regular keyword arguments if broadcastIndex is not None and len(bcast) > 0: for k, v in bcast.items(): kwargs[k] = v[broadcastIndex] result = custom.run(self.__ptype, self.__name, *args, *self.__args, **kwargs) if self.__metaproc is not None and \ isinstance(result, tuple) and \ len(result) == 4: # The first argument to a process # should be the data table dtable = args[0] # The 3rd/4th args returned from a # process should be a list of vids, # and a list of Column kwargs for # each of them vids = result[2] kwargs = result[3] mproc = self.__metaproc newkwargs = [] for vid, vkwargs in zip(vids, kwargs): if vkwargs is None or 'metadata' not in vkwargs: newkwargs.append(vkwargs) continue try: vkwargs['metadata'] = custom.runMetaproc( mproc, dtable, vid, vkwargs['metadata']) newkwargs.append(vkwargs) except Exception as e: log.warning('Metadata processing function ' 'failed (vid %u): %s', vid, e) newkwargs.append(vkwargs) result = tuple(list(result[:3]) + [newkwargs]) return result
[docs] def parseProcesses(procs, ptype): """Parses the given string containing one or more comma-separated process calls, as defined in the processing table. Returns a list of :class:`Process` objects. :arg procs: String containing one or more comma-separated (pre-)processing steps. :arg ptype: either ``cleaner`` or ``processor`` :returns: A list of :class:`Process` objects. """ def makeProcess(toks): name = toks[0] args = () kwargs = {} if len(toks) == 2: if isinstance(toks[1], tuple): args = toks[1] elif isinstance(toks[1], dict): kwargs = toks[1] elif len(toks) == 3: args, kwargs = toks[1:] if not custom.exists(ptype, name): raise NoSuchProcessError(name) return Process(ptype, name, args, kwargs, procs) parser = pp.delimitedList(makeProcessParser().setParseAction(makeProcess)) try: parsed = parser.parseString(procs, parseAll=True) except Exception as e: log.error('Error parsing process list "%s": %s', procs, e) raise e return list(parsed)
[docs] @ft.lru_cache() def makeProcessParser(): """Generate a ``pyparsing`` parser which can be used to parse a single process call in the processing table. """ lparen = pp.Literal('(').suppress() rparen = pp.Literal(')').suppress() lbracket = pp.Literal('[').suppress() rbracket = pp.Literal(']').suppress() def convertBoolean(tok): tok = tok[0] if tok == 'True': return True elif tok == 'False': return False else: return tok def parseValList(toks): return [list(toks)] def parseArgs(toks): return [tuple(toks)] def parseKwargs(toks): kwargs = collections.OrderedDict() for i in range(0, len(toks), 2): kwargs[toks[i]] = toks[i + 1] return kwargs funcName = pp.pyparsing_common.identifier argval = (pp.QuotedString('"') ^ pp.QuotedString("'") ^ pp.pyparsing_common.number ^ pp.oneOf(['True', 'False']).setParseAction(convertBoolean) ^ pp.Literal('None').setParseAction(pp.replaceWith(None))) # argument values are either a # scalar, or a list of scalars # within square brackets vallist = (lbracket + pp.delimitedList(argval) + rbracket) vallist = vallist.setParseAction(parseValList) argval = argval ^ vallist # arguments are either # positional or keyword kwargs = (pp.pyparsing_common.identifier + pp.Literal('=').suppress() + argval) posargs = pp.delimitedList(argval).setParseAction(parseArgs) kwargs = pp.delimitedList(kwargs).setParseAction(parseKwargs) allargs = pp.delimitedList(pp.Optional(posargs) + pp.Optional(kwargs)) # function can be called as: # function # function() # function(args) allargs = lparen + pp.Optional(allargs) + rparen function = funcName + pp.Optional(allargs) return function