flux-jobtap-plugins(7)

DESCRIPTION

The jobtap interface loads builtin and external plugins into the job manager broker module. Plugins assign job priorities, manage dependencies, debug job state flow, or extend job manager functionality.

Jobtap plugins use the Flux standard plugin format. Each plugin exports flux_plugin_init(), which calls flux_plugin_add_handler(3) to register functions for callback topic strings described in JOB CALLBACK TOPICS.

Each callback function uses the Flux standard plugin callback form, e.g.:

int callback (flux_plugin_t *p,
              const char *topic,
              flux_plugin_arg_t *args,
              void *arg);

where p is the handle for the current plugin, topic is the topic string for the invoked callback, args contains plugin arguments unpacked with flux_plugin_arg_unpack(3), and arg is any opaque argument passed when registering the handler.

Multiple plugins may be loaded simultaneously. All matching handlers are called in load order. See flux-conf-job-manager(7) or flux-jobtap(1) for loading plugins.

JOBTAP PLUGIN NAMES

Plugins are referenced in flux jobtap list output by file name. Plugins loaded by fully qualified path are shortened to basename, yielding names like plugin-name.so.

Builtin plugins are named with a leading ., hidden in flux jobtap list, and do not match glob(7) * or "all" keyword (similar to hidden filesystem files). List builtin plugins with -a, --all; remove them by explicit name or pattern including the leading ..

A plugin may assign a name with flux_plugin_set_name(3), but this name is not displayed in flux jobtap list or used in matching. The internal name is used only in service names generated by flux_jobtap_service_register(): job-manager.<name>.<method>. If no name is set, the basename minus trailing .so is used.

JOBTAP PLUGIN ARGUMENTS

For job-specific callbacks, all job data is passed to the plugin via flux_plugin_arg_t *args, and return data is sent back to the job manager via the same args. Unpack incoming arguments using flux_plugin_arg_unpack(3), e.g.:

rc = flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                             "{s{s:o}, s:I}",
                             "jobspec", "resources", &resources,
                             "id", &id);

unpacks the resources section of jobspec and the jobid into resources and id.

The full list of available args includes the following:

name

type

description

jobspec

o

jobspec with environment redacted

R

o

R with scheduling key redacted (RUN state or later)

id

I

jobid

state

i

current job state

prev_state

i

previous state (job.state.* callbacks)

userid

i

userid

urgency

i

current urgency

priority

I

current priority

t_submit

f

submit timestamp in floating point seconds

entry

o

posted eventlog entry, including context

end_event

o

copy of event that caused transition to CLEANUP, if available

Pack return arguments using FLUX_PLUGIN_ARG_OUT and optionally FLUX_PLUGIN_ARG_REPLACE flags. To return a priority:

rc = flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
                           "{s:I}",
                           "priority", (int64_t) priority);

While a job is pending, plugin callbacks may add job annotations by returning a value for the annotations key:

flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
                      "{s:{s:s}}",
                      "annotations", "test", value);

UTILITY FUNCTIONS

Getting the Flux Handle

flux_t *flux_jobtap_get_flux (flux_plugin_t *p);

Returns the flux_t handle for the broker instance. Use for RPCs, logging, or other Flux APIs. Returns NULL on failure.

The handle is owned by the plugin infrastructure and must not be destroyed.

Example:

static int callback (flux_plugin_t *p,
                    const char *topic,
                    flux_plugin_arg_t *args,
                    void *arg)
{
    flux_t *h = flux_jobtap_get_flux (p);

    flux_log (h, LOG_INFO, "callback invoked for topic %s", topic);

    return 0;
}

JOB CALLBACK TOPICS

The following job callback "topic strings" are currently provided by the jobtap interface:

job.create

Notifies plugins about a newly introduced job in three situations:

  1. on job submission

  2. when the job manager restarts and reloads a job from KVS

  3. when a new jobtap plugin is loaded

In case 1, job state is always FLUX_JOB_STATE_NEW; in cases 2 and 3, jobs can be in any state except FLUX_JOB_STATE_INACTIVE.

In case 1, the job is not yet validated. job.create may reject it like job.validate using flux_jobtap_reject_job(3) and a negative return code.

In cases 2 and 3, handle fatal errors by raising a fatal job exception.

Posting events from a job.create handler is safe in all cases.

Note

