fsl.utils.fslsub
This module submits jobs to a computing cluster using FSL’s fsl_sub command line tool. It is assumed that the computing cluster is managed by SGE.
Note
All of the functionality in this module is deprecated and will be
removed in a future version of fslpy. Equivalent functionality is
available in the fsl_sub
project, and the fsl.utils.run
module and
wrappers.fsl_sub
wrapper function.
Example usage, building a short pipeline:
from fsl.utils.fslsub import submit
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
queue='veryshort.q',
output='<mask_filename>')
# submits another job
other_job = submit('some other pre-processing step', queue='short.q')
# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
wait_for=(bet_job, other_job),
queue='cuda.q')
Submits a given command to the cluster |
|
Gets information on a given job id |
|
Returns the output of the given job. |
|
Defines the command needed to run the function from the command line |
|
Waits until all jobs have finished |
- class fsl.utils.fslsub.SubmitParams(minutes: float | None = None, queue: str | None = None, architecture: str | None = None, priority: int | None = None, email: str | None = None, wait_for: str | None | Collection[str] = None, job_name: str | None = None, ram: int | None = None, logdir: str | None = None, mail_options: str | None = None, flags: bool = False, multi_threaded: Tuple[str, str] | None = None, verbose: bool = False, env: dict | None = None)[source]
Bases:
object
Represents the fsl_sub parameters
The
SubmitParams
class is deprecated - you should usefsl.wrappers.fsl_sub
instead, or use thefsl_sub
Python library, which is installed as part of FSL.Any command line script can be submitted by the parameters by calling the SubmitParams object:
submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019']) submit('echo finished')
This will run “echo finished” with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished. It is the equivalent of
fsl_sub -T 1 -l log -j 108023,108019 "echo finished"
For python scripts that submit themselves to the cluster, it might be useful to give the user some control over at least some of the submission parameters. This can be done using:
import argparse parser = argparse.ArgumentParser("my script doing awesome stuff") parser.add_argument("input_file") parser.add_argument("output_file") SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir')) args = parser.parse_args() submitter = SubmitParams.from_args(args).update(minutes=10) from fsl import wrappers wrappers.bet(input_file, output_file, fslsub=submitter)
This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
- minutes: float | None = None
- queue: str | None = None
- architecture: str | None = None
- priority: int | None = None
- email: str | None = None
- wait_for: str | None | Collection[str] = None
- job_name: str | None = None
- ram: int | None = None
- logdir: str | None = None
- mail_options: str | None = None
- flags: bool = False
- multi_threaded: Tuple[str, str] | None = None
- verbose: bool = False
- env: dict = None
- cmd_line_flags = {'-M': 'email', '-N': 'job_name', '-R': 'ram', '-T': 'minutes', '-a': 'architecture', '-l': 'logdir', '-m': 'mail_options', '-p': 'priority', '-q': 'queue'}
- __post_init__()[source]
If not set explicitly by the user don’t alter the environment in which the script will be submitted
- as_flags()[source]
Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
- Returns:
tuple with the flags
- __call__(*command, **kwargs)[source]
Submits the command to the cluster.
- Parameters:
command – string or tuple of strings with the command to submit
kwargs – Keyword arguments can override any parameters set in self
- Returns:
job ID
- classmethod add_to_parser(parser: ArgumentParser, as_group='fsl_sub commands', include=('wait_for', 'logdir', 'email', 'mail_options'))[source]
Adds submission parameters to the parser
- Parameters:
parser – parser that should understand submission commands
as_group – add as a new group
include – sequence of argument flags/names that should be added to the parser (set to None to include everything)
- Returns:
the group the arguments got added to
- __annotations__ = {'architecture': typing.Optional[str], 'email': typing.Optional[str], 'env': <class 'dict'>, 'flags': <class 'bool'>, 'job_name': typing.Optional[str], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'minutes': typing.Optional[float], 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'priority': typing.Optional[int], 'queue': typing.Optional[str], 'ram': typing.Optional[int], 'verbose': <class 'bool'>, 'wait_for': typing.Union[str, NoneType, typing.Collection[str]]}
- __dataclass_fields__ = {'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}
- __dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)
- __dict__ = mappingproxy({'__module__': 'fsl.utils.fslsub', '__annotations__': {'minutes': typing.Optional[float], 'queue': typing.Optional[str], 'architecture': typing.Optional[str], 'priority': typing.Optional[int], 'email': typing.Optional[str], 'wait_for': typing.Union[str, NoneType, typing.Collection[str]], 'job_name': typing.Optional[str], 'ram': typing.Optional[int], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'flags': <class 'bool'>, 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'verbose': <class 'bool'>, 'env': <class 'dict'>}, '__doc__': 'Represents the fsl_sub parameters\n\n The ``SubmitParams`` class is deprecated - you should use\n :mod:`fsl.wrappers.fsl_sub` instead, or use the ``fsl_sub`` Python\n library, which is installed as part of FSL.\n\n Any command line script can be submitted by the parameters by calling the `SubmitParams` object:\n\n .. code-block:: python\n\n submit = SubmitParams(minutes=1, logdir=\'log\', wait_for=[\'108023\', \'108019\'])\n submit(\'echo finished\')\n\n This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.\n It is the equivalent of\n\n .. code-block:: bash\n\n fsl_sub -T 1 -l log -j 108023,108019 "echo finished"\n\n For python scripts that submit themselves to the cluster, it might be useful to give the user some control\n over at least some of the submission parameters. This can be done using:\n\n .. code-block:: python\n\n import argparse\n parser = argparse.ArgumentParser("my script doing awesome stuff")\n parser.add_argument("input_file")\n parser.add_argument("output_file")\n SubmitParams.add_to_parser(parser, include=(\'wait_for\', \'logdir\'))\n args = parser.parse_args()\n\n submitter = SubmitParams.from_args(args).update(minutes=10)\n from fsl import wrappers\n wrappers.bet(input_file, output_file, fslsub=submitter)\n\n This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.\n ', 'minutes': None, 'queue': None, 'architecture': None, 'priority': None, 'email': None, 'wait_for': None, 'job_name': None, 'ram': None, 'logdir': None, 'mail_options': None, 'flags': False, 'multi_threaded': None, 'verbose': False, 'env': None, 'cmd_line_flags': {'-T': 'minutes', '-q': 'queue', '-a': 'architecture', '-p': 'priority', '-M': 'email', '-N': 'job_name', '-R': 'ram', '-l': 'logdir', '-m': 'mail_options'}, '__post_init__': <function SubmitParams.__post_init__>, 'as_flags': <function SubmitParams.as_flags>, '__str__': <function SubmitParams.__str__>, '__call__': <function SubmitParams.__call__>, 'update': <function SubmitParams.update>, 'add_to_parser': <classmethod(<function SubmitParams.add_to_parser>)>, 'from_args': <classmethod(<function SubmitParams.from_args>)>, '__dict__': <attribute '__dict__' of 'SubmitParams' objects>, '__weakref__': <attribute '__weakref__' of 'SubmitParams' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}, '__init__': <function SubmitParams.__init__>, '__repr__': <function SubmitParams.__repr__>, '__eq__': <function SubmitParams.__eq__>, '__hash__': None, '__match_args__': ('minutes', 'queue', 'architecture', 'priority', 'email', 'wait_for', 'job_name', 'ram', 'logdir', 'mail_options', 'flags', 'multi_threaded', 'verbose', 'env')})
- __eq__(other)
Return self==value.
- __hash__ = None
- __init__(minutes: float | None = None, queue: str | None = None, architecture: str | None = None, priority: int | None = None, email: str | None = None, wait_for: str | None | Collection[str] = None, job_name: str | None = None, ram: int | None = None, logdir: str | None = None, mail_options: str | None = None, flags: bool = False, multi_threaded: Tuple[str, str] | None = None, verbose: bool = False, env: dict | None = None) None
- __match_args__ = ('minutes', 'queue', 'architecture', 'priority', 'email', 'wait_for', 'job_name', 'ram', 'logdir', 'mail_options', 'flags', 'multi_threaded', 'verbose', 'env')
- __module__ = 'fsl.utils.fslsub'
- __repr__()
Return repr(self).
- __weakref__
list of weak references to the object (if defined)
- fsl.utils.fslsub.submit(*command, **kwargs)[source]
Submits a given command to the cluster
The
submit
function is deprecated - you should usefsl.wrappers.fsl_sub
instead, or use thefsl_sub
Python library, which is available in FSL 6.0.5 and newer.You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
- Parameters:
command – string or regular/unpacked sequence of strings with the job command
minutes – Estimated job length in minutes, used to auto-set queue name
queue – Explicitly sets the queue name
architecture – e.g., darwin or lx24-amd64
priority – Lower priority [0:-1024] default = 0
email – Who to email after job completion
wait_for – Place a hold on this task until the job-ids in this string or tuple are complete
job_name – Specify job name as it will appear on the queue
ram – Max total RAM to use for job (integer in MB)
logdir – where to output logfiles
mail_options – Change the SGE mail options, see qsub for details
output – If <output> image or file already exists, do nothing and exit
flags – If True, use flags embedded in scripts to set SGE queuing options
multi_threaded –
Submit a multi-threaded task - Set to a tuple containing two elements:
<pename>: a PE configures for the requested queues
<threads>: number of threads to run
verbose – If True, use verbose mode
env – Dict containing environment variables
- Returns:
string of submitted job id
- fsl.utils.fslsub.info(job_ids) Dict[str, Dict[str, str] | None] [source]
Gets information on a given job id
The
info
function is deprecated - you should use thefsl_sub.report
function from thefsl_sub
Python library, which is available in FSL 6.0.5 and newer.Uses qstat -j <job_ids>
- Parameters:
job_ids – string with job id or (nested) sequence with jobs
- Returns:
dictionary of jobid -> another dictionary with job information (or None if job does not exist)
- fsl.utils.fslsub._parse_qstat(job_ids_string, qstat_stdout)[source]
Parses the qstat output into a dictionary of dictionaries
- Parameters:
job_ids_string – input job ids
qstat_stdout – qstat output
- Returns:
dictionary of jobid -> another dictionary with job information (or None if job does not exist)
- fsl.utils.fslsub.output(job_id, logdir='.', command=None, name=None)[source]
Returns the output of the given job.
- Parameters:
job_id – String containing job ID.
logdir – Directory containing the log - defaults to the current directory.
command – Command that was run. Not currently used.
name – Job name if it was specified. Not currently used.
- Returns:
A tuple containing the standard output and standard error.
- fsl.utils.fslsub._flatten_job_ids(job_ids)[source]
Returns a potentially nested sequence of job ids as a single comma-separated string
- Parameters:
job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.
- Returns:
comma-separated string of job ids
- fsl.utils.fslsub.hold(job_ids, hold_filename=None)[source]
Waits until all jobs have finished
Internally works by submitting a new job, which creates a file named hold_filename, which will only run after all jobs in job_ids finished.
This function will only return once hold_filename has been created
- Parameters:
job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.
hold_filename – filename to use as a hold file. The containing directory should exist, but the file itself should not. Defaults to a ./.<random characters>.hold in the current directory.
- Returns:
only returns when all the jobs have finished
- fsl.utils.fslsub.func_to_cmd(func, args=None, kwargs=None, tmp_dir=None, clean='never', verbose=False)[source]
Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script, the script will be run again to retrieve this function. Make sure there is a “if __name__ == ‘__main__’” guard to prevent the full script from being rerun.
- Parameters:
func – function to be run
args – positional arguments
kwargs – keyword arguments
tmp_dir – directory where to store the temporary file
clean –
Whether the script should be removed after running. There are three options:
”never” (default): Script is kept
”on_success”: only remove if script successfully finished (i.e., no error is raised)
”always”: always remove the script, even if it raises an error
verbose – If set to True, the script will print its own filename before running
- Returns:
string which will run the function