flux.job.executor module

This module defines the FluxExecutor and FluxExecutorFuture classes.

class flux.job.executor.FluxExecutor(threads=1, thread_name_prefix='', poll_interval=0.1, handle_args=(), handle_kwargs={})

Bases: object

Provides a method to submit and monitor Flux jobs asynchronously.

Forks threads to complete futures and fetch event updates in the background.

Inspired by the concurrent.futures.Executor class, with the following interface differences:

  • the submit method takes a flux.job.Jobspec instead of a callable and its arguments, and returns a FluxExecutorFuture representing that job.
  • the map method is not supported, given that the executor consumes Jobspecs rather than callables.

Otherwise, the FluxExecutor is faithful to its inspiration. In addition to methods and behavior defined by concurrent.futures, FluxExecutor provides its futures with event updates and the jobid of the underlying job.

Futures returned by submit have their jobid set as soon as it is available, which is always before the future completes.

The executor can also monitor existing jobs through the attach method, which takes a job ID and returns a future representing the job.

Futures may receive event updates even after they complete. The names of valid events are contained in the EVENTS class attribute.

The result of a future is the highest process exit status of the underlying job (in which case the result is an integer greater than or equal to 0), or -signum where signum is the number of the signal that caused the process to terminate (in which case the result is an integer less than 0).

A future is marked as "running" (and can no longer be canceled using the .cancel() method) once it reaches a certain point in the Executor---a point which is completely unrelated to the status of the underlying Flux job. The underlying Flux job may still be canceled at any point before it terminates, however, using the flux.job.cancel and flux.job.kill functions, in which case a JobException will be set.

If the jobspec is invalid, an OSError is set.

Parameters:
  • threads -- the number of worker threads to fork.
  • thread_name_prefix -- used to control the names of threading.Thread objects created by the executor, for easier debugging.
  • poll_interval -- the interval (in seconds) in which to break out of the flux event loop to check for new job submissions.
  • handle_args -- positional arguments to the flux.Flux instances used by the executor.
  • handle_kwargs -- keyword arguments to the flux.Flux instances used by the executor.
EVENTS = frozenset({'urgency', 'start', 'depend', 'flux-restart', 'clean', 'submit', 'priority', 'exception', 'finish', 'free', 'alloc', 'debug', 'release'})

A set containing valid event names for attaching to futures.

attach(jobid)

Attach a FluxExecutorFuture to an existing job ID and return it.

Returned futures will behave identically to futures returned by the FluxExecutor.submit method. If the job ID is not accepted by Flux an exception will be set on the future.

This method is primarily useful for monitoring jobs that have been submitted through other mechanisms.

Parameters:jobid (int) -- jobid to attach to.
Raises:RuntimeError -- if shutdown has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).
shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait -- If True, then this method will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
  • cancel_futures -- If True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won't be cancelled, regardless of the value of cancel_futures.
submit(*args, **kwargs)

Submit a jobspec to Flux and return a FluxExecutorFuture.

Accepts the same positional and keyword arguments as flux.job.submit, except for the flux.job.submit function's first argument, flux_handle.

Parameters:
  • jobspec (Jobspec or its string encoding) -- jobspec defining the job request
  • urgency (int) -- job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner.
  • waitable (bool) -- allow result to be fetched with flux.job.wait() (default is False). Waitable=True is restricted to the instance owner.
  • debug (bool) -- enable job manager debugging events to job eventlog (default is False)
  • pre_signed (bool) -- jobspec argument is already signed (default is False)
Raises:

RuntimeError -- if shutdown has been called or if an error has occurred and new jobs cannot be submitted (e.g. a remote Flux instance can no longer be communicated with).

class flux.job.executor.FluxExecutorFuture(owning_thread_id, *args, **kwargs)

Bases: concurrent.futures._base.Future

A concurrent.futures.Future subclass that represents a single Flux job.

In addition to all of the concurrent.futures.Future functionality, FluxExecutorFuture instances offer:

  • The jobid and add_jobid_callback methods for retrieving the Flux jobid of the underlying job.
  • The add_event_callback method to invoke callbacks when particular job-state events occur.

Valid events are contained in the EVENTS class attribute.

EVENTS = frozenset({'urgency', 'start', 'depend', 'flux-restart', 'clean', 'submit', 'priority', 'exception', 'finish', 'free', 'alloc', 'debug', 'release'})

A set containing the names of valid events.

add_done_callback(*args, **kwargs)

Attaches a callable that will be called when the future finishes.

Parameters:fn -- A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.
Returns:self
add_event_callback(event, callback)

Add a callback to be invoked when an event occurs.

The callback will be invoked, with the future as the first argument and the flux.job.EventLogEvent as the second, whenever the event occurs. If the event occurs multiple times, the callback will be invoked with each different EventLogEvent instance. If the event never occurs, the callback will never be invoked.

Added callables are called in the order that they were added and may be called in another thread. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.

If the event has already occurred, the callback will be called immediately.

Parameters:
  • event -- the name of the event to add the callback to.
  • callback -- a callable taking the future and the event as arguments.
Returns:

self

add_jobid_callback(callback)

Attaches a callable that will be called when the jobid is ready.

Added callables are called in the order that they were added and may be called in another thread. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.

Parameters:callback -- a callable taking the future as its only argument.
Returns:self
cancel(*args, **kwargs)

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

exception(*args, **kwargs)

Return the exception raised by the call that the future represents.

Parameters:

timeout -- The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError -- If the future was cancelled.
  • TimeoutError -- If the future didn't finish executing before the given timeout.
jobid(timeout=None)

Return the jobid of the Flux job that the future represents.

Parameters:

timeout -- The number of seconds to wait for the jobid. If None, then there is no limit on the wait time.

Returns:

a positive integer jobid.

Raises:
  • concurrent.futures.TimeoutError -- If the jobid is not available before the given timeout.
  • concurrent.futures.CancelledError -- If the future was cancelled.
  • RuntimeError -- If the job could not be submitted (e.g. if the jobspec was invalid).
result(*args, **kwargs)

Return the result of the call that the future represents.

Parameters:

timeout -- The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.

Returns:

The result of the call that the future represents.

Raises:
  • CancelledError -- If the future was cancelled.
  • TimeoutError -- If the future didn't finish executing before the given timeout.
  • Exception -- If the call raised then that exception will be raised.
set_exception(exception)

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.