In case 3, job.create is called for active jobs in unspecified order. For ordering guarantees, call flux_jobtap_set_load_sort_order(3) from flux_plugin_init(). The mode parameter is state (sort by state then jobid) or -state (reverse state then jobid). For example:

flux_jobtap_set_load_sort_order (p, "state");

ensures job.create and job.new are called on PRIORITY jobs first, then DEPEND, then SCHED, and so on.

job.destroy

Called after a job is rejected or becomes inactive.

job.validate

Allows plugins to reject jobs before introduction to the job manager. Rejected jobs cause submission errors in the client, and job data in KVS is purged. No further callbacks except job.destroy occur for rejected jobs. If a job is not rejected, job.new is invoked immediately after job.validate. Implement limits or checks in job.validate, but confine accounting to job.new, since job.new is also called during job-manager restart or plugin reload.

job.dependency.*

Allows dependency plugins to notify the job manager they handle a given dependency _scheme_. The job manager scans the attributes.system.dependencies array and issues a job.dependency.SCHEME callback for each dependency. If no plugin registered for SCHEME, the job is rejected. The plugin calls flux_jobtap_dependency_add(3) to add a named dependency (if necessary). Jobs remain in DEPEND state until all dependencies are removed via flux_jobtap_dependency_remove(3). See job.state.depend for more information. Reject jobs with invalid specifications using flux_jobtap_reject_job(3) and a negative return code.

job.new

Announces a new valid job. Called in the same three situations as job.create.

job.state.*

Called after a job state transition. The callback executes after the state is published to the eventlog but before action is taken on the state (since the action may immediately trigger another transition).

job.event.*

The job.event.* callbacks are made only for plugins subscribed to a job with flux_jobtap_job_subscribe(). All job events trigger this callback for subscribed plugins, including events that do not result in state transitions (e.g., start or non-fatal exception).

Subscribing to Job Events:

int flux_jobtap_job_subscribe (flux_plugin_t *p, flux_jobid_t id);
void flux_jobtap_job_unsubscribe (flux_plugin_t *p, flux_jobid_t id);

flux_jobtap_job_subscribe() subscribes the plugin to job.event.* callbacks for job id. Returns 0 on success, -1 on failure.

flux_jobtap_job_unsubscribe() unsubscribes from job id events.

Subscriptions are cleaned up automatically when the job becomes inactive or the plugin is unloaded.

job.state.depend

The final place to add dependencies to a job. Add dependencies via flux_jobtap_dependency_add(), which attaches a named dependency to a job. Jobs remain in DEPEND state until all dependencies are removed via flux_jobtap_dependency_remove(). A dependency may be used only once. A second flux_jobtap_dependency_add() call with the same description returns EEXIST, even if the dependency was removed. This enables idempotent operation during job-manager or plugin restart.

Note

See EVENT PROCESSING MODEL for important information about when state transitions occur synchronously when removing dependencies.

job.state.priority

Priority-managing plugins must return a priority at callback end. If the priority is not available, use flux_jobtap_priority_unavail() to indicate it cannot be set. Jobs without a priority (unavailable or no priority plugin loaded) remain in PRIORITY state until assigned. Set the priority asynchronously using flux_jobtap_reprioritize_job(). See PRIORITY for details on plugin priority management.

Note

See EVENT PROCESSING MODEL for important information about when the job.state.sched callback is invoked relative to this callback.

job.state.sched

Plugins may set R in output args. If R is not already assigned, this forces R for the current job and bypasses the scheduler.

job.priority.get

Called when the job manager updates a single job priority. The plugin returns a priority immediately, but may use flux_jobtap_priority_unavail() if unavailable when a job is in PRIORITY state. Returning unavailable priority in SCHED state is an error (logged but ignored). Request job.priority.get for all jobs via flux_jobtap_reprioritize_all(). See PRIORITY for details.

job.inactive-add

Job transitioned to INACTIVE state and was added to the inactive hash.

job.inactive-remove

Job was purged from the inactive hash.

job.update

Job was updated with an RFC 21 jobspec-update event.

CONFIGURATION CALLBACK TOPIC

Plugins may register a conf.update callback. The current/proposed configuration object is in input arguments under the conf key. The callback is invoked when:

  • The plugin is first loaded. Failure causes plugin load to fail.

  • The configuration changes. Failure causes flux config reload to fail.

Return 0 on success, -1 on failure. On failure, optionally set a human readable error in the errstr output argument. Use flux_jobtap_error() for convenience.

