pipe_tree package

Declarative pipeline definition based on filetrees.

Typical usage:

from pipe_tree import pipe, In, Out, Ref, Var

@pipe
def job(input_file: In, output_file: Out):
    # code to convert `input_file` to `output_file`

pipe.cli()  # runs command line interface

pipe_tree.pipeline module

Defines the pipeline before a FileTree is provided.

A pipeline is a collection of functions with mapping from the function parameters to input/output/reference/placeholder basenames.

class pipe_tree.pipeline.Pipeline(scripts=None, default_output=None, default_submit=None)[source]

Bases: object

Collection of python functions forming a pipeline.

You can either create a new pipeline (from pipe_tree import Pipeline; pipe = Pipeline()) or use a pre-existing one (from pipe_tree import pipe)

Scripts are added to a pipeline by using the pipeline as a decorator (see __call__()).

To run the pipeline based on instructions from the command line run pipe.cli(tree), where tree is a FileTree defining the directory structure of the pipeline input & output files.

Variables:

scripts – list of PipedFunction, which define the python functions forming the pipeline and their input/output templates

__init__(scripts=None, default_output=None, default_submit=None)[source]

Create a new empty pipeline.

__call__(function=None, *, kwargs=None, no_iter=None, placeholders=None, as_path=False, submit=None, batch=None)[source]

Add a python function as a PipedFunction to the pipeline.

This is the main route through which jobs are added to the pipeline.

from pipe_tree import pipe, In, Out, Ref, Var

@pipe(submit=dict(jobtime=40))
def func(in_path: In, out_path: Out, ref_path: Ref, placeholder_key: Var):
    pass
Parameters:
  • function

    Optionally provide the function directly. This can be useful when not using pipe as a decorator, e.g.:

    from pipe_tree import pipe, In, Out, Ref, Var
    from shutil import copyfile
    
    pipe(copyfile, kwargs={'src': In('template_in'), 'dst': Out('template_out')})
    

  • kwargs – maps function keyword arguments to template names (In/Out/Ref), placeholder names (Var), or anything else. Templates or placeholders are replaced by their values based on the file-tree. Anything else is passed on unaltered.

  • no_iter – optional set of parameters not to iterate over.

  • as_path – if set to true, provide template filenames to the function as pathlib.Path objects instead of strings.

  • placeholders – dictionary overriding the placeholders set in the filetree for this specific function. This can, for example, be used to run this function on a sub-set of subjects.

  • submit – dictionary with the flags to pass on to fsl_sub.submit, when submitting jobs to a cluster queue.

  • batch – label used to batch multiple jobs into one when submitting to the cluster

generate_jobs(tree: FileTree)[source]

Split the pipeline into individual jobs.

Produces an unsorted JobList, which can be sorted by running filter on it.

Parameters:

tree – set of templates for the input/output/reference files with all possible placeholder values

default_parser(tree: FileTree, parser: ArgumentParser | None = None, include_vars=None, exclude_vars=()) ArgumentParser[source]

Add default pipe-tree arguments to an argument parser (will create a new one if needed).

Parameters:
  • treefile_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).

  • parser – Optional argument parser that will be updated with the default pipe-tree arguments (by default a new one is created).

  • include_vars – if provided, only include expose placeholders in this list to the command line

  • exclude_vars – exclude placeholders in this list from the command line

Returns:

Argument parser with the default pipe-tree arguments. If one was provided as input, that one will be returned.

run_cli(args: Namespace, tree: FileTree)[source]

Run the pipeline based on arguments extracted from the default argument parser.

Parameters:
  • args

    Namespace consisting of arguments extracted from the command line (produced by argparse.ArgumentParser.parse_args). This is expected to contain:

    • templates (defaults to self.default_output): list of which templates the pipeline should produce

    • pipeline_method: string with method used to run the jobs

    • overwrite: boolean, which if true overwrite any requested files

    • overwrite_dependencies: boolean, which if true also overwrites dependencies of requested files

    • job-hold: string with comma-separated list, which contains job(s) that will be waited for

    • skip-missing: whether to skip jobs depending on missing data instead of raising an error

    • datalad (defaults to False): if true, run datalad get on all input data before running/submitting the jobs

    • {placeholder} (defaults to values in FileTree): sequence of strings overwriting the possible values for a particular placeholder

  • tree – Definition of pipeline input/output files

