flux.job.Jobspec module
- class flux.job.Jobspec.Jobspec(resources, tasks, attributes, version)
Bases:
object- add_file(path, data, perms=None, encoding=None)
Add a file to the RFC 14 "files" dictionary in Jobspec. If
datacontains newlines or an encoding is explicitly provided, then it is presumed to be the file content. Otherwise,datais a local filesystem path, the contents of which are to be loaded into jobspec. For filesystem- Parameters
path (str) -- path or file name to encode
dataas in Jobspecdata (dict, str) -- content of file or a local path to load
perms (int) -- file permissions, default 0o0600 (octal). If
datais a file system path, then permissions of the local file system object will be used.encoding (str) -- RFC 37 compatible encoding for
data. None ifdatais a dict or to determine encoding from a file whendataspecifies a filesystem path. O/w, if encoding set, data is a string encoded in specifiedencoding.
- property attributes
Jobspec attributes section
- property cwd
Working directory of job.
- dumps(**kwargs)
- property duration
Job's time limit.
The duration may be:
an int or float in seconds
a string in Flux Standard Duration (see RFC 23)
a python
datetime.timedelta
A duration of zero is interpreted as "not set".
- property environment
Environment of job.
- property error
Path to use for stderr.
- classmethod from_yaml_file(filename)
Create a jobspec from a path to a YAML file.
- classmethod from_yaml_stream(yaml_stream)
Create a jobspec from a YAML file-like object.
- getattr(key)
get attribute from jobspec using dotted key notation, e.g. system.duration or optionally attributes.system.duration.
Raises KeyError if a component of key does not exist.
- property input
Path to use for stdin.
- property output
Path to use for stdout.
- property queue
Target queue of job submission
- resource_counts()
Compute the counts of each resource type in the jobspec
The following jobspec would return
{ "slot": 12, "core": 18, "memory": 242 }- type: slot count: 2 with: - type: core count: 4 - type: memory count: 1 unit: GB - type: slot count: 10 with: - type: core count: 1 - type: memory count: 24 unit: GB
Note
the current implementation ignores the unit label and assumes they are consist across resources
- resource_walk()
Traverse the resources in the resources section of the jobspec.
Performs a depth-first, pre-order traversal. Yields a tuple containing (parent, resource, count). parent is None when resource is a top-level resource. count is the number of that resource including the multiplicative effects of the with clause in ancestor resources. For example, the following resource section, will yield a count of 2 for the slot and a count of 8 for the core resource:
- type: slot count: 2 with: - type: core count: 4
- property resources
Jobspec resources section
- setattr(key, val)
set job attribute
- setattr_shell_option(key, val)
set job attribute: shell option
- property stderr
Path to use for stderr.
- property stdin
Path to use for stdin.
- property stdout
Path to use for stdout.
- property tasks
Jobspec tasks section
- top_level_keys = {'attributes', 'resources', 'tasks', 'version'}
- property unbuffered
True if std output and error are unbuffered
- property version
Jobspec version section
- class flux.job.Jobspec.JobspecV1(resources, tasks, attributes, version=1)
Bases:
Jobspec- apply_options(args=None, *, prog=None, _preinit=True, env=None, rlimit=None, signal=None, taskmap=None, dependency=None, requires=None, shell_options=None, time_limit=None, attributes=None, add_file=None, **kwargs)
Apply submission options to this jobspec.
Modifies the jobspec in-place and returns self for method chaining. Options may be passed as explicit keyword arguments, via args (an
argparse.Namespaceor any object with matching attributes), or both — keyword arguments take precedence over args attributes. All options are optional; absent options are silently ignored.The options correspond to
flux submitcommand-line flags; see flux-submit(1) or runflux submit --helpfor general option semantics. Options with non-obvious Python-specific behavior are described in full below.Note
envandrlimitare only applied when explicitly passed as keyword arguments, or when args is provided (the CLI path). Passenv=[]orrlimit=[]to propagate the full environment or the default resource-limit set without any filtering rules. To suppress env propagation entirely, passenv=["-*"].Note
shell_optionsandattributesaccept plain Python dicts. The CLI-compatible string formssetoptandsetattrare supported when passed via an args namespace (for use by the CLI internally), but are not accepted as keyword arguments. Python code should useshell_optionsandattributes, or useJobspec.setattr_shell_option(),Jobspec.setattr().- Parameters
args -- Optional namespace object (e.g.
argparse.Namespace) whose attributes supply option values. Explicit keyword arguments override any same-named attribute in args. This parameter is mainly used internally by the CLI; most Python code should use keyword arguments directly.prog (str) -- CLI plugin context name —
"submit","run","batch", or"alloc". When set, plugins registered for that command are loaded and theirmodify_jobspechooks are invoked. Defaults toNone(no plugin processing).env (list[str]) --
Environment filter rules applied to the submitter's environment. Each rule is one of:
"VAR=VALUE"— set VAR to VALUE in the job environment, expanding$VARreferences against the environment built so far."VAR"— import VAR from the submitter's environment (glob patterns are accepted, e.g."SLURM_*")."-PATTERN"— remove variables matching the glob PATTERN, e.g."-LD_*"."^/path/to/file"— read additional rules from a file.
Values containing
{{…}}(e.g."MY_RANK={{rank}}") are stored as mustache templates and expanded by the job shell at startup, not at submission time.rlimit (list[str]) --
Resource-limit propagation rules. Each rule is one of:
"NAME"— propagateRLIMIT_NAMEat its current value (glob patterns accepted, e.g."*")."NAME=VALUE"— setRLIMIT_NAMEto VALUE; use"unlimited"or"infinity"forRLIM_INFINITY."-NAME"— removeRLIMIT_NAMEfrom propagation.
Pass
rlimit=[]to propagate the default set of rlimits (stack,cpu,fsize, etc.) without any filtering rules. When rlimit is omitted and no args namespace is provided, no rlimits are applied (see the Note above).signal (str) -- Send a signal to the job before its time limit expires. Format:
"[SIG][@TIME]"where SIG is a signal name or number (defaultSIGUSR1) and TIME is a duration in Flux Standard Duration (default60s). Examples:"USR1@30s","TERM@2m","@2m"(SIGUSR1, 2-minute warning).time_limit (str or float) -- Job wall-clock limit as a Flux Standard Duration string (e.g.
"30s","1.5h","2d") or a plainfloatnumber of seconds. See flux-standard-duration(7). A value of0means no limit.taskmap (str) -- Task-to-node mapping scheme, e.g.
"block"(default) or"cyclic". Corresponds to--taskmap.dependency (list[str]) -- Job dependency URIs, e.g.
["afterok:f1234abcd", "afterany:f5678ef01"]. Corresponds to--dependency.requires (list[str]) -- Node constraint expressions, e.g.
["host:node1", "rank:0"]. Multiple entries are AND-combined. Corresponds to--requires.shell_options (dict) -- Job-shell options as a plain Python dict, e.g.
{"verbose": 1, "pty": 1}. Keys and values must be JSON-serializable.attributes (dict) -- Jobspec attributes as a plain Python dict. Keys may be fully qualified (e.g.
"system.foo") or bare (e.g."foo"), in which case thesystem.namespace is implied. A leading"."addresses theattributes.root directly (e.g.".user.comment"setsattributes.user.comment).add_file (list[str]) --
Files to attach to the job. Each entry uses one of the following forms:
"/path/to/file"— attach a file from the filesystem; the name in the jobspec is the basename of the path."name=/path/to/file"— attach a file with an explicit name."name=line1\nline2\n"— attach inline text content (use"\n"for newlines in the string)."name:0755=/path/to/script"— attach a file with explicit octal permissions.
Corresponds to
--add-file.
- Returns
self, to allow method chaining.
Example
Build a jobspec and apply several options at once:
from flux.job import JobspecV1 jobspec = JobspecV1.from_command( ["myapp", "--input", "data.h5"], num_tasks=16, cores_per_task=4, ).apply_options( time_limit="2h", env=["-LD_PRELOAD", "MY_RANK={{rank}}"], dependency=["afterok:f1234abcd"], shell_options={"verbose": 1}, attributes={"system.queue": "gpu"}, )
Attach a helper script and set resource limits:
jobspec.apply_options( add_file=["setup.sh=/path/to/setup.sh"], rlimit=["-*", "nofile=65536"], )
- classmethod from_alloc(*, nslots=None, nodes=None, cores_per_slot=1, gpus_per_slot=None, exclusive=False, broker_opts=None, conf=None, bg=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)
Create a jobspec for a nested Flux instance with options applied.
Equivalent to
from_nest_command()followed byapply_options()withprog="alloc". Either nslots or nodes must be provided; if only nodes is given, nslots defaults to nodes and exclusive is forcedTrue.- Parameters
nslots (int) -- Number of resource slots.
nodes (int) -- Number of nodes. Sets nslots when nslots is not given and forces exclusive
True.cores_per_slot (int) -- Cores per slot. Default
1.gpus_per_slot (int) -- GPUs per slot.
exclusive (bool) -- Allocate nodes exclusively.
broker_opts (list[str]) -- Options passed to the child broker.
conf -- Broker configuration as a
dict,BatchConfig, or a string accepted byBatchConfig.update()(key=val, JSON, TOML, file path).bg (bool) -- Start instance in the background without attaching. Appends
-Sbroker.rc2_none=1to broker_opts and sets thepty.captureshell option.name (str) -- Job name.
queue (str) -- Target queue.
bank (str) -- Target bank.
cwd (str) -- Working directory. Defaults to
os.getcwd().output (str) -- Path for standard output.
error (str) -- Path for standard error.
input (str) -- Path for standard input.
label_io (bool) -- Label output lines with task IDs.
unbuffered (bool) -- Disable output buffering.
**kwargs -- Forwarded to
apply_options().
- Returns
Note
CLI plugin
preinit()callbacks are invoked (viaapply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as nslots or nodes insidepreinit()are therefore ignored; usemodify_jobspec()for structural changes.Example
Start a nested Flux instance on four nodes with a broker config:
import flux import flux.job from flux.job import JobspecV1 js = JobspecV1.from_alloc( nodes=4, time_limit="1h", conf={"resource": {"noverify": True}}, shell_options={"verbose": 1}, ) jobid = flux.job.submit(flux.Flux(), js)
See
apply_options()for a full description of all accepted keyword arguments, including CLI plugin options.
- classmethod from_batch(script=None, *, content=None, nslots=None, nodes=None, cores_per_slot=1, gpus_per_slot=None, exclusive=False, broker_opts=None, conf=None, wrap=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)
Create a jobspec for a batch script job with options applied.
Equivalent to
from_batch_command()followed byapply_options()withprog="batch". The script may be supplied as a file path via script or as inline string content via content; exactly one must be provided.If neither nslots nor nodes is provided, nslots defaults to
1. If only nodes is given, nslots defaults to nodes and exclusive is forcedTrue.- Args:
- script (str or os.PathLike): Path to the batch script file.
The file is read and name defaults to the filename. Mutually exclusive with content.
- content (str): Inline script content. name defaults to
"batch". Mutually exclusive with script.
nslots (int): Number of resource slots. Default
1. nodes (int): Number of nodes. Sets nslots when nslotsis not given and forces exclusive
True.cores_per_slot (int): Cores per slot. Default
1. gpus_per_slot (int): GPUs per slot. exclusive (bool): Allocate nodes exclusively. broker_opts (list[str]): Options passed to the child broker. conf: Broker configuration — seefrom_alloc(). wrap (bool): IfTrueand the script has no shebang,prepend
#!/bin/sh.- name (str): Job name. Defaults to the script filename when
script is given, otherwise
"batch".
queue (str): Target queue. bank (str): Target bank. cwd (str): Working directory. Defaults to
os.getcwd(). output (str): Path for standard output. Defaults to"flux-{{id}}.out".error (str): Path for standard error. input (str): Path for standard input. label_io (bool): Label output lines with task IDs. unbuffered (bool): Disable output buffering. **kwargs: Forwarded to
apply_options().- Returns:
- Raises:
- TypeError: If both script and content are given, or
neither is given.
- Note:
CLI plugin
preinit()callbacks are invoked (viaapply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as nslots or nodes insidepreinit()are therefore ignored; usemodify_jobspec()for structural changes.
Example:
Submit a batch script from a file:
import flux import flux.job from flux.job import JobspecV1 js = JobspecV1.from_batch( "/path/to/script.sh", nodes=4, time_limit="2h", attributes={"system.queue": "batch"}, ) jobid = flux.job.submit(flux.Flux(), js)
Submit an inline script:
js = JobspecV1.from_batch( content="#!/bin/bash
flux run -n8 myapp ",
nslots=8, output="myapp-{{id}}.out", error="myapp-{{id}}.err",
) jobid = flux.job.submit(flux.Flux(), js)
See
apply_options()for a full description of all accepted keyword arguments, including how to discover and use CLI plugin options.
- classmethod from_batch_command(script, jobname, args=None, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None, **kwargs)
Create a Jobspec describing a nested Flux instance controlled by a script.
The nested Flux instance will execute the script with the given command-line arguments after copying it and setting the executable bit. Conceptually, this differs from the from_nest_command, which also creates a nested Flux instance, in that it a) requires the initial program of the new instance to be an executable text file and b) creates the initial program from a string rather than using an executable existing somewhere on the filesystem.
Use setters to assign additional properties.
- Parameters
script (str) -- contents of the script to execute, as a string. The script should have a shebang (e.g. #!/bin/sh) at the top.
jobname (str) -- name to use for system.job.name attribute This will be the default job name reported by Flux.
args (iterable of str) -- arguments to pass to script
num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.
cores_per_slot (int) -- number of cores to allocate per slot
gpus_per_slot (int) -- number of GPUs to allocate per slot
num_nodes (int) -- distribute allocated resource slots across N individual nodes
broker_opts (iterable of str) -- options to pass to the new Flux broker
conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and a -c{{tmpdir}}/conf.json entry is appended to the broker command line.
**kwargs -- Extra named arguments as accepted in
from_command()
Note
This is a low-level builder. For most Python code,
from_batch()is preferred — it wraps this method andapply_options()in a single call and accepts a file path or inline script content directly.
- classmethod from_command(command, num_tasks=1, cores_per_task=1, gpus_per_task=None, num_nodes=None, exclusive=False, duration=None, environment=None, env_expand=None, cwd=None, rlimits=None, name=None, input=None, output=None, error=None, label_io=False, unbuffered=False, queue=None, bank=None)
Factory function that builds the minimum legal v1 jobspec.
- Parameters
command (iterable of str) -- command to execute
num_tasks (int) -- number of tasks to create
cores_per_task (int) -- number of cores to allocate per task
gpus_per_task (int) -- number of GPUs to allocate per task
num_nodes (int) -- distribute allocated tasks across N individual nodes.
exclusive (bool) -- always allocate nodes exclusively
duration (Number, str) -- assign a time limit to the job in Flux Standard Duration (if str),
datetime.timedeltaor Number in seconds. If not provided then the duration will unlimited unless set via thedurationsetter.environment (Mapping) -- Set the environment for the job via a mapping of environment variable name to value. If not provided then the environment will be initialized using
os.environ.env_expand (Mapping) -- A mapping of environment variables that contain mustache templates to be expanded by the job shell at runtime. (See the flux-run(1) MUSTACHE TEMPLATES section for more info)
rlimits (Mapping) -- Set process resource limits for the job via a mapping of limit name to value, where a value of -1 is taken as unlimited. E.g.
{"nofile": 12000}.cwd (str) -- Set the current working directory for the job. If unset, then a working directory may be set using the
cwdsetter.name (str) -- Set a job name.
input (str,
os.PathLike) -- Set job input to a file path.output (str,
os.PathLike) -- Set job output to a file path.stderrwill be copied to the same path asstdoutby default unless it is set separately.error -- (str,
os.PathLike): Set job stderr to a file path.label_io (bool) -- For file output, label output with the source task ids. Default is False.
unbuffered (bool) -- Disable output buffering as much as practical.
queue (str) -- Set the queue for the job.
bank (str) -- Set the bank for the job.
Note
This method is a low-level factory function which builds a minimum RFC 14 jobspec directly. See
from_submit()for a more full-featured alternative which wraps this method andapply_options()in a single call, and therefore supports most options offered by the flux-submit(1) CLI utility, including options provided by configured CLI plugins.
- classmethod from_nest_command(command, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None, **kwargs)
Create a Jobspec describing a nested Flux instance controlled by command.
Conceptually, this differs from the from_batch_command method in that a) the initial program of the nested Flux instance can be any executable on the file system, not just a text file and b) the executable is not copied at submission time.
Use setters to assign additional properties.
- Parameters
command (iterable of str) -- initial program for the nested Flux
instance --
num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.
cores_per_slot (int) -- number of cores to allocate per slot
gpus_per_slot (int) -- number of GPUs to allocate per slot
num_nodes (int) -- distribute allocated resource slots across N individual nodes
broker_opts (iterable of str) -- options to pass to the new Flux broker
conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and a -c{{tmpdir}}/conf.json entry is appended to the broker command line.
**kwargs -- Extra named arguments as accepted in
from_command()
Note
This is a low-level builder. For most Python code,
from_alloc()is preferred — it wraps this method andapply_options()in a single call, and therefore supports most flux-alloc(1) options include options provided by configured CLI plugins.
- classmethod from_submit(command, *, ntasks=1, nodes=None, cores_per_task=1, gpus_per_task=None, exclusive=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)
Create a jobspec for a command job with options applied.
Equivalent to
from_command()followed byapply_options()withprog="submit". Resource layout and basic job arguments are listed below; allapply_options()keyword arguments (env,rlimit,time_limit,dependency, etc.) and CLI plugin option dests are forwarded via**kwargs.- Parameters
command (list[str]) -- Command to execute.
ntasks (int) -- Number of tasks. Default
1.nodes (int) -- Number of nodes to distribute tasks across.
cores_per_task (int) -- Cores per task. Default
1.gpus_per_task (int) -- GPUs per task.
exclusive (bool) -- Allocate nodes exclusively.
name (str) -- Job name.
queue (str) -- Target queue.
bank (str) -- Target bank.
cwd (str) -- Working directory.
output (str) -- Path for standard output.
error (str) -- Path for standard error.
input (str) -- Path for standard input.
label_io (bool) -- Label output lines with task IDs.
unbuffered (bool) -- Disable output buffering.
**kwargs -- Forwarded to
apply_options().
- Returns
Note
CLI plugin
preinit()callbacks are invoked (viaapply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as ntasks or nodes insidepreinit()are therefore ignored; usemodify_jobspec()for structural changes.Example
Build and submit a 16-task job with shell options and attributes:
import flux import flux.job from flux.job import JobspecV1 js = JobspecV1.from_submit( ["myapp", "--input", "data.h5"], ntasks=16, cores_per_task=4, time_limit="2h", shell_options={"verbose": 1}, attributes={"system.queue": "gpu"}, dependency=["afterok:f1234abcd"], ) jobid = flux.job.submit(flux.Flux(), js)
Pass options registered by CLI plugins using their prefixed dest name. Run
flux submit --helpto see what plugins are loaded; plugin options appear under "Options provided by plugins". The dest for each option is the flag name with the leading--removed and dashes replaced by underscores (e.g.--amd-gpumode→amd_gpumode):# --amd-gpumode is listed in "flux submit --help" output js = JobspecV1.from_submit(["myapp"], ntasks=8, amd_gpumode="TPX")
For programmatic discovery of installed plugin option dests:
from flux.cli.plugin import CLIPluginRegistry for opt in CLIPluginRegistry("submit").options: print(f"{opt.name} -> dest: {opt.dest}")
See
apply_options()for a full description of all accepted keyword arguments.
- classmethod per_resource(command, ncores=None, nnodes=None, per_resource_type=None, per_resource_count=None, gpus_per_node=None, exclusive=False, **kwargs)
Factory function that builds a v1 jobspec from an explicit count of nodes or cores and a number of tasks per one of these resources.
Use setters to assign additional properties.
- Parameters
ncores -- Total number of cores to allocate
nnodes -- Total number of nodes to allocate
per_resource_type -- (optional) Type of resource over which to schedule a count of tasks. Only "node" or "core" are currently supported.
per_resource_count -- (optional) Count of tasks per per_resource_type
gpus_per_node -- With nnodes, request a number of gpus per node
exclusive -- with nnodes, request whole nodes exclusively
**kwargs -- Extra named arguments as accepted in
from_command()
- flux.job.Jobspec.validate_counts(resources)
Validate counts of resources. Counts must be a positive integer.
- Parameters
resources -- dictionary following the specification in RFC14 for the resources top-level key
- flux.job.Jobspec.validate_jobspec(jobspec, require_version=None)
Validates the jobspec by attempting to construct a Jobspec object. If no exceptions are thrown during construction, then the jobspec is assumed to be valid and this function returns True. If the jobspec is invalid, the relevant exception is thrown (i.e., TypeError, ValueError, EnvironmentError)
By default, the validation function will read the version key in the jobspec to determine which Jobspec object to instantiate. An optional require_version is included to override this behavior and force a particular class to be used.
- Parameters
jobspec -- a Jobspec object or JSON string
require_version -- jobspec version to use, if not provided, the value of jobspec['version'] is used
- Raises
ValueError --
TypeError --
EnvironmentError --