JOB UPDATE CALLBACKS

The job manager allows updates of select job attributes through plugins. Register a callback matching job.update.KEY, where KEY is a period-delimited jobspec attribute (e.g., job.update.attributes.system.duration). Requested updates are passed in the updates key.

job.update.* callbacks allow or deny updates of specific attributes. Updates are denied by default unless a callback exists for the attribute and the plugin returns 0. Deny updates by returning -1; optionally set an error message with flux_jobtap_error(3).

After all updates are allowed, the updated jobspec passes through the job.validate plugin stack for validation. Note an update is already validated by setting a validated flag in FLUX_PLUGIN_OUT_ARGS. If all updated attributes have this flag, validation is skipped. This allows instance owners to update attributes beyond limits.

Some updates benefit from a feasibility check before application. This prevents users from making feasible jobs infeasible. The update plugin determines if feasibility checking is needed. Feasibility checks occur only if a feasibility flag in FLUX_PLUGIN_OUT_ARGS is set. If any plugin requires a check, the updated jobspec as a whole is checked. If infeasible, the update is aborted and an error returned.

Updating one attribute may require modifying others. For example, updating attributes.system.queue may require modifying attributes.system.constraints to apply the new queue's constraints. Plugins may push an updates object onto FLUX_PLUGIN_OUT_ARGS with the same form as RFC 21 jobspec-update context. To update attributes.system.foo to 1, set:

{"updates": {"attributes.system.foo": 1}}

in FLUX_PLUGIN_OUT_ARGS before returning. Updates are applied by updating requested updates, so this may overwrite user-requested updates. Use with caution.

Example: Complete Update Handler

A plugin that allows updating job duration while enforcing limits:

#define MAX_DURATION 86400  /* 24 hours */

static int update_duration_cb (flux_plugin_t *p,
                               const char *topic,
                               flux_plugin_arg_t *args,
                               void *arg)
{
    double duration;

    /* Unpack the requested update value */
    if (flux_plugin_arg_unpack (args,
                                FLUX_PLUGIN_ARG_IN,
                                "{s:{s:f}}",
                                "updates",
                                  "attributes.system.duration", &duration) < 0) {
        return flux_jobtap_error (p, args, "failed to unpack duration");
    }

    /* Enforce maximum duration limit */
    if (duration > MAX_DURATION) {
        return flux_jobtap_error (p, args,
                                 "duration %.0f exceeds maximum of %d",
                                 duration, MAX_DURATION);
    }

    /* Enforce minimum duration */
    if (duration < 60.0) {
        return flux_jobtap_error (p, args,
                                 "duration %.0f is below minimum of 60",
                                 duration);
    }

    /* Accept the update - no additional validation needed */
    flux_plugin_arg_pack (args,
                         FLUX_PLUGIN_ARG_OUT,
                         "{s:i}",
                         "validated", 1);

    return 0;
}

int flux_plugin_init (flux_plugin_t *p)
{
    return flux_plugin_add_handler (p,
                                    "job.update.attributes.system.duration",
                                    update_duration_cb,
                                    NULL);
}

Validates duration updates against limits and sets validated=1 to skip additional validation.

UPDATING JOBS PROGRAMMATICALLY

int flux_jobtap_jobspec_update_pack (flux_plugin_t *p,
                                     const char *fmt,
                                     ...);

int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
                                        flux_jobid_t id,
                                        const char *fmt,
                                        ...);

flux_jobtap_jobspec_update_pack() updates the jobspec of the current job (the job for which the callback was invoked). Arguments follow Jansson pack format with period-delimited keys per RFC 21 jobspec-update format.

Call only from a jobtap callback for the job being updated, not for other jobs or outside callback context. Returns 0 on success, -1 on failure.

flux_jobtap_jobspec_update_id_pack() updates job id asynchronously, outside a callback for the target job. The job must not be the current job (use flux_jobtap_jobspec_update_pack() for that). Returns 0 on success, -1 on failure.

Restrictions:

  • Jobs in RUN, CLEANUP, or INACTIVE state cannot be updated

  • Jobs with readonly eventlogs cannot be updated

  • Updates are subject to job.validate callbacks

  • Use job.update.* callbacks to allow/deny specific attribute updates

Example updating job duration:

static int some_callback (flux_plugin_t *p,
                         const char *topic,
                         flux_plugin_arg_t *args,
                         void *arg)
{
    /* Update current job's duration to 1 hour */
    if (flux_jobtap_jobspec_update_pack (p,
                                        "{s:f}",
                                        "attributes.system.duration",
                                        3600.0) < 0)
        return -1;

    return 0;
}

Example updating another job asynchronously:

/* Called from timer or other async context */
static void timer_cb (flux_reactor_t *r,
                     flux_watcher_t *w,
                     int revents,
                     void *arg)
{
    flux_plugin_t *p = arg;
    flux_jobid_t target_job = get_target_job ();

    /* Update different job's priority attribute */
    flux_jobtap_jobspec_update_id_pack (p,
                                       target_job,
                                       "{s:i}",
                                       "attributes.system.urgency",
                                       16);
}

PLUGIN CALLBACK TOPICS

plugin.query

The job manager calls the plugin.query callback to allow plugins to provide data in response to a jobtap-query request (used by flux jobtap query PLUGIN). Plugins export internal state for inspection by placing data in callback output arguments, e.g.:

flux_plugin_arg_pack (p, FLUX_PLUGIN_ARG_OUT,
                      "{s:O}"
                      "data", internal_data);

EVENT PROCESSING MODEL

Understanding synchronous versus asynchronous event processing is important when posting events that affect other jobs or trigger multiple state transitions.

Event Queue Mechanism

The job manager maintains a per-job event queue to ensure in-order processing and prevent recursive callback invocations. When a plugin posts an event (via flux_jobtap_dependency_remove(), flux_jobtap_event_post_pack(), or by returning values like priority), the event is added to the target job's queue.

Queue processing depends on whether the target job is executing a callback:

Target job is idle (not processing any event):

The event is processed synchronously before the posting function returns. This triggers a cascade of state transitions and callbacks within the same call stack.

Target job is busy (currently in a callback):

The event is queued and processed after the current callback completes but before control returns to the original caller. This prevents recursive callbacks that see inconsistent job state.

Synchronous State Transition Chains

Jobtap operations can trigger chains of synchronous state transitions.

Example 1: Dependency Removal

When a plugin removes the last dependency from a job:

flux_jobtap_dependency_remove (p, target_jobid, "my-dependency");
// When this returns, target job may have reached SCHED state

If the target job is not currently in a callback, the following occurs synchronously before the function returns:

  1. A dependency-remove event is posted and processed immediately

  2. The job transitions from DEPEND to PRIORITY state

  3. All plugins' job.state.priority callbacks are invoked

  4. If a plugin returns a valid priority (or uses default priority):

    1. A priority event is posted (queued due to recursion prevention)

    2. After the job.state.priority callback returns, the queued priority event is processed

    3. The job transitions from PRIORITY to SCHED state

    4. All plugins' job.state.sched callbacks are invoked

    5. The job is enqueued with the scheduler

  5. Only then does flux_jobtap_dependency_remove() return

If the target job is currently in a callback, the event is queued, the function returns immediately, and state transitions occur after the target job's callback completes.

Example 2: Priority Assignment

When a plugin returns a priority in its job.state.priority callback:

int priority_cb (flux_plugin_t *p,
                 const char *topic,
                 flux_plugin_arg_t *args,
                 void *arg)
{
    // Calculate priority
    int64_t priority = calculate_priority (args);

    flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
                         "{s:I}",
                         "priority", priority);
    return 0;
    // When this returns, job.state.sched may have been called
}

When priority is available immediately:

  1. The plugin returns with a priority in output args

  2. A priority event is posted (queued to prevent recursion)

  3. After all job.state.priority callbacks complete, the queued event is processed

  4. The job transitions from PRIORITY to SCHED state

  5. All plugins' job.state.sched callbacks are invoked

  6. Control returns through the callback stack

Recursion Prevention

The event queue prevents infinite recursion:

Events posted to the current job:

Events posted to the job for which a callback was invoked are queued and processed after the callback returns, not during. This keeps job state stable during callback execution.

Cascading events during event processing:

Events triggered during processing (like priority assignment triggering a state transition) are queued and processed in order before returning, without deep recursion.

This design ensures:

  • Job state remains consistent during callbacks

  • Callbacks see a stable, well-defined view of the job

  • Events are processed in strict FIFO order

  • Stack depth remains bounded regardless of event chains

  • Jobs cannot change state "underneath" a running callback

Job Hold Mechanism

