Flux Job Submission and Monitoring

The Flux Python bindings provide synchronous and asynchronous functions for:

  • Submitting jobs with arbitrary attributes and resources

  • Waiting for jobs to complete

  • Cancelling jobs

  • Viewing job info and events

Job submission

Job submission is performed by creating a flux.job.Jobspec object, populating it with attributes, and then passing it to one of the submission functions, e.g. flux.job.submit. Jobspec objects define everything about a job, including the job's resources, executable, working directory, environment, and stdio streams.

JobspecV1 provides two layers of factory methods. The low-level methods — from_command(), from_batch_command(), and from_nest_command() — build the RFC 14 jobspec structure directly, accepting raw values for environment, resource limits, and duration. Further customization of the resulting jobspec is done via various setters, getters, and methods of JobspecV1(), or further processing by apply_options(), which is a convenience method used by command-line tools to process common options and CLI plugin arguments.

High-level methods -- from_submit(), from_alloc(), and from_batch() are also available which give callers full access to the same options available in the command-line tools flux-submit(1), flux-alloc(1), and flux-batch(1) respectively. These methods are equivalent to calling a low-level method and apply_options() with the appropriate prog in a single step. They use command-line flag aligned parameter names (ntasks, nodes, time_limit) and pass any remaining keyword arguments to apply_options(). The result is a jobspec equivalent to what flux submit, flux alloc, or flux batch would produce.

class flux.job.Jobspec(resources, tasks, attributes, version)
add_file(path, data, perms=None, encoding=None)

Add a file to the RFC 14 "files" dictionary in Jobspec. If data contains newlines or an encoding is explicitly provided, then it is presumed to be the file content. Otherwise, data is a local filesystem path, the contents of which are to be loaded into jobspec. For filesystem

Parameters
  • path (str) -- path or file name to encode data as in Jobspec

  • data (dict, str) -- content of file or a local path to load

  • perms (int) -- file permissions, default 0o0600 (octal). If data is a file system path, then permissions of the local file system object will be used.

  • encoding (str) -- RFC 37 compatible encoding for data. None if data is a dict or to determine encoding from a file when data specifies a filesystem path. O/w, if encoding set, data is a string encoded in specified encoding.

property attributes

Jobspec attributes section

property cwd

Working directory of job.

property duration

Job's time limit.

The duration may be:

  • an int or float in seconds

  • a string in Flux Standard Duration (see RFC 23)

  • a python datetime.timedelta

A duration of zero is interpreted as "not set".

property environment

Environment of job.

property error

Path to use for stderr.

classmethod from_yaml_file(filename)

Create a jobspec from a path to a YAML file.

classmethod from_yaml_stream(yaml_stream)

Create a jobspec from a YAML file-like object.

getattr(key)

get attribute from jobspec using dotted key notation, e.g. system.duration or optionally attributes.system.duration.

Raises KeyError if a component of key does not exist.

property input

Path to use for stdin.

property output

Path to use for stdout.

property queue

Target queue of job submission

resource_counts()

Compute the counts of each resource type in the jobspec

The following jobspec would return { "slot": 12, "core": 18, "memory": 242 }

- type: slot
  count: 2
  with:
    - type: core
      count: 4
    - type: memory
      count: 1
      unit: GB
- type: slot
  count: 10
  with:
    - type: core
      count: 1
    - type: memory
      count: 24
      unit: GB

Note

the current implementation ignores the unit label and assumes they are consist across resources

resource_walk()

Traverse the resources in the resources section of the jobspec.

Performs a depth-first, pre-order traversal. Yields a tuple containing (parent, resource, count). parent is None when resource is a top-level resource. count is the number of that resource including the multiplicative effects of the with clause in ancestor resources. For example, the following resource section, will yield a count of 2 for the slot and a count of 8 for the core resource:

- type: slot
  count: 2
  with:
    - type: core
      count: 4
property resources

Jobspec resources section

setattr(key, val)

set job attribute

setattr_shell_option(key, val)

set job attribute: shell option

property stderr

Path to use for stderr.

property stdin

Path to use for stdin.

property stdout

Path to use for stdout.

property tasks

Jobspec tasks section

property unbuffered

True if std output and error are unbuffered

property version

Jobspec version section

class flux.job.JobspecV1(resources, tasks, attributes, version=1)

Bases: Jobspec

apply_options(args=None, *, prog=None, _preinit=True, env=None, rlimit=None, signal=None, taskmap=None, dependency=None, requires=None, shell_options=None, time_limit=None, attributes=None, add_file=None, **kwargs)

Apply submission options to this jobspec.

Modifies the jobspec in-place and returns self for method chaining. Options may be passed as explicit keyword arguments, via args (an argparse.Namespace or any object with matching attributes), or both — keyword arguments take precedence over args attributes. All options are optional; absent options are silently ignored.

The options correspond to flux submit command-line flags; see flux-submit(1) or run flux submit --help for general option semantics. Options with non-obvious Python-specific behavior are described in full below.

Note

env and rlimit are only applied when explicitly passed as keyword arguments, or when args is provided (the CLI path). Pass env=[] or rlimit=[] to propagate the full environment or the default resource-limit set without any filtering rules. To suppress env propagation entirely, pass env=["-*"].

Note

shell_options and attributes accept plain Python dicts. The CLI-compatible string forms setopt and setattr are supported when passed via an args namespace (for use by the CLI internally), but are not accepted as keyword arguments. Python code should use shell_options and attributes, or use Jobspec.setattr_shell_option(), Jobspec.setattr().

