Source code for funpack.util

#!/usr/bin/env python
#
# util.py - Miscellaneous utility functions.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains a collection of miscellaneous utility functions,
classes, and constants.
"""


import                    os
import                    re
import                    sys
import                    enum
import                    site
import                    time
import                    shutil
import                    logging
import                    warnings
import                    tempfile
import                    functools
import                    contextlib
import itertools       as it
import os.path         as op
import subprocess      as sp
import multiprocessing as mp

import numpy           as np
import pandas          as pd

from typing import Any

# The stdlib resource module is only
# available on unix-like platforms.
try:
    import resource
except ImportError:
    resource = None


log = logging.getLogger(__name__)


CTYPES = enum.Enum(
    'CTYPES',
    ['sequence',
     'integer',
     'continuous',
     'categorical_single',
     'categorical_single_non_numeric',
     'categorical_multiple',
     'categorical_multiple_non_numeric',
     'time',
     'date',
     'text',
     'compound',
     'unknown'])
"""The ``CTYPES`` enum defines all the types that ``funpack`` is aware of.
"""


DATA_TYPES = {

    # We have to use floating point for
    # integer types because pandas uses
    # nan to represent missing data.
    CTYPES.integer                          : np.float32,
    CTYPES.continuous                       : np.float32,
    CTYPES.categorical_single               : np.float32,
    CTYPES.categorical_multiple             : np.float32,
    CTYPES.sequence                         : np.uint32,
    CTYPES.categorical_single_non_numeric   : str,
    CTYPES.categorical_multiple_non_numeric : str,
    CTYPES.text                             : str,
    CTYPES.compound                         : str,
}
"""Default internal data type to use for the different variable types.
Used by the :func:`columnTypes` function. These types may be overridden
by the ``InternalType`` column of the variable table, which is populated
from the ``funpack/schema/type.txt`` file (see :func:`.loadTableBases`).
"""


[docs] def parseColumnName(name): """Parses a UK Biobank column name, returns the components. Two column naming formats are supported. The name is expected to be a string of one of the following forms:: variable-visit.instance variable.instance f.variable.visit.instance where ``variable`` and ``visit`` are integers. ``instance`` is typically also an integer, but non-numeric values for ``instance`` are accepted. This (and the second form above) is to allow parsing of derived columns (see e.g. the :func:`.processing_functions.binariseCategorical` processing function). Some variables have the form:: f.variable..visit.instance For these variables, the visit is interpreted as a negative number. If ``name`` does not have one of the above forms, a :exc:`ValueError` is raised. .. note:: For the vast majority of biobank variables, the second number in a column name (``visit`` above) corresponds to the assessment visit. However, there are a small number of variables which are not associated with a specific visit, and thus for which this number does not correspond to a visit (e.g. variable 40006), but to some other coding. Confusingly, the UK Biobank showcase refers to the coding that a variable adheres to as an "instancing", whilst also using the term "instance" to refer to the columns of multi-valued variables - the ``instance`` element of the column name. The "instancing" that a variable uses is contained in the ``Instancing`` column of the variable table. Variables for which the ``visit`` component of their column names do correspond to an actual visit have an instancing equal to 2. :arg name: Column name :returns: A tuple containing: - variable ID - visit number - instance (may be an integer or a string) """ def parse_norm(grps): vid = int(grps[0]) visit = int(grps[2]) instance = grps[3] if grps[1] is not None: visit = -visit return vid, visit, instance def parse_deriv(grps): vid = int(grps[0]) instance = grps[1] return vid, 0, instance patterns = [ (r'([0-9]+)-(-)?([0-9]+)\.(.+)', parse_norm), (r'([0-9]+)\.(.+)', parse_deriv), (r'f\.([0-9]+)\.(\.)?([0-9]+)\.([0-9]+)', parse_norm) ] for pat, parse in patterns: pat = re.compile(pat) match = pat.fullmatch(name) if match is None: continue vid, visit, instance = parse(match.groups()) # accept numeric/non-numeric instance try: instance = int(instance) except ValueError: pass break if match is None: raise ValueError('Invalid column name: {}'.format(name)) return (vid, visit, instance)
[docs] def generateColumnName(variable, visit, instance): """Generate a column name for the given variable, visit and instance. :arg variable: Integer variable ID :arg visit: Visit number :arg instance: Instance number """ return '{}-{}.{}'.format(variable, visit, instance)
[docs] def findConfigDir(dirname='configs'): """Returns the first entry from ``findConfigDirs``. If ``$FUNPACK_CONFIG_DIR`` is set, it will be returned. Otherwise, it will be the location of the `funpack/configs/` directory as described in :func:`findConfigDirs`. """ return findConfigDirs(dirname)[0]
[docs] def findConfigDirs(dirname='configs'): """Returns a list of candidate FUNPACK configuration directories. The FUNPACK FMRIB configuration installs its config/table files into ``<python>/lib/python<X.Y>/site-packages/funpack/configs/``. If FUNPACK is installed into that Python environment, this directory will be alongside the FUNPACK source code. However, if FUNPACK is being executed from a source checkout, we have to use ``site.getsitepackages`` to find the location of the config directory. The ``dirname`` argument may also be set to ``plugins``, in which case the path to the ``funpack.plugins`` module will be returned. The ``$FUNPACK_CONFIG_DIR`` environment variable can also be used to point to a configuration directory - if set, the returned list will include ``$FUNPACK_CONFIG_DIR/`` at the beginning. A ``RuntimeError`` is raised if the config directory cannot be found. """ # The user can refer to "built-in" config # files just by giving a file path # with/without suffix, relative to one of # the following locations (in order of # precedence): # # - in $FUNPACK_CONFIG_DIR, or # - if we are running from a git checkout, installed in the running # python env (<pyenv>/lib/pythonX.Y/site-packages/funpack/configs/), or # - within the funpack package directory, (<thisdir>/configs/) cfgdirs = [] candidates = [] if 'FUNPACK_CONFIG_DIR' in os.environ: candidates.append(os.environ['FUNPACK_CONFIG_DIR']) candidates.extend(op.join(sitedir, 'funpack', dirname) for sitedir in site.getsitepackages()) candidates.append(op.join(op.dirname(__file__), dirname)) for candidate in candidates: if op.isdir(candidate): cfgdirs.append(candidate) if len(cfgdirs) == 0: raise RuntimeError('Cannot find FUNPACK configuration directory!') return cfgdirs
[docs] def findTableFile(filename): """Searches for a FUNPACK table tile - see :func:`findConfigFile`. """ return findConfigFile(filename, '.tsv')
[docs] def findPluginFile(filename): """Searches for a FUNPACK plugin tile - see :func:`findConfigFile`. """ return findConfigFile(filename, '.py', dirname='plugins')
[docs] def findConfigFile(filename, suffix='.cfg', dirname='configs'): """Searches for a FUNPACK configuration file in a number of locations. :arg filename: Name of file to search for :arg suffix: Suffix to append, if the filename is specfied without one (must include the leading period). :arg dirname: Name of internal/built-in directory to search - assumed to be within the ``funpack`` package directory, e.g. ``funpack/configs/``. :returns: Absolute path to the found file, or ``filename`` unmodified if a match was not found. """ # Make things easier for users of this function if filename is None: return filename # Suffix is just appended straight onto the # file name, so empty string is a no-op if suffix is None: suffix = '' # config files may be absolute / relative # paths to an arbitrary location if op.isfile(filename): return op.abspath(filename) cfgdirs = findConfigDirs(dirname) # Built-in config files can be specified # with (in order of precedence): # # - file path with suffix (e.g. "fmrib/categories.tsv") # - file path without suffix (e.g. "fmrib/categories") # - file path without suffix, with dots instead of slashes # (e.g. "fmrib.categories") candidates = [filename, f'{filename}{suffix}', f'{filename.replace(".", op.sep)}{suffix}'] for cfgdir, cand in it.product(cfgdirs, candidates): cand = op.abspath(op.join(cfgdir, cand)) if op.isfile(cand): return cand # Can't find the file - return the # path unmodified, which will result # in an error at some other point. return filename
[docs] def parseMatlabRange(r): """Parses a string containing a MATLAB-style ``start:stop`` or ``start:step:stop`` range, where the ``stop`` is inclusive). :arg r: String containing MATLAB_style range. :returns: List of integers in the fully expanded range. """ elems = [int(e) for e in r.split(':')] if len(elems) == 3: start, step, stop = elems if step > 0: stop += 1 elif step < 0: stop -= 1 elif len(elems) == 2: start, stop = elems stop += 1 step = 1 elif len(elems) == 1: start = elems[0] stop = start + 1 step = 1 else: raise ValueError('Invalid range string: {}'.format(r)) return list(range(start, stop, step))
[docs] def dedup(seq): """Remove duplicates from a sequence, preserving order. Returns a list. """ newseq = [] for i in seq: if i not in newseq: newseq.append(i) return newseq
[docs] def wc(fname): """Uses ``wc`` to count the number of lines in ``fname``. :arg fname: Name of the file to check :returns: Number of lines in ``fname``. """ with timed('Row count', log): if shutil.which('wc'): nrows = sp.check_output(['wc', '-l', fname]).split()[0] else: nrows = 0 with open(fname) as f: for _ in f: nrows = nrows + 1 return int(nrows)
[docs] def cat(files, outfile): """Uses ``cat`` to concatenate ``files``, saving the output to ``outfile``. :arg files: Sequence of files to concatenate. :arg outfile: Name of file to save output to. """ with timed('Concatenate files', log): if shutil.which('cat'): with open(outfile, 'w') as outf: cmd = ['cat'] + list(files) sp.run(cmd, check=True, stdout=outf) else: with open(outfile, 'w') as outf: for infile in files: with open(infile, 'r') as inf: while True: buf = inf.read(16777216) if buf: outf.write(buf) else: break
[docs] def inMainProcess(): """Returns ``True`` if the running process is the main (parent) process. Returns ``False`` if the running process is a child process (e.g. a ``multiprocessingg`` worker process). """ return mp.current_process().pid == inMainProcess.pid
# Save the main process # ID, so inMainProcess # can compare against it inMainProcess.pid = mp.current_process().pid
[docs] @contextlib.contextmanager def timed(op=None, logger=None, lvl=None, fmt=None): """Context manager which times a section of code, and prints a log message afterwards. :arg op: Name of operation which is being timed :arg logger: Logger object to use - defaults to :attr:`log`. :arg lvl: Log level - defaults to ``logging.INFO``. :arg fmt: Custom message. If not provided, a default message is used. Must be a ``'%'``-style format string which accepts two parameters: the elapsed time (``%s``), and the memory usage (``%i``).. """ if fmt is None: fmt = '[{}] completed in %s (%+iMB)'.format(op) if logger is None: logger = log if lvl is None: lvl = logging.INFO if op is not None: logger.log(lvl, 'Running task [%s]', op) # ru_maxrss appears to be bytes under # macos, and kilobytes under linux if sys.platform == 'darwin': memdenom = 1048576.0 else: memdenom = 1024.0 if resource is not None: startmem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss else: startmem = 0 starttime = time.time() yield endtime = time.time() if resource is not None: endmem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss else: endmem = 0 hours = int( (endtime - starttime) / 3600) minutes = int(((endtime - starttime) % 3600) / 60) seconds = int(((endtime - starttime) % 3600) % 60) timestr = '{:d} seconds'.format(seconds) if minutes > 0: timestr = '{} minutes, {}'.format(minutes, timestr) if hours > 0: timestr = '{} hours, {}' .format(hours, timestr) mbytes = (endmem - startmem) / memdenom if minutes: logger.log(lvl, fmt, timestr, mbytes) else: logger.log(lvl, fmt, timestr, mbytes)
[docs] def logIfError(label): """Decorator which emits a log message with ``label`` if the decorated function raises an ``Exception``. """ def wrapper(func): def decorator(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: log.error(label, exc_info=e) raise e return functools.update_wrapper(decorator, func) return wrapper
[docs] def deprecated(message): """Decorator used to mark a function or method as deprecated """ def wrapper(func): warnings.filterwarnings('default', category=DeprecationWarning) def decorator(*args, **kwargs): warnings.warn(message, DeprecationWarning, stacklevel=2) return func(*args, **kwargs) return functools.update_wrapper(decorator, func) return wrapper
[docs] @contextlib.contextmanager def tempdir(root=None, changeto=True): """Create and change into a temporary directory, deleting it on exit. :arg root: Create the directory as a sub-directory of ``root`` (default: ``$TMPDIR``) :arg changeto: Change into the directory (default: ``True``) """ testdir = tempfile.mkdtemp(dir=root) prevdir = os.getcwd() try: if changeto: os.chdir(testdir) yield testdir finally: if changeto: os.chdir(prevdir) shutil.rmtree(testdir)
[docs] def isna(val : Any) -> bool: """Test whether ``val`` is NaN. Return ``True`` if ``val`` is ``nan``, or if ``val`` is a sequence where every value contained within is ``nan``. """ try: result = pd.isna(val) if isinstance(result, bool): return result else: return result.all() except ValueError: return False
[docs] class Singleton: """Manages a reference to a single instance of a class. This is not a true singleton - there are no restrictions against multiple instances being created. However, a reference is only held to the first created instance. The ``Singleton`` class is used as the base class for :class:`.DataTable`, to allow for shared-memory access to the ``DataTable`` by worker processes. """ def __new__(cls, *args, **kwargs): """Create a new instance and save a ref to it, if one does not yet exist. """ new = super().__new__(cls) if Singleton.instance() is None: Singleton.setInstance(new) return new
[docs] @classmethod def instance(cls): """Return a reference to the singleton instance, or ``None`` if one does not exist. """ return getattr(cls, '_{}_singleton'.format(cls.__name__), None)
[docs] @classmethod def setInstance(cls, inst): """Set/override the singleton instance. """ setattr(cls, '_{}_singleton'.format(cls.__name__), inst)