The job manager may temporarily "hold" events for a job using the job->hold_events flag. While set, all events for that job are queued, not processed immediately. This occurs during:

  • Batch KVS commit operations

  • Certain bulk job operations

When released, all queued events process synchronously in order before control returns. This is transparent to plugins but explains why multiple state transitions may occur atomically.

Implications for Plugin Authors

Synchronous cascades:

Callbacks on other jobs or later states may execute before posting functions return. Do not assume operations occur after event posting—the event and its callbacks complete within the posting function.

State consistency:

Job state remains constant during callback execution. A job in job.state.priority remains in PRIORITY state until the callback returns, even if the callback posts events.

Dependency removal timing:

If the target job is not in a callback:

  • job.state.priority callback executes before flux_jobtap_dependency_remove() returns

  • If priority is available, job.state.sched also executes before return

  • If priority is unavailable, target job remains in PRIORITY state

If the target job is in a callback, the dependency removal is queued and flux_jobtap_dependency_remove() returns immediately.

Priority assignment timing:

When returning a priority in job.state.priority:

  • job.state.sched callback executes before job.state.priority returns

  • All plugins' job.state.sched callbacks execute synchronously

  • The job may be enqueued with the scheduler before job.state.priority returns

Asynchronous operations:

For asynchronous work (I/O, RPCs, timers):

  • For priority: Use flux_jobtap_priority_unavail(), then call flux_jobtap_reprioritize_job() when ready

  • For dependencies: Add in job.state.depend, remove when condition met

  • Do not attempt to defer event processing—use the provided mechanisms

Querying job state:

Querying a job after posting an event returns the state after all synchronous transitions complete:

flux_jobtap_dependency_remove (p, other_id, "my-dependency");

// Query the job - it may now be in SCHED state
flux_plugin_arg_t *job_info = flux_jobtap_job_lookup (p, other_id);
int state;
flux_plugin_arg_unpack (job_info, FLUX_PLUGIN_ARG_IN,
                       "{s:i}", "state", &state);
// state may be FLUX_JOB_STATE_SCHED, not DEPEND or PRIORITY
Callback ordering:

Events are processed in order per job. Across jobs, processing order depends on the order events were posted.

Example: Dependency Chain

Consider a plugin managing a chain of dependent jobs. When the first job completes, it should release the second job:

int finish_cb (flux_plugin_t *p,
               const char *topic,
               flux_plugin_arg_t *args,
               void *arg)
{
    flux_jobid_t completed_id, dependent_id;

    flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                           "{s:I}", "id", &completed_id);

    dependent_id = get_dependent_job (completed_id);
    if (dependent_id != FLUX_JOBID_ANY) {
        // If dependent_id is idle: DEPEND -> PRIORITY -> SCHED synchronously
        // If dependent_id is busy: queued for later processing
        flux_jobtap_dependency_remove (p, dependent_id, "predecessor");
    }
    return 0;
}

When the target job is idle, synchronous processing releases dependent jobs immediately, minimizing startup latency.

JOB AUXILIARY DATA

Plugins can attach arbitrary data to jobs. This data persists across callbacks and state transitions, enabling stateful plugin operation.

API Functions

int flux_jobtap_job_aux_set (flux_plugin_t *p,
                             flux_jobid_t id,
                             const char *name,
                             void *val,
                             flux_free_f free_fn);

void *flux_jobtap_job_aux_get (flux_plugin_t *p,
                               flux_jobid_t id,
                               const char *name);

int flux_jobtap_job_aux_delete_value (flux_plugin_t *p,
                                      flux_jobid_t id,
                                      void *val);

flux_jobtap_job_aux_set() attaches data val with key name to job id. If free_fn is non-NULL, it destroys val when the job is destroyed or the plugin unloads. Returns 0 on success, -1 on failure.

flux_jobtap_job_aux_get() retrieves data for key name from job id. Returns the data pointer, or NULL if not found.

flux_jobtap_job_aux_delete_value() removes aux data by value from job id. Use when the key name is unknown but the value pointer is available. Returns 0 on success, -1 on failure.

The id parameter accepts FLUX_JOBTAP_CURRENT_JOB to operate on the current callback's job.

Lifecycle

Auxiliary data is cleaned up automatically when:

  • The job transitions to INACTIVE state

  • The plugin is unloaded

  • The data is explicitly deleted

Data persists across all state transitions before INACTIVE.

Isolation