Parameters
  • args -- Optional namespace object (e.g. argparse.Namespace) whose attributes supply option values. Explicit keyword arguments override any same-named attribute in args. This parameter is mainly used internally by the CLI; most Python code should use keyword arguments directly.

  • prog (str) -- CLI plugin context name — "submit", "run", "batch", or "alloc". When set, plugins registered for that command are loaded and their modify_jobspec hooks are invoked. Defaults to None (no plugin processing).

  • env (list[str]) --

    Environment filter rules applied to the submitter's environment. Each rule is one of:

    • "VAR=VALUE" — set VAR to VALUE in the job environment, expanding $VAR references against the environment built so far.

    • "VAR" — import VAR from the submitter's environment (glob patterns are accepted, e.g. "SLURM_*").

    • "-PATTERN" — remove variables matching the glob PATTERN, e.g. "-LD_*".

    • "^/path/to/file" — read additional rules from a file.

    Values containing {{…}} (e.g. "MY_RANK={{rank}}") are stored as mustache templates and expanded by the job shell at startup, not at submission time.

  • rlimit (list[str]) --

    Resource-limit propagation rules. Each rule is one of:

    • "NAME" — propagate RLIMIT_NAME at its current value (glob patterns accepted, e.g. "*").

    • "NAME=VALUE" — set RLIMIT_NAME to VALUE; use "unlimited" or "infinity" for RLIM_INFINITY.

    • "-NAME" — remove RLIMIT_NAME from propagation.

    Pass rlimit=[] to propagate the default set of rlimits (stack, cpu, fsize, etc.) without any filtering rules. When rlimit is omitted and no args namespace is provided, no rlimits are applied (see the Note above).

  • signal (str) -- Send a signal to the job before its time limit expires. Format: "[SIG][@TIME]" where SIG is a signal name or number (default SIGUSR1) and TIME is a duration in Flux Standard Duration (default 60s). Examples: "USR1@30s", "TERM@2m", "@2m" (SIGUSR1, 2-minute warning).

  • time_limit (str or float) -- Job wall-clock limit as a Flux Standard Duration string (e.g. "30s", "1.5h", "2d") or a plain float number of seconds. See flux-standard-duration(7). A value of 0 means no limit.

  • taskmap (str) -- Task-to-node mapping scheme, e.g. "block" (default) or "cyclic". Corresponds to --taskmap.

  • dependency (list[str]) -- Job dependency URIs, e.g. ["afterok:f1234abcd", "afterany:f5678ef01"]. Corresponds to --dependency.

  • requires (list[str]) -- Node constraint expressions, e.g. ["host:node1", "rank:0"]. Multiple entries are AND-combined. Corresponds to --requires.

  • shell_options (dict) -- Job-shell options as a plain Python dict, e.g. {"verbose": 1, "pty": 1}. Keys and values must be JSON-serializable.

  • attributes (dict) -- Jobspec attributes as a plain Python dict. Keys may be fully qualified (e.g. "system.foo") or bare (e.g. "foo"), in which case the system. namespace is implied. A leading "." addresses the attributes. root directly (e.g. ".user.comment" sets attributes.user.comment).

  • add_file (list[str]) --

    Files to attach to the job. Each entry uses one of the following forms:

    • "/path/to/file" — attach a file from the filesystem; the name in the jobspec is the basename of the path.

    • "name=/path/to/file" — attach a file with an explicit name.

    • "name=line1\nline2\n" — attach inline text content (use "\n" for newlines in the string).

    • "name:0755=/path/to/script" — attach a file with explicit octal permissions.

    Corresponds to --add-file.

Returns

self, to allow method chaining.

Example

Build a jobspec and apply several options at once:

from flux.job import JobspecV1

jobspec = JobspecV1.from_command(
    ["myapp", "--input", "data.h5"],
    num_tasks=16,
    cores_per_task=4,
).apply_options(
    time_limit="2h",
    env=["-LD_PRELOAD", "MY_RANK={{rank}}"],
    dependency=["afterok:f1234abcd"],
    shell_options={"verbose": 1},
    attributes={"system.queue": "gpu"},
)

Attach a helper script and set resource limits:

jobspec.apply_options(
    add_file=["setup.sh=/path/to/setup.sh"],
    rlimit=["-*", "nofile=65536"],
)
classmethod from_alloc(*, nslots=None, nodes=None, cores_per_slot=1, gpus_per_slot=None, exclusive=False, broker_opts=None, conf=None, bg=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)

Create a jobspec for a nested Flux instance with options applied.

Equivalent to from_nest_command() followed by apply_options() with prog="alloc". Either nslots or nodes must be provided; if only nodes is given, nslots defaults to nodes and exclusive is forced True.

Parameters
  • nslots (int) -- Number of resource slots.

  • nodes (int) -- Number of nodes. Sets nslots when nslots is not given and forces exclusive True.

  • cores_per_slot (int) -- Cores per slot. Default 1.

  • gpus_per_slot (int) -- GPUs per slot.

  • exclusive (bool) -- Allocate nodes exclusively.

  • broker_opts (list[str]) -- Options passed to the child broker.

  • conf -- Broker configuration as a dict, BatchConfig, or a string accepted by BatchConfig.update() (key=val, JSON, TOML, file path).

  • bg (bool) -- Start instance in the background without attaching. Appends -Sbroker.rc2_none=1 to broker_opts and sets the pty.capture shell option.

  • name (str) -- Job name.

  • queue (str) -- Target queue.

  • bank (str) -- Target bank.

  • cwd (str) -- Working directory. Defaults to os.getcwd().

  • output (str) -- Path for standard output.

  • error (str) -- Path for standard error.

  • input (str) -- Path for standard input.

  • label_io (bool) -- Label output lines with task IDs.

  • unbuffered (bool) -- Disable output buffering.

  • **kwargs -- Forwarded to apply_options().

Returns

JobspecV1

Note

