Source code for funpack.merging

#!/usr/bin/env python
#
# merging.py - Merging data from multiple input files.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains functions which can merge data from multiple input
files.


.. autosummary::
   :nosignatures:

   mergeDataFrames

"""


import itertools as it
import              logging
import              collections

import numpy     as np
import pandas    as pd


log = logging.getLogger(__name__)


[docs] def mergeDataFrames(data, cols, axis, strategy, dryrun=False): """Merges one or more ``pandas.DataFrames`` according to the given ``axis`` and ``strategy``. :arg data: List of ``DataFrame`` objects to merge. :arg cols: List of lists - :class:`.Column` objects representing the columns in each data set in ``data``. :arg axis: Axis to merge on - either ``subjects`` or ``variables``. :arg strategy: Strategy to use for merging ``data``, either ``union`` (an outer join), ``intersection`` (inner join), or ``naive`` (naive concatenation along ``axis``). :arg dryrun: If ``True``, only ``cols`` is merged. :returns: A tuple containing: - a new ``DataFrame`` containing the merged ``data``, or ``None`` if ``dryrun is True``. - A list of :class:`.Column` objects representing the columns that were kept. The index column is at the beginning of the list. .. warning:: A dry run may produce different results with the naive merge strategy. """ if axis not in (0, '0', 'rows', 'subjects', 1, '1', 'cols', 'columns', 'variables'): raise ValueError('Invalid axis: {}'.format(axis)) if strategy not in ('naive', 'union', 'intersection', 'inner', 'outer'): raise ValueError('Invalid merge strategy: {}'.format(strategy)) axis = {0 : 0, '0' : 0, 'rows' : 0, 'subjects' : 0, 1 : 1, '1' : 1, 'cols' : 1, 'columns' : 1, 'variables' : 1}[axis] strategy = {'naive' : 'naive', 'inner' : 'inner', 'intersection' : 'inner', 'outer' : 'outer', 'union' : 'outer'}[strategy] if len(data) == 0: raise ValueError('No data!') naive = strategy == 'naive' if strategy == 'inner': join = 'inner' else: join = 'outer' # Separate out the index and non- # index columns for each data frame idxcols = [[] for _ in cols] datacols = [[] for _ in cols] for dfi, dfcols in enumerate(cols): for col in dfcols: if col.vid == 0: idxcols[ dfi].append(col) else: datacols[dfi].append(col) # Only one file - no merging required if len(data) == 1: return data[0], idxcols[0] + datacols[0] # Build a list of Column objects # which describe what the merged # data frame will look like. # This list will be assigned # back to the cols variable. # naive concatenation. if naive: # if performing a dry run, we can't look # at the data dimensions, so we take the # first file as the definitive one. if dryrun: if axis == 0: cols = cols[0] else: cols = cols[0] + list(it.chain(*datacols[1:])) # If concatenating rows, we assume that # the columns are aligned in each file. # We take the column names from the file # with the most columns. elif axis == 0: lens = [len(d.columns) for d in data] rows = list(it.chain(*[d.index for d in data])) cols = cols[lens.index(max(lens))] # If concatenating columns, we assume # that the subjects are aligned in each # file. We take the index columns from # the file with the largest number of # rows. # # We re-generate the list of expected # rows and columns in the data, dropping # the index column else: lens = [len(d.index) for d in data] maxi = lens.index(max(lens)) idxcol = idxcols[maxi] rows = data[maxi].index cols = idxcol + list(it.chain(*datacols)) # concatenate rows with inner join - only # retain the columns that are present in # every dataframe. Identify the Column # objectsd which represent these retained # columns. elif axis == 0 and strategy == 'inner': idxcol = idxcols[0] allcols = list(it.chain(*datacols)) colnames = [[c.name for c in dc] for dc in datacols] innercols = set.intersection(*[set(cn) for cn in colnames]) colnames = list(it.chain(*colnames)) colidxs = [colnames.index(ic) for ic in innercols] cols = idxcol + [allcols[i] for i in sorted(colidxs)] # concatenate rows with outer join - all # unique column names will be retained. elif axis == 0 and strategy == 'outer': idxcol = idxcols[0] uniqnames = [] uniqcols = [] for col in it.chain(*datacols): if col.name not in uniqnames: uniqnames.append(col.name) uniqcols .append(col) cols = idxcol + uniqcols # concatenate columns (horizontally) - # all columns are retained even if # there are duplicate column names else: idxcol = idxcols[0] cols = idxcol + list(it.chain(*datacols)) if dryrun: return None, cols log.debug('Merging %u data sets (axis: %s, strategy: %s)', len(data), axis, strategy) # We have to reset row/column labels, # otherwise pandas will try to do a # real join. if naive: for d in data: d.columns = range(len(d.columns)) d.reset_index(drop=True, inplace=True) merged = pd.concat(data, axis=axis, join=join, ignore_index=naive, sort=True, copy=False) if len(merged) == 0: log.warning('Merged dataframe is empty! Are ' 'index column positions correct?') # Re-label subjects and variables, # make sure that dataframe column # order matches cols list order. if naive: merged.index = rows merged.columns = [c.name for c in cols if c.vid != 0] # If the input data frames # have different index names, # the merged index will not # have a name. So make sure # it has a name. idxnames = [c.name for c in cols if c.vid == 0] if merged.index.nlevels == 1: merged.index.name = idxnames[0] else: merged.index.names = idxnames # Warn if we concatenated rows, and # there are duplicate subject IDs. if axis == 0: idxs, counts = np.unique(merged.index, return_counts=True) notuniq = counts > 1 if np.any(notuniq): log.warning('Duplicate subject IDs in data: %s', idxs[notuniq][:5]) # Or if we concatenated columns, and # there are duplicate column names # or VIDs else: # names counts = collections.Counter(merged.columns) if any([c > 1 for c in counts.values()]): log.warning('Duplicate column names in data: %s', [n for n, c in counts.items() if c > 1]) # vids allcols = [(c.vid, c.visit, c.instance) for c in cols if c.vid != 0] counts = collections.Counter(allcols) if any([c > 1 for c in counts.values()]): log.warning('Duplicate column VIDs in data: %s', [n for n, c in counts.items() if c > 1]) return merged, cols