Each plugin's auxiliary data is isolated from other plugins. Two plugins can use the same key without conflict.

Example

Track the number of times a job has been through PRIORITY state:

struct priority_count {
    int count;
};

static int priority_cb (flux_plugin_t *p,
                       const char *topic,
                       flux_plugin_arg_t *args,
                       void *arg)
{
    struct priority_count *pc;

    pc = flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "pricount");
    if (!pc) {
        pc = calloc (1, sizeof (*pc));
        if (flux_jobtap_job_aux_set (p,
                                     FLUX_JOBTAP_CURRENT_JOB,
                                     "pricount",
                                     pc,
                                     free) < 0) {
            free (pc);
            return -1;
        }
    }
    pc->count++;

    flux_log (flux_jobtap_get_flux (p),
             LOG_INFO,
             "job priority calculated %d times",
             pc->count);

    return 0;
}

RAISING EXCEPTIONS

Plugins can raise job exceptions to signal errors or anomalous conditions. Exceptions are recorded in the job eventlog and can terminate the job.

API Function

int flux_jobtap_raise_exception (flux_plugin_t *p,
                                 flux_jobid_t id,
                                 const char *type,
                                 int severity,
                                 const char *fmt,
                                 ...);

Raises an exception on job id with the specified type, severity, and formatted message. The id parameter accepts FLUX_JOBTAP_CURRENT_JOB. Returns 0 on success, -1 on failure.

Severity Levels

severity = 0 (fatal):

Terminates the job immediately. Job transitions to CLEANUP state and will not run. Use for unrecoverable errors.

severity = 1-6 (non-fatal):

Logs the exception but allows the job to continue. Use for warning conditions that should be recorded but do not prevent execution.

severity = 7 (debug):

Debug-level exception, not shown to users by default.

Severity levels correspond to syslog levels, with 0 most severe (LOG_EMERG) and 7 least severe (LOG_DEBUG).

Exception Types

The type parameter identifies the exception category. Common types:

  • "dependency" - dependency-related errors

  • "alloc" - resource allocation failures

  • "timeout" - job timeout

  • "cancel" - job cancellation

  • "exec" - execution system errors

The type is recorded in the exception event for monitoring tools.

When to Use

Use flux_jobtap_raise_exception() instead of returning -1 when:

  • The error should be visible in job eventlogs

  • A detailed error message is needed

  • The error should terminate the job (severity 0)

  • The job should continue but the condition logged (severity 1-6)

Return -1 for plugin-internal errors that should not generate user-visible exceptions.

Example

Raise a fatal exception if a job violates policy:

static int validate_cb (flux_plugin_t *p,
                       const char *topic,
                       flux_plugin_arg_t *args,
                       void *arg)
{
    int nnodes;

    if (flux_plugin_arg_unpack (args,
                                FLUX_PLUGIN_ARG_IN,
                                "{s:{s:{s:i}}}",
                                "jobspec",
                                  "resources",
                                    "nnodes", &nnodes) < 0)
        return -1;

    if (nnodes > 1024) {
        flux_jobtap_raise_exception (p,
                                     FLUX_JOBTAP_CURRENT_JOB,
                                     "policy",
                                     0,
                                     "requested %d nodes exceeds limit of 1024",
                                     nnodes);
        return -1;
    }

    return 0;
}

Raise a non-fatal warning:

static int alloc_cb (flux_plugin_t *p,
                    const char *topic,
                    flux_plugin_arg_t *args,
                    void *arg)
{
    /* Warn if job has been waiting more than 1 hour */
    double t_submit, now = flux_reactor_now (flux_get_reactor (
                                              flux_jobtap_get_flux (p)));

    flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                           "{s:f}", "t_submit", &t_submit);

    if (now - t_submit > 3600.0) {
        flux_jobtap_raise_exception (p,
                                     FLUX_JOBTAP_CURRENT_JOB,
                                     "long-wait",
                                     4,
                                     "job waited %.0f seconds for allocation",
                                     now - t_submit);
    }

    return 0;
}

QUERYING JOB STATE

Getting Job Information

flux_plugin_arg_t *flux_jobtap_job_lookup (flux_plugin_t *p,
                                           flux_jobid_t id);

Returns job information for job id as a flux_plugin_arg_t containing the same fields as callback input args (see JOBTAP PLUGIN ARGUMENTS). Returns NULL on failure.