CLI plugin preinit() callbacks are invoked (via apply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as nslots or nodes inside preinit() are therefore ignored; use modify_jobspec() for structural changes.

Example

Start a nested Flux instance on four nodes with a broker config:

import flux
import flux.job
from flux.job import JobspecV1

js = JobspecV1.from_alloc(
    nodes=4,
    time_limit="1h",
    conf={"resource": {"noverify": True}},
    shell_options={"verbose": 1},
)
jobid = flux.job.submit(flux.Flux(), js)

See apply_options() for a full description of all accepted keyword arguments, including CLI plugin options.

classmethod from_batch(script=None, *, content=None, nslots=None, nodes=None, cores_per_slot=1, gpus_per_slot=None, exclusive=False, broker_opts=None, conf=None, wrap=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)

Create a jobspec for a batch script job with options applied.

Equivalent to from_batch_command() followed by apply_options() with prog="batch". The script may be supplied as a file path via script or as inline string content via content; exactly one must be provided.

If neither nslots nor nodes is provided, nslots defaults to 1. If only nodes is given, nslots defaults to nodes and exclusive is forced True.

Args:
script (str or os.PathLike): Path to the batch script file.

The file is read and name defaults to the filename. Mutually exclusive with content.

content (str): Inline script content. name defaults to

"batch". Mutually exclusive with script.

nslots (int): Number of resource slots. Default 1. nodes (int): Number of nodes. Sets nslots when nslots

is not given and forces exclusive True.

cores_per_slot (int): Cores per slot. Default 1. gpus_per_slot (int): GPUs per slot. exclusive (bool): Allocate nodes exclusively. broker_opts (list[str]): Options passed to the child broker. conf: Broker configuration — see from_alloc(). wrap (bool): If True and the script has no shebang,

prepend #!/bin/sh.

name (str): Job name. Defaults to the script filename when

script is given, otherwise "batch".

queue (str): Target queue. bank (str): Target bank. cwd (str): Working directory. Defaults to os.getcwd(). output (str): Path for standard output. Defaults to

"flux-{{id}}.out".

error (str): Path for standard error. input (str): Path for standard input. label_io (bool): Label output lines with task IDs. unbuffered (bool): Disable output buffering. **kwargs: Forwarded to apply_options().

Returns:

JobspecV1

Raises:
TypeError: If both script and content are given, or

neither is given.

Note:

CLI plugin preinit() callbacks are invoked (via apply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as nslots or nodes inside preinit() are therefore ignored; use modify_jobspec() for structural changes.

Example:

Submit a batch script from a file:

import flux
import flux.job
from flux.job import JobspecV1

js = JobspecV1.from_batch(
    "/path/to/script.sh",
    nodes=4,
    time_limit="2h",
    attributes={"system.queue": "batch"},
)
jobid = flux.job.submit(flux.Flux(), js)

Submit an inline script:

js = JobspecV1.from_batch(
    content="#!/bin/bash

flux run -n8 myapp ",

nslots=8, output="myapp-{{id}}.out", error="myapp-{{id}}.err",

) jobid = flux.job.submit(flux.Flux(), js)

See apply_options() for a full description of all accepted keyword arguments, including how to discover and use CLI plugin options.

classmethod from_batch_command(script, jobname, args=None, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None, **kwargs)

Create a Jobspec describing a nested Flux instance controlled by a script.

The nested Flux instance will execute the script with the given command-line arguments after copying it and setting the executable bit. Conceptually, this differs from the from_nest_command, which also creates a nested Flux instance, in that it a) requires the initial program of the new instance to be an executable text file and b) creates the initial program from a string rather than using an executable existing somewhere on the filesystem.

Use setters to assign additional properties.

Parameters
  • script (str) -- contents of the script to execute, as a string. The script should have a shebang (e.g. #!/bin/sh) at the top.

  • jobname (str) -- name to use for system.job.name attribute This will be the default job name reported by Flux.

  • args (iterable of str) -- arguments to pass to script

  • num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.

  • cores_per_slot (int) -- number of cores to allocate per slot

  • gpus_per_slot (int) -- number of GPUs to allocate per slot

  • num_nodes (int) -- distribute allocated resource slots across N individual nodes

  • broker_opts (iterable of str) -- options to pass to the new Flux broker

  • conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and a -c{{tmpdir}}/conf.json entry is appended to the broker command line.

  • **kwargs -- Extra named arguments as accepted in from_command()

Note

This is a low-level builder. For most Python code, from_batch() is preferred — it wraps this method and apply_options() in a single call and accepts a file path or inline script content directly.

classmethod from_command(command, num_tasks=1, cores_per_task=1, gpus_per_task=None, num_nodes=None, exclusive=False, duration=None, environment=None, env_expand=None, cwd=None, rlimits=None, name=None, input=None, output=None, error=None, label_io=False, unbuffered=False, queue=None, bank=None)

Factory function that builds the minimum legal v1 jobspec.

Parameters
  • command (iterable of str) -- command to execute

  • num_tasks (int) -- number of tasks to create

  • cores_per_task (int) -- number of cores to allocate per task

  • gpus_per_task (int) -- number of GPUs to allocate per task

  • num_nodes (int) -- distribute allocated tasks across N individual nodes.

  • exclusive (bool) -- always allocate nodes exclusively

  • duration (Number, str) -- assign a time limit to the job in Flux Standard Duration (if str), datetime.timedelta or Number in seconds. If not provided then the duration will unlimited unless set via the duration setter.

  • environment (Mapping) -- Set the environment for the job via a mapping of environment variable name to value. If not provided then the environment will be initialized using os.environ.

  • env_expand (Mapping) -- A mapping of environment variables that contain mustache templates to be expanded by the job shell at runtime. (See the flux-run(1) MUSTACHE TEMPLATES section for more info)

  • rlimits (Mapping) -- Set process resource limits for the job via a mapping of limit name to value, where a value of -1 is taken as unlimited. E.g. {"nofile": 12000}.

  • cwd (str) -- Set the current working directory for the job. If unset, then a working directory may be set using the cwd setter.

  • name (str) -- Set a job name.

  • input (str, os.PathLike) -- Set job input to a file path.

  • output (str, os.PathLike) -- Set job output to a file path. stderr will be copied to the same path as stdout by default unless it is set separately.

  • error -- (str, os.PathLike): Set job stderr to a file path.

  • label_io (bool) -- For file output, label output with the source task ids. Default is False.

  • unbuffered (bool) -- Disable output buffering as much as practical.

  • queue (str) -- Set the queue for the job.

  • bank (str) -- Set the bank for the job.

Note

This method is a low-level factory function which builds a minimum RFC 14 jobspec directly. See from_submit() for a more full-featured alternative which wraps this method and apply_options() in a single call, and therefore supports most options offered by the flux-submit(1) CLI utility, including options provided by configured CLI plugins.

classmethod from_nest_command(command, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None, **kwargs)

Create a Jobspec describing a nested Flux instance controlled by command.

Conceptually, this differs from the from_batch_command method in that a) the initial program of the nested Flux instance can be any executable on the file system, not just a text file and b) the executable is not copied at submission time.

