"""Defines pipeline interface after it has been split into individual jobs based on a FileTree."""
from functools import lru_cache
from file_tree.file_tree import FileTree
from typing import Optional, Set, Dict, Collection, Any, Callable, Sequence, List, Tuple
from enum import Enum
from fsl.utils.fslsub import func_to_cmd
from loguru import logger
from pathlib import Path
from fsl_sub import submit, config
import re
import inspect
from contextlib import contextmanager
from wcmatch.glob import globmatch, GLOBSTAR
from warnings import warn
import numpy as np
[docs]
class OutputMissing(IOError):
"""Raised if a job misses the output files."""
pass
[docs]
class RunMethod(Enum):
"""How to run the individual jobs."""
local = 1
submit = 2
dask = 3
[docs]
def default():
"""Return default RunMethod, which is `submit` on a cluster and `local` otherwise."""
return RunMethod.submit if config.has_queues() else RunMethod.local
[docs]
class JobList:
"""Pipeline with a concrete set of jobs.
Produced from a :class:`fsl_pipe.pipeline.Pipeline` based on the provided `FileTree`.
The default `JobList` produced by `Pipeline.generate_jobs` is unsorted and
needs to be sorted by running `filter` before running.
"""
[docs]
def __init__(self, tree: FileTree, jobs: Sequence["SingleJob"], targets: Dict[str, "FileTarget"]):
"""Create a new concrete set of jobs."""
self.tree = tree
self.jobs = jobs
self.targets = targets
self._sort()
def __len__(self, ):
return len(self.jobs)
[docs]
def filter(self, templates: Collection[str]=None, overwrite=False, overwrite_dependencies=False, skip_missing=False) -> "JobList":
"""Filter out a subset of jobs.
This does a total of three things:
- Only include jobs needed to create the templates or file patterns given in `templates`.
- Exclude jobs for which output files already exist, unless requested by `overwrite` or `overwrite_dependencies`.
- Sort the jobs, so that dependencies are run before the jobs that depend on them.
:param templates: target template keys (default: all templates); Any jobs required to produce those files will be kept in the returned pipeline.
:param overwrite: if True overwrite the files matching the template even if they already exist, defaults to False
:param overwrite_dependencies: if True rerun any dependent jobs even if the output of those jobs already exists, defaults to False
:param skip_missing: if True remove any jobs missing input dependencies. If not set, an error is raised whenever inputs are missing.
:return: Concrete pipeline with the jobs needed to produces `templates` (with optional overriding)
"""
if isinstance(templates, str):
templates = [templates]
def add_target(target: "FileTarget", warn_msg=""):
if target.exists() and (not overwrite or target.producer is None):
return
if target.producer is None:
warn(f"No job found that produces {target.filename} {warn_msg}")
return
try:
target.producer.add_to_jobs(all_jobs, overwrite=overwrite, overwrite_dependencies=overwrite_dependencies)
except InputMissingPipe as e:
if skip_missing:
return
e.dependency_graph.insert(0, e.target)
e.target = target.filename
raise
all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set())
if templates is None:
for target in self.targets.values():
if target.producer is None:
continue
add_target(target)
else:
for template in templates:
if template in self.tree.template_keys():
for filename in self.tree.get_mult(template).data.flatten():
target = get_target(filename, self.targets, from_template=template)
add_target(target, f"(part of template {template})")
else:
matches = get_matching_targets(template, self.targets)
if len(matches) == 0:
warn(f"No files were found that match {template}. It is not a template in the FileTree nor does it match any files that can be produced by this pipeline.")
for target in matches:
add_target(target, f"(matches {template})")
return JobList(self.tree, all_jobs[0], self.targets)
def _sort(self, ):
"""Sorts the job in the `jobs` list in place based on dependencies.
A job dependencies should always come before the job itself.
This sorting is done purely based on the job inputs/outputs and does not depend on which files actually exist on disk.
"""
all_jobs: Tuple[List[SingleJob], Set[SingleJob]] = ([], set())
for job in self.jobs:
job.add_to_jobs(all_jobs, only_sort=True)
# Remove jobs that were already filtered out
sorted_jobs = [j for j in all_jobs[0] if j in self.jobs]
assert len(self.jobs) == len(sorted_jobs)
self.jobs = sorted_jobs
[docs]
def batch(self, use_label=False, only_connected=False, split_on_ram=False, use_placeholders=True, batch_unlabeled=False):
"""Batches groups of jobs into a single job.
Two jobs will be batched if:
1. There are no intermediate jobs that need to run between the two and cannot be included in the same batch (because of #3 below).
2. One of the jobs depend on the other. This is disabled by default. Enable it by setting `only_connected` to True.
3. The jobs are similar. Jobs are similar if they match all of the (enabled) properties listed below:
a) They have the same batch label (set during pipeline creating using the `@pipe(batch=<batch label>)`. This can be enabled by setting the `use_label` keyword to True. If enabled, jobs without a batch label will never be merged.
b) The jobs have the same submit parameters (e.g., "architecture", "coprocessor"), except for "jobtime", "name", and "export_vars". "jobram" can also be ignored by setting the `split_on_ram` keyword to True.
c) The jobs have the same placeholder values. This will prevent jobs from different subjects to be merged with each other. It can be disabled by setting the `use_placeholders` to False. Alternatively, a subset of placeholder values could be considered by passing `use_placeholders` to a sequence of placeholders to consider (e.g., `use_placeholders=["subject", "session"]` will not merge jobs with different "subject" or "session" placeholder values, but will ignore any other placeholders).
A new `JobList` with the batched jobs will be returned.
"""
reference = self.copy()
all_flags = {key for job in reference.jobs for key in job.submit_params.keys()}.difference(["jobtime", "name", "export_vars"])
if not split_on_ram and "jobram" in all_flags:
all_flags.remove("jobram")
all_flags = sorted(all_flags)
job_labels = []
for job in reference.jobs:
label = []
label.append(job.batch if use_label else 1)
label.extend(job.submit_params.get(key, None) for key in all_flags)
if use_placeholders:
if use_placeholders is True:
parameters = job.set_parameters
else:
parameters = {key: value for key, value in job.set_parameters.items() if key in use_placeholders}
label.append(frozenset(parameters.items()))
job_labels.append(tuple(label))
hashable_job_labels = [tuple(label) if isinstance(label, list) else label for label in job_labels]
to_batch = {
tb: set(job for (job, label) in zip(reference.jobs, job_labels) if label == tb)
for tb in set(hashable_job_labels)
}
jobs = set(reference.jobs)
for to_batch, batching in to_batch.items():
if to_batch[0] is None and not batch_unlabeled:
# do not batch jobs without a label set (if `use_label` is True)
continue
other_jobs = jobs.difference(batching)
new_jobs = batch_connected_jobs(batching, other_jobs)
if not only_connected:
new_jobs = batch_unconnected_jobs(new_jobs, other_jobs)
jobs = set(new_jobs).union(other_jobs)
return JobList(reference.tree, jobs, reference.targets)
[docs]
def split_pipeline(self, use_label=False, split_on_ram=False):
"""Split the pipeline into multiple stages that require different hardware.
This uses the same rules as :meth:`batch`, except that placeholder values are always ignored.
"""
batched_jobs = self.batch(
use_label=use_label, split_on_ram=split_on_ram,
only_connected=False, use_placeholders=False, batch_unlabeled=True
)
return [stage.to_job_list(batched_jobs.tree) for stage in batched_jobs.jobs]
[docs]
def copy(self, ):
"""Create a new, independent copy of the JobList."""
targets = dict()
new_jobs = [job.copy(targets) for job in self.jobs]
return JobList(self.tree, new_jobs, targets)
[docs]
def report(self, console=None):
"""Produce tree reports with the relevant input/output templates."""
from rich import color, tree
if console is None:
from rich.console import Console
console = Console()
if len(self.jobs) == 0:
console.print("No jobs will be run.")
return
all_inputs = set.union(*[set(j.input_targets) for j in self.jobs])
all_outputs = set.union(*[set(j.output_targets) for j in self.jobs])
templates = {
template
for fns in (all_inputs, all_outputs)
for target in fns
for template in target.from_templates
}
def proc_line(tree_obj):
for t in tree_obj.children:
proc_line(t)
line = tree_obj.label
start = line.index('[cyan]') + 6
end = line.index('[/cyan]')
key = line[start:end]
if key not in templates:
return
input_count = 0
output_count = 0
overwrite_count = 0
exists_count = 0
for fn in self.tree.get_mult(key).data.flatten():
target = get_target(fn, self.targets)
is_output = False
is_input = False
if target in all_outputs:
is_output = True
if target in all_inputs:
is_input = True
if target.exists() and is_output:
overwrite_count += 1
elif is_output:
output_count += 1
elif target.exists():
exists_count += 1
if is_input:
input_count += 1
if (input_count + output_count + overwrite_count) == 0:
return
counter = "/".join([str(number) if color is None else f"[{color}]{number}[/{color}]"
for number, color in [
(overwrite_count, 'red'),
(output_count, 'yellow'),
(input_count, 'blue'),
]])
tree_obj.label = f"{line} [{counter}]"
for rich_obj in self.tree.filter_templates(templates).fill()._generate_rich_report():
if isinstance(rich_obj, tree.Tree):
proc_line(rich_obj)
console.print(rich_obj)
[docs]
def run_datalad(self, ):
"""Make sure we can run the pipeline.
Calls `datalad.get` on all input files and `datalad.unlock` on all output files.
"""
input_targets = set()
output_targets = set()
for job_group in self.jobs.values():
for j in job_group:
input_targets.update(j.exists_before)
output_targets.update(j.exists_after)
input_targets.difference_update(output_targets)
input_fns = [t.filename for t in input_targets]
output_fns = [t.filename for t in output_targets if t.exists]
from .datalad import get_dataset
ds = get_dataset()
ds.get(input_fns)
ds.unlock(output_fns)
[docs]
def run(self, method: RunMethod=None, wait_for=(), clean_script="on_success"):
"""Run all the jobs that are required to produce the given templates.
:param method: defines how to run the job
:param wait_for: job IDs to wait for before running pipeline
:param clean_script: Sets whether the script produced in the log directory when submitting a job to the cluster should be kept after the job finishes. Only used if `method` is "submit". Options:
- "never": Script is kept
- "on_success": (default) Only remove if script successfully finishes (i.e., no error is raised)
- "always": Always remove the script, even if the script raises an error
"""
if method is None:
method = RunMethod.default()
elif not isinstance(method, RunMethod):
method = RunMethod[method]
if len(self.jobs) == 0:
logger.info("No new jobs being run/submitted")
return
prev_count = 0
run_jobs: Dict[SingleJob, Any] = {}
while len(run_jobs) < len(self.jobs):
for job in self.jobs:
if job in run_jobs or any(j in self.jobs and j not in run_jobs for _, j in job.dependencies(only_missing=False)):
continue
dependencies = [run_jobs[j] for _, j in job.dependencies(only_missing=False) if j in run_jobs]
if len(dependencies) == 0:
dependencies = wait_for
run_jobs[job] = job(
method=method,
wait_for=dependencies,
clean_script=clean_script,
)
if len(run_jobs) == prev_count:
raise ValueError("Unable to run/submit all jobs. Are there circular dependencies?")
prev_count = len(run_jobs)
if method == RunMethod.dask:
import dask
def last_dask_job(*all_jobs):
if any(a != 0 for a in all_jobs):
return 1
return 0
if dask.delayed(last_dask_job, name="last_job")(*run_jobs.values()).compute() == 1:
raise ValueError("One or more of the jobs have failed.")
logger.info("Successfully finished running all jobs using Dask.")
[docs]
def scale_jobtime(self, scaling):
"""
Scale the submit job times of all jobs by `scaling`.
This will only affect jobs submitted to the cluster.
"""
for job in self.jobs:
job.scale_jobtime(scaling)
return self
[docs]
class JobParent:
"""Parent for `SingleJob` and `BatchJob`.
"""
input_targets: Set["FileTarget"]
output_targets: Set["FileTarget"]
optional_targets: Set["FileTarget"]
#@lru_cache(None)
[docs]
def dependencies(self, only_missing=True) -> Set[Optional["SingleJob"]]:
"""Return jobs on which this job depends.
By default it only returns those related with missing input files.
:param only_missing: set to False to also return dependencies that produce files that already exist on disk
"""
jobs = set()
for target in self.input_targets:
if not (only_missing and target.exists()):
jobs.add((target in self.optional_targets, target.producer))
return jobs
[docs]
def missing_output(self, reset_cache=False):
"""
Create a list of filenames that do not exist on disk.
Optional outputs are not considered.
:param reset_cache: set to True to not rely on an existing cached existence check
"""
missing = set()
for to_check in self.output_targets:
if to_check in self.optional_targets:
continue
if reset_cache:
to_check.reset_existence()
if not to_check.exists():
missing.add(to_check)
return missing
[docs]
def add_to_jobs(self, all_jobs, overwrite=False, overwrite_dependencies=False, only_sort=False):
"""Mark this job and all of its dependencies to run.
This job is marked to run, if any of the output does not yet exist or overwrite is True.
The dependencies are marked to run, if this job runs and either their output does not exist or overwrite_dependencies is True.
:param all_jobs: list and set of individual jobs. This job and all required jobs are added to this list.
:param overwrite: if True mark this job even if the output already exists, defaults to False
:param overwrite_dependencies: if True mark the dependencies of this job even if their output already exists, defaults to False
"""
if self in all_jobs[1]:
return
if not only_sort:
if not overwrite and len(self.missing_output()) == 0:
return
missing = self.missing_input(ignore_expected=True)
if len(missing) > 0:
raise InputMissingPipe(self, {m.filename for m in missing})
subjobs = ([], set(all_jobs[1]))
for optional, job in self.dependencies(only_missing=not (overwrite_dependencies or only_sort)):
try:
if job is not None:
job.add_to_jobs(subjobs, overwrite_dependencies, overwrite_dependencies, only_sort) and not optional
except InputMissingPipe as e:
if optional:
continue
e.dependency_graph.insert(0, e.target)
e.target = self
raise
all_jobs[0].extend(subjobs[0])
all_jobs[1].update(subjobs[0])
all_jobs[0].append(self)
all_jobs[1].add(self)
[docs]
def __call__(self, method: RunMethod, wait_for=(), clean_script="on_success"):
"""Run the job using the specified `method`."""
if method == RunMethod.local:
self.prepare_run()
missing = self.missing_input()
if len(missing) > 0:
raise InputMissingRun(f"{self} can not run as it misses required input files: {missing}")
logger.info(f"running {self}")
self.job = self.function(**self.kwargs)
missing = self.missing_output(reset_cache=True)
if len(missing) > 0:
raise OutputMissing(f"{self} failed to produce required output files: {missing}")
elif method == RunMethod.submit:
from .pipeline import Template
self.prepare_run()
local_submit = dict(self.submit_params)
if 'logdir' not in local_submit:
local_submit['logdir'] = 'log'
Path(local_submit['logdir']).mkdir(exist_ok=True, parents=True)
if 'name' not in local_submit:
local_submit['name'] = self.job_name()
cmd = func_to_cmd(self.function, (), self.kwargs, local_submit['logdir'], clean=clean_script)
if len(wait_for) == 0:
wait_for = None
self.job = submit(cmd, jobhold=wait_for, **local_submit)
logger.debug(f"submitted {self} with job ID {self.job}")
elif method == RunMethod.dask:
import dask
def dask_job(*other_jobs):
if any(a != 0 for a in other_jobs):
logger.debug(f"{self} skipped because dependencies failed")
return 1
try:
logger.debug(f"Running {self} using dask")
self(RunMethod.local)
except Exception as e:
logger.exception(f"{self} failed: {e}")
return 1
logger.debug(f"Running {self} using dask")
return 0
self.job = dask.delayed(dask_job, name=str(self))(*wait_for)
return self.job
[docs]
def prepare_run(self):
"""
Prepare to run this job.
Steps:
1. Creates output directory
"""
for target in self.output_targets:
target.filename.parent.mkdir(parents=True, exist_ok=True)
[docs]
def expected(self, ):
"""
Return true if this job is expected to be able to run.
"""
for target in self.input_targets:
if not target.expected():
return False
return True
[docs]
class SingleJob(JobParent):
"""A single job within a larger pipeline."""
[docs]
def __init__(self, function: Callable, kwargs, submit_params, input_targets, output_targets, optionals, set_parameters=None, batch=None):
"""
Create a single job that can be run locally or submitted.
:param function: python function
:param kwargs: keyword arguments
:param submit_params: instructions to submit job to cluster using `fsl_sub`
:param input_targets: set of all filenames expected to exist before this job runs
:param output_targets: set of all filenames expected to exist after this job runs
:param optionals: set of filenames that are used to generate the dependency graph and yet might not exist
:param set_parameters: dictionary with placeholder values used to distinguish this SingleJob with all those produced from the same function
:param batch: label used to batch multiple jobs into one when submitting to the cluster
"""
self.function = function
if set_parameters is None:
set_parameters = {}
self.set_parameters = set_parameters
self.kwargs = kwargs
self.input_targets = input_targets
self.output_targets = output_targets
self.optional_targets = set(optionals)
self.submit_params = dict(submit_params)
self.batch = batch
for target in self.input_targets:
target.required_by.add(self)
for target in self.output_targets:
target.producer = self
[docs]
def copy(self, targets: Dict[str, "FileTarget"]):
"""
Create a copy of the `SingleJob` to be included in a new :class:`JobList`.
`targets` contain the set of FileTargets recognised by this new `JobList`.
This will be updated based on the input/output targets of this job.
"""
def copy_targets(job_targets):
new_targets = set()
for target in job_targets:
new_target = get_target(target.filename, targets)
new_target.from_templates = target.from_templates
new_targets.add(new_target)
return new_targets
return SingleJob(
self.function,
self.kwargs,
self.submit_params,
copy_targets(self.input_targets),
copy_targets(self.output_targets),
copy_targets(self.optional_targets),
self.set_parameters,
self.batch,
)
[docs]
def job_name(self, ):
"""Return a string representation of this job."""
if len(self.set_parameters) > 0:
parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()])
name = f"{self.function.__name__}_{parameter_string}"
else:
name = self.function.__name__
value = re.sub(r'[^\w\s-]', '', name).strip().lower()
return re.sub(r'[-\s]+', '-', value)
def __repr__(self, ):
"""Print job as a function call."""
parameter_string = ', '.join([f"{key}={value}" for key, value in self.set_parameters.items()])
return f"{self.function.__name__}({parameter_string})"
[docs]
def to_job_list(self, tree:FileTree):
"""Convert single job into its own :class:`JobList`."""
result = JobList(tree, [self], {}).copy()
return self.batch, self.submit_params, result
[docs]
def scale_jobtime(self, scaling):
"""
Scale the submit job time in place by `scaling`.
This will only affect jobs submitted to the cluster.
"""
if "jobtime" in self.submit_params.keys():
self.submit_params["jobtime"] = int(np.ceil(scaling * self.submit_params["jobtime"]))
[docs]
def get_target(filename: Path, all_targets, from_template=None) -> "FileTarget":
"""
Return a :class:`FileTarget` matching the input `filename`.
If the `FileTarget` for `filename` is already in `all_targets`, it will be returned.
Otherwise a new `FileTarget` will be added to `all_targets` and returned.
:param filename: path to the input/intermediate/output filename
:param all_targets: dictionary of all FileTargets
:param from_template: template key used to obtain the filename
"""
abs_path = Path(filename).absolute()
if abs_path not in all_targets:
all_targets[abs_path] = FileTarget(filename)
if from_template is not None:
all_targets[abs_path].from_templates.add(from_template)
return all_targets[abs_path]
[docs]
def get_matching_targets(pattern: str, all_targets) -> List["FileTarget"]:
"""
Return all :class:`FileTarget` that match the given input pattern.
:param pattern: filename definition supporting Unix shell-style wildcards
:param all_targets: dictionary of all FileTargets
"""
abs_pattern = str(Path(pattern).absolute())
matches = []
for path, target in all_targets.items():
if globmatch(str(path), abs_pattern, flags=GLOBSTAR):
matches.append(target)
return matches
[docs]
class FileTarget:
"""Input, intermediate, or output file.
See :func:`get_target` for instructions on creating a new `FileTarget`.
If a specific :class:`SingleJob` produces a filename, this can be indicated by setting :attr:`producer`:
.. code-block:: python
get_target(filename, all_targets).producer = job
This will raise a `ValueError` if the filename is already produced by another job.
If a specific :class:`SingleJob` requires a filename, this can be indicated by adding it to :attr:`required_by`:
.. code-block:: python
get_target(filename, all_targets).required_by.add(job)
Filename existence can be checked using :meth:`exists`.
This method uses caching. To reset the cache run :meth:`reset_existence`.
To check if the filename can be created by this pipeline (or already exists) run :meth:`expected`.
"""
[docs]
def __init__(self, filename: Path):
"""
Create a new target based on the provided filename.
Do not call this method directly.
Instead use :func:`get_target`.
:param filename: filename
"""
self.filename = Path(filename)
self._producer = None
self.required_by: Set[SingleJob] = set()
self.from_templates: Set[str] = set()
[docs]
def exists(self) -> bool:
"""
Test whether file exists on disk.
This function is lazy; once it has been checked once it will keep returning the same result.
To reset use :meth:`reset_existence`.
"""
if not hasattr(self, "_exists"):
self._exists = self.filename.is_symlink() or self.filename.exists()
return self._exists
[docs]
def reset_existence(self, ):
"""Ensure existence is checked again when running :meth:`exists`."""
if hasattr(self, "_exists"):
del self._exists
[docs]
def expected(self, ):
"""
Return whether the file can be produced by the pipeline (or already exists).
Returns False if the file does not exist and there is no way to produce it. Otherwise, True is returned
"""
if self.exists():
return True
if self.producer is None:
return False
return self.producer.expected()
@property
def producer(self, ) -> SingleJob:
"""Job that can produce this file."""
return self._producer
@producer.setter
def producer(self, new_value):
if self._producer is not None:
if self._producer is new_value:
return
raise ValueError(f"{self} can be produced by both {self.producer} and {new_value}")
self._producer = new_value
def __repr__(self, ):
"""Print filename of target."""
return f"FileTarget({str(self.filename)})"
[docs]
@contextmanager
def update_closure(*dictionaries, **kwargs):
"""Add the provided dictionaries to the globals dictionary.
Inside the `with` block all the dictionary keys will be available in the local environment.
After the `with` block ends the local environment will be cleaned up again.
Use like this:
.. code-block:: python
def my_func()
with add_to_globals({'a': 3}):
print(a) # prints 3
print(a) # raises a NameError (or prints whatever `a` is set to in the global environment)
"""
func_globals = inspect.stack()[2].frame.f_globals
# merge input dictionaries
new_kwargs = {}
for d in dictionaries:
new_kwargs.update(d)
new_kwargs.update(kwargs)
to_restore = {}
to_delete = set()
for key in new_kwargs:
if key in func_globals:
to_restore[key] = func_globals[key]
else:
to_delete.add(key)
func_globals.update(new_kwargs)
yield # return to run the code within the with-block
# clean up the environment
func_globals.update(to_restore)
for key in to_delete:
del func_globals[key]
del func_globals
[docs]
def call_batched_jobs(funcs, kwargs):
for f, k in zip(funcs, kwargs):
f(**k)
[docs]
class BatchJob(JobParent):
"""Batched combination of multiple `SingleJob` instances that will be submitted together.
"""
function = staticmethod(call_batched_jobs)
[docs]
def __init__(self, *sub_jobs):
"""Creates a new `BatchJob` from sub_jobs.
`sub_jobs` can be either `SingleJob` or `BatchJob`. In either case they will be run in the order in which they are supplied.
"""
self.torun: Sequence[SingleJob] = []
for job in sub_jobs:
if isinstance(job, BatchJob):
self.torun.extend(job.torun)
else:
assert isinstance(job, SingleJob)
self.torun.append(job)
self.torun_set = set(self.torun)
self.output_targets = {t for job in self.torun for t in job.output_targets}
for t in self.output_targets:
t._producer = self
self.input_targets = {t for job in self.torun for t in job.input_targets if t not in self.output_targets}
for t in self.input_targets:
t.required_by.add(self)
self.optional_targets = set.union(
{
t for t in self.output_targets
if not any(t in job.output_targets and t not in job.optional_targets for job in self.torun)
},
{
t for t in self.input_targets
if not any(t in job.input_targets and t not in job.optional_targets for job in self.torun)
},
)
as_tuples = set.intersection(*[{(key, value) for key, value in job.set_parameters.items()} for job in self.torun])
self.set_parameters = {key: value for key, value in as_tuples}
[docs]
def copy(self, targets: Dict[str, "FileTarget"]):
"""
Create a copy of the `BatchJob` to be included in a new :class:`JobList`.
`targets` contain the set of FileTargets recognised by this new `JobList`.
This will be updated based on the input/output targets of each job within this batch.
"""
return BatchJob(*[job.copy(targets) for job in self.torun])
@property
def batch(self, ):
b = self.torun[0].batch
if all(job.batch is not None and job.batch == b for job in self.torun):
return b
return None
@property
def kwargs(self, ):
return {
'funcs': [job.function for job in self.torun],
'kwargs': [job.kwargs for job in self.torun],
}
@property
def submit_params(self, ):
def sum_value(name, *times):
total = sum(t for t in times if t is not None)
return None if total == 0 else total
def max_value(name, *rams):
if any(r is not None for r in rams):
return max(int(r) for r in rams if r is not None)
return None
def extend(name, *vars):
all_vars = set()
for v in vars:
if v is not None:
all_vars.update(v)
return list(all_vars)
def merge_name(_, *names):
if len(names) <= 3:
return "-".join(names)
return "batch_job"
def unique_param(name, *flags):
not_none = {f for f in flags if f is not None}
if len(not_none) == 0:
return None
elif len(not_none) == 1:
return not_none.pop()
else:
raise ValueError(f"Multiple different values for submit parameter flag '{name}' found. fsl-pipe does not know which one to choose out of: {not_none}")
to_combine = {
'jobtime': sum_value,
'jobram': max_value,
'threads': max_value,
'export_vars': extend,
'name': merge_name,
}
all_flags = {flag for job in self.torun for flag in job.submit_params.keys()}
return {
flag: to_combine.get(flag, unique_param)(flag, *[job.submit_params.get(flag, None) for job in self.torun])
for flag in all_flags
}
[docs]
def job_name(self, ):
"""Return a string representation of this job."""
base = "" if self.batch is None else self.batch + '_'
func_name = "-".join(sorted({job.function.__name__ for job in self.torun}))
if len(self.set_parameters) > 0:
parameter_string = '_'.join([f"{key}-{value}" for key, value in self.set_parameters.items()])
name = f"{func_name}_{parameter_string}"
else:
name = func_name
value = re.sub(r'[^\w\s-]', '', name).strip().lower()
return base + re.sub(r'[-\s]+', '-', value)
[docs]
def scale_jobtime(self, scaling):
"""
Scale the submit job time in place by `scaling`.
This will only affect jobs submitted to the cluster.
"""
for job in self.torun:
job.scale_jobtime(scaling)
def __repr__(self, ):
"""Print job as a function call."""
return f"Batch({self.torun})"
[docs]
def to_job_list(self, tree:FileTree):
"""Convert batched jobs back into a :class:`JobList`."""
result = JobList(tree, self.torun, {}).copy()
return self.batch, self.submit_params, result
[docs]
def has_dependencies(possible_parent: JobParent, possible_children: Set[JobParent], all_jobs):
if possible_parent in possible_children:
return True
for _, child in possible_parent.dependencies(only_missing=False):
if child in all_jobs and has_dependencies(child, possible_children, all_jobs):
return True
return False
[docs]
def batch_connected_jobs(to_batch: Sequence[SingleJob], other_jobs: Collection[SingleJob]) -> List[JobParent]:
"""Iteratively combines two jobs
Two jobs will be batched if:
1. They are both in `to_batch`
2. One of the jobs depends on the other
3. There is no other job that needs to run between these two jobs.
Iteration through the jobs will continue until no further batching is possible.
"""
other_jobs = set(other_jobs)
batches = set(to_batch)
nbatches_prev = len(batches) * 2
while len(batches) != nbatches_prev:
nbatches_prev = len(batches)
for job in set(batches):
if job not in batches:
continue
for _, dependency in job.dependencies(only_missing=False):
if job in batches and dependency in batches:
# properties 1 and 2 are met; now checking property 3
all_jobs = set.union(batches, other_jobs)
indirect_dependency = any(has_dependencies(d, {dependency}, all_jobs) for _, d in job.dependencies(only_missing=False) if d in all_jobs and d != dependency)
if not indirect_dependency:
batches.remove(job)
batches.remove(dependency)
batches.add(BatchJob(dependency, job))
break
return list(batches)
[docs]
def batch_unconnected_jobs(to_batch: Sequence[JobParent], other_jobs: Collection[JobParent]) -> List[BatchJob]:
"""Batch jobs into generational sets
All jobs that do not depend on any other job in `to_batch` will be added to the first generational batch.
All jobs that only depend on the first generational batch will be added to the second.
And so forth, until all jobs have been added to a generation.
Only dependencies through `other_jobs` will be considered.
"""
to_batch = set(to_batch)
all_jobs = set.union(to_batch, other_jobs)
generations = []
while len(to_batch) > 0:
generations.append(set())
for job in to_batch:
if not any(has_dependencies(dependency, to_batch, all_jobs) for _, dependency in job.dependencies(only_missing=False) if dependency in all_jobs):
generations[-1].add(job)
to_batch.difference_update(generations[-1])
return [jobs.pop() if len(jobs) == 1 else BatchJob(*jobs) for jobs in generations]