fsl.utils.fslsub

This module submits jobs to a computing cluster using FSL’s fsl_sub command line tool. It is assumed that the computing cluster is managed by SGE.

Note

All of the functionality in this module is deprecated and will be removed in a future version of fslpy. Equivalent functionality is available in the fsl_sub project, and the fsl.utils.run module and wrappers.fsl_sub wrapper function.

Example usage, building a short pipeline:

from fsl.utils.fslsub import submit

# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
                 queue='veryshort.q',
                 output='<mask_filename>')

# submits another job
other_job = submit('some other pre-processing step', queue='short.q')

# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
                  wait_for=(bet_job, other_job),
                  queue='cuda.q')

submit

Submits a given command to the cluster

info

Gets information on a given job id

output

Returns the output of the given job.

func_to_cmd

Defines the command needed to run the function from the command line

hold

Waits until all jobs have finished

class fsl.utils.fslsub.SubmitParams(minutes: float | None = None, queue: str | None = None, architecture: str | None = None, priority: int | None = None, email: str | None = None, wait_for: str | None | Collection[str] = None, job_name: str | None = None, ram: int | None = None, logdir: str | None = None, mail_options: str | None = None, flags: bool = False, multi_threaded: Tuple[str, str] | None = None, verbose: bool = False, env: dict | None = None)[source]

Bases: object

Represents the fsl_sub parameters

The SubmitParams class is deprecated - you should use fsl.wrappers.fsl_sub instead, or use the fsl_sub Python library, which is installed as part of FSL.

Any command line script can be submitted by the parameters by calling the SubmitParams object:

submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019'])
submit('echo finished')

This will run “echo finished” with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished. It is the equivalent of

fsl_sub -T 1 -l log -j 108023,108019 "echo finished"

For python scripts that submit themselves to the cluster, it might be useful to give the user some control over at least some of the submission parameters. This can be done using:

import argparse
parser = argparse.ArgumentParser("my script doing awesome stuff")
parser.add_argument("input_file")
parser.add_argument("output_file")
SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir'))
args = parser.parse_args()

submitter = SubmitParams.from_args(args).update(minutes=10)
from fsl import wrappers
wrappers.bet(input_file, output_file, fslsub=submitter)

This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.

minutes: float | None = None
queue: str | None = None
architecture: str | None = None
priority: int | None = None
email: str | None = None
wait_for: str | None | Collection[str] = None
job_name: str | None = None
ram: int | None = None
logdir: str | None = None
mail_options: str | None = None
flags: bool = False
multi_threaded: Tuple[str, str] | None = None
verbose: bool = False
env: dict = None
cmd_line_flags = {'-M': 'email', '-N': 'job_name', '-R': 'ram', '-T': 'minutes', '-a': 'architecture', '-l': 'logdir', '-m': 'mail_options', '-p': 'priority', '-q': 'queue'}
__post_init__()[source]

If not set explicitly by the user don’t alter the environment in which the script will be submitted

as_flags()[source]

Creates flags for submission using fsl_sub

All parameters changed from their default value (typically None) will be included in the flags.

Returns:

tuple with the flags

__str__()[source]

Return str(self).

__call__(*command, **kwargs)[source]

Submits the command to the cluster.

Parameters:
  • command – string or tuple of strings with the command to submit

  • kwargs – Keyword arguments can override any parameters set in self

Returns:

job ID

update(**kwargs)[source]

Creates a new SubmitParams withe updated parameters

classmethod add_to_parser(parser: ArgumentParser, as_group='fsl_sub commands', include=('wait_for', 'logdir', 'email', 'mail_options'))[source]

Adds submission parameters to the parser

Parameters:
  • parser – parser that should understand submission commands

  • as_group – add as a new group

  • include – sequence of argument flags/names that should be added to the parser (set to None to include everything)

Returns:

the group the arguments got added to

classmethod from_args(args)[source]

Create a SubmitParams from the command line arguments