Use setters to assign additional properties.

Parameters
  • command (iterable of str) -- initial program for the nested Flux

  • instance --

  • num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.

  • cores_per_slot (int) -- number of cores to allocate per slot

  • gpus_per_slot (int) -- number of GPUs to allocate per slot

  • num_nodes (int) -- distribute allocated resource slots across N individual nodes

  • broker_opts (iterable of str) -- options to pass to the new Flux broker

  • conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and a -c{{tmpdir}}/conf.json entry is appended to the broker command line.

  • **kwargs -- Extra named arguments as accepted in from_command()

Note

This is a low-level builder. For most Python code, from_alloc() is preferred — it wraps this method and apply_options() in a single call, and therefore supports most flux-alloc(1) options include options provided by configured CLI plugins.

classmethod from_submit(command, *, ntasks=1, nodes=None, cores_per_task=1, gpus_per_task=None, exclusive=False, name=None, queue=None, bank=None, cwd=None, output=None, error=None, input=None, label_io=False, unbuffered=False, **kwargs)

Create a jobspec for a command job with options applied.

Equivalent to from_command() followed by apply_options() with prog="submit". Resource layout and basic job arguments are listed below; all apply_options() keyword arguments (env, rlimit, time_limit, dependency, etc.) and CLI plugin option dests are forwarded via **kwargs.

Parameters
  • command (list[str]) -- Command to execute.

  • ntasks (int) -- Number of tasks. Default 1.

  • nodes (int) -- Number of nodes to distribute tasks across.

  • cores_per_task (int) -- Cores per task. Default 1.

  • gpus_per_task (int) -- GPUs per task.

  • exclusive (bool) -- Allocate nodes exclusively.

  • name (str) -- Job name.

  • queue (str) -- Target queue.

  • bank (str) -- Target bank.

  • cwd (str) -- Working directory.

  • output (str) -- Path for standard output.

  • error (str) -- Path for standard error.

  • input (str) -- Path for standard input.

  • label_io (bool) -- Label output lines with task IDs.

  • unbuffered (bool) -- Disable output buffering.

  • **kwargs -- Forwarded to apply_options().

Returns

JobspecV1

Note

CLI plugin preinit() callbacks are invoked (via apply_options()) after the jobspec structure is built. Mutations to resource-sizing attributes such as ntasks or nodes inside preinit() are therefore ignored; use modify_jobspec() for structural changes.

Example

Build and submit a 16-task job with shell options and attributes:

import flux
import flux.job
from flux.job import JobspecV1

js = JobspecV1.from_submit(
    ["myapp", "--input", "data.h5"],
    ntasks=16,
    cores_per_task=4,
    time_limit="2h",
    shell_options={"verbose": 1},
    attributes={"system.queue": "gpu"},
    dependency=["afterok:f1234abcd"],
)
jobid = flux.job.submit(flux.Flux(), js)

Pass options registered by CLI plugins using their prefixed dest name. Run flux submit --help to see what plugins are loaded; plugin options appear under "Options provided by plugins". The dest for each option is the flag name with the leading -- removed and dashes replaced by underscores (e.g. --amd-gpumodeamd_gpumode):

# --amd-gpumode is listed in "flux submit --help" output
js = JobspecV1.from_submit(["myapp"], ntasks=8, amd_gpumode="TPX")

For programmatic discovery of installed plugin option dests:

from flux.cli.plugin import CLIPluginRegistry

for opt in CLIPluginRegistry("submit").options:
    print(f"{opt.name} -> dest: {opt.dest}")

See apply_options() for a full description of all accepted keyword arguments.

classmethod per_resource(command, ncores=None, nnodes=None, per_resource_type=None, per_resource_count=None, gpus_per_node=None, exclusive=False, **kwargs)

Factory function that builds a v1 jobspec from an explicit count of nodes or cores and a number of tasks per one of these resources.

Use setters to assign additional properties.

Parameters
  • ncores -- Total number of cores to allocate

  • nnodes -- Total number of nodes to allocate

  • per_resource_type -- (optional) Type of resource over which to schedule a count of tasks. Only "node" or "core" are currently supported.

  • per_resource_count -- (optional) Count of tasks per per_resource_type

  • gpus_per_node -- With nnodes, request a number of gpus per node

  • exclusive -- with nnodes, request whole nodes exclusively

  • **kwargs -- Extra named arguments as accepted in from_command()