cli(tree: FileTree | None = None, include_vars=None, exclude_vars=(), cli_arguments=None)[source]

Run the pipeline from the command line.

Parameters:
  • treefile_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).

  • include_vars – if provided, only include expose variables in this list to the command line

  • exclude_vars – exclude variables in this list from the command line

move_to_subtree(sub_tree=None, other_mappings=None)[source]

Create a new pipeline that runs in a sub-tree of a larger tree rather than at the top level.

Parameters:
  • sub_tree – name of the sub-tree in the FileTree

  • other_mappings – other mappings between templates or placeholder names and their new values

classmethod merge(pipelines: Collection[Pipeline])[source]

Combine multiple pipelines into a single one.

Parameters:

pipelines – pipelines containing part of the jobs

Returns:

parent pipeline containing all of the jobs in pipelines

find(function: str | Callable)[source]

Iterate over any pipeline scripts that run the provided function.

Either the function itself or the name of the function can be given.

remove(function: str | Callable)[source]

Remove any pipeline scripts that run the provided function from the pipeline.

Either the function itself or the name of the function can be given.

configure(kwargs)[source]

Override the values passed on to the keyword arguments of all the scripts.

Any keywords not expected by a script will be silently skipped for that script.

add_to_graph(graph: Digraph = None, tree: FileTree = None)[source]

Add all the pipeline functions to the provided graph.

Parameters:
  • graph – GraphViz graph object (will be altered)

  • tree – concrete FileTree

class pipe_tree.pipeline.PipedFunction(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]

Bases: object

Represents a function stored in a pipeline.

__init__(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]

Wrap a function with additional information to run it in a pipeline.

Parameters:
  • function – python function that will be run in pipeline.

  • submit_params – parameters to submit job running python function to cluster using fsl_sub.

  • kwargs – maps function keyword arguments to templates, variables, or actual values.

  • templates – maps function keyword arguments to templates in the FileTree with additional information on whether the file is input/output/reference.

  • no_iter – which parameters to not iterate over (i.e., they are passed to the function in an array).

  • override – dictionary overriding the placeholders set in the filetree.

  • as_path – whether to pass on pathlib.Path objects instead of strings to the functions (default: True).

  • batch – label used to batch multiple jobs into one when submitting to the cluster.

copy()[source]

Create a copy of this PipedFunction for pipeline merging.

property no_iter: Set[str]

Sequence of placeholder names that should not be iterated over.

configure(kwargs, allow_new_keywords=True, check=True)[source]

Override the values passed on to the keyword arguments of the script.

Parameters:
  • kwargs – new placeholders/templates/values for keyword arguments

  • allow_new_keywords – if set to False, don’t allow new keywords

property placeholders

Return dictionary with placeholder values overriden for this function.

property templates

Return dictionary with templates used as input, output, or reference for this function.

property kwargs

Return dictionary with keyword arguments that will be passed on to the function.

filter_templates(output=False, all_templates=None) Set[str][source]

Find all input or output template keys.

Parameters:
  • output – if set to True select the input rather than output templates

  • all_templates – sequence of all possible templates (required if any Template keys use globbing)

Returns:

set of input or output templates

iter_over(tree: FileTree) Tuple[str, ...][source]

Find all the placeholders that should be iterated over before calling the function.

These are all the placeholders that affect the input templates, but are not part of self.no_iter.

Parameters:

tree – set of templates with placeholder values

Returns:

placeholder names to be iterated over sorted by name

get_jobs(tree: FileTree, all_targets: Dict)[source]

Get a list of all individual jobs defined by this function.

Parameters:
  • tree – set of templates with placeholder values

  • all_targets – mapping from filenames to Target objects used to match input/output filenames between jobs

Returns:

sequence of jobs

all_placeholders(tree: FileTree, output=False) Set[str][source]

Identify the multi-valued placeholders affecting the input/output templates of this function.

Parameters:
  • tree – set of templates with placeholder values

  • output – if set to True returns the placeholders for the output than input templates

Returns:

set of all placeholders that affect the input/output templates

move_to_subtree(sub_tree=None, other_mappings=None)[source]

Create a new wrapped function that runs in a sub-tree of a larger tree rather than at the top level.

Parameters:
  • sub_tree – name of the sub-tree in the FileTree

  • other_mappings – other mappings between templates or placeholder names and their new values

add_node(graph: Graph, index, tree: FileTree, placeholder_color: Dict[str, str])[source]

Add a node representing this function for a pipeline diagram.

Parameters:
  • graph – input pipeline diagram (will be altered)

  • index – unique integer identifier to use within the graph

class pipe_tree.pipeline.Template(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False)[source]

Bases: object

Represents a keyword argument that will be mapped to a template path in the FileTree.

Parameters:
  • key – template key in FileTree.

  • input – Set to true if file should be considered as input (i.e., it should exist before the job is run).

  • output – Set to true if file should be considered as output (i.e., it is expected to exist after the job is run).

  • optional – Set to true if input/output file should be considered for creating the dependency graph, but still might not exist.

key: str | None = None
input: bool = False
output: bool = False
optional: bool = False
__call__(key=None, optional=False)[source]

Override the template key and whether it is optional.

__init__(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False) None
class pipe_tree.pipeline.PlaceHolder(key: str | None = None, no_iter: bool = False)[source]

Bases: object

Represents a keyword argument that will be mapped to a placeholder value.

Parameters:
  • key – placeholder key in FileTree.

  • no_iter – if True the pipeline will not iterate over individual placeholder values, but rather run a single job with all possible placeholder values at once.

key: str | None = None
no_iter: bool = False
__call__(key=None, no_iter=None)[source]

Override the Placeholder.

__init__(key: str | None = None, no_iter: bool = False) None
pipe_tree.pipeline.In = Template(key=None, input=True, output=False, optional=False)

Use to mark keyword arguments that represent output filenames of the wrapped function.

These filenames are expected to exist after the function is run. An error is raised if they do not.

The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).

pipe_tree.pipeline.Out = Template(key=None, input=False, output=True, optional=False)

Use to mark keyword arguments that represent reference filenames of the wrapped function.

These filenames might or might not exist before or after the job is run.

The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).

pipe_tree.pipeline.Ref = Template(key=None, input=False, output=False, optional=False)

Use to mark keyword arguments that represent placeholder values in the pipeline.

Placeholder values are returned as :py:`PlaceholderValue` objects.

pipe_tree.pipeline.to_templates_dict(input_files=(), output_files=(), reference_files=())[source]

Convert a sequence of input/output/reference files into a template dictionary.

Parameters:
  • input_files (sequence, optional) – Template keys representing input files. Defaults to ().

  • output_files (sequence, optional) – Template keys representing output files. Defaults to ().

  • reference_files (sequence, optional) – Template keys representing reference paths. Defaults to ().

Raises:

KeyError – If the same template key is used as more than one of the input/output/reference options

Returns:

mapping of the keyword argument names to the Template objects

Return type:

dict

class pipe_tree.pipeline.PlaceholderValue(key, index, value)

Bases: tuple

index

Alias for field number 1

key

Alias for field number 0

value

Alias for field number 2

pipe_tree.pipeline.check_submit_parameters(submit_params: Dict)[source]

Check that the submit parameters are actually valid.

Raises a ValueError if there are any submit parameters set not expected by fsl_sub.

pipe_tree.job module

Defines pipeline interface after it has been split into individual jobs based on a FileTree.

exception pipe_tree.job.OutputMissing[source]

Bases: OSError

Raised if a job misses the output files.

exception pipe_tree.job.InputMissingRun[source]

Bases: OSError

Raised if a job misses the input files when it is being launched.

exception pipe_tree.job.InputMissingPipe(target, missing, dependency_graph=None)[source]

Bases: OSError

Raised if a job misses input files while it is being added to the pipeline.