__annotations__ = {'architecture': typing.Optional[str], 'email': typing.Optional[str], 'env': <class 'dict'>, 'flags': <class 'bool'>, 'job_name': typing.Optional[str], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'minutes': typing.Optional[float], 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'priority': typing.Optional[int], 'queue': typing.Optional[str], 'ram': typing.Optional[int], 'verbose': <class 'bool'>, 'wait_for': typing.Union[str, NoneType, typing.Collection[str]]}
__dataclass_fields__ = {'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)
__dict__ = mappingproxy({'__module__': 'fsl.utils.fslsub', '__annotations__': {'minutes': typing.Optional[float], 'queue': typing.Optional[str], 'architecture': typing.Optional[str], 'priority': typing.Optional[int], 'email': typing.Optional[str], 'wait_for': typing.Union[str, NoneType, typing.Collection[str]], 'job_name': typing.Optional[str], 'ram': typing.Optional[int], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'flags': <class 'bool'>, 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'verbose': <class 'bool'>, 'env': <class 'dict'>}, '__doc__': 'Represents the fsl_sub parameters\n\n    The ``SubmitParams`` class is deprecated - you should use\n    :mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python\n    library, which is installed as part of FSL.\n\n    Any command line script can be submitted by the parameters by calling the `SubmitParams` object:\n\n    .. code-block:: python\n\n        submit = SubmitParams(minutes=1, logdir=\'log\', wait_for=[\'108023\', \'108019\'])\n        submit(\'echo finished\')\n\n    This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.\n    It is the equivalent of\n\n    .. code-block:: bash\n\n        fsl_sub -T 1 -l log -j 108023,108019 "echo finished"\n\n    For python scripts that submit themselves to the cluster, it might be useful to give the user some control\n    over at least some of the submission parameters. This can be done using:\n\n    .. code-block:: python\n\n        import argparse\n        parser = argparse.ArgumentParser("my script doing awesome stuff")\n        parser.add_argument("input_file")\n        parser.add_argument("output_file")\n        SubmitParams.add_to_parser(parser, include=(\'wait_for\', \'logdir\'))\n        args = parser.parse_args()\n\n        submitter = SubmitParams.from_args(args).update(minutes=10)\n        from fsl import wrappers\n        wrappers.bet(input_file, output_file, fslsub=submitter)\n\n    This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.\n    ', 'minutes': None, 'queue': None, 'architecture': None, 'priority': None, 'email': None, 'wait_for': None, 'job_name': None, 'ram': None, 'logdir': None, 'mail_options': None, 'flags': False, 'multi_threaded': None, 'verbose': False, 'env': None, 'cmd_line_flags': {'-T': 'minutes', '-q': 'queue', '-a': 'architecture', '-p': 'priority', '-M': 'email', '-N': 'job_name', '-R': 'ram', '-l': 'logdir', '-m': 'mail_options'}, '__post_init__': <function SubmitParams.__post_init__>, 'as_flags': <function SubmitParams.as_flags>, '__str__': <function SubmitParams.__str__>, '__call__': <function SubmitParams.__call__>, 'update': <function SubmitParams.update>, 'add_to_parser': <classmethod(<function SubmitParams.add_to_parser>)>, 'from_args': <classmethod(<function SubmitParams.from_args>)>, '__dict__': <attribute '__dict__' of 'SubmitParams' objects>, '__weakref__': <attribute '__weakref__' of 'SubmitParams' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}, '__init__': <function SubmitParams.__init__>, '__repr__': <function SubmitParams.__repr__>, '__eq__': <function SubmitParams.__eq__>, '__hash__': None, '__match_args__': ('minutes', 'queue', 'architecture', 'priority', 'email', 'wait_for', 'job_name', 'ram', 'logdir', 'mail_options', 'flags', 'multi_threaded', 'verbose', 'env')})
__eq__(other)

Return self==value.

__hash__ = None
__init__(minutes: float | None = None, queue: str | None = None, architecture: str | None = None, priority: int | None = None, email: str | None = None, wait_for: str | None | Collection[str] = None, job_name: str | None = None, ram: int | None = None, logdir: str | None = None, mail_options: str | None = None, flags: bool = False, multi_threaded: Tuple[str, str] | None = None, verbose: bool = False, env: dict | None = None) None
__match_args__ = ('minutes', 'queue', 'architecture', 'priority', 'email', 'wait_for', 'job_name', 'ram', 'logdir', 'mail_options', 'flags', 'multi_threaded', 'verbose', 'env')
__module__ = 'fsl.utils.fslsub'
__repr__()