High-level factory methods

from_submit(), from_alloc(), and from_batch() are the recommended starting point for most Python code. Each wraps the corresponding low-level method and calls apply_options() internally, so all apply_options() keyword arguments (env, rlimit, time_limit, dependency, shell_options, attributes, and more) can be passed directly, as can any CLI plugin option dests. The resulting jobspec is passed to submit() or submit_async() to actually run the job.

Build a jobspec for a 16-task command and submit it:

import flux
import flux.job
from flux.job import JobspecV1

js = JobspecV1.from_submit(
    ["myapp", "--input", "data.h5"],
    ntasks=16,
    cores_per_task=4,
    time_limit="2h",
    dependency=["afterok:f1234abcd"],
    shell_options={"verbose": 1},
    attributes={"system.queue": "gpu"},
)
jobid = flux.job.submit(flux.Flux(), js)

Build a jobspec for a nested Flux instance on four nodes:

js = JobspecV1.from_alloc(
    nodes=4,
    time_limit="1h",
    conf={"resource": {"noverify": True}},
)
jobid = flux.job.submit(flux.Flux(), js)

Build a jobspec for a batch script from a file or inline content:

# From a file — job name defaults to the filename
js = JobspecV1.from_batch("/path/to/script.sh", nodes=4)

# Inline script content — use the content= keyword argument
js = JobspecV1.from_batch(
    content="#!/bin/bash\nflux run -n8 myapp\n",
    nslots=8,
    time_limit="30m",
    output="job-{{id}}.out",
)
jobid = flux.job.submit(flux.Flux(), js)

See Applying options to jobspecs below for details on passing shell options, jobspec attributes, environment rules, resource limits, and CLI plugin options through these methods.

Applying options to jobspecs

apply_options() can be called on any jobspec, including those created with the lower-level factory methods. It modifies the jobspec in-place and returns self so calls can be chained.

Shell options and jobspec attributes

shell_options sets job-shell options; attributes sets jobspec attributes. Both accept plain Python dicts.

from flux.job import JobspecV1

js = JobspecV1.from_command(["hostname"]).apply_options(
    shell_options={"verbose": 1},
    attributes={"system.queue": "batch", ".user.comment": "my job"},
)

Note that the above is equivalent to:

from flux.job import JobspecV1

js = JobspecV1.from_submit(
    ["hostname"],
    shell_options={"verbose": 1},
    attributes={"system.queue": "batch", ".user.comment": "my job"},
)

Environment and resource limits

env accepts filter rules applied to the submitter's environment. rlimit propagates resource limits. The high-level factory methods (from_submit(), from_alloc(), from_batch()) always propagate the full environment and default resource limits even when neither is specified — matching CLI behavior. When calling apply_options() directly, env and rlimit are only applied when explicitly passed; a call that omits both leaves any previously set environment and rlimits unchanged.

js.apply_options(
    env=["MY_RANK={{rank}}", "-LD_PRELOAD", "IMPORTANT_VAR"],
    rlimit=["-*", "nofile=65536"],
)

Using CLI plugin options

When prog is set, CLI plugins registered for that command are loaded and their modify_jobspec() hooks are invoked. Run flux <cmd> --help to see what plugins are loaded — their options appear under "Options provided by plugins". The kwarg dest for each option is the flag name with the leading -- removed and dashes replaced by underscores (e.g. --amd-gpumodeamd_gpumode):

# --amd-gpumode is listed under "Options provided by plugins" in
# "flux submit --help" output; its dest is amd_gpumode
js = JobspecV1.from_submit(
    ["myapp"],
    ntasks=8,
    amd_gpumode="TPX",
)

# Or apply to an existing jobspec:
js.apply_options(prog="submit", amd_gpumode="TPX")

For programmatic discovery of available plugin option dests:

from flux.cli.plugin import CLIPluginRegistry

for opt in CLIPluginRegistry("submit").options:
    print(f"{opt.name} -> dest: {opt.dest}")

Job manipulation

After a job has been submitted, it will be assigned an ID. That ID can then be used for getting information about the job or for manipulating it---see the synchronous and asynchronous sections below.

To translate job ID representations, use the flux.job.JobID class:

class flux.job.JobID(value)

Class used to represent a Flux JOBID

JobID is a subclass of int, so may be used in place of integer. However, a JobID may be created from any valid RFC 19 FLUID encoding, including:

  • decimal integer (no prefix)

  • hexadecimal integer (prefix 0x)

  • dotted hex (dothex) (xxxx.xxxx.xxxx.xxxx)

  • kvs dir (dotted hex with job. prefix)

  • RFC19 F58: (Base58 encoding with prefix ƒ or f)

  • basemoji (emoji encoding)

A JobID object also has properties for encoding a JOBID into each of the above representations, e.g. jobid.f85, jobid.words, jobid.dothex...

property dec

Return decimal integer representation of a JobID

property dothex

Return dotted hexadecimal representation of a JobID

property emoji

Return emoji representation of a JobID

encode(encoding='dec')

Encode a JobID to alternate supported format

property f58

Return RFC19 F58 representation of a JobID

property f58plain

Return RFC19 F58 representation of a JobID with ASCII prefix

property hex

Return 0x-prefixed hexadecimal representation of a JobID

property kvs

Return KVS directory path of a JobID

property orig

Return the original string used to create the JobID

property words

Return words (mnemonic) representation of a JobID

Synchronous interface

The simplest way to interact with Flux is with the synchronous functions listed below. However, these functions introduce a lot of overhead (e.g. any function that waits for a job to reach a certain state, such as result may block for an indeterminate amount of time) and may not be suitable for interacting with large numbers of jobs in time-sensitive applications.

To spend less time blocking in the Flux reactor, consider using one of the asynchronous interfaces.