Call flux_plugin_arg_destroy() on the returned object.

Getting Job Result

int flux_jobtap_get_job_result (flux_plugin_t *p,
                                flux_jobid_t id,
                                flux_job_result_t *resultp);

Gets the result of job id and stores it in *resultp. Valid only for jobs in CLEANUP or INACTIVE state. Returns 0 on success, -1 on failure.

Result values:

FLUX_JOB_RESULT_COMPLETED

Job completed successfully (exit code 0)

FLUX_JOB_RESULT_FAILED

Job failed (non-zero exit code)

FLUX_JOB_RESULT_CANCELED

Job canceled by exception type "cancel"

FLUX_JOB_RESULT_TIMEOUT

Job canceled by exception type "timeout"

Use in job.state.cleanup or job.state.inactive callbacks to determine why a job ended.

Checking Posted Events

int flux_jobtap_job_event_posted (flux_plugin_t *p,
                                  flux_jobid_t id,
                                  const char *name);

Returns 1 if event name was posted to job id, 0 if not, -1 on failure. Use for idempotent operations and checking job history.

Setting Job Flags

int flux_jobtap_job_set_flag (flux_plugin_t *p,
                              flux_jobid_t id,
                              const char *flag);

Sets a flag on job id. The id parameter accepts FLUX_JOBTAP_CURRENT_JOB. Returns 0 on success, -1 on failure.

Supported flags:

waitable

Makes the job waitable via flux_job_wait(). By default, only jobs submitted with FLUX_JOB_WAITABLE are waitable. This allows plugins to make jobs waitable after submission.

Posts a set-flags event to the job eventlog.

Example

Query job information from another context:

flux_plugin_arg_t *args = flux_jobtap_job_lookup (p, other_jobid);
if (args) {
    int state;
    flux_jobid_t id;

    flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
                           "{s:I s:i}",
                           "id", &id,
                           "state", &state);

    flux_log (flux_jobtap_get_flux (p),
             LOG_DEBUG,
             "job %s is in state %d",
             idf58 (id),
             state);

    flux_plugin_arg_destroy (args);
}

Get job result in cleanup:

static int cleanup_cb (flux_plugin_t *p,
                      const char *topic,
                      flux_plugin_arg_t *args,
                      void *arg)
{
    flux_job_result_t result;

    if (flux_jobtap_get_job_result (p,
                                    FLUX_JOBTAP_CURRENT_JOB,
                                    &result) == 0) {
        switch (result) {
            case FLUX_JOB_RESULT_COMPLETED:
                /* Job succeeded */
                break;
            case FLUX_JOB_RESULT_FAILED:
                /* Job failed */
                break;
            case FLUX_JOB_RESULT_CANCELED:
                /* Job was canceled */
                break;
            case FLUX_JOB_RESULT_TIMEOUT:
                /* Job timed out */
                break;
        }
    }

    return 0;
}

Check if dependency event was posted:

if (flux_jobtap_job_event_posted (p, id, "dependency-add")) {
    /* Dependency was already added, skip */
    return 0;
}

PRIORITY

Custom job priority assignment is a core jobtap feature. A builtin .priority-default plugin is always loaded to ensure jobs move past PRIORITY state when no other priority plugin is loaded. The default plugin assigns priority equal to job urgency.

When loading a priority plugin, note that .priority-default may still be loaded. This initializes priority in return arguments to job urgency. Since job.state.priority and job.priority.get callbacks run in order, subsequently loaded plugins overwrite the default priority, making the last loaded priority plugin active.

To ensure the default priority is overridden, priority plugins must always set a priority or use flux_jobtap_priority_unavail() if unavailable, in callbacks that return priority: job.state.priority and job.priority.get.

To fully prevent conflicts, explicitly remove the builtin priority plugin:

flux jobtap remove .priority-default

or via configuration (See flux-conf-job-manager(7))

[job-manager]
plugins = [
  { remove = ".priority-default",
    load = "complex-priority.so"
  },
]

PROLOG AND EPILOG ACTIONS

Plugins perform asynchronous tasks after alloc but before running, or after finish but before freeing resources, using job manager prolog and epilog actions.

Delineate actions with these functions:

int flux_jobtap_prolog_start (flux_plugin_t *p,
                              const char *description);

int flux_jobtap_prolog_finish (flux_plugin_t *p,
                               flux_jobid_t id,
                               const char *description,
                               int status);