Return repr(self).

__weakref__

list of weak references to the object (if defined)

fsl.utils.fslsub.submit(*command, **kwargs)[source]

Submits a given command to the cluster

The submit function is deprecated - you should use fsl.wrappers.fsl_sub instead, or use the fsl_sub Python library, which is available in FSL 6.0.5 and newer.

You can pass the command and arguments as a single string, or as a regular or unpacked sequence.

Parameters:
  • command – string or regular/unpacked sequence of strings with the job command

  • minutes – Estimated job length in minutes, used to auto-set queue name

  • queue – Explicitly sets the queue name

  • architecture – e.g., darwin or lx24-amd64

  • priority – Lower priority [0:-1024] default = 0

  • email – Who to email after job completion

  • wait_for – Place a hold on this task until the job-ids in this string or tuple are complete

  • job_name – Specify job name as it will appear on the queue

  • ram – Max total RAM to use for job (integer in MB)

  • logdir – where to output logfiles

  • mail_options – Change the SGE mail options, see qsub for details

  • output – If <output> image or file already exists, do nothing and exit

  • flags – If True, use flags embedded in scripts to set SGE queuing options

  • multi_threaded

    Submit a multi-threaded task - Set to a tuple containing two elements:

    • <pename>: a PE configures for the requested queues

    • <threads>: number of threads to run

  • verbose – If True, use verbose mode

  • env – Dict containing environment variables

Returns:

string of submitted job id

fsl.utils.fslsub.info(job_ids) Dict[str, Dict[str, str] | None][source]

Gets information on a given job id

The info function is deprecated - you should use the fsl_sub.report function from the fsl_sub Python library, which is available in FSL 6.0.5 and newer.

Uses qstat -j <job_ids>

Parameters:

job_ids – string with job id or (nested) sequence with jobs

Returns:

dictionary of jobid -> another dictionary with job information (or None if job does not exist)

fsl.utils.fslsub._parse_qstat(job_ids_string, qstat_stdout)[source]

Parses the qstat output into a dictionary of dictionaries

Parameters:
  • job_ids_string – input job ids

  • qstat_stdout – qstat output

Returns:

dictionary of jobid -> another dictionary with job information (or None if job does not exist)

fsl.utils.fslsub.output(job_id, logdir='.', command=None, name=None)[source]

Returns the output of the given job.

Parameters:
  • job_id – String containing job ID.

  • logdir – Directory containing the log - defaults to the current directory.

  • command – Command that was run. Not currently used.

  • name – Job name if it was specified. Not currently used.

Returns:

A tuple containing the standard output and standard error.

fsl.utils.fslsub._flatten_job_ids(job_ids)[source]

Returns a potentially nested sequence of job ids as a single comma-separated string

Parameters:

job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.

Returns:

comma-separated string of job ids

fsl.utils.fslsub.hold(job_ids, hold_filename=None)[source]

Waits until all jobs have finished

Internally works by submitting a new job, which creates a file named hold_filename, which will only run after all jobs in job_ids finished.

This function will only return once hold_filename has been created

Parameters:
  • job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.

  • hold_filename – filename to use as a hold file. The containing directory should exist, but the file itself should not. Defaults to a ./.<random characters>.hold in the current directory.

Returns:

only returns when all the jobs have finished

fsl.utils.fslsub.func_to_cmd(func, args=None, kwargs=None, tmp_dir=None, clean='never', verbose=False)[source]

Defines the command needed to run the function from the command line

WARNING: if submitting a function defined in the __main__ script, the script will be run again to retrieve this function. Make sure there is a “if __name__ == ‘__main__’” guard to prevent the full script from being rerun.

Parameters:
  • func – function to be run

  • args – positional arguments

  • kwargs – keyword arguments

  • tmp_dir – directory where to store the temporary file

  • clean

    Whether the script should be removed after running. There are three options:

    • ”never” (default): Script is kept

    • ”on_success”: only remove if script successfully finished (i.e., no error is raised)

    • ”always”: always remove the script, even if it raises an error

  • verbose – If set to True, the script will print its own filename before running

Returns:

string which will run the function