Flux Job Submission and Monitoring
The Flux Python bindings provide synchronous and asynchronous functions for:
Submitting jobs with arbitrary attributes and resources
Waiting for jobs to complete
Cancelling jobs
Viewing job info and events
Job submission
Job submission is performed by creating a flux.job.Jobspec
object,
populating it with attributes, and then passing it to one of the submission
functions, e.g. flux.job.submit. Jobspec
objects define everything
about a job, including the job's resources, executable, working directory,
environment, and stdio streams.
Basic Jobspec creation is generally done with the
JobspecV1.from_command
class method and its
variants from_batch_command
and from_nest_command
, which are helper
methods replicating the jobspecs created by the
job submission command-line utilities.
- class flux.job.Jobspec(resources, tasks, **kwargs)
- add_file(path, data, perms=None, encoding=None)
Add a file to the RFC 14 "files" dictionary in Jobspec. If
data
contains newlines or an encoding is explicitly provided, then it is presumed to be the file content. Otherwise,data
is a local filesystem path, the contents of which are to be loaded into jobspec. For filesystem- Parameters
path (str) -- path or file name to encode
data
as in Jobspecdata (dict, str) -- content of file or a local path to load
perms (int) -- file pemissions, default 0o0600 (octal). If
data
is a file system path, then permissions of the local file system object will be used.encoding (str) -- RFC 37 compatible encoding for
data
. None ifdata
is a dict or to determine encoding from a file whendata
specifies a filesystem path. O/w, if encoding set, data is a string encoded in specifiedencoding
.
- property attributes
Jobspec attributes section
- property cwd
Working directory of job.
- property duration
Job's time limit.
The duration may be:
an int or float in seconds
a string in Flux Standard Duration (see RFC 23)
a python
datetime.timedelta
A duration of zero is interpreted as "not set".
- property environment
Environment of job. Defaults to
None
.
- classmethod from_yaml_file(filename)
Create a jobspec from a path to a YAML file.
- classmethod from_yaml_stream(yaml_stream)
Create a jobspec from a YAML file-like object.
- getattr(key)
get attribute from jobspec using dotted key notation, e.g. system.duration or optionally attributes.system.duration.
Raises KeyError if a component of key does not exist.
- property queue
Target queue of job submission
- resource_counts()
Compute the counts of each resource type in the jobspec
The following jobspec would return
{ "slot": 12, "core": 18, "memory": 242 }
- type: slot count: 2 with: - type: core count: 4 - type: memory count: 1 unit: GB - type: slot count: 10 with: - type: core count: 1 - type: memory count: 24 unit: GB
Note
the current implementation ignores the unit label and assumes they are consist across resources
- resource_walk()
Traverse the resources in the resources section of the jobspec.
Performs a depth-first, pre-order traversal. Yields a tuple containing (parent, resource, count). parent is None when resource is a top-level resource. count is the number of that resource including the multiplicative effects of the with clause in ancestor resources. For example, the following resource section, will yield a count of 2 for the slot and a count of 8 for the core resource:
- type: slot count: 2 with: - type: core count: 4
- property resources
Jobspec resources section
- setattr(key, val)
set job attribute
- setattr_shell_option(key, val)
set job attribute: shell option
- property stderr
Path to use for stderr.
- property stdin
Path to use for stdin.
- property stdout
Path to use for stdout.
- property tasks
Jobspec tasks section
- property version
Jobspec version section
- class flux.job.JobspecV1(resources, tasks, **kwargs)
Bases:
Jobspec
- classmethod from_batch_command(script, jobname, args=None, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None)
Create a Jobspec describing a nested Flux instance controlled by a script.
The nested Flux instance will execute the script with the given command-line arguments after copying it and setting the executable bit. Conceptually, this differs from the from_nest_command, which also creates a nested Flux instance, in that it a) requires the initial program of the new instance to be an executable text file and b) creates the initial program from a string rather than using an executable existing somewhere on the filesystem.
Use setters to assign additional properties.
- Parameters
script (str) -- contents of the script to execute, as a string. The script should have a shebang (e.g. #!/bin/sh) at the top.
jobname (str) -- name to use for system.job.name attribute This will be the default job name reported by Flux.
args (iterable of str) -- arguments to pass to script
num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.
cores_per_slot (int) -- number of cores to allocate per slot
gpus_per_slot (int) -- number of GPUs to allocate per slot
num_nodes (int) -- distribute allocated resource slots across N individual nodes
broker_opts (iterable of str) -- options to pass to the new Flux broker
conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and broker_opts will be extended to add -c{{tmpdir}}/conf.json
- classmethod from_command(command, num_tasks=1, cores_per_task=1, gpus_per_task=None, num_nodes=None, exclusive=False)
Factory function that builds the minimum legal v1 jobspec.
Use setters to assign additional properties.
- Parameters
command (iterable of str) -- command to execute
num_tasks -- number of MPI tasks to create
cores_per_task -- number of cores to allocate per task
gpus_per_task -- number of GPUs to allocate per task
num_nodes -- distribute allocated tasks across N individual nodes
- classmethod from_nest_command(command, num_slots=1, cores_per_slot=1, gpus_per_slot=None, num_nodes=None, broker_opts=None, exclusive=False, conf=None)
Create a Jobspec describing a nested Flux instance controlled by command.
Conceptually, this differs from the from_batch_command method in that a) the initial program of the nested Flux instance can be any executable on the file system, not just a text file and b) the executable is not copied at submission time.
Use setters to assign additional properties.
- Parameters
command (iterable of str) -- initial program for the nested Flux
instance --
num_slots (int) -- number of resource slots to create. Slots are an abstraction, and are only used (along with cores_per_slot and gpus_per_slot) to determine the nested instance's allocation size and layout.
cores_per_slot (int) -- number of cores to allocate per slot
gpus_per_slot (int) -- number of GPUs to allocate per slot
num_nodes (int) -- distribute allocated resource slots across N individual nodes
broker_opts (iterable of str) -- options to pass to the new Flux broker
conf (dict) -- optional broker configuration to pass to the child instance brokers. If set, conf will be set in the jobspec 'files' (RFC 37 File Archive) attribute as conf.json, and broker_opts will be extended to add -c{{tmpdir}}/conf.json
- classmethod per_resource(command, ncores=None, nnodes=None, per_resource_type=None, per_resource_count=None, gpus_per_node=None, exclusive=False)
Factory function that builds a v1 jobspec from an explicit count of nodes or cores and a number of tasks per one of these resources.
Use setters to assign additional properties.
- Parameters
ncores -- Total number of cores to allocate
nnodes -- Total number of nodes to allocate
per_resource_type -- (optional) Type of resource over which to schedule a count of tasks. Only "node" or "core" are currently supported.
per_resource_count -- (optional) Count of tasks per per_resource_type
gpus_per_node -- With nnodes, request a number of gpus per node
exclusive -- with nnodes, request whole nodes exclusively
Job manipulation
After a job has been submitted, it will be assigned an ID. That ID can then be used for getting information about the job or for manipulating it---see the synchronous and asynchronous sections below.
To translate job ID representations, use the flux.job.JobID
class:
- class flux.job.JobID(value)
Class used to represent a Flux JOBID
JobID is a subclass of int, so may be used in place of integer. However, a JobID may be created from any valid RFC 19 FLUID encoding, including:
decimal integer (no prefix)
hexadecimal integer (prefix 0x)
dotted hex (dothex) (xxxx.xxxx.xxxx.xxxx)
kvs dir (dotted hex with job. prefix)
RFC19 F58: (Base58 encoding with prefix ƒ or f)
basemoji (emoji encoding)
A JobID object also has properties for encoding a JOBID into each of the above representations, e.g. jobid.f85, jobid.words, jobid.dothex...
- property dec
Return decimal integer representation of a JobID
- property dothex
Return dotted hexadecimal representation of a JobID
- property emoji
Return emoji representation of a JobID
- encode(encoding='dec')
Encode a JobID to alternate supported format
- property f58
Return RFC19 F58 representation of a JobID
- property f58plain
Return RFC19 F58 representation of a JobID with ASCII prefix
- property hex
Return 0x-prefixed hexadecimal representation of a JobID
- property kvs
Return KVS directory path of a JobID
- property orig
Return the original string used to create the JobID
- property words
Return words (mnemonic) representation of a JobID
Synchronous interface
The simplest way to interact with Flux is with the synchronous functions listed
below.
However, these functions introduce a lot of overhead (e.g. any function that
waits for a job to reach a certain state, such as result
may block for an indeterminate amount of time) and may not be suitable for
interacting with large numbers of jobs in time-sensitive applications.
To spend less time blocking in the Flux reactor, consider using one of the asynchronous interfaces.
- flux.job.submit(flux_handle, jobspec, urgency=_flux._core.lib.FLUX_JOB_URGENCY_DEFAULT, waitable=False, debug=False, pre_signed=False)
Submit a job to Flux
Ask Flux to run a job, blocking until a job ID is assigned.
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobspec (Jobspec or its string encoding) -- jobspec defining the job request
urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.
waitable (bool) -- allow result to be fetched with job.wait() (default is False). Waitable=true is restricted to the instance owner.
debug (bool) -- enable job manager debugging events to job eventlog (default is False)
pre_signed (bool) -- jobspec argument is already signed (default is False)
- Returns
job ID
- Return type
int
- flux.job.event_watch(flux_handle, jobid, eventlog='eventlog')
Python generator to watch all events for a job
Synchronously watch events a job eventlog via a simple generator.
Example
>>> for event in job.event_watch(flux_handle, jobid): ... # do something with event
See also
- 21/Job States and Events Version 1
Documentation for the events in the main eventlog
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID on which to watch events
eventlog -- eventlog path in job kvs directory (default: eventlog)
- flux.job.event_wait(flux_handle, jobid, name, eventlog='eventlog', raiseJobException=True)
Wait for a job eventlog entry 'name'
Wait synchronously for an eventlog entry named "name" and return the entry to caller, raises OSError with ENODATA if event never occurred
See also
- 21/Job States and Events Version 1
Documentation for the events in the main eventlog
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID on which to wait for eventlog events
name -- The event name for which to wait
eventlog -- eventlog path in job kvs directory (default: eventlog)
raiseJobException -- if True, watch for job exception events and raise a JobException if one is seen before event 'name' (default=True)
- Returns
an EventLogEvent object, or raises OSError if eventlog ended before matching event was found
- Return type
- flux.job.kill(flux_handle: Flux, jobid: Union[JobID, int], signum: Optional[int] = None)
Send a signal to a running job.
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID of the job to kill
signum -- signal to send (default SIGTERM)
- flux.job.cancel(flux_handle: Flux, jobid: Union[JobID, int], reason: Optional[str] = None)
Cancel a pending or or running job
- Parameters
flux_handle -- handle for Flux broker from flux.Flux()
jobid -- the job ID of the job to cancel
reason -- the textual reason associated with the cancelation
- flux.job.result(flux_handle, jobid, flags=0)
Wait for a job to reach its terminal state and return job result
This function waits for job completion by watching the eventlog. Because this function must process the eventlog, it is a little more heavyweight than
flux.job.wait.wait()
. However, it may be used for non-waitable jobs, jobs that have already completed, and works multiple times on the same jobid.This function will wait until the job result is available and returns a
flux.job.info.JobInfo
object filled with the available information.Note: The JobInfo object returned from this method is only capable of computing a small subset of job information, including, but possibly not limited to:
id
t_submit, t_run, t_cleanup
returncode
waitstatus
runtime
result
result_id
- Parameters
flux_handle (
flux.Flux
) -- handle for Flux brokerjobid (
flux.job.JobID
) -- the jobid for which to fetch result
- Returns
A limited JobInfo object which can be used to fetch the final job result, returncode, etc.
- Return type
- flux.job.wait(flux_handle, jobid=_flux._core.lib.FLUX_JOBID_ANY)
Wait for a job to complete
Submit a request to wait for job completion, blocking until a response is received, then return the job status.
Only jobs submitted with waitable=True can be waited for.
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID to wait for (default is any waitable job)
- Returns
job status, a tuple of: Job ID (int), success (bool), and an error (string) if success=False
- Return type
- flux.job.get_job(flux_handle, jobid)
Get job information dictionary based on a jobid
This is a courtesy, blocking function for users looking for details about a job after submission. The dictionary includes the job identifier, userid that submit it, urgency, priority, t_submit, t_depend, (and others when finished), state, name, ntasks, ncores, duration, nnodes, result, runtime, returncode, waitstatus, nodelist, and exception type, severity, and note.
get_job
After submitting a job, if you quickly want to see information for it,
you can use flux.job.get_job
. Here is an example:
import flux
import flux.job
# It's encouraged to create a handle to use across commands
handle = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(["sleep", "60"])
jobid = flux.job.submit(handle, jobspec)
job_meta = flux.job.get_job(handle, jobid)
{
"job": {
"id": 676292747853824,
"userid": 0,
"urgency": 16,
"priority": 16,
"t_submit": 1667760398.4034982,
"t_depend": 1667760398.4034982,
"state": "SCHED",
"name": "sleep",
"ntasks": 1,
"ncores": 1,
"duration": 0.0
}
}
If the jobid you are asking for does not exist, None
will be returned.
For the interested user, this is a courtesy function that wraps using the identifier
to create an RPC object, serializing that to string, and loading as JSON.
Since it is likely you, as the user, will be interacting with flux.job
, it
is also logical you would look for this function to retrieve the job on the same
module.
result
vs wait
Both flux.job.result
and flux.job.wait
return when a job has completed.
However, wait
only works on jobs which have been submitted
with the waitable
flag, and the ability to set that flag is
restricted to instance owners.
Asynchronous interfaces
There are two primary asynchronous interfaces to job manipulations. The first is
an event-loop interface, which is closer to the native C interface, and consists of
functions like flux.job.submit_async
and flux.job.result_async
(note the
functions are the same as in the synchronous interface, only with an "_async"
suffix). The second is an interface which is almost identical to the
concurrent.futures
interface in Python's standard library, and it consists of the
flux.job.FluxExecutor
class. Both interfaces deal in callbacks and
futures, the difference being that the FluxExecutor
is designed so that
all futures fulfill in the background, and there is no need for user code
to enter the Flux event loop, while the event-loop-based interface
requires the user to call into the Flux event loop in order for futures
to fulfill and for callbacks to trigger.
Our general recommendation is that you use the FluxExecutor
interface
unless you are familiar with event-loop based programming.
The FluxExecutor
interface
Basic FluxExecutor
usage consists of creating an executor,
submitting jobspecs to it, and then attaching callbacks to those
futures or waiting for them to complete. Executors must have
.shutdown()
called when they are no longer to be used. However,
they also support the context-manager protocol (i.e. with executor ...:
)
which will call shutdown
upon leaving the with
block.
Example usage:
import concurrent.futures
import flux.job
jobspec = flux.job.JobspecV1.from_command(["/bin/true"])
with flux.job.FluxExecutor() as executor:
futs = [executor.submit(jobspec) for _ in range(5)]
for f in concurrent.futures.as_completed(futs):
print(f.result())
- class flux.job.FluxExecutor(threads=1, thread_name_prefix='', poll_interval=0.1, handle_args=(), handle_kwargs={})
Provides a method to submit and monitor Flux jobs asynchronously.
Forks threads to complete futures and fetch event updates in the background.
Inspired by the
concurrent.futures.Executor
class, with the following interface differences:the
submit
method takes aflux.job.Jobspec
instead of a callable and its arguments, and returns aFluxExecutorFuture
representing that job.the
map
method is not supported, given that the executor consumes Jobspecs rather than callables.
Otherwise, the FluxExecutor is faithful to its inspiration. In addition to methods and behavior defined by
concurrent.futures
, FluxExecutor provides its futures with event updates and the jobid of the underlying job.Futures returned by
submit
have their jobid set as soon as it is available, which is always before the future completes.The executor can also monitor existing jobs through the
attach
method, which takes a job ID and returns a future representing the job.Futures may receive event updates even after they complete. The names of valid events are contained in the
EVENTS
class attribute.The result of a future is the highest process exit status of the underlying job (in which case the result is an integer greater than or equal to 0), or
-signum
wheresignum
is the number of the signal that caused the process to terminate (in which case the result is an integer less than 0).A future is marked as "running" (and can no longer be canceled using the
.cancel()
method) once it reaches a certain point in the Executor---a point which is completely unrelated to the status of the underlying Flux job. The underlying Flux job may still be canceled at any point before it terminates, however, using theflux.job.cancel
andflux.job.kill
functions, in which case aJobException
will be set.If the jobspec is invalid, an
OSError
is set.- Parameters
threads -- the number of worker threads to fork.
thread_name_prefix -- used to control the names of
threading.Thread
objects created by the executor, for easier debugging.poll_interval -- the interval (in seconds) in which to break out of the flux event loop to check for new job submissions.
handle_args -- positional arguments to the
flux.Flux
instances used by the executor.handle_kwargs -- keyword arguments to the
flux.Flux
instances used by the executor.
- EVENTS = frozenset({'alloc', 'clean', 'debug', 'depend', 'exception', 'finish', 'flux-restart', 'free', 'priority', 'release', 'start', 'submit', 'urgency'})
A set containing valid event names for attaching to futures.
- attach(jobid)
Attach a
FluxExecutorFuture
to an existing job ID and return it.Returned futures will behave identically to futures returned by the
FluxExecutor.submit
method. If the job ID is not accepted by Flux an exception will be set on the future.This method is primarily useful for monitoring jobs that have been submitted through other mechanisms.
- Parameters
jobid (int) -- jobid to attach to.
- Raises
RuntimeError -- if
shutdown
has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).
- shutdown(wait=True, *, cancel_futures=False)
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters
wait -- If
True
, then this method will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.cancel_futures -- If
True
, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won't be cancelled, regardless of the value ofcancel_futures
.
- submit(*args, **kwargs)
Submit a jobspec to Flux and return a
FluxExecutorFuture
.Accepts the same positional and keyword arguments as
flux.job.submit
, except for theflux.job.submit
function's first argument,flux_handle
.- Parameters
jobspec (Jobspec or its string encoding) -- jobspec defining the job request
urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.
waitable (bool) -- allow result to be fetched with
flux.job.wait()
(default is False). Waitable=True is restricted to the instance owner.debug (bool) -- enable job manager debugging events to job eventlog (default is False)
pre_signed (bool) -- jobspec argument is already signed (default is False)
- Raises
RuntimeError -- if
shutdown
has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).
Futures should not be created directly by user code. Since
the futures are subclasses of concurrent.executors.Future
,
you can invoke concurrent.futures
functions like wait
or
as_completed
on them.
- class flux.job.FluxExecutorFuture(owning_thread_id, *args, **kwargs)
A
concurrent.futures.Future
subclass that represents a single Flux job.In addition to all of the
concurrent.futures.Future
functionality,FluxExecutorFuture
instances offer:The
jobid
andadd_jobid_callback
methods for retrieving the Flux jobid of the underlying job.The
add_event_callback
method to invoke callbacks when particular job-state events occur.
Valid events are contained in the
EVENTS
class attribute.- EVENTS = frozenset({'alloc', 'clean', 'debug', 'depend', 'exception', 'finish', 'flux-restart', 'free', 'priority', 'release', 'start', 'submit', 'urgency'})
A set containing the names of valid events.
- add_done_callback(*args, **kwargs)
Attaches a callable that will be called when the future finishes.
- Parameters
fn -- A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.
- Returns
self
- add_event_callback(event, callback)
Add a callback to be invoked when an event occurs.
The callback will be invoked, with the future as the first argument and the
flux.job.EventLogEvent
as the second, whenever the event occurs. If the event occurs multiple times, the callback will be invoked with each different EventLogEvent instance. If the event never occurs, the callback will never be invoked.Added callables are called in the order that they were added and may be called in another thread. If the callable raises an
Exception
subclass, it will be logged and ignored. If the callable raises aBaseException
subclass, the behavior is undefined.If the event has already occurred, the callback will be called immediately.
- Parameters
event -- the name of the event to add the callback to.
callback -- a callable taking the future and the event as arguments.
- Returns
self
- add_jobid_callback(callback)
Attaches a callable that will be called when the jobid is ready.
Added callables are called in the order that they were added and may be called in another thread. If the callable raises an
Exception
subclass, it will be logged and ignored. If the callable raises aBaseException
subclass, the behavior is undefined.- Parameters
callback -- a callable taking the future as its only argument.
- Returns
self
- cancel(*args, **kwargs)
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
- exception(*args, **kwargs)
Return the exception raised by the call that the future represents.
- Parameters
timeout -- The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time.
- Returns
The exception raised by the call that the future represents or None if the call completed without raising.
- Raises
CancelledError -- If the future was cancelled.
TimeoutError -- If the future didn't finish executing before the given timeout.
- jobid(timeout=None)
Return the jobid of the Flux job that the future represents.
- Parameters
timeout -- The number of seconds to wait for the jobid. If None, then there is no limit on the wait time.
- Returns
a positive integer jobid.
- Raises
concurrent.futures.TimeoutError -- If the jobid is not available before the given timeout.
concurrent.futures.CancelledError -- If the future was cancelled.
RuntimeError -- If the job could not be submitted (e.g. if the jobspec was invalid).
- result(*args, **kwargs)
Return the result of the call that the future represents.
- Parameters
timeout -- The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.
- Returns
The result of the call that the future represents.
- Raises
CancelledError -- If the future was cancelled.
TimeoutError -- If the future didn't finish executing before the given timeout.
Exception -- If the call raised then that exception will be raised.
- set_exception(exception)
Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
Asynchronous event-loop interface
General usage consists of initiating some events, then entering the Flux reactor and writing all code thereafter as callbacks.
For instance, the below example submits five jobs, then enters the reactor and fires off callbacks as futures complete.
import flux
import flux.job
def submit_cb(fut, flux_handle):
# when this callback fires, the jobid will be ready
jobid = fut.get_id()
# Create a future representing the result of the job
result_fut = flux.job.result_async(flux_handle, jobid)
# attach a callback to fire when the job finishes
result_fut.then(result_cb)
def result_cb(fut):
job = fut.get_info()
result = job.result.lower()
print(f"{job.id}: {result} with returncode {job.returncode}")
flux_handle = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(["/bin/true"])
for _ in range(5):
# submit 5 futures and attach callbacks to each one
submit_future = flux.job.submit_async(flux_handle, jobspec)
submit_future.then(submit_cb, flux_handle)
# enter the flux event loop (the 'reactor') to trigger the callbacks
# once the futures complete
flux_handle.reactor_run()
- flux.job.submit_async(flux_handle, jobspec, urgency=_flux._core.lib.FLUX_JOB_URGENCY_DEFAULT, waitable=False, debug=False, pre_signed=False, novalidate=False)
Ask Flux to run a job, without waiting for a response
Submit a job to Flux. This method returns immediately with a Flux Future, which can be used obtain the job ID later.
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobspec (Jobspec or its string encoding) -- jobspec defining the job request
urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.
waitable (bool) -- allow result to be fetched with job.wait() (default is False). Waitable=True is restricted to the instance owner.
debug (bool) -- enable job manager debugging events to job eventlog (default is False)
pre_signed (bool) -- jobspec argument is already signed (default is False)
novalidate (bool) -- jobspec does not need to be validated. (default is False) novalidate=True is restricted to the instance owner.
- Returns
a Flux Future object for obtaining the assigned jobid
- Return type
- flux.job.event_watch_async(flux_handle, jobid, eventlog='eventlog')
Asynchronously get eventlog updates for a job
Asynchronously watch the events of a job eventlog.
Returns a JobEventWatchFuture. Call .get_event() from the then callback to get the currently returned event from the Future object.
See also
- 21/Job States and Events Version 1
Documentation for the events in the main eventlog
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID on which to watch events
eventlog -- eventlog path in job kvs directory (default: eventlog)
- Returns
a JobEventWatchFuture object
- Return type
- flux.job.cancel_async(flux_handle: Flux, jobid: Union[JobID, int], reason: Optional[str] = None)
Cancel a pending or or running job asynchronously
- Parameters
flux_handle -- handle for Flux broker from flux.Flux()
jobid -- the job ID of the job to cancel
reason -- the textual reason associated with the cancelation
- Returns
a future fulfilled when the cancelation completes
- Return type
- flux.job.kill_async(flux_handle: Flux, jobid: Union[JobID, int], signum: Optional[int] = None)
Send a signal to a running job asynchronously
- flux.job.result_async(flux_handle, jobid, flags=0)
Wait for a job to reach its terminal state and return job result
This function waits for job completion by watching the eventlog. Because this function must process the eventlog, it is a little more heavyweight than
flux.job.wait.wait_async()
. However, it may be used for non-waitable jobs, jobs that have already completed, and works multiple times on the same jobid.Once the eventlog terminal state is reached, the returned Future is fulfilled with a set of information gleaned from the processed events, including whether the job started running (in case it was canceled before starting), any exception state, and the final exit code and wait(2) status.
- Parameters
flux_handle (
flux.Flux
) -- handle for Flux brokerjobid (
flux.job.JobID
) -- the jobid for which to fetch result
- Returns
A Future fulfilled with the job result.
- Return type
- flux.job.wait_async(flux_handle, jobid=_flux._core.lib.FLUX_JOBID_ANY)
Wait for a job to complete, asynchronously
Submit a request to wait for job completion. This method returns immediately with a Flux Future, which can be used to process the result later.
Only jobs submitted with waitable=True can be waited for.
- Parameters
flux_handle (Flux) -- handle for Flux broker from flux.Flux()
jobid -- the job ID to wait for (default is any waitable job)
- Returns
a Flux Future object for obtaining the job result
- Return type