flux.job.submit(flux_handle, jobspec, urgency=_flux._core.lib.FLUX_JOB_URGENCY_DEFAULT, waitable=False, debug=False, pre_signed=False)

Submit a job to Flux

Ask Flux to run a job, blocking until a job ID is assigned.

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobspec (Jobspec or its string encoding) -- jobspec defining the job request

  • urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.

  • waitable (bool) -- allow result to be fetched with job.wait() (default is False). Waitable=true is restricted to the instance owner.

  • debug (bool) -- enable job manager debugging events to job eventlog (default is False)

  • pre_signed (bool) -- jobspec argument is already signed (default is False)

Returns

job ID

Return type

int

flux.job.event_watch(flux_handle, jobid, eventlog='eventlog')

Python generator to watch all events for a job

Synchronously watch events a job eventlog via a simple generator.

Example

>>> for event in job.event_watch(flux_handle, jobid):
...     # do something with event

See also

21/Job States and Events Version 1

Documentation for the events in the main eventlog

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID on which to watch events

  • eventlog -- eventlog path in job kvs directory (default: eventlog)

flux.job.event_wait(flux_handle, jobid, name, eventlog='eventlog', raiseJobException=True)

Wait for a job eventlog entry 'name'

Wait synchronously for an eventlog entry named "name" and return the entry to caller, raises OSError with ENODATA if event never occurred

See also

21/Job States and Events Version 1

Documentation for the events in the main eventlog

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID on which to wait for eventlog events

  • name -- The event name for which to wait

  • eventlog -- eventlog path in job kvs directory (default: eventlog)

  • raiseJobException -- if True, watch for job exception events and raise a JobException if one is seen before event 'name' (default=True)

Returns

an EventLogEvent object, or raises OSError if eventlog ended before matching event was found

Return type

EventLogEvent

flux.job.kill(flux_handle: Flux, jobid: Union[JobID, int], signum: Optional[int] = None)

Send a signal to a running job.

Parameters
  • flux_handle -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID of the job to kill

  • signum -- signal to send (default SIGTERM)

flux.job.cancel(flux_handle: Flux, jobid: Union[JobID, int], reason: Optional[str] = None)

Cancel a pending or or running job

Parameters
  • flux_handle -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID of the job to cancel

  • reason -- the textual reason associated with the cancelation

flux.job.result(flux_handle, jobid, flags=0)

Wait for a job to reach its terminal state and return job result

This function waits for job completion by watching the eventlog. Because this function must process the eventlog, it is a little more heavyweight than flux.job.wait.wait(). However, it may be used for non-waitable jobs, jobs that have already completed, and works multiple times on the same jobid.

This function will wait until the job result is available and returns a flux.job.info.JobInfo object filled with the available information.

Note: The JobInfo object returned from this method is only capable of computing a small subset of job information, including, but possibly not limited to:

  • id

  • t_submit, t_run, t_cleanup

  • returncode

  • waitstatus

  • runtime

  • result

  • result_id

Parameters
Returns

A limited JobInfo object which can be used to fetch the final job result, returncode, etc.

Return type

JobInfo

flux.job.wait(flux_handle, jobid=_flux._core.lib.FLUX_JOBID_ANY)

Wait for a job to complete

Submit a request to wait for job completion, blocking until a response is received, then return the job status.

Only jobs submitted with waitable=True can be waited for.

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID to wait for (default is any waitable job)

Returns

job status, a tuple of: Job ID (int), success (bool), and an error (string) if success=False

Return type

JobWaitResult

flux.job.get_job(flux_handle, jobid)

Get job information dictionary based on a jobid

This is a courtesy, blocking function for users looking for details about a job after submission. The dictionary includes the job identifier, userid that submit it, urgency, priority, t_submit, t_depend, (and others when finished), state, name, ntasks, ncores, duration, nnodes, result, runtime, returncode, waitstatus, nodelist, and exception type, severity, and note.

get_job

After submitting a job, if you quickly want to see information for it, you can use flux.job.get_job. Here is an example:

import flux
import flux.job

# It's encouraged to create a handle to use across commands
handle = flux.Flux()

jobspec = flux.job.JobspecV1.from_command(["sleep", "60"])
jobid = flux.job.submit(handle, jobspec)
job_meta = flux.job.get_job(handle, jobid)

{
    "job": {
        "id": 676292747853824,
        "userid": 0,
        "urgency": 16,
        "priority": 16,
        "t_submit": 1667760398.4034982,
        "t_depend": 1667760398.4034982,
        "state": "SCHED",
        "name": "sleep",
        "ntasks": 1,
        "ncores": 1,
        "duration": 0.0
    }
}

If the jobid you are asking for does not exist, None will be returned. For the interested user, this is a courtesy function that wraps using the identifier to create an RPC object, serializing that to string, and loading as JSON. Since it is likely you, as the user, will be interacting with flux.job, it is also logical you would look for this function to retrieve the job on the same module.

result vs wait

Both flux.job.result and flux.job.wait return when a job has completed. However, wait only works on jobs which have been submitted with the waitable flag, and the ability to set that flag is restricted to instance owners.

Asynchronous interfaces

There are two primary asynchronous interfaces to job manipulations. The first is an event-loop interface, which is closer to the native C interface, and consists of functions like flux.job.submit_async and flux.job.result_async (note the functions are the same as in the synchronous interface, only with an "_async" suffix). The second is an interface which is almost identical to the concurrent.futures interface in Python's standard library, and it consists of the flux.job.FluxExecutor class. Both interfaces deal in callbacks and futures, the difference being that the FluxExecutor is designed so that all futures fulfill in the background, and there is no need for user code to enter the Flux event loop, while the event-loop-based interface requires the user to call into the Flux event loop in order for futures to fulfill and for callbacks to trigger.

