fsl_pipe package

Declarative pipeline definition based on filetrees.

Typical usage:

from fsl_pipe import pipe, In, Out, Ref, Var

@pipe
def job(input_file: In, output_file: Out):
    # code to convert `input_file` to `output_file`

pipe.cli()  # runs command line interface

fsl_pipe.pipeline module

Defines the pipeline before a FileTree is provided.

A pipeline is a collection of functions with mapping from the function parameters to input/output/reference/placeholder basenames.

class fsl_pipe.pipeline.Pipeline(scripts=None, default_output=None, default_submit=None)[source]

Bases: object

Collection of python functions forming a pipeline.

You can either create a new pipeline (from fsl_pipe import Pipeline; pipe = Pipeline()) or use a pre-existing one (from fsl_pipe import pipe)

Scripts are added to a pipeline by using the pipeline as a decorator (see __call__()).

To run the pipeline based on instructions from the command line run pipe.cli(tree), where tree is a FileTree defining the directory structure of the pipeline input & output files.

Variables:

scripts – list of PipedFunction, which define the python functions forming the pipeline and their input/output templates

__init__(scripts=None, default_output=None, default_submit=None)[source]

Create a new empty pipeline.

__call__(function=None, *, kwargs=None, no_iter=None, placeholders=None, as_path=False, submit=None, batch=None)[source]

Add a python function as a PipedFunction to the pipeline.

This is the main route through which jobs are added to the pipeline.

from fsl_pipe import pipe, In, Out, Ref, Var

@pipe(submit=dict(jobtime=40))
def func(in_path: In, out_path: Out, ref_path: Ref, placeholder_key: Var):
    pass
Parameters:
  • function

    Optionally provide the function directly. This can be useful when not using pipe as a decorator, e.g.:

    from fsl_pipe import pipe, In, Out, Ref, Var
    from shutil import copyfile
    
    pipe(copyfile, kwargs={'src': In('template_in'), 'dst': Out('template_out')})
    

  • kwargs – maps function keyword arguments to template names (In/Out/Ref), placeholder names (Var), or anything else. Templates or placeholders are replaced by their values based on the file-tree. Anything else is passed on unaltered.

  • no_iter – optional set of parameters not to iterate over.

  • as_path – if set to true, provide template filenames to the function as pathlib.Path objects instead of strings.

  • placeholders – dictionary overriding the placeholders set in the filetree for this specific function. This can, for example, be used to run this function on a sub-set of subjects.

  • submit – dictionary with the flags to pass on to fsl_sub.submit, when submitting jobs to a cluster queue.

  • batch – label used to batch multiple jobs into one when submitting to the cluster

generate_jobs(tree: FileTree)[source]

Split the pipeline into individual jobs.

Produces an unsorted JobList, which can be sorted by running filter on it.

Parameters:

tree – set of templates for the input/output/reference files with all possible placeholder values

default_parser(tree: FileTree, parser: ArgumentParser | None = None, include_vars=None, exclude_vars=()) ArgumentParser[source]

Add default fsl-pipe arguments to an argument parser (will create a new argument parser if needed).

Parameters:
  • treefile_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).

  • parser – Optional argument parser that will be updated with the default fsl-pipe arguments (by default a new one is created).

  • include_vars – if provided, only include expose placeholders in this list to the command line

  • exclude_vars – exclude placeholders in this list from the command line

Returns:

Argument parser with the default fsl-pipe arguments. If one was provided as input, that one will be returned.

run_cli(args: Namespace, tree: FileTree)[source]

Run the pipeline based on arguments extracted from the default argument parser.

