#!/usr/bin/env python
#
# main.py - funpack entry point
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains the ``funpack`` entry point. """
import multiprocessing as mp
import os.path as op
import sys
import logging
import warnings
import datetime
import calendar
import pandas as pd
import pandas.api.types as pdtypes
import threadpoolctl
import funpack
import funpack.util as util
import funpack.icd10 as icd10
import funpack.config as config
import funpack.custom as custom
import funpack.dryrun as dryrun
import funpack.fileinfo as fileinfo
import funpack.cleaning as cleaning
import funpack.datatable as datatable
import funpack.exporting as exporting
import funpack.importing as importing
import funpack.loadtables as loadtables
import funpack.processing as processing
import funpack.schema as schema
import funpack.schema.hierarchy as hierarchy
log = logging.getLogger(__name__)
[docs]
def main(argv=None):
"""``funpack`` entry point. """
# Make sure built in plugins are
# registered, as they are queried
# in the command-line help. Set
# logging to critical until we've
# parsed command-line args.
logging.getLogger().setLevel(logging.CRITICAL)
custom.registerBuiltIns()
args, argv = config.parseArgsWithConfigFile(argv)
date = datetime.date.today()
# Set the number of threads
# that numpy should use
threadpoolctl.threadpool_limits(args.num_jobs)
# disable schema download if requested
schema.DOWNLOAD = not args.no_download
# Now that args are passed,
# we can set up logging properly.
configLogging(args)
log.info('funpack %s', funpack.version())
log.info('Date: %s (%s)', date.today(), calendar.day_name[date.weekday()])
log.info('Command-line arguments %s', ' '.join(argv))
for e in args.echo:
log.info(e)
log.debug('Running with the following options')
for name, val in args.__dict__.items():
if val is not None:
val = str(val)
if len(val) <= 30: log.debug(' %s: %s', name, val)
else: log.debug(' %s: %s...', name, val[:30])
# Load any custom plugins
# that have been specified.
if args.plugin_file is not None:
for p in args.plugin_file:
custom.loadPluginFile(p)
# default output format inferred
# from output filename or, failing
# that, tsv
if args.format is None:
fmt = op.splitext(args.outfile)[1].lower().strip('.')
if fmt in ('h5', 'hdf'):
fmt = 'hdf5'
if not custom.exists('exporter', fmt):
fmt = 'tsv'
args.format = fmt
# error if any loaders/formats are
# invalid (we can only perform this
# check after plugins have been
# loaded)
if args.loader is not None:
for f, l in args.loader.items():
if not custom.exists('loader', l):
raise ValueError('Unknown loader {} [{}]'.format(l, f))
if not custom.exists('exporter', args.format):
raise ValueError('Unknown output format {}'.format(args.format))
if args.date_format is not None and \
not custom.exists('formatter', args.date_format):
raise ValueError('Unknown date format {}'.format(args.date_format))
if args.time_format is not None and \
not custom.exists('formatter', args.time_format):
raise ValueError('Unknown time format {}'.format(args.time_format))
if args.var_format is not None:
for v, f in args.var_format.items():
if not custom.exists('formatter', f):
raise ValueError('Unknown formatter {} [{}]'.format(f, v))
if args.num_jobs > 1:
log.debug('Running up to %i jobs in parallel', args.num_jobs)
# Funpack relies on fork() to share
# the dataset to child processes
# when parallelising tasks. This
# is potentially dangerous on macOS
# - see the "macOS issues" note in
# README, and the python bug report:
# https://bugs.python.org/issue33725
if mp.get_start_method(True) is None:
mp.set_start_method('fork')
# The icd10 module maintains information
# which is potentially shared (read from
# and written to) by multiple processes,
# so we use a mp.Manager to handle
# shared state.
mgr = mp.Manager()
icd10.initialise(mgr)
else:
mgr = None
with util.timed(None, log, fmt='Total time: %s (%+iMB)'):
dtable, unknowns, uncategorised, drop = doImport(args, mgr)
if args.dry_run:
dryrun.doDryRun(dtable, unknowns, uncategorised, drop, args)
else:
doCleanAndProcess( dtable, args)
doExport( dtable, args)
doUnknownsExport( dtable, args, unknowns, uncategorised)
doICD10Export( args)
doDescriptionExport(dtable, args)
doSummaryExport( dtable, args)
return 0
[docs]
def doImport(args, mgr):
"""Data import stage.
:arg args: :class:`argparse.Namespace` object containing command line
arguments
:arg mgr: :class:`multiprocessing.Manager` object for parallelisation (may
be ``None``)
:returns: A tuple containing:
- A :class:`.DataTable` containing the data
- A sequence of :class:`.Column` objects representing the
unknown columns.
- A sequence of :class:`.Column` objects representing columns
which are uncategorised, and have no processing or cleaning
rules specified on them.
- A list of :class:`.Column` objects that were not loaded from
each input file.
"""
# if --remove_duplicates, we append
# an identifying suffix to the names
# of columns to be removed. This is
# then passed through as an exclusion
# pattern to the importData function
# via its excludeColnames option.
if args.remove_duplicates:
suffix = importing.REMOVE_DUPLICATE_COLUMN_IDENTIFIER
renameDuplicates = True
else:
suffix = None
renameDuplicates = args.rename_duplicates
finfo = fileinfo.FileInfo(args.infile,
indexes=args.index,
loaders=args.loader,
encodings=args.encoding,
renameDuplicates=renameDuplicates,
renameSuffix=suffix)
with util.timed('Table import', log):
vartable, proctable, cattable, unknowns, uncategorised = \
loadtables.loadTables(
finfo,
args.variable_file,
args.datacoding_file,
args.type_file,
args.processing_file,
args.category_file,
noBuiltins=args.no_builtins,
naValues=args.na_values,
childValues=args.child_values,
recoding=args.recoding,
clean=args.clean,
typeClean=args.type_clean,
globalClean=args.global_clean,
skipProcessing=args.skip_processing,
prependProcess=args.prepend_process,
appendProcess=args.append_process)
subjects, exprs = args.subject
if suffix is None: excludeColnames = []
else: excludeColnames = [suffix]
# Import data
with util.timed('Data import', log):
dtable, drop = importing.importData(
fileinfo=finfo,
vartable=vartable,
proctable=proctable,
cattable=cattable,
variables=args.variable,
colnames=args.column,
excludeColnames=excludeColnames,
categories=args.category,
subjects=subjects,
subjectExprs=exprs,
exclude=args.exclude,
excludeVariables=args.exclude_variable,
excludeCategories=args.exclude_category,
trustTypes=args.trust_types,
mergeAxis=args.merge_axis,
mergeStrategy=args.merge_strategy,
indexVisits=args.index_visits,
dropNaRows=args.drop_na_rows,
addAuxVars=args.add_aux_vars,
njobs=args.num_jobs,
mgr=mgr,
dryrun=args.dry_run,
failIfMissing=args.fail_if_missing)
# Filter unknown/uncategorised
# column lists to only contain
# those that were loaded
allcols = dtable.dataColumns
unknowns = [c for c in unknowns if c in allcols]
uncategorised = [c for c in uncategorised if c in allcols]
# if it appears that we're doing
# a full run on a large data set,
# emit warnings about unknown/
# uncategorised variables.
bigrun = any((args.variable_file is not None,
args.datacoding_file is not None,
args.processing_file is not None,
args.category_file is not None))
if bigrun:
for u in unknowns:
log.warning('Variable %s [file %s, column %s, assigned '
'variable ID %s] is unknown.',
u.name, u.datafile, u.index, u.vid)
for u in uncategorised:
log.warning('Variable %s [file %s, column %s, assigned '
'variable ID %s] is uncategorised.',
u.name, u.datafile, u.index, u.vid)
return dtable, unknowns, uncategorised, drop
[docs]
def doCleanAndProcess(dtable, args):
"""Data cleaning and processing stage.
:arg dtable: :class:`.DataTable` containing the data
:arg args: :class:`argparse.Namespace` object containing command line
arguments
:arg pool: :class:`multiprocessing.Pool` object for parallelisation (may
be ``None``)
"""
# Clean data (it times each step individually)
cleaning.cleanData(
dtable,
skipNAInsertion=args.skip_insertna,
skipCleanFuncs=args.skip_clean_funcs,
skipChildValues=args.skip_childvalues,
skipRecoding=args.skip_recoding)
# Process data
with util.timed('Data processing', log):
processing.processData(dtable)
[docs]
def splitDataTable(dtable, args):
"""Splits the .:class:`DataTable` into separate numeric/non-numeric tables.
Called by :func:`doExport`. If the ``--suppress_non_numerics`` and/or
``--write_non_numerics`` options are active, non-numeric columns need to
be separated from numeric columns, and possibly saved to a separate output
file.
:arg dtable: :class:`.DataTable` containing the data
:arg args: :class:`argparse.Namespace` object containing command line
arguments
:returns: A list of ``(DataTable, filename)`` tuples, containing the
:class:`.DataTable` instances and corresponding output
file names.
"""
# We're outputting one main output file
# with all columns - no need to split
if not (args.suppress_non_numerics or args.write_non_numerics):
return [(dtable, args.outfile)]
# we need to separate out numeric from
# non-numeric columns, and potentially
# create two separate datatables
dtables = []
ncols = []
nncols = []
# we run a single value from each
# column through formatting in
# order to determine whether each
# column is numeric or non-numeric
for col in dtable.dataColumns:
idx = dtable[:, col.name].first_valid_index()
# column is all nan
if idx is None:
continue
# format a subtable containing
# just the column and the first
# non-na value
series = exporting.formatColumn(
col,
dtable.subtable([col], [idx]),
dateFormat=args.date_format,
timeFormat=args.time_format,
formatters=args.var_format)[:, col.name]
# separate accordingly
if pdtypes.is_numeric_dtype(series): ncols .append(col)
else: nncols.append(col)
# if suppress, only numeric columns
# are saved to main output file
if args.suppress_non_numerics:
log.debug('Separating out %u / %u numeric columns for export',
len(ncols), len(dtable.dataColumns))
dtables.append((dtable.subtable(ncols), args.outfile))
else:
dtables.append((dtable, args.outfile))
# if write, non-numeric columns
# are saved to an auxillary file
if args.write_non_numerics:
log.debug('Separating out %u / %u non-numeric columns for export',
len(nncols), len(dtable.dataColumns))
dtables.append((dtable.subtable(nncols), args.non_numerics_file))
return dtables
[docs]
def doExport(dtable, args):
"""Data export stage.
:arg dtable: :class:`.DataTable` containing the data
:arg args: :class:`argparse.Namespace` object containing command line
arguments
"""
# if ids_only is specified, all we
# need to do is output the index
if args.ids_only:
with open(args.outfile, 'wt') as f:
for i in dtable.index:
f.write(f'{i}\n')
return
# Otherwise we are exporting the full
# data set, and things become more
# complicated...
# List of data tables and file names
# for export (we may split the dtable
# up into two - numeric/non-numeric)
dtables = splitDataTable(dtable, args)
# If not parallelising, we export the
# entire file in one go. Because what's
# the point in chunked export if we're
# not parallelising across chunks?
if args.num_jobs <= 1:
args.num_rows = len(dtable)
with util.timed('Data export', log):
for dtable, outfile in dtables:
# set the DataTable singleton for
# shared mem to child processes
# during export.
datatable.DataTable.setInstance(dtable)
exporting.exportData(
dtable,
outfile,
# General export options
fileFormat=args.format,
numRows=args.num_rows,
dropNaRows=args.drop_na_rows,
dateFormat=args.date_format,
timeFormat=args.time_format,
formatters=args.var_format,
# TSV options
escapeNewlines=args.escape_newlines,
sep=args.tsv_sep,
missingValues=args.tsv_missing_values,
# HDF5 options
key=args.hdf5_key,
style=args.hdf5_style)
[docs]
def doICD10Export(args):
"""If a ``--icd10_map_file`` has been specified, the ICD10 codes present
in the data (and their converted values) are saved out to the file.
"""
if args.icd10_map_file is None:
return
with util.timed('ICD10 mapping export', log):
try:
ihier = hierarchy.loadHierarchyFile(name='icd10')
icd10.saveCodes(args.icd10_map_file, ihier)
except Exception as e:
log.warning('Failed to export ICD10 mappings: {}'.format(e),
exc_info=True)
[docs]
def doDescriptionExport(dtable, args):
"""If a ``--description_file`` has been specified, a description for every
column is saved out to the file.
"""
if args.description_file is None:
return
with util.timed('Description export', log):
cols = dtable.dataColumns
try:
with open(args.description_file, 'wt') as f:
for col in cols:
desc = generateDescription(dtable, col)
f.write('{}\t{}\n'.format(col.name, desc))
except Exception as e:
log.warning('Failed to export descriptions: {}'.format(e),
exc_info=True)
[docs]
def generateDescription(dtable, col):
"""Called by :func:`doDescriptionExport`. Generates and returns a
suitable description for the given column.
:arg dtable: :class:`.Datatable` instance
:arg col: :class:`.Column` instance
"""
vartable = dtable.vartable
desc = vartable.loc[col.vid, 'Description']
if pd.isna(desc) or (desc == col.name):
desc = 'n/a'
# If metadata has been added to the column,
# we add it to the description. See the
# binariseCategorical processing function
# for an example of this.
if col.metadata is not None:
suffix = ' ({})'.format(col.metadata)
else:
suffix = ' ({}.{})'.format(col.visit, col.instance)
return '{}{}'.format(desc, suffix)
[docs]
def doUnknownsExport(dtable, args, unknowns, uncategorised):
"""If the ``--unknown_vars_file`` argument was used, the unknown/
unprocessed columns are saved out to a file.
:arg dtable: :class:`.DataTable` containing the data
:arg args: :class:`argparse.Namespace` object containing command
line arguments
:arg unknowns: List of :class:`.Column` objects representing the
unknown columns.
:arg uncategorised: A sequence of :class:`.Column` objects representing
columns which are uncategorised, and have no processing
or cleaning rules specified on them.
"""
if args.unknown_vars_file is None:
return
if len(unknowns) + len(uncategorised) == 0:
return
# Save unknown/uncategorised
# vars list to file columns:
# - name - column name
# - file - originating input file
# - class - unknown or uncategorised
# - exported - whether column passed processing and was exported
allcols = list(dtable.dataColumns)
allunknowns = list(unknowns + uncategorised)
names = [u.name for u in allunknowns]
files = [u.datafile for u in allunknowns]
classes = ['unknown' for u in unknowns] + \
['uncategorised' for u in uncategorised]
exported = [int(u in allcols) for u in allunknowns]
rows = ['{}\t{}\t{}\t{}'.format(n, f, c, e)
for n, f, c, e in zip(names, files, classes, exported)]
log.debug('Saving unknown/uncategorised variables to %s',
args.unknown_vars_file)
try:
with open(args.unknown_vars_file, 'wt') as f:
f.write('name\tfile\tclass\texported\n')
f.write('\n'.join(rows))
except Exception as e:
log.warning('Error saving unknown variables to {}: '
'{}'.format(args.unknown_vars_file, e),
exc_info=True)
[docs]
def doSummaryExport(dtable, args):
"""If a ``--summary_file`` has been specified, a summary of the cleaning
steps that have been applied to each variable are saved out to the file.
"""
if args.summary_file is None:
return
vartable = dtable.vartable
vids = sorted(dtable.variables)[1:]
sumdf = pd.DataFrame(columns=['NAValues', 'RawLevels', 'NewLevels',
'ParentValues', 'ChildValues',
'Clean', 'Flags'], index=vids)
sumdf.index.name = 'ID'
with util.timed('Summary export', log):
for vid in vids:
sumdf.at[vid, 'NAValues'] = vartable.at[vid, 'NAValues']
sumdf.at[vid, 'RawLevels'] = vartable.at[vid, 'RawLevels']
sumdf.at[vid, 'NewLevels'] = vartable.at[vid, 'NewLevels']
sumdf.at[vid, 'ParentValues'] = vartable.at[vid, 'ParentValues']
sumdf.at[vid, 'ChildValues'] = vartable.at[vid, 'ChildValues']
clean = vartable.at[vid, 'Clean']
if pd.notna(clean):
sumdf.at[vid, 'Clean'] = list(clean.values())
flagstr = []
cols = dtable.columns(vid)
colflags = {c : dtable.getFlags(c) for c in cols}
flags = set.union(*colflags.values())
for flag in flags:
if all([flag in colflags[c] for c in cols]):
flagstr.append(flag)
else:
names = [c.name for c in cols if flag in colflags[c]]
flagstr.append('{} [{}]'.format(flag, ', '.join(names)))
sumdf.at[vid, 'Flags'] = ';'.join(flagstr)
sumdf.to_csv(args.summary_file, sep='\t')
[docs]
def configLogging(args):
"""Configures ``funpack`` logging.
:arg args: ``argparse.Namespace`` object containing parsed command line
arguments.
"""
# Custom log handler which
# colours messages
class LogHandler(logging.StreamHandler):
def emit(self, record):
levelno = record.levelno
if levelno >= logging.WARNING: colour = '\x1b[31;1m'
elif levelno >= logging.INFO: colour = '\x1b[39;1m'
elif levelno >= logging.DEBUG: colour = '\x1b[90;1m'
else: colour = ''
# Reset terminal attributes
# after each message.
record.msg = '{}{}\x1b[0m'.format(colour, record.msg)
return super(LogHandler, self).emit(record)
logger = logging.getLogger('funpack')
fmt = logging.Formatter('%(asctime)s '
'%(levelname)8.8s '
'%(filename)20.20s '
'%(lineno)4d: '
'%(funcName)-15.15s - '
'%(message)s',
'%H:%M:%S')
if args.log_file is None: handler = LogHandler()
else: handler = logging.FileHandler(args.log_file)
handler.setFormatter(fmt)
logger.addHandler(handler)
# configure verbosity
if args.quiet: loglevel = logging.CRITICAL
elif args.noisy == 0: loglevel = logging.INFO
else: loglevel = logging.DEBUG
logging.getLogger('funpack').setLevel(loglevel)
if args.noisy < 3:
warnings.filterwarnings('ignore', module='pandas')
warnings.filterwarnings('ignore', module='numpy')
warnings.filterwarnings('ignore', module='tables')
# print deprecation warnings
# at any verbosity level
if args.noisy >= 1:
warnings.filterwarnings('default', category=DeprecationWarning)
if args.noisy == 1:
makequiet = ['funpack.expression',
'funpack.custom',
'funpack.cleaning_functions',
'funpack.processing_functions']
elif args.noisy == 2:
makequiet = ['funpack.expression',
'funpack.custom']
else:
makequiet = []
for mod in makequiet:
logging.getLogger(mod).setLevel(logging.INFO)
if __name__ == '__main__':
sys.exit(main())