int flux_jobtap_epilog_start (flux_plugin_t *p,
                              const char *description);

int flux_jobtap_epilog_finish (flux_plugin_t *p,
                               flux_jobid_t id,
                               const char *description,
                               int status);

Call flux_jobtap_prolog_start() to initiate a prolog action. This blocks the job from starting until flux_jobtap_prolog_finish() is called, even after resources are assigned. The prolog status passed to flux_jobtap_prolog_finish() is captured in the eventlog, but the action must raise a job exception or take other action on failure. Non-zero prolog finish status does not trigger automated job manager behavior. The prolog description is informational only, distinguishing multiple actions in the eventlog.

Call flux_jobtap_epilog_start() to initiate an epilog action, preventing resource release until flux_jobtap_epilog_finish() is called. The same caveats regarding description and completion status apply.

Call flux_jobtap_prolog_start() anytime before the start request to the execution system, typically from job.state.run or job.event.alloc callbacks (when resources are allocated). Note: plugins receive job.event.* callbacks only for subscribed jobs (via flux_jobtap_job_subscribe()). Cannot start prolog after CLEANUP state.

Call flux_jobtap_epilog_start() only after a job enters CLEANUP state but before the free request to the scheduler, typically from job.state.cleanup or job.event.finish callbacks.

If called for a job in an invalid state, these functions return -1 with errno set to EINVAL.

Multiple prolog or epilog actions can be active simultaneously.

CALLING OTHER PLUGINS

Invoke custom callbacks in other plugins using flux_jobtap_call(). Topic strings starting with job. are reserved for the job manager and cause immediate failure with errno set to EINVAL:

int flux_jobtap_call (flux_plugin_t *p,
                      flux_jobid_t id,
                      const char *topic,
                      flux_plugin_arg_t *args)

The jobtap API assumes a current job, so id is required. args is passed unmodified when invoking callbacks for topic, so expected data from JOBTAP PLUGIN ARGUMENTS for job id may not be present in newly created args unless manually added. When invoked from another jobtap callback, use the existing args with FLUX_JOBTAP_CURRENT_JOB, which preserves expected job arguments. For example, this calls all plugins registered for custom.topic when callback is called:

int callback (flux_plugin_t *p,
              const char *topic,
              flux_plugin_arg_t *args,
              void *arg)
{
     return flux_jobtap_call (p,
                              FLUX_JOBTAP_CURRENT_JOB,
                              "custom.topic",
                              args);
}

REGISTERING SERVICES

int flux_jobtap_service_register (flux_plugin_t *p,
                                  const char *method,
                                  flux_msg_handler_f cb,
                                  void *arg);

int flux_jobtap_service_register_ex (flux_plugin_t *p,
                                     const char *method,
                                     uint32_t rolemask,
                                     flux_msg_handler_f cb,
                                     void *arg);

Registers a service endpoint invoked via Flux RPC. The service name is job-manager.<plugin-name>.<method>, where <plugin-name> is the plugin's internal name (see JOBTAP PLUGIN NAMES section).

flux_jobtap_service_register() registers a service accessible to all users.

flux_jobtap_service_register_ex() restricts access to users matching rolemask. Use FLUX_ROLE_OWNER for instance owner, FLUX_ROLE_USER for any user. Both return 0 on success, -1 on failure.

The callback receives RPC requests and responds using flux_respond() or flux_respond_error().

Example:

static void query_cb (flux_t *h,
                     flux_msg_handler_t *mh,
                     const flux_msg_t *msg,
                     void *arg)
{
    struct plugin_data *data = arg;

    /* Return plugin state */
    if (flux_respond_pack (h, msg,
                          "{s:i}",
                          "count", data->count) < 0)
        flux_log_error (h, "query response failed");
}

int flux_plugin_init (flux_plugin_t *p)
{
    struct plugin_data *data = calloc (1, sizeof (*data));

    /* Register job-manager.myplugin.query service */
    if (flux_jobtap_service_register (p, "query", query_cb, data) < 0)
        return -1;

    return 0;
}

Users can then query via:

$ flux exec -r 0 flux python
>>> import flux
>>> h = flux.Flux()
>>> h.rpc("job-manager.myplugin.query").get()

RESOURCES

Flux: http://flux-framework.org

Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc

Issue Tracker: https://github.com/flux-framework/flux-core/issues

SEE ALSO

flux-jobtap(1), flux-conf-job-manager(7)