Parameters:
  • args

    Namespace consisting of arguments extracted from the command line (produced by argparse.ArgumentParser.parse_args). This is expected to contain:

    • templates (defaults to self.default_output): list of which templates the pipeline should produce

    • pipeline_method: string with method used to run the jobs

    • overwrite: boolean, which if true overwrite any requested files

    • overwrite_dependencies: boolean, which if true also overwrites dependencies of requested files

    • job-hold: string with comma-separated list, which contains job(s) that will be waited for

    • skip-missing: whether to skip jobs depending on missing data instead of raising an error

    • datalad (defaults to False): if true, run datalad get on all input data before running/submitting the jobs

    • {placeholder} (defaults to values in FileTree): sequence of strings overwriting the possible values for a particular placeholder

  • tree – Definition of pipeline input/output files

cli(tree: FileTree | None = None, include_vars=None, exclude_vars=(), cli_arguments=None)[source]

Run the pipeline from the command line.

Parameters:
  • treefile_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).

  • include_vars – if provided, only include expose variables in this list to the command line

  • exclude_vars – exclude variables in this list from the command line

  • cli_arguments – list of command line arguments. If not set the arguments used in the python call (sys.argv) are used.

gui(tree: FileTree, **kwargs)[source]

Run the fsl-pipe-gui interface to select pipeline output.

Parameters:
  • treefile_tree.FileTree object describing the directory structure for the input/output files.

  • overwrite_depencencies – set to True to overwrite dependencies

  • run_method – overrides the default method to run the jobs

move_to_subtree(sub_tree=None, other_mappings=None)[source]

Create a new pipeline that runs in a sub-tree of a larger tree rather than at the top level.

Parameters:
  • sub_tree – name of the sub-tree in the FileTree

  • other_mappings – other mappings between templates or placeholder names and their new values

classmethod merge(pipelines: Collection[Pipeline])[source]

Combine multiple pipelines into a single one.

Parameters:

pipelines – pipelines containing part of the jobs

Returns:

parent pipeline containing all of the jobs in pipelines

find(function: str | Callable)[source]

Iterate over any pipeline scripts that run the provided function.

Either the function itself or the name of the function can be given.

remove(function: str | Callable)[source]

Remove any pipeline scripts that run the provided function from the pipeline.

Either the function itself or the name of the function can be given.

configure(kwargs)[source]

Override the values passed on to the keyword arguments of all the scripts.

Any keywords not expected by a script will be silently skipped for that script.

add_to_graph(graph: Digraph | None = None, tree: FileTree | None = None)[source]

Add all the pipeline functions to the provided graph.

Parameters:
  • graph – GraphViz graph object (will be altered)

  • tree – concrete FileTree

class fsl_pipe.pipeline.PipedFunction(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]

Bases: object

Represents a function stored in a pipeline.

__init__(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]

Wrap a function with additional information to run it in a pipeline.

Parameters:
  • function – python function that will be run in pipeline.

  • submit_params – parameters to submit job running python function to cluster using fsl_sub.

  • kwargs – maps function keyword arguments to templates, variables, or actual values.

  • no_iter – which parameters to not iterate over (i.e., they are passed to the function in an array).

  • override – dictionary overriding the placeholders set in the filetree.

  • as_path – whether to pass on pathlib.Path objects instead of strings to the functions (default: True).

  • batch – label used to batch multiple jobs into one when submitting to the cluster.

copy()[source]

Create a copy of this PipedFunction for pipeline merging.

property no_iter: Set[str]

Sequence of placeholder names that should not be iterated over.

configure(kwargs, allow_new_keywords=True, check=True)[source]

Override the values passed on to the keyword arguments of the script.

Parameters:
  • kwargs – new placeholders/templates/values for keyword arguments

  • allow_new_keywords – if set to False, don’t allow new keywords

property placeholders

Return dictionary with placeholder values overriden for this function.

property templates

Return dictionary with templates used as input, output, or reference for this function.

property kwargs

Return dictionary with keyword arguments that will be passed on to the function.

filter_templates(output=False, all_templates=None) Set[str][source]

Find all input or output template keys.

Parameters:
  • output – if set to True select the input rather than output templates

  • all_templates – sequence of all possible templates (required if any Template keys use globbing)

Returns:

set of input or output templates

iter_over(tree: FileTree) Tuple[str, ...][source]

Find all the placeholders that should be iterated over before calling the function.

These are all the placeholders that affect the input templates, but are not part of self.no_iter.

Parameters:

tree – set of templates with placeholder values

Returns:

placeholder names to be iterated over sorted by name

get_jobs(tree: FileTree, all_targets: Dict)[source]

Get a list of all individual jobs defined by this function.

Parameters:
  • tree – set of templates with placeholder values

  • all_targets – mapping from filenames to Target objects used to match input/output filenames between jobs

Returns:

sequence of jobs

all_placeholders(tree: FileTree, output=False) Set[str][source]

Identify the multi-valued placeholders affecting the input/output templates of this function.

Parameters:
  • tree – set of templates with placeholder values

  • output – if set to True returns the placeholders for the output than input templates

Returns:

set of all placeholders that affect the input/output templates

move_to_subtree(sub_tree=None, other_mappings=None)[source]

Create a new wrapped function that runs in a sub-tree of a larger tree rather than at the top level.

Parameters:
  • sub_tree – name of the sub-tree in the FileTree

  • other_mappings – other mappings between templates or placeholder names and their new values

add_node(graph: Graph, index, tree: FileTree, placeholder_color: Dict[str, str])[source]

Add a node representing this function for a pipeline diagram.

Parameters:
  • graph – input pipeline diagram (will be altered)

  • index – unique integer identifier to use within the graph

class fsl_pipe.pipeline.Template(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False)[source]

Bases: object

Represents a keyword argument that will be mapped to a template path in the FileTree.

Parameters:
  • key – template key in FileTree.

  • input – Set to true if file should be considered as input (i.e., it should exist before the job is run).

  • output – Set to true if file should be considered as output (i.e., it is expected to exist after the job is run).

  • optional – Set to true if input/output file should be considered for creating the dependency graph, but still might not exist.

key: str | None = None
input: bool = False
output: bool = False
optional: bool = False
__call__(key=None, optional=False)[source]

Override the template key and whether it is optional.

__init__(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False) None
class fsl_pipe.pipeline.PlaceHolder(key: str | None = None, no_iter: bool = False)[source]

Bases: object

Represents a keyword argument that will be mapped to a placeholder value.

Parameters:
  • key – placeholder key in FileTree.

  • no_iter – if True the pipeline will not iterate over individual placeholder values, but rather run a single job with all possible placeholder values at once.

key: str | None = None
no_iter: bool = False
__call__(key=None, no_iter=None)[source]

Override the Placeholder.

__init__(key: str | None = None, no_iter: bool = False) None
fsl_pipe.pipeline.In = Template(key=None, input=True, output=False, optional=False)

Use to mark keyword arguments that represent output filenames of the wrapped function.

These filenames are expected to exist after the function is run. An error is raised if they do not.

The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).

fsl_pipe.pipeline.Out = Template(key=None, input=False, output=True, optional=False)

Use to mark keyword arguments that represent reference filenames of the wrapped function.

These filenames might or might not exist before or after the job is run.

The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).

fsl_pipe.pipeline.Ref = Template(key=None, input=False, output=False, optional=False)

Use to mark keyword arguments that represent placeholder values in the pipeline.

Placeholder values are returned as PlaceholderValue objects.

fsl_pipe.pipeline.to_templates_dict(input_files=(), output_files=(), reference_files=())[source]

Convert a sequence of input/output/reference files into a template dictionary.

Parameters:
  • input_files (sequence, optional) – Template keys representing input files. Defaults to ().

  • output_files (sequence, optional) – Template keys representing output files. Defaults to ().

  • reference_files (sequence, optional) – Template keys representing reference paths. Defaults to ().

Raises:

KeyError – If the same template key is used as more than one of the input/output/reference options

Returns:

mapping of the keyword argument names to the Template objects

Return type:

dict

class fsl_pipe.pipeline.PlaceholderValue(key, index, value)

Bases: tuple

index

Alias for field number 1

key

Alias for field number 0

value

Alias for field number 2

fsl_pipe.pipeline.check_submit_parameters(submit_params: Dict)[source]

Check that the submit parameters are actually valid.

Raises a ValueError if there are any submit parameters set not expected by fsl_sub.

fsl_pipe.job module

Defines pipeline interface after it has been split into individual jobs based on a FileTree.

exception fsl_pipe.job.OutputMissing[source]

Bases: OSError

Raised if a job misses the output files.

exception fsl_pipe.job.InputMissingRun[source]

Bases: OSError

Raised if a job misses the input files when it is being launched.

exception fsl_pipe.job.InputMissingPipe(target, missing, dependency_graph=None)[source]

Bases: OSError

Raised if a job misses input files while it is being added to the pipeline.

__init__(target, missing, dependency_graph=None)[source]
class fsl_pipe.job.RunMethod(value)[source]

Bases: Enum

How to run the individual jobs.

local = 1
submit = 2
dask = 3
default()[source]

Return default RunMethod, which is submit on a cluster and local otherwise.

class fsl_pipe.job.JobList(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]

Bases: object

Pipeline with a concrete set of jobs.

Produced from a fsl_pipe.pipeline.Pipeline based on the provided FileTree.

The default JobList produced by Pipeline.generate_jobs is unsorted and needs to be sorted by running filter before running.

__init__(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]

Create a new concrete set of jobs.

filter(templates: Collection[str] | None = None, overwrite=False, overwrite_dependencies=False, skip_missing=False) JobList[source]

Filter out a subset of jobs.

This does a total of three things: - Only include jobs needed to create the templates or file patterns given in templates. - Exclude jobs for which output files already exist, unless requested by overwrite or overwrite_dependencies. - Sort the jobs, so that dependencies are run before the jobs that depend on them.

Parameters:
  • templates – target template keys (default: all templates); Any jobs required to produce those files will be kept in the returned pipeline.

  • overwrite – if True overwrite the files matching the template even if they already exist, defaults to False

  • overwrite_dependencies – if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False

  • skip_missing – if True remove any jobs missing input dependencies. If not set, an error is raised whenever inputs are missing.

Returns:

Concrete pipeline with the jobs needed to produces templates (with optional overriding)

batch(use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True, batch_unlabeled=False)[source]

Batches groups of jobs into a single job.

Two jobs will be batched if:

  1. There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below).

  2. One of the jobs depend on the other. This is disabled by default. Enable it by setting only_connected to True.

  3. The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below:

    1. They have the same batch label (set during pipeline creating using the @pipe(batch=<batch label>). This can be enabled by setting the use_label keyword to True. If enabled, jobs without a batch label will never be merged.

    2. The jobs have the same submit parameters (e.g., “architecture”, “coprocessor”), except for “jobtime”, “name”, and “export_vars”. “jobram” can also be ignored by setting the split_on_ram keyword to True.

    3. The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the use_placeholders to False. Alternatively, a subset of placeholder values could be considered by passing use_placeholders to a sequence of placeholders to consider (e.g., use_placeholders=[“subject”, “session”] will not merge jobs with different “subject” or “session” placeholder values, but will ignore any other placeholders).

A new JobList with the batched jobs will be returned.

split_pipeline(use_label=False, split_on_ram=False)[source]

Split the pipeline into multiple stages that require different hardware.

This uses the same rules as batch(), except that placeholder values are always ignored.

copy()[source]

Create a new, independent copy of the JobList.

report(console=None)[source]

Produce tree reports with the relevant input/output templates.

run_datalad()[source]

Make sure we can run the pipeline.

Calls datalad.get on all input files and datalad.unlock on all output files.

run(method: RunMethod | None = None, wait_for=(), clean_script='on_success')[source]

Run all the jobs that are required to produce the given templates.

Parameters:
  • method – defines how to run the job

  • wait_for – job IDs to wait for before running pipeline

  • clean_script – Sets whether the script produced in the log directory when submitting a job to the cluster should be kept after the job finishes. Only used if method is “submit”. Options: - “never”: Script is kept - “on_success”: (default) Only remove if script successfully finishes (i.e., no error is raised) - “always”: Always remove the script, even if the script raises an error

scale_jobtime(scaling)[source]

Scale the submit job times of all jobs by scaling.

This will only affect jobs submitted to the cluster.

class fsl_pipe.job.JobParent[source]

Bases: object

Parent for SingleJob and BatchJob.

input_targets: Set[FileTarget]
output_targets: Set[FileTarget]
optional_targets: Set[FileTarget]
dependencies(only_missing=True) Set[SingleJob | None][source]

Return jobs on which this job depends.

By default it only returns those related with missing input files.

Parameters:

only_missing – set to False to also return dependencies that produce files that already exist on disk

missing_output(reset_cache=False)[source]

Create a list of filenames that do not exist on disk.

Optional outputs are not considered.

Parameters:

reset_cache – set to True to not rely on an existing cached existence check

missing_input(reset_cache=False, ignore_expected=False)[source]

Create a list of filenames that do not exist on disk.

Optional inputs are not considered.

Parameters:
  • reset_cache – set to True to not rely on an existing cached existence check

  • ignore_expected – set to True to ignore any missing files that have a job that will produce them in the pipeline

add_to_jobs(all_jobs, overwrite=False, overwrite_dependencies=False, only_sort=False)[source]

Mark this job and all of its dependencies to run.

This job is marked to run, if any of the output does not yet exist or overwrite is True. The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True.

Parameters:
  • all_jobs – list and set of individual jobs. This job and all required jobs are added to this list.

  • overwrite – if True mark this job even if the output already exists, defaults to False

  • overwrite_dependencies – if True mark the dependencies of this job even if their output already exists, defaults to False

__call__(method: RunMethod, wait_for=(), clean_script='on_success')[source]

Run the job using the specified method.

prepare_run()[source]

Prepare to run this job.

Steps: 1. Creates output directory

expected()[source]

Return true if this job is expected to be able to run.

class fsl_pipe.job.SingleJob(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]

Bases: JobParent

A single job within a larger pipeline.

__init__(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]

Create a single job that can be run locally or submitted.

Parameters:
  • function – python function

  • kwargs – keyword arguments

  • submit_params – instructions to submit job to cluster using fsl_sub

  • input_targets – set of all filenames expected to exist before this job runs

  • output_targets – set of all filenames expected to exist after this job runs

  • optionals – set of filenames that are used to generate the dependency graph and yet might not exist

  • set_parameters – dictionary with placeholder values used to distinguish this SingleJob with all those produced from the same function

  • batch – label used to batch multiple jobs into one when submitting to the cluster

copy(targets: Dict[str, FileTarget])[source]

Create a copy of the SingleJob to be included in a new JobList.

targets contain the set of FileTargets recognised by this new JobList. This will be updated based on the input/output targets of this job.

job_name()[source]

Return a string representation of this job.

to_job_list(tree: FileTree)[source]

Convert single job into its own JobList.

scale_jobtime(scaling)[source]

Scale the submit job time in place by scaling.

This will only affect jobs submitted to the cluster.

fsl_pipe.job.get_target(filename: Path, all_targets, from_template=None) FileTarget[source]

Return a FileTarget matching the input filename.

If the FileTarget for filename is already in all_targets, it will be returned. Otherwise a new FileTarget will be added to all_targets and returned.

Parameters:
  • filename – path to the input/intermediate/output filename

  • all_targets – dictionary of all FileTargets

  • from_template – template key used to obtain the filename

fsl_pipe.job.get_matching_targets(pattern: str, all_targets) List[FileTarget][source]

Return all FileTarget that match the given input pattern.

Parameters:
  • pattern – filename definition supporting Unix shell-style wildcards

  • all_targets – dictionary of all FileTargets

class fsl_pipe.job.FileTarget(filename: Path)[source]

Bases: object

Input, intermediate, or output file.

See get_target() for instructions on creating a new FileTarget.

If a specific SingleJob produces a filename, this can be indicated by setting producer:

get_target(filename, all_targets).producer = job

This will raise a ValueError if the filename is already produced by another job.

If a specific SingleJob requires a filename, this can be indicated by adding it to required_by:

get_target(filename, all_targets).required_by.add(job)

Filename existence can be checked using exists(). This method uses caching. To reset the cache run reset_existence().

To check if the filename can be created by this pipeline (or already exists) run expected().

__init__(filename: Path)[source]

Create a new target based on the provided filename.

Do not call this method directly. Instead use get_target().

Parameters:

filename – filename

exists() bool[source]

Test whether file exists on disk.

This function is lazy; once it has been checked once it will keep returning the same result.

To reset use reset_existence().

reset_existence()[source]

Ensure existence is checked again when running exists().

expected()[source]

Return whether the file can be produced by the pipeline (or already exists).

Returns False if the file does not exist and there is no way to produce it. Otherwise, True is returned

property producer: SingleJob

Job that can produce this file.

fsl_pipe.job.update_closure(*dictionaries, **kwargs)[source]

Add the provided dictionaries to the globals dictionary.

Inside the with block all the dictionary keys will be available in the local environment. After the with block ends the local environment will be cleaned up again.

Use like this:

def my_func()
    with add_to_globals({'a': 3}):
        print(a)  # prints 3
    print(a)  # raises a NameError (or prints whatever `a` is set to in the global environment)
fsl_pipe.job.call_batched_jobs(funcs, kwargs)[source]
class fsl_pipe.job.BatchJob(*sub_jobs)[source]

Bases: JobParent

Batched combination of multiple SingleJob instances that will be submitted together.

static function(funcs, kwargs)
__init__(*sub_jobs)[source]

Creates a new BatchJob from sub_jobs.

sub_jobs can be either SingleJob or BatchJob. In either case they will be run in the order in which they are supplied.

copy(targets: Dict[str, FileTarget])[source]

Create a copy of the BatchJob to be included in a new JobList.

targets contain the set of FileTargets recognised by this new JobList. This will be updated based on the input/output targets of each job within this batch.

property batch
property kwargs
property submit_params
job_name()[source]

Return a string representation of this job.

scale_jobtime(scaling)[source]

Scale the submit job time in place by scaling.

This will only affect jobs submitted to the cluster.

to_job_list(tree: FileTree)[source]

Convert batched jobs back into a JobList.

fsl_pipe.job.has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs)[source]
fsl_pipe.job.batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) List[JobParent][source]

Iteratively combines two jobs

Two jobs will be batched if: 1. They are both in to_batch 2. One of the jobs depends on the other 3. There is no other job that needs to run between these two jobs. Iteration through the jobs will continue until no further batching is possible.

fsl_pipe.job.batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) List[BatchJob][source]

Batch jobs into generational sets

All jobs that do not depend on any other job in to_batch will be added to the first generational batch. All jobs that only depend on the first generational batch will be added to the second. And so forth, until all jobs have been added to a generation.

Only dependencies through other_jobs will be considered.

fsl_pipe.datalad module

Define interface of fsl-pipe with datalad.

fsl_pipe.datalad.get_tree(relative_dir='.', full_dir=None)[source]

Get FileTree defined as “data.tree” at the top-level of datalad dataset.

fsl_pipe.datalad.get_dataset(directory='.')[source]

Get datalad dataset containing given directory (default: current directory).