Our general recommendation is that you use the FluxExecutor interface unless you are familiar with event-loop based programming.

The FluxExecutor interface

Basic FluxExecutor usage consists of creating an executor, submitting jobspecs to it, and then attaching callbacks to those futures or waiting for them to complete. Executors must have .shutdown() called when they are no longer to be used. However, they also support the context-manager protocol (i.e. with executor ...:) which will call shutdown upon leaving the with block.

Example usage:

import concurrent.futures
import flux.job

jobspec = flux.job.JobspecV1.from_command(["/bin/true"])
with flux.job.FluxExecutor() as executor:
        futs = [executor.submit(jobspec) for _ in range(5)]
        for f in concurrent.futures.as_completed(futs):
                print(f.result())
class flux.job.FluxExecutor(threads=1, thread_name_prefix='', poll_interval=0.1, handle_args=(), handle_kwargs={})

Provides a method to submit and monitor Flux jobs asynchronously.

Forks threads to complete futures and fetch event updates in the background.

Inspired by the concurrent.futures.Executor class, with the following interface differences:

  • the submit method takes a flux.job.Jobspec instead of a callable and its arguments, and returns a FluxExecutorFuture representing that job.

  • the map method is not supported, given that the executor consumes Jobspecs rather than callables.

Otherwise, the FluxExecutor is faithful to its inspiration. In addition to methods and behavior defined by concurrent.futures, FluxExecutor provides its futures with event updates and the jobid of the underlying job.

Futures returned by submit have their jobid set as soon as it is available, which is always before the future completes.

The executor can also monitor existing jobs through the attach method, which takes a job ID and returns a future representing the job.

Futures may receive event updates even after they complete. The names of valid events are contained in the EVENTS class attribute.

The result of a future is the highest process exit status of the underlying job (in which case the result is an integer greater than or equal to 0), or -signum where signum is the number of the signal that caused the process to terminate (in which case the result is an integer less than 0).

A future is marked as "running" (and can no longer be canceled using the .cancel() method) once it reaches a certain point in the Executor---a point which is completely unrelated to the status of the underlying Flux job. The underlying Flux job may still be canceled at any point before it terminates, however, using the flux.job.cancel and flux.job.kill functions, in which case a JobException will be set.

If the jobspec is invalid, an OSError is set.

Parameters
  • threads -- the number of worker threads to fork.

  • thread_name_prefix -- used to control the names of threading.Thread objects created by the executor, for easier debugging.

  • poll_interval -- the interval (in seconds) in which to break out of the flux event loop to check for new job submissions.

  • handle_args -- positional arguments to the flux.Flux instances used by the executor.

  • handle_kwargs -- keyword arguments to the flux.Flux instances used by the executor.

EVENTS = frozenset({'alloc', 'clean', 'debug', 'depend', 'exception', 'finish', 'flux-restart', 'free', 'priority', 'release', 'start', 'submit', 'urgency'})

A set containing valid event names for attaching to futures.

attach(jobid)

Attach a FluxExecutorFuture to an existing job ID and return it.

Returned futures will behave identically to futures returned by the FluxExecutor.submit method. If the job ID is not accepted by Flux an exception will be set on the future.

This method is primarily useful for monitoring jobs that have been submitted through other mechanisms.

Parameters

jobid (int) -- jobid to attach to.

Raises

RuntimeError -- if shutdown has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters
  • wait -- If True, then this method will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures -- If True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won't be cancelled, regardless of the value of cancel_futures.

submit(*args, **kwargs)

Submit a jobspec to Flux and return a FluxExecutorFuture.

Accepts the same positional and keyword arguments as flux.job.submit, except for the flux.job.submit function's first argument, flux_handle.

Parameters
  • jobspec (Jobspec or its string encoding) -- jobspec defining the job request

  • urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.

  • waitable (bool) -- allow result to be fetched with flux.job.wait() (default is False). Waitable=True is restricted to the instance owner.

  • debug (bool) -- enable job manager debugging events to job eventlog (default is False)

  • pre_signed (bool) -- jobspec argument is already signed (default is False)

Raises

RuntimeError -- if shutdown has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).

Futures should not be created directly by user code. Since the futures are subclasses of concurrent.executors.Future, you can invoke concurrent.futures functions like wait or as_completed on them.

class flux.job.FluxExecutorFuture(owning_thread_id, *args, **kwargs)

A concurrent.futures.Future subclass that represents a single Flux job.

In addition to all of the concurrent.futures.Future functionality, FluxExecutorFuture instances offer:

  • The jobid and add_jobid_callback methods for retrieving the Flux jobid of the underlying job.

  • The add_event_callback method to invoke callbacks when particular job-state events occur.

Valid events are contained in the EVENTS class attribute.

EVENTS = frozenset({'alloc', 'clean', 'debug', 'depend', 'exception', 'finish', 'flux-restart', 'free', 'priority', 'release', 'start', 'submit', 'urgency'})

A set containing the names of valid events.

add_done_callback(*args, **kwargs)

Attaches a callable that will be called when the future finishes.

Parameters

fn -- A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.

Returns

self

add_event_callback(event, callback)

Add a callback to be invoked when an event occurs.

The callback will be invoked, with the future as the first argument and the flux.job.EventLogEvent as the second, whenever the event occurs. If the event occurs multiple times, the callback will be invoked with each different EventLogEvent instance. If the event never occurs, the callback will never be invoked.

Added callables are called in the order that they were added and may be called in another thread. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.

If the event has already occurred, the callback will be called immediately.

Parameters
  • event -- the name of the event to add the callback to.

  • callback -- a callable taking the future and the event as arguments.

Returns