__init__(target, missing, dependency_graph=None)[source]
class pipe_tree.job.RunMethod(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

How to run the individual jobs.

local = 1
submit = 2
dask = 3
default()[source]

Return default RunMethod, which is submit on a cluster and local otherwise.

class pipe_tree.job.JobList(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]

Bases: object

Pipeline with a concrete set of jobs.

Produced from a pipe_tree.pipeline.Pipeline based on the provided FileTree.

The default JobList produced by Pipeline.generate_jobs is unsorted and needs to be sorted by running filter before running.

__init__(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]

Create a new concrete set of jobs.

filter(templates: Collection[str], overwrite=False, overwrite_dependencies=False, skip_missing=False) JobList[source]

Filter out a subset of jobs.

This does a total of three things: - Only include jobs needed to create the templates or file patterns given in templates. - Exclude jobs for which output files already exist, unless requested by overwrite or overwrite_dependencies. - Sort the jobs, so that dependencies are run before the jobs that depend on them.

Parameters:
  • templates – target template keys; Any jobs required to produce those files will be kept in the returned pipeline

  • overwrite – if True overwrite the files matching the template even if they already exist, defaults to False

  • overwrite_dependencies – if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False

Returns:

Concrete pipeline with the jobs needed to produces templates (with optional overriding)

batch(use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True)[source]

Batches groups of jobs into a single job.

Two jobs will be batched if:

  1. There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below).

  2. One of the jobs depend on the other. This is disabled by default. Enable it by setting only_connected to True.

  3. The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below:

    1. They have the same batch label (set during pipeline creating using the @pipe(batch=<batch label>). This can be enabled by setting the use_label keyword to True. If enabled, jobs without a batch label will never be merged.

    2. The jobs have the same submit parameters (e.g., “architecture”, “coprocessor”), except for “jobtime” and “name”. “jobram” can also be ignored by setting the split_on_ram keyword to True.

    3. The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the use_placeholders to False. Alternatively, a subset of placeholder values could be considered by passing use_placeholders to a sequence of placeholders to consider (e.g., use_placeholders=[“subject”, “session”] will not merge jobs with different “subject” or “session” placeholder values, but will ignore any other placeholders).

report()[source]

Produce tree reports with the relevant input/output templates.

run_datalad()[source]

Make sure we can run the pipeline.

Calls datalad.get on all input files and datalad.unlock on all output files.

run(method: RunMethod = None, wait_for=())[source]

Run all the jobs that are required to produce the given templates.

Parameters:
  • method – defines how to run the job

  • wait_for – job IDs to wait for before running pipeline

Returns:

set of all the jobs that need to be run to produce the template files

class pipe_tree.job.JobParent[source]

Bases: object

Parent for SingleJob and BatchJob.

input_targets: Set[FileTarget]
output_targets: Set[FileTarget]
optional_targets: Set[FileTarget]
dependencies(only_missing=True) Set[SingleJob | None][source]

Return jobs on which this job depends.

By default it only returns those related with missing input files.

Parameters:

only_missing – set to False to also return dependencies that produce files that already exist on disk

missing_output(reset_cache=False)[source]

Create a list of filenames that do not exist on disk.

Optional outputs are not considered.

Parameters:

reset_cache – set to True to not rely on an existing cached existence check

missing_input(reset_cache=False, ignore_expected=False)[source]

Create a list of filenames that do not exist on disk.

Optional inputs are not considered.

Parameters:
  • reset_cache – set to True to not rely on an existing cached existence check

  • ignore_expected – set to True to ignore any missing files that have a job that will produce them in the pipeline

add_to_jobs(all_jobs, overwrite=False, overwrite_dependencies=False)[source]

Mark this job and all of its dependencies to run.

This job is marked to run, if any of the output does not yet exist or overwrite is True. The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True.

Parameters:
  • all_jobs – list of individual jobs. This job and all required jobs are added to this list.

  • overwrite – if True mark this job even if the output already exists, defaults to False

  • overwrite_dependencies – if True mark the dependencies of this job even if their output already exists, defaults to False

__call__(method: RunMethod, wait_for=())[source]

Run the job.

prepare_run()[source]

Prepare to run this job.

Steps: 1. Creates output directory

class pipe_tree.job.SingleJob(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]

Bases: JobParent

A single job within a larger pipeline.

__init__(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]

Create a single job that can be run locally or submitted.

Parameters:
  • function – python function

  • kwargs – keyword arguments

  • submit_params – instructions to submit job to cluster using fsl_sub

  • input_targets – set of all filenames expected to exist before this job runs

  • output_targets – set of all filenames expected to exist after this job runs

  • optionals – set of filenames that are used to generate the dependency graph and yet might not exist

  • set_parameters – dictionary with placeholder values used to distinguish this SingleJob with all those produces for the same function

  • batch – label used to batch multiple jobs into one when submitting to the cluster

job_name()[source]

Return a string representation of this job.

pipe_tree.job.get_target(filename: Path, all_targets, from_template=None) FileTarget[source]

Return a FileTarget matching the input filename.

If the FileTarget for filename is already in all_targets, it will be returned. Otherwise a new FileTarget will be added to all_targets and returned.

Parameters:
  • filename – path to the input/intermediate/output filename

  • all_targets – dictionary of all FileTargets

  • from_template – template key used to obtain the filename

pipe_tree.job.get_matching_targets(pattern: str, all_targets) List[FileTarget][source]

Return all FileTarget that match the given input pattern.

Parameters:
class pipe_tree.job.FileTarget(filename: Path)[source]

Bases: object

Input, intermediate, or output file.

See get_target() for instructions on creating a new FileTarget.

If a specific SingleJob produces a filename, this can be indicated by setting producer:

get_target(filename, all_targets).producer = job

This will raise a ValueError if the filename is already produced by another job.

If a specific SingleJob requires a filename, this can be indicated by adding it to required_by:

get_target(filename, all_targets).required_by.add(job)

Filename existence can be checked using exists(). This method uses caching. To reset the cache run reset_existence().

To check if the filename can be created by this pipeline (or already exists) run expected().

__init__(filename: Path)[source]

Create a new target based on the provided filename.

Do not call this method directly. Instead use get_target().

Parameters:

filename – filename

exists() bool[source]

Test whether file exists on disk.

This function is lazy; once it has been checked once it will keep returning the same result.

To reset use reset_existence().

reset_existence()[source]

Ensure existence is checked again when running exists().

expected()[source]

Return whether the file can be produced by the pipeline (or already exists).

Returns False if the file does not exist and there is no way to produce it. Otherwise, True is returned

property producer: SingleJob

Job that can produce this file.

pipe_tree.job.update_closure(*dictionaries, **kwargs)[source]

Add the provided dictionaries to the globals dictionary.

Inside the with block all the dictionary keys will be available in the local environment. After the with block ends the local environment will be cleaned up again.

Use like this:

def my_func()
    with add_to_globals({'a': 3}):
        print(a)  # prints 3
    print(a)  # raises a NameError (or prints whatever `a` is set to in the global environment)
pipe_tree.job.call_batched_jobs(funcs, kwargs)[source]
class pipe_tree.job.BatchJob(*sub_jobs)[source]

Bases: JobParent

Batched combination of multiple SingleJob instances that will be submitted together.

static function(funcs, kwargs)
__init__(*sub_jobs)[source]

Creates a new BatchJob from sub_jobs.

sub_jobs can be either SingleJob or BatchJob. In either case they will be run in the order in which they are supplied.

property batch
property kwargs
property submit_params
job_name()[source]

Return a string representation of this job.

pipe_tree.job.has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs)[source]
pipe_tree.job.batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) List[JobParent][source]

Iteratively combines two jobs

Two jobs will be batched if: 1. They are both in to_batch 2. One of the jobs depends on the other 3. There is no other job that needs to run between these two jobs. Iteration through the jobs will continue until no further batching is possible.

pipe_tree.job.batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) List[BatchJob][source]

Batch jobs into generational sets

All jobs that do not depend on any other job in to_batch will be added to the first generational batch. All jobs that only depend on the first generational batch will be added to the second. And so forth, until all jobs have been added to a generation.

Only dependencies through other_jobs will be considered.

pipe_tree.datalad module

Define interface of pipe-tree with datalad.

pipe_tree.datalad.get_tree(relative_dir='.', full_dir=None)[source]

Get FileTree defined as “data.tree” at the top-level of datalad dataset.

pipe_tree.datalad.get_dataset(directory='.')[source]

Get datalad dataset containing given directory (default: current directory).