fsl_pipe package¶
Declarative pipeline definition based on filetrees.
Typical usage:
from fsl_pipe import pipe, In, Out, Ref, Var
@pipe
def job(input_file: In, output_file: Out):
# code to convert `input_file` to `output_file`
pipe.cli() # runs command line interface
fsl_pipe.pipeline module¶
Defines the pipeline before a FileTree is provided.
A pipeline is a collection of functions with mapping from the function parameters to input/output/reference/placeholder basenames.
- class fsl_pipe.pipeline.Pipeline(scripts=None, default_output=None, default_submit=None)[source]¶
Bases:
object
Collection of python functions forming a pipeline.
You can either create a new pipeline (from fsl_pipe import Pipeline; pipe = Pipeline()) or use a pre-existing one (from fsl_pipe import pipe)
Scripts are added to a pipeline by using the pipeline as a decorator (see
__call__()
).To run the pipeline based on instructions from the command line run
pipe.cli(tree)
, where tree is a FileTree defining the directory structure of the pipeline input & output files.- Variables:
scripts – list of
PipedFunction
, which define the python functions forming the pipeline and their input/output templates
- __init__(scripts=None, default_output=None, default_submit=None)[source]¶
Create a new empty pipeline.
- __call__(function=None, *, kwargs=None, no_iter=None, placeholders=None, as_path=False, submit=None, batch=None)[source]¶
Add a python function as a
PipedFunction
to the pipeline.This is the main route through which jobs are added to the pipeline.
from fsl_pipe import pipe, In, Out, Ref, Var @pipe(submit=dict(jobtime=40)) def func(in_path: In, out_path: Out, ref_path: Ref, placeholder_key: Var): pass
- Parameters:
function –
Optionally provide the function directly. This can be useful when not using pipe as a decorator, e.g.:
from fsl_pipe import pipe, In, Out, Ref, Var from shutil import copyfile pipe(copyfile, kwargs={'src': In('template_in'), 'dst': Out('template_out')})
kwargs – maps function keyword arguments to template names (In/Out/Ref), placeholder names (Var), or anything else. Templates or placeholders are replaced by their values based on the file-tree. Anything else is passed on unaltered.
no_iter – optional set of parameters not to iterate over.
as_path – if set to true, provide template filenames to the function as pathlib.Path objects instead of strings.
placeholders – dictionary overriding the placeholders set in the filetree for this specific function. This can, for example, be used to run this function on a sub-set of subjects.
submit – dictionary with the flags to pass on to fsl_sub.submit, when submitting jobs to a cluster queue.
batch – label used to batch multiple jobs into one when submitting to the cluster
- generate_jobs(tree: FileTree)[source]¶
Split the pipeline into individual jobs.
Produces an unsorted JobList, which can be sorted by running filter on it.
- Parameters:
tree – set of templates for the input/output/reference files with all possible placeholder values
- default_parser(tree: FileTree, parser: ArgumentParser | None = None, include_vars=None, exclude_vars=()) ArgumentParser [source]¶
Add default fsl-pipe arguments to an argument parser (will create a new argument parser if needed).
- Parameters:
tree – file_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).
parser – Optional argument parser that will be updated with the default fsl-pipe arguments (by default a new one is created).
include_vars – if provided, only include expose placeholders in this list to the command line
exclude_vars – exclude placeholders in this list from the command line
- Returns:
Argument parser with the default fsl-pipe arguments. If one was provided as input, that one will be returned.
- run_cli(args: Namespace, tree: FileTree)[source]¶
Run the pipeline based on arguments extracted from the default argument parser.
- Parameters:
args –
Namespace consisting of arguments extracted from the command line (produced by argparse.ArgumentParser.parse_args). This is expected to contain:
templates (defaults to self.default_output): list of which templates the pipeline should produce
pipeline_method: string with method used to run the jobs
overwrite: boolean, which if true overwrite any requested files
overwrite_dependencies: boolean, which if true also overwrites dependencies of requested files
job-hold: string with comma-separated list, which contains job(s) that will be waited for
skip-missing: whether to skip jobs depending on missing data instead of raising an error
datalad (defaults to False): if true, run datalad get on all input data before running/submitting the jobs
{placeholder} (defaults to values in FileTree): sequence of strings overwriting the possible values for a particular placeholder
tree – Definition of pipeline input/output files
- cli(tree: FileTree | None = None, include_vars=None, exclude_vars=(), cli_arguments=None)[source]¶
Run the pipeline from the command line.
- Parameters:
tree – file_tree.FileTree object describing the directory structure for the input/output files (defaults to datalad tree).
include_vars – if provided, only include expose variables in this list to the command line
exclude_vars – exclude variables in this list from the command line
cli_arguments – list of command line arguments. If not set the arguments used in the python call (sys.argv) are used.
- gui(tree: FileTree, **kwargs)[source]¶
Run the fsl-pipe-gui interface to select pipeline output.
- Parameters:
tree – file_tree.FileTree object describing the directory structure for the input/output files.
overwrite_depencencies – set to True to overwrite dependencies
run_method – overrides the default method to run the jobs
- move_to_subtree(sub_tree=None, other_mappings=None)[source]¶
Create a new pipeline that runs in a sub-tree of a larger tree rather than at the top level.
- Parameters:
sub_tree – name of the sub-tree in the FileTree
other_mappings – other mappings between templates or placeholder names and their new values
- classmethod merge(pipelines: Collection[Pipeline])[source]¶
Combine multiple pipelines into a single one.
- Parameters:
pipelines – pipelines containing part of the jobs
- Returns:
parent pipeline containing all of the jobs in pipelines
- find(function: str | Callable)[source]¶
Iterate over any pipeline scripts that run the provided function.
Either the function itself or the name of the function can be given.
- remove(function: str | Callable)[source]¶
Remove any pipeline scripts that run the provided function from the pipeline.
Either the function itself or the name of the function can be given.
- class fsl_pipe.pipeline.PipedFunction(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]¶
Bases:
object
Represents a function stored in a pipeline.
- __init__(function, submit_params: Dict, kwargs=None, no_iter=None, override=None, as_path=True, batch=None)[source]¶
Wrap a function with additional information to run it in a pipeline.
- Parameters:
function – python function that will be run in pipeline.
submit_params – parameters to submit job running python function to cluster using fsl_sub.
kwargs – maps function keyword arguments to templates, variables, or actual values.
no_iter – which parameters to not iterate over (i.e., they are passed to the function in an array).
override – dictionary overriding the placeholders set in the filetree.
as_path – whether to pass on pathlib.Path objects instead of strings to the functions (default: True).
batch – label used to batch multiple jobs into one when submitting to the cluster.
- property no_iter: Set[str]¶
Sequence of placeholder names that should not be iterated over.
- configure(kwargs, allow_new_keywords=True, check=True)[source]¶
Override the values passed on to the keyword arguments of the script.
- Parameters:
kwargs – new placeholders/templates/values for keyword arguments
allow_new_keywords – if set to False, don’t allow new keywords
- property placeholders¶
Return dictionary with placeholder values overriden for this function.
- property templates¶
Return dictionary with templates used as input, output, or reference for this function.
- property kwargs¶
Return dictionary with keyword arguments that will be passed on to the function.
- filter_templates(output=False, all_templates=None) Set[str] [source]¶
Find all input or output template keys.
- Parameters:
output – if set to True select the input rather than output templates
all_templates – sequence of all possible templates (required if any Template keys use globbing)
- Returns:
set of input or output templates
- iter_over(tree: FileTree) Tuple[str, ...] [source]¶
Find all the placeholders that should be iterated over before calling the function.
These are all the placeholders that affect the input templates, but are not part of self.no_iter.
- Parameters:
tree – set of templates with placeholder values
- Returns:
placeholder names to be iterated over sorted by name
- get_jobs(tree: FileTree, all_targets: Dict)[source]¶
Get a list of all individual jobs defined by this function.
- Parameters:
tree – set of templates with placeholder values
all_targets – mapping from filenames to Target objects used to match input/output filenames between jobs
- Returns:
sequence of jobs
- all_placeholders(tree: FileTree, output=False) Set[str] [source]¶
Identify the multi-valued placeholders affecting the input/output templates of this function.
- Parameters:
tree – set of templates with placeholder values
output – if set to True returns the placeholders for the output than input templates
- Returns:
set of all placeholders that affect the input/output templates
- move_to_subtree(sub_tree=None, other_mappings=None)[source]¶
Create a new wrapped function that runs in a sub-tree of a larger tree rather than at the top level.
- Parameters:
sub_tree – name of the sub-tree in the FileTree
other_mappings – other mappings between templates or placeholder names and their new values
- class fsl_pipe.pipeline.Template(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False)[source]¶
Bases:
object
Represents a keyword argument that will be mapped to a template path in the FileTree.
- Parameters:
key – template key in FileTree.
input – Set to true if file should be considered as input (i.e., it should exist before the job is run).
output – Set to true if file should be considered as output (i.e., it is expected to exist after the job is run).
optional – Set to true if input/output file should be considered for creating the dependency graph, but still might not exist.
- key: str | None = None¶
- input: bool = False¶
- output: bool = False¶
- optional: bool = False¶
- __init__(key: str | None = None, input: bool = False, output: bool = False, optional: bool = False) None ¶
- class fsl_pipe.pipeline.PlaceHolder(key: str | None = None, no_iter: bool = False)[source]¶
Bases:
object
Represents a keyword argument that will be mapped to a placeholder value.
- Parameters:
key – placeholder key in FileTree.
no_iter – if True the pipeline will not iterate over individual placeholder values, but rather run a single job with all possible placeholder values at once.
- key: str | None = None¶
- no_iter: bool = False¶
- __init__(key: str | None = None, no_iter: bool = False) None ¶
- fsl_pipe.pipeline.In = Template(key=None, input=True, output=False, optional=False)¶
Use to mark keyword arguments that represent output filenames of the wrapped function.
These filenames are expected to exist after the function is run. An error is raised if they do not.
The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).
- fsl_pipe.pipeline.Out = Template(key=None, input=False, output=True, optional=False)¶
Use to mark keyword arguments that represent reference filenames of the wrapped function.
These filenames might or might not exist before or after the job is run.
The actual filename is extracted from the FileTree based on the template key. This template key will by default match the keyword argument name, but can be overriden by calling In (e.g., In(“other_key”)).
- fsl_pipe.pipeline.Ref = Template(key=None, input=False, output=False, optional=False)¶
Use to mark keyword arguments that represent placeholder values in the pipeline.
Placeholder values are returned as
PlaceholderValue
objects.
- fsl_pipe.pipeline.to_templates_dict(input_files=(), output_files=(), reference_files=())[source]¶
Convert a sequence of input/output/reference files into a template dictionary.
- Parameters:
input_files (sequence, optional) – Template keys representing input files. Defaults to ().
output_files (sequence, optional) – Template keys representing output files. Defaults to ().
reference_files (sequence, optional) – Template keys representing reference paths. Defaults to ().
- Raises:
KeyError – If the same template key is used as more than one of the input/output/reference options
- Returns:
mapping of the keyword argument names to the Template objects
- Return type:
dict
fsl_pipe.job module¶
Defines pipeline interface after it has been split into individual jobs based on a FileTree.
- exception fsl_pipe.job.OutputMissing[source]¶
Bases:
OSError
Raised if a job misses the output files.
- exception fsl_pipe.job.InputMissingRun[source]¶
Bases:
OSError
Raised if a job misses the input files when it is being launched.
- exception fsl_pipe.job.InputMissingPipe(target, missing, dependency_graph=None)[source]¶
Bases:
OSError
Raised if a job misses input files while it is being added to the pipeline.
- class fsl_pipe.job.RunMethod(value)[source]¶
Bases:
Enum
How to run the individual jobs.
- local = 1¶
- submit = 2¶
- dask = 3¶
- class fsl_pipe.job.JobList(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]¶
Bases:
object
Pipeline with a concrete set of jobs.
Produced from a
fsl_pipe.pipeline.Pipeline
based on the provided FileTree.The default JobList produced by Pipeline.generate_jobs is unsorted and needs to be sorted by running filter before running.
- __init__(tree: FileTree, jobs: Sequence[SingleJob], targets: Dict[str, FileTarget])[source]¶
Create a new concrete set of jobs.
- filter(templates: Collection[str] | None = None, overwrite=False, overwrite_dependencies=False, skip_missing=False) JobList [source]¶
Filter out a subset of jobs.
This does a total of three things: - Only include jobs needed to create the templates or file patterns given in templates. - Exclude jobs for which output files already exist, unless requested by overwrite or overwrite_dependencies. - Sort the jobs, so that dependencies are run before the jobs that depend on them.
- Parameters:
templates – target template keys (default: all templates); Any jobs required to produce those files will be kept in the returned pipeline.
overwrite – if True overwrite the files matching the template even if they already exist, defaults to False
overwrite_dependencies – if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False
skip_missing – if True remove any jobs missing input dependencies. If not set, an error is raised whenever inputs are missing.
- Returns:
Concrete pipeline with the jobs needed to produces templates (with optional overriding)
- batch(use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True, batch_unlabeled=False)[source]¶
Batches groups of jobs into a single job.
Two jobs will be batched if:
There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below).
One of the jobs depend on the other. This is disabled by default. Enable it by setting only_connected to True.
The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below:
They have the same batch label (set during pipeline creating using the @pipe(batch=<batch label>). This can be enabled by setting the use_label keyword to True. If enabled, jobs without a batch label will never be merged.
The jobs have the same submit parameters (e.g., “architecture”, “coprocessor”), except for “jobtime”, “name”, and “export_vars”. “jobram” can also be ignored by setting the split_on_ram keyword to True.
The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the use_placeholders to False. Alternatively, a subset of placeholder values could be considered by passing use_placeholders to a sequence of placeholders to consider (e.g., use_placeholders=[“subject”, “session”] will not merge jobs with different “subject” or “session” placeholder values, but will ignore any other placeholders).
A new JobList with the batched jobs will be returned.
- split_pipeline(use_label=False, split_on_ram=False)[source]¶
Split the pipeline into multiple stages that require different hardware.
This uses the same rules as
batch()
, except that placeholder values are always ignored.
- run_datalad()[source]¶
Make sure we can run the pipeline.
Calls datalad.get on all input files and datalad.unlock on all output files.
- run(method: RunMethod | None = None, wait_for=(), clean_script='on_success')[source]¶
Run all the jobs that are required to produce the given templates.
- Parameters:
method – defines how to run the job
wait_for – job IDs to wait for before running pipeline
clean_script – Sets whether the script produced in the log directory when submitting a job to the cluster should be kept after the job finishes. Only used if method is “submit”. Options: - “never”: Script is kept - “on_success”: (default) Only remove if script successfully finishes (i.e., no error is raised) - “always”: Always remove the script, even if the script raises an error
- class fsl_pipe.job.JobParent[source]¶
Bases:
object
Parent for SingleJob and BatchJob.
- input_targets: Set[FileTarget]¶
- output_targets: Set[FileTarget]¶
- optional_targets: Set[FileTarget]¶
- dependencies(only_missing=True) Set[SingleJob | None] [source]¶
Return jobs on which this job depends.
By default it only returns those related with missing input files.
- Parameters:
only_missing – set to False to also return dependencies that produce files that already exist on disk
- missing_output(reset_cache=False)[source]¶
Create a list of filenames that do not exist on disk.
Optional outputs are not considered.
- Parameters:
reset_cache – set to True to not rely on an existing cached existence check
- missing_input(reset_cache=False, ignore_expected=False)[source]¶
Create a list of filenames that do not exist on disk.
Optional inputs are not considered.
- Parameters:
reset_cache – set to True to not rely on an existing cached existence check
ignore_expected – set to True to ignore any missing files that have a job that will produce them in the pipeline
- add_to_jobs(all_jobs, overwrite=False, overwrite_dependencies=False, only_sort=False)[source]¶
Mark this job and all of its dependencies to run.
This job is marked to run, if any of the output does not yet exist or overwrite is True. The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True.
- Parameters:
all_jobs – list and set of individual jobs. This job and all required jobs are added to this list.
overwrite – if True mark this job even if the output already exists, defaults to False
overwrite_dependencies – if True mark the dependencies of this job even if their output already exists, defaults to False
- class fsl_pipe.job.SingleJob(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]¶
Bases:
JobParent
A single job within a larger pipeline.
- __init__(function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None)[source]¶
Create a single job that can be run locally or submitted.
- Parameters:
function – python function
kwargs – keyword arguments
submit_params – instructions to submit job to cluster using fsl_sub
input_targets – set of all filenames expected to exist before this job runs
output_targets – set of all filenames expected to exist after this job runs
optionals – set of filenames that are used to generate the dependency graph and yet might not exist
set_parameters – dictionary with placeholder values used to distinguish this SingleJob with all those produced from the same function
batch – label used to batch multiple jobs into one when submitting to the cluster
- copy(targets: Dict[str, FileTarget])[source]¶
Create a copy of the SingleJob to be included in a new
JobList
.targets contain the set of FileTargets recognised by this new JobList. This will be updated based on the input/output targets of this job.
- fsl_pipe.job.get_target(filename: Path, all_targets, from_template=None) FileTarget [source]¶
Return a
FileTarget
matching the input filename.If the FileTarget for filename is already in all_targets, it will be returned. Otherwise a new FileTarget will be added to all_targets and returned.
- Parameters:
filename – path to the input/intermediate/output filename
all_targets – dictionary of all FileTargets
from_template – template key used to obtain the filename
- fsl_pipe.job.get_matching_targets(pattern: str, all_targets) List[FileTarget] [source]¶
Return all
FileTarget
that match the given input pattern.- Parameters:
pattern – filename definition supporting Unix shell-style wildcards
all_targets – dictionary of all FileTargets
- class fsl_pipe.job.FileTarget(filename: Path)[source]¶
Bases:
object
Input, intermediate, or output file.
See
get_target()
for instructions on creating a new FileTarget.If a specific
SingleJob
produces a filename, this can be indicated by settingproducer
:get_target(filename, all_targets).producer = job
This will raise a ValueError if the filename is already produced by another job.
If a specific
SingleJob
requires a filename, this can be indicated by adding it torequired_by
:get_target(filename, all_targets).required_by.add(job)
Filename existence can be checked using
exists()
. This method uses caching. To reset the cache runreset_existence()
.To check if the filename can be created by this pipeline (or already exists) run
expected()
.- __init__(filename: Path)[source]¶
Create a new target based on the provided filename.
Do not call this method directly. Instead use
get_target()
.- Parameters:
filename – filename
- exists() bool [source]¶
Test whether file exists on disk.
This function is lazy; once it has been checked once it will keep returning the same result.
To reset use
reset_existence()
.
- fsl_pipe.job.update_closure(*dictionaries, **kwargs)[source]¶
Add the provided dictionaries to the globals dictionary.
Inside the with block all the dictionary keys will be available in the local environment. After the with block ends the local environment will be cleaned up again.
Use like this:
def my_func() with add_to_globals({'a': 3}): print(a) # prints 3 print(a) # raises a NameError (or prints whatever `a` is set to in the global environment)
- class fsl_pipe.job.BatchJob(*sub_jobs)[source]¶
Bases:
JobParent
Batched combination of multiple SingleJob instances that will be submitted together.
- static function(funcs, kwargs)¶
- __init__(*sub_jobs)[source]¶
Creates a new BatchJob from sub_jobs.
sub_jobs can be either SingleJob or BatchJob. In either case they will be run in the order in which they are supplied.
- copy(targets: Dict[str, FileTarget])[source]¶
Create a copy of the BatchJob to be included in a new
JobList
.targets contain the set of FileTargets recognised by this new JobList. This will be updated based on the input/output targets of each job within this batch.
- property batch¶
- property kwargs¶
- property submit_params¶
- fsl_pipe.job.has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs)[source]¶
- fsl_pipe.job.batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) List[JobParent] [source]¶
Iteratively combines two jobs
Two jobs will be batched if: 1. They are both in to_batch 2. One of the jobs depends on the other 3. There is no other job that needs to run between these two jobs. Iteration through the jobs will continue until no further batching is possible.
- fsl_pipe.job.batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) List[BatchJob] [source]¶
Batch jobs into generational sets
All jobs that do not depend on any other job in to_batch will be added to the first generational batch. All jobs that only depend on the first generational batch will be added to the second. And so forth, until all jobs have been added to a generation.
Only dependencies through other_jobs will be considered.
fsl_pipe.datalad module¶
Define interface of fsl-pipe with datalad.