self

add_jobid_callback(callback)

Attaches a callable that will be called when the jobid is ready.

Added callables are called in the order that they were added and may be called in another thread. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.

Parameters

callback -- a callable taking the future as its only argument.

Returns

self

cancel(*args, **kwargs)

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

exception(*args, **kwargs)

Return the exception raised by the call that the future represents.

Parameters

timeout -- The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time.

Returns

The exception raised by the call that the future represents or None if the call completed without raising.

Raises
  • CancelledError -- If the future was cancelled.

  • TimeoutError -- If the future didn't finish executing before the given timeout.

jobid(timeout=None)

Return the jobid of the Flux job that the future represents.

Parameters

timeout -- The number of seconds to wait for the jobid. If None, then there is no limit on the wait time.

Returns

a positive integer jobid.

Raises
  • concurrent.futures.TimeoutError -- If the jobid is not available before the given timeout.

  • concurrent.futures.CancelledError -- If the future was cancelled.

  • RuntimeError -- If the job could not be submitted (e.g. if the jobspec was invalid).

result(*args, **kwargs)

Return the result of the call that the future represents.

Parameters

timeout -- The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.

Returns

The result of the call that the future represents.

Raises
  • CancelledError -- If the future was cancelled.

  • TimeoutError -- If the future didn't finish executing before the given timeout.

  • Exception -- If the call raised then that exception will be raised.

set_exception(exception)

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

Asynchronous event-loop interface

General usage consists of initiating some events, then entering the Flux reactor and writing all code thereafter as callbacks.

For instance, the below example submits five jobs, then enters the reactor and fires off callbacks as futures complete.

import flux
import flux.job

def submit_cb(fut, flux_handle):
        # when this callback fires, the jobid will be ready
        jobid = fut.get_id()
        # Create a future representing the result of the job
        result_fut = flux.job.result_async(flux_handle, jobid)
        # attach a callback to fire when the job finishes
        result_fut.then(result_cb)

def result_cb(fut):
        job = fut.get_info()
        result = job.result.lower()
        print(f"{job.id}: {result} with returncode {job.returncode}")

flux_handle = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(["/bin/true"])
for _ in range(5):
        # submit 5 futures and attach callbacks to each one
        submit_future = flux.job.submit_async(flux_handle, jobspec)
        submit_future.then(submit_cb, flux_handle)
# enter the flux event loop (the 'reactor') to trigger the callbacks
# once the futures complete
flux_handle.reactor_run()
flux.job.submit_async(flux_handle, jobspec, urgency=_flux._core.lib.FLUX_JOB_URGENCY_DEFAULT, waitable=False, debug=False, pre_signed=False, novalidate=False)

Ask Flux to run a job, without waiting for a response

Submit a job to Flux. This method returns immediately with a Flux Future, which can be used obtain the job ID later.

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobspec (Jobspec or its string encoding) -- jobspec defining the job request

  • urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.

  • waitable (bool) -- allow result to be fetched with job.wait() (default is False). Waitable=True is restricted to the instance owner.

  • debug (bool) -- enable job manager debugging events to job eventlog (default is False)

  • pre_signed (bool) -- jobspec argument is already signed (default is False)

  • novalidate (bool) -- jobspec does not need to be validated. (default is False) novalidate=True is restricted to the instance owner.

Returns

a Flux Future object for obtaining the assigned jobid

Return type

SubmitFuture

flux.job.event_watch_async(flux_handle, jobid, eventlog='eventlog')

Asynchronously get eventlog updates for a job

Asynchronously watch the events of a job eventlog.

Returns a JobEventWatchFuture. Call .get_event() from the then callback to get the currently returned event from the Future object.

See also

21/Job States and Events Version 1

Documentation for the events in the main eventlog

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID on which to watch events

  • eventlog -- eventlog path in job kvs directory (default: eventlog)

Returns

a JobEventWatchFuture object

Return type

JobEventWatchFuture

flux.job.cancel_async(flux_handle: Flux, jobid: Union[JobID, int], reason: Optional[str] = None)

Cancel a pending or or running job asynchronously

Parameters
  • flux_handle -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID of the job to cancel

  • reason -- the textual reason associated with the cancelation

Returns

a future fulfilled when the cancelation completes

Return type

Future

flux.job.kill_async(flux_handle: Flux, jobid: Union[JobID, int], signum: Optional[int] = None)

Send a signal to a running job asynchronously

Parameters
  • flux_handle -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID of the job to kill

  • signum -- signal to send (default SIGTERM)

Returns

a future fulfilled when the signal is delivered

Return type

Future

flux.job.result_async(flux_handle, jobid, flags=0)

Wait for a job to reach its terminal state and return job result

This function waits for job completion by watching the eventlog. Because this function must process the eventlog, it is a little more heavyweight than flux.job.wait.wait_async(). However, it may be used for non-waitable jobs, jobs that have already completed, and works multiple times on the same jobid.

Once the eventlog terminal state is reached, the returned Future is fulfilled with a set of information gleaned from the processed events, including whether the job started running (in case it was canceled before starting), any exception state, and the final exit code and wait(2) status.

Parameters
Returns

A Future fulfilled with the job result.

Return type

JobResultFuture

flux.job.wait_async(flux_handle, jobid=_flux._core.lib.FLUX_JOBID_ANY)

Wait for a job to complete, asynchronously

Submit a request to wait for job completion. This method returns immediately with a Flux Future, which can be used to process the result later.

Only jobs submitted with waitable=True can be waited for.

Parameters
  • flux_handle (Flux) -- handle for Flux broker from flux.Flux()

  • jobid -- the job ID to wait for (default is any waitable job)

Returns

a Flux Future object for obtaining the job result

Return type

JobWaitFuture