flux-jobtap-plugins(7)
DESCRIPTION
The jobtap interface loads builtin and external plugins into the job manager broker module. Plugins assign job priorities, manage dependencies, debug job state flow, or extend job manager functionality.
Jobtap plugins use the Flux standard plugin format. Each plugin exports
flux_plugin_init(), which calls flux_plugin_add_handler(3) to
register functions for callback topic strings described in
JOB CALLBACK TOPICS.
Each callback function uses the Flux standard plugin callback form, e.g.:
int callback (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg);
where p is the handle for the current plugin, topic is the topic
string for the invoked callback, args contains plugin arguments unpacked
with flux_plugin_arg_unpack(3), and arg is any opaque argument passed
when registering the handler.
Multiple plugins may be loaded simultaneously. All matching handlers are called in load order. See flux-conf-job-manager(7) or flux-jobtap(1) for loading plugins.
JOBTAP PLUGIN NAMES
Plugins are referenced in flux jobtap list output by file name. Plugins
loaded by fully qualified path are shortened to basename, yielding names like
plugin-name.so.
Builtin plugins are named with a leading ., hidden in flux jobtap list,
and do not match glob(7) * or "all" keyword (similar to hidden
filesystem files). List builtin plugins with -a, --all; remove them by
explicit name or pattern including the leading ..
A plugin may assign a name with flux_plugin_set_name(3), but this name
is not displayed in flux jobtap list or used in matching. The internal
name is used only in service names generated by
flux_jobtap_service_register(): job-manager.<name>.<method>. If no
name is set, the basename minus trailing .so is used.
JOBTAP PLUGIN ARGUMENTS
For job-specific callbacks, all job data is passed to the plugin via
flux_plugin_arg_t *args, and return data is sent back to the job
manager via the same args. Unpack incoming arguments using
flux_plugin_arg_unpack(3), e.g.:
rc = flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
"{s{s:o}, s:I}",
"jobspec", "resources", &resources,
"id", &id);
unpacks the resources section of jobspec and the jobid into resources
and id.
The full list of available args includes the following:
name |
type |
description |
|---|---|---|
jobspec |
o |
jobspec with environment redacted |
R |
o |
R with scheduling key redacted (RUN state or later) |
id |
I |
jobid |
state |
i |
current job state |
prev_state |
i |
previous state ( |
userid |
i |
userid |
urgency |
i |
current urgency |
priority |
I |
current priority |
t_submit |
f |
submit timestamp in floating point seconds |
entry |
o |
posted eventlog entry, including context |
end_event |
o |
copy of event that caused transition to CLEANUP, if available |
Pack return arguments using FLUX_PLUGIN_ARG_OUT and optionally
FLUX_PLUGIN_ARG_REPLACE flags. To return a priority:
rc = flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
"{s:I}",
"priority", (int64_t) priority);
While a job is pending, plugin callbacks may add job annotations by
returning a value for the annotations key:
flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
"{s:{s:s}}",
"annotations", "test", value);
UTILITY FUNCTIONS
Getting the Flux Handle
flux_t *flux_jobtap_get_flux (flux_plugin_t *p);
Returns the flux_t handle for the broker instance. Use for RPCs, logging,
or other Flux APIs. Returns NULL on failure.
The handle is owned by the plugin infrastructure and must not be destroyed.
Example:
static int callback (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
flux_t *h = flux_jobtap_get_flux (p);
flux_log (h, LOG_INFO, "callback invoked for topic %s", topic);
return 0;
}
JOB CALLBACK TOPICS
The following job callback "topic strings" are currently provided by the jobtap interface:
- job.create
Notifies plugins about a newly introduced job in three situations:
on job submission
when the job manager restarts and reloads a job from KVS
when a new jobtap plugin is loaded
In case 1, job state is always
FLUX_JOB_STATE_NEW; in cases 2 and 3, jobs can be in any state exceptFLUX_JOB_STATE_INACTIVE.In case 1, the job is not yet validated.
job.createmay reject it likejob.validateusing flux_jobtap_reject_job(3) and a negative return code.In cases 2 and 3, handle fatal errors by raising a fatal job exception.
Posting events from a
job.createhandler is safe in all cases.Note
In case 3,
job.createis called for active jobs in unspecified order. For ordering guarantees, callflux_jobtap_set_load_sort_order(3)fromflux_plugin_init(). Themodeparameter isstate(sort by state then jobid) or-state(reverse state then jobid). For example:flux_jobtap_set_load_sort_order (p, "state");
ensures
job.createandjob.neware called on PRIORITY jobs first, then DEPEND, then SCHED, and so on.- job.destroy
Called after a job is rejected or becomes inactive.
- job.validate
Allows plugins to reject jobs before introduction to the job manager. Rejected jobs cause submission errors in the client, and job data in KVS is purged. No further callbacks except
job.destroyoccur for rejected jobs. If a job is not rejected,job.newis invoked immediately afterjob.validate. Implement limits or checks injob.validate, but confine accounting tojob.new, sincejob.newis also called during job-manager restart or plugin reload.- job.dependency.*
Allows dependency plugins to notify the job manager they handle a given dependency _scheme_. The job manager scans the
attributes.system.dependenciesarray and issues ajob.dependency.SCHEMEcallback for each dependency. If no plugin registered forSCHEME, the job is rejected. The plugin callsflux_jobtap_dependency_add(3)to add a named dependency (if necessary). Jobs remain inDEPENDstate until all dependencies are removed viaflux_jobtap_dependency_remove(3). Seejob.state.dependfor more information. Reject jobs with invalid specifications using flux_jobtap_reject_job(3) and a negative return code.- job.new
Announces a new valid job. Called in the same three situations as
job.create.- job.state.*
Called after a job state transition. The callback executes after the state is published to the eventlog but before action is taken on the state (since the action may immediately trigger another transition).
- job.event.*
The
job.event.*callbacks are made only for plugins subscribed to a job withflux_jobtap_job_subscribe(). All job events trigger this callback for subscribed plugins, including events that do not result in state transitions (e.g.,startor non-fatalexception).Subscribing to Job Events:
int flux_jobtap_job_subscribe (flux_plugin_t *p, flux_jobid_t id); void flux_jobtap_job_unsubscribe (flux_plugin_t *p, flux_jobid_t id);
flux_jobtap_job_subscribe()subscribes the plugin tojob.event.*callbacks for jobid. Returns 0 on success, -1 on failure.flux_jobtap_job_unsubscribe()unsubscribes from jobidevents.Subscriptions are cleaned up automatically when the job becomes inactive or the plugin is unloaded.
- job.state.depend
The final place to add dependencies to a job. Add dependencies via
flux_jobtap_dependency_add(), which attaches a named dependency to a job. Jobs remain inDEPENDstate until all dependencies are removed viaflux_jobtap_dependency_remove(). A dependency may be used only once. A secondflux_jobtap_dependency_add()call with the same description returnsEEXIST, even if the dependency was removed. This enables idempotent operation during job-manager or plugin restart.Note
See EVENT PROCESSING MODEL for important information about when state transitions occur synchronously when removing dependencies.
- job.state.priority
Priority-managing plugins must return a priority at callback end. If the priority is not available, use
flux_jobtap_priority_unavail()to indicate it cannot be set. Jobs without a priority (unavailable or no priority plugin loaded) remain in PRIORITY state until assigned. Set the priority asynchronously usingflux_jobtap_reprioritize_job(). See PRIORITY for details on plugin priority management.Note
See EVENT PROCESSING MODEL for important information about when the
job.state.schedcallback is invoked relative to this callback.- job.state.sched
Plugins may set
Rin output args. IfRis not already assigned, this forcesRfor the current job and bypasses the scheduler.- job.priority.get
Called when the job manager updates a single job priority. The plugin returns a priority immediately, but may use
flux_jobtap_priority_unavail()if unavailable when a job is in PRIORITY state. Returning unavailable priority in SCHED state is an error (logged but ignored). Requestjob.priority.getfor all jobs viaflux_jobtap_reprioritize_all(). See PRIORITY for details.- job.inactive-add
Job transitioned to INACTIVE state and was added to the inactive hash.
- job.inactive-remove
Job was purged from the inactive hash.
- job.update
Job was updated with an RFC 21
jobspec-updateevent.
CONFIGURATION CALLBACK TOPIC
Plugins may register a conf.update callback. The current/proposed
configuration object is in input arguments under the conf key. The
callback is invoked when:
The plugin is first loaded. Failure causes plugin load to fail.
The configuration changes. Failure causes
flux config reloadto fail.
Return 0 on success, -1 on failure. On failure, optionally set a human
readable error in the errstr output argument. Use
flux_jobtap_error() for convenience.
JOB UPDATE CALLBACKS
The job manager allows updates of select job attributes through plugins.
Register a callback matching job.update.KEY, where KEY is a
period-delimited jobspec attribute (e.g.,
job.update.attributes.system.duration). Requested updates are passed
in the updates key.
job.update.* callbacks allow or deny updates of specific attributes.
Updates are denied by default unless a callback exists for the attribute and
the plugin returns 0. Deny updates by returning -1; optionally set an error
message with flux_jobtap_error(3).
After all updates are allowed, the updated jobspec passes through the
job.validate plugin stack for validation. Note an update is already
validated by setting a validated flag in FLUX_PLUGIN_OUT_ARGS. If all
updated attributes have this flag, validation is skipped. This allows instance
owners to update attributes beyond limits.
Some updates benefit from a feasibility check before application. This
prevents users from making feasible jobs infeasible. The update plugin
determines if feasibility checking is needed. Feasibility checks occur only
if a feasibility flag in FLUX_PLUGIN_OUT_ARGS is set. If any plugin
requires a check, the updated jobspec as a whole is checked. If infeasible,
the update is aborted and an error returned.
Updating one attribute may require modifying others. For example, updating
attributes.system.queue may require modifying
attributes.system.constraints to apply the new queue's constraints.
Plugins may push an updates object onto FLUX_PLUGIN_OUT_ARGS with the
same form as RFC 21 jobspec-update context. To update
attributes.system.foo to 1, set:
{"updates": {"attributes.system.foo": 1}}
in FLUX_PLUGIN_OUT_ARGS before returning. Updates are applied by updating
requested updates, so this may overwrite user-requested updates. Use with
caution.
Example: Complete Update Handler
A plugin that allows updating job duration while enforcing limits:
#define MAX_DURATION 86400 /* 24 hours */
static int update_duration_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
double duration;
/* Unpack the requested update value */
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:f}}",
"updates",
"attributes.system.duration", &duration) < 0) {
return flux_jobtap_error (p, args, "failed to unpack duration");
}
/* Enforce maximum duration limit */
if (duration > MAX_DURATION) {
return flux_jobtap_error (p, args,
"duration %.0f exceeds maximum of %d",
duration, MAX_DURATION);
}
/* Enforce minimum duration */
if (duration < 60.0) {
return flux_jobtap_error (p, args,
"duration %.0f is below minimum of 60",
duration);
}
/* Accept the update - no additional validation needed */
flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:i}",
"validated", 1);
return 0;
}
int flux_plugin_init (flux_plugin_t *p)
{
return flux_plugin_add_handler (p,
"job.update.attributes.system.duration",
update_duration_cb,
NULL);
}
Validates duration updates against limits and sets validated=1 to skip
additional validation.
UPDATING JOBS PROGRAMMATICALLY
int flux_jobtap_jobspec_update_pack (flux_plugin_t *p,
const char *fmt,
...);
int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
flux_jobid_t id,
const char *fmt,
...);
flux_jobtap_jobspec_update_pack() updates the jobspec of the current
job (the job for which the callback was invoked). Arguments follow Jansson
pack format with period-delimited keys per RFC 21 jobspec-update format.
Call only from a jobtap callback for the job being updated, not for other jobs or outside callback context. Returns 0 on success, -1 on failure.
flux_jobtap_jobspec_update_id_pack() updates job id asynchronously,
outside a callback for the target job. The job must not be the current job
(use flux_jobtap_jobspec_update_pack() for that). Returns 0 on success,
-1 on failure.
Restrictions:
Jobs in RUN, CLEANUP, or INACTIVE state cannot be updated
Jobs with readonly eventlogs cannot be updated
Updates are subject to
job.validatecallbacksUse
job.update.*callbacks to allow/deny specific attribute updates
Example updating job duration:
static int some_callback (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
/* Update current job's duration to 1 hour */
if (flux_jobtap_jobspec_update_pack (p,
"{s:f}",
"attributes.system.duration",
3600.0) < 0)
return -1;
return 0;
}
Example updating another job asynchronously:
/* Called from timer or other async context */
static void timer_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
flux_plugin_t *p = arg;
flux_jobid_t target_job = get_target_job ();
/* Update different job's priority attribute */
flux_jobtap_jobspec_update_id_pack (p,
target_job,
"{s:i}",
"attributes.system.urgency",
16);
}
PLUGIN CALLBACK TOPICS
- plugin.query
The job manager calls the
plugin.querycallback to allow plugins to provide data in response to ajobtap-queryrequest (used byflux jobtap query PLUGIN). Plugins export internal state for inspection by placing data in callback output arguments, e.g.:flux_plugin_arg_pack (p, FLUX_PLUGIN_ARG_OUT, "{s:O}" "data", internal_data);
EVENT PROCESSING MODEL
Understanding synchronous versus asynchronous event processing is important when posting events that affect other jobs or trigger multiple state transitions.
Event Queue Mechanism
The job manager maintains a per-job event queue to ensure in-order processing
and prevent recursive callback invocations. When a plugin posts an event (via
flux_jobtap_dependency_remove(), flux_jobtap_event_post_pack(), or by
returning values like priority), the event is added to the target job's
queue.
Queue processing depends on whether the target job is executing a callback:
- Target job is idle (not processing any event):
The event is processed synchronously before the posting function returns. This triggers a cascade of state transitions and callbacks within the same call stack.
- Target job is busy (currently in a callback):
The event is queued and processed after the current callback completes but before control returns to the original caller. This prevents recursive callbacks that see inconsistent job state.
Synchronous State Transition Chains
Jobtap operations can trigger chains of synchronous state transitions.
Example 1: Dependency Removal
When a plugin removes the last dependency from a job:
flux_jobtap_dependency_remove (p, target_jobid, "my-dependency");
// When this returns, target job may have reached SCHED state
If the target job is not currently in a callback, the following occurs synchronously before the function returns:
A
dependency-removeevent is posted and processed immediatelyThe job transitions from
DEPENDtoPRIORITYstateAll plugins'
job.state.prioritycallbacks are invokedIf a plugin returns a valid priority (or uses default priority):
A
priorityevent is posted (queued due to recursion prevention)After the
job.state.prioritycallback returns, the queuedpriorityevent is processedThe job transitions from
PRIORITYtoSCHEDstateAll plugins'
job.state.schedcallbacks are invokedThe job is enqueued with the scheduler
Only then does
flux_jobtap_dependency_remove()return
If the target job is currently in a callback, the event is queued, the function returns immediately, and state transitions occur after the target job's callback completes.
Example 2: Priority Assignment
When a plugin returns a priority in its job.state.priority callback:
int priority_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
// Calculate priority
int64_t priority = calculate_priority (args);
flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
"{s:I}",
"priority", priority);
return 0;
// When this returns, job.state.sched may have been called
}
When priority is available immediately:
The plugin returns with a priority in output args
A
priorityevent is posted (queued to prevent recursion)After all
job.state.prioritycallbacks complete, the queued event is processedThe job transitions from
PRIORITYtoSCHEDstateAll plugins'
job.state.schedcallbacks are invokedControl returns through the callback stack
Recursion Prevention
The event queue prevents infinite recursion:
- Events posted to the current job:
Events posted to the job for which a callback was invoked are queued and processed after the callback returns, not during. This keeps job state stable during callback execution.
- Cascading events during event processing:
Events triggered during processing (like priority assignment triggering a state transition) are queued and processed in order before returning, without deep recursion.
This design ensures:
Job state remains consistent during callbacks
Callbacks see a stable, well-defined view of the job
Events are processed in strict FIFO order
Stack depth remains bounded regardless of event chains
Jobs cannot change state "underneath" a running callback
Job Hold Mechanism
The job manager may temporarily "hold" events for a job using the
job->hold_events flag. While set, all events for that job are queued,
not processed immediately. This occurs during:
Batch KVS commit operations
Certain bulk job operations
When released, all queued events process synchronously in order before control returns. This is transparent to plugins but explains why multiple state transitions may occur atomically.
Example: Dependency Chain
Consider a plugin managing a chain of dependent jobs. When the first job completes, it should release the second job:
int finish_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
flux_jobid_t completed_id, dependent_id;
flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
"{s:I}", "id", &completed_id);
dependent_id = get_dependent_job (completed_id);
if (dependent_id != FLUX_JOBID_ANY) {
// If dependent_id is idle: DEPEND -> PRIORITY -> SCHED synchronously
// If dependent_id is busy: queued for later processing
flux_jobtap_dependency_remove (p, dependent_id, "predecessor");
}
return 0;
}
When the target job is idle, synchronous processing releases dependent jobs immediately, minimizing startup latency.
JOB AUXILIARY DATA
Plugins can attach arbitrary data to jobs. This data persists across callbacks and state transitions, enabling stateful plugin operation.
API Functions
int flux_jobtap_job_aux_set (flux_plugin_t *p,
flux_jobid_t id,
const char *name,
void *val,
flux_free_f free_fn);
void *flux_jobtap_job_aux_get (flux_plugin_t *p,
flux_jobid_t id,
const char *name);
int flux_jobtap_job_aux_delete_value (flux_plugin_t *p,
flux_jobid_t id,
void *val);
flux_jobtap_job_aux_set() attaches data val with key name to
job id. If free_fn is non-NULL, it destroys val when the job
is destroyed or the plugin unloads. Returns 0 on success, -1 on failure.
flux_jobtap_job_aux_get() retrieves data for key name from job
id. Returns the data pointer, or NULL if not found.
flux_jobtap_job_aux_delete_value() removes aux data by value from job
id. Use when the key name is unknown but the value pointer is available.
Returns 0 on success, -1 on failure.
The id parameter accepts FLUX_JOBTAP_CURRENT_JOB to operate on the
current callback's job.
Lifecycle
Auxiliary data is cleaned up automatically when:
The job transitions to INACTIVE state
The plugin is unloaded
The data is explicitly deleted
Data persists across all state transitions before INACTIVE.
Isolation
Each plugin's auxiliary data is isolated from other plugins. Two plugins can use the same key without conflict.
Example
Track the number of times a job has been through PRIORITY state:
struct priority_count {
int count;
};
static int priority_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
struct priority_count *pc;
pc = flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "pricount");
if (!pc) {
pc = calloc (1, sizeof (*pc));
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"pricount",
pc,
free) < 0) {
free (pc);
return -1;
}
}
pc->count++;
flux_log (flux_jobtap_get_flux (p),
LOG_INFO,
"job priority calculated %d times",
pc->count);
return 0;
}
RAISING EXCEPTIONS
Plugins can raise job exceptions to signal errors or anomalous conditions. Exceptions are recorded in the job eventlog and can terminate the job.
API Function
int flux_jobtap_raise_exception (flux_plugin_t *p,
flux_jobid_t id,
const char *type,
int severity,
const char *fmt,
...);
Raises an exception on job id with the specified type, severity,
and formatted message. The id parameter accepts FLUX_JOBTAP_CURRENT_JOB.
Returns 0 on success, -1 on failure.
Severity Levels
- severity = 0 (fatal):
Terminates the job immediately. Job transitions to CLEANUP state and will not run. Use for unrecoverable errors.
- severity = 1-6 (non-fatal):
Logs the exception but allows the job to continue. Use for warning conditions that should be recorded but do not prevent execution.
- severity = 7 (debug):
Debug-level exception, not shown to users by default.
Severity levels correspond to syslog levels, with 0 most severe (LOG_EMERG) and 7 least severe (LOG_DEBUG).
Exception Types
The type parameter identifies the exception category. Common types:
"dependency"- dependency-related errors"alloc"- resource allocation failures"timeout"- job timeout"cancel"- job cancellation"exec"- execution system errors
The type is recorded in the exception event for monitoring tools.
When to Use
Use flux_jobtap_raise_exception() instead of returning -1 when:
The error should be visible in job eventlogs
A detailed error message is needed
The error should terminate the job (severity 0)
The job should continue but the condition logged (severity 1-6)
Return -1 for plugin-internal errors that should not generate user-visible exceptions.
Example
Raise a fatal exception if a job violates policy:
static int validate_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
int nnodes;
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:{s:i}}}",
"jobspec",
"resources",
"nnodes", &nnodes) < 0)
return -1;
if (nnodes > 1024) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"policy",
0,
"requested %d nodes exceeds limit of 1024",
nnodes);
return -1;
}
return 0;
}
Raise a non-fatal warning:
static int alloc_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
/* Warn if job has been waiting more than 1 hour */
double t_submit, now = flux_reactor_now (flux_get_reactor (
flux_jobtap_get_flux (p)));
flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
"{s:f}", "t_submit", &t_submit);
if (now - t_submit > 3600.0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"long-wait",
4,
"job waited %.0f seconds for allocation",
now - t_submit);
}
return 0;
}
QUERYING JOB STATE
Getting Job Information
flux_plugin_arg_t *flux_jobtap_job_lookup (flux_plugin_t *p,
flux_jobid_t id);
Returns job information for job id as a flux_plugin_arg_t containing
the same fields as callback input args (see JOBTAP PLUGIN ARGUMENTS). Returns NULL
on failure.
Call flux_plugin_arg_destroy() on the returned object.
Getting Job Result
int flux_jobtap_get_job_result (flux_plugin_t *p,
flux_jobid_t id,
flux_job_result_t *resultp);
Gets the result of job id and stores it in *resultp. Valid only for
jobs in CLEANUP or INACTIVE state. Returns 0 on success, -1 on failure.
Result values:
FLUX_JOB_RESULT_COMPLETEDJob completed successfully (exit code 0)
FLUX_JOB_RESULT_FAILEDJob failed (non-zero exit code)
FLUX_JOB_RESULT_CANCELEDJob canceled by exception type "cancel"
FLUX_JOB_RESULT_TIMEOUTJob canceled by exception type "timeout"
Use in job.state.cleanup or job.state.inactive callbacks to
determine why a job ended.
Checking Posted Events
int flux_jobtap_job_event_posted (flux_plugin_t *p,
flux_jobid_t id,
const char *name);
Returns 1 if event name was posted to job id, 0 if not, -1 on
failure. Use for idempotent operations and checking job history.
Setting Job Flags
int flux_jobtap_job_set_flag (flux_plugin_t *p,
flux_jobid_t id,
const char *flag);
Sets a flag on job id. The id parameter accepts
FLUX_JOBTAP_CURRENT_JOB. Returns 0 on success, -1 on failure.
Supported flags:
waitableMakes the job waitable via
flux_job_wait(). By default, only jobs submitted withFLUX_JOB_WAITABLEare waitable. This allows plugins to make jobs waitable after submission.
Posts a set-flags event to the job eventlog.
Example
Query job information from another context:
flux_plugin_arg_t *args = flux_jobtap_job_lookup (p, other_jobid);
if (args) {
int state;
flux_jobid_t id;
flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
"{s:I s:i}",
"id", &id,
"state", &state);
flux_log (flux_jobtap_get_flux (p),
LOG_DEBUG,
"job %s is in state %d",
idf58 (id),
state);
flux_plugin_arg_destroy (args);
}
Get job result in cleanup:
static int cleanup_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
flux_job_result_t result;
if (flux_jobtap_get_job_result (p,
FLUX_JOBTAP_CURRENT_JOB,
&result) == 0) {
switch (result) {
case FLUX_JOB_RESULT_COMPLETED:
/* Job succeeded */
break;
case FLUX_JOB_RESULT_FAILED:
/* Job failed */
break;
case FLUX_JOB_RESULT_CANCELED:
/* Job was canceled */
break;
case FLUX_JOB_RESULT_TIMEOUT:
/* Job timed out */
break;
}
}
return 0;
}
Check if dependency event was posted:
if (flux_jobtap_job_event_posted (p, id, "dependency-add")) {
/* Dependency was already added, skip */
return 0;
}
PRIORITY
Custom job priority assignment is a core jobtap feature. A builtin
.priority-default plugin is always loaded to ensure jobs move past
PRIORITY state when no other priority plugin is loaded. The default plugin
assigns priority equal to job urgency.
When loading a priority plugin, note that .priority-default may still
be loaded. This initializes priority in return arguments to job urgency.
Since job.state.priority and job.priority.get callbacks run in order,
subsequently loaded plugins overwrite the default priority, making the
last loaded priority plugin active.
To ensure the default priority is overridden, priority plugins must always
set a priority or use flux_jobtap_priority_unavail() if unavailable,
in callbacks that return priority: job.state.priority and
job.priority.get.
To fully prevent conflicts, explicitly remove the builtin priority plugin:
flux jobtap remove .priority-default
or via configuration (See flux-conf-job-manager(7))
[job-manager]
plugins = [
{ remove = ".priority-default",
load = "complex-priority.so"
},
]
PROLOG AND EPILOG ACTIONS
Plugins perform asynchronous tasks after alloc but before running, or
after finish but before freeing resources, using job manager prolog and
epilog actions.
Delineate actions with these functions:
int flux_jobtap_prolog_start (flux_plugin_t *p,
const char *description);
int flux_jobtap_prolog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);
int flux_jobtap_epilog_start (flux_plugin_t *p,
const char *description);
int flux_jobtap_epilog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);
Call flux_jobtap_prolog_start() to initiate a prolog action. This blocks
the job from starting until flux_jobtap_prolog_finish() is called, even
after resources are assigned. The prolog status passed to
flux_jobtap_prolog_finish() is captured in the eventlog, but the action
must raise a job exception or take other action on failure. Non-zero prolog
finish status does not trigger automated job manager behavior. The prolog
description is informational only, distinguishing multiple actions in
the eventlog.
Call flux_jobtap_epilog_start() to initiate an epilog action, preventing
resource release until flux_jobtap_epilog_finish() is called. The same
caveats regarding description and completion status apply.
Call flux_jobtap_prolog_start() anytime before the start request
to the execution system, typically from job.state.run or
job.event.alloc callbacks (when resources are allocated). Note: plugins
receive job.event.* callbacks only for subscribed jobs (via
flux_jobtap_job_subscribe()). Cannot start prolog after CLEANUP state.
Call flux_jobtap_epilog_start() only after a job enters CLEANUP state
but before the free request to the scheduler, typically from
job.state.cleanup or job.event.finish callbacks.
If called for a job in an invalid state, these functions return -1 with
errno set to EINVAL.
Multiple prolog or epilog actions can be active simultaneously.
CALLING OTHER PLUGINS
Invoke custom callbacks in other plugins using flux_jobtap_call(). Topic
strings starting with job. are reserved for the job manager and cause
immediate failure with errno set to EINVAL:
int flux_jobtap_call (flux_plugin_t *p,
flux_jobid_t id,
const char *topic,
flux_plugin_arg_t *args)
The jobtap API assumes a current job, so id is required. args is
passed unmodified when invoking callbacks for topic, so expected data
from JOBTAP PLUGIN ARGUMENTS for job id may not be present in newly created
args unless manually added. When invoked from another jobtap callback,
use the existing args with FLUX_JOBTAP_CURRENT_JOB, which preserves
expected job arguments. For example, this calls all plugins registered for
custom.topic when callback is called:
int callback (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
return flux_jobtap_call (p,
FLUX_JOBTAP_CURRENT_JOB,
"custom.topic",
args);
}
REGISTERING SERVICES
int flux_jobtap_service_register (flux_plugin_t *p,
const char *method,
flux_msg_handler_f cb,
void *arg);
int flux_jobtap_service_register_ex (flux_plugin_t *p,
const char *method,
uint32_t rolemask,
flux_msg_handler_f cb,
void *arg);
Registers a service endpoint invoked via Flux RPC. The service name is
job-manager.<plugin-name>.<method>, where <plugin-name> is the
plugin's internal name (see JOBTAP PLUGIN NAMES section).
flux_jobtap_service_register() registers a service accessible to all
users.
flux_jobtap_service_register_ex() restricts access to users matching
rolemask. Use FLUX_ROLE_OWNER for instance owner, FLUX_ROLE_USER
for any user. Both return 0 on success, -1 on failure.
The callback receives RPC requests and responds using flux_respond() or
flux_respond_error().
Example:
static void query_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct plugin_data *data = arg;
/* Return plugin state */
if (flux_respond_pack (h, msg,
"{s:i}",
"count", data->count) < 0)
flux_log_error (h, "query response failed");
}
int flux_plugin_init (flux_plugin_t *p)
{
struct plugin_data *data = calloc (1, sizeof (*data));
/* Register job-manager.myplugin.query service */
if (flux_jobtap_service_register (p, "query", query_cb, data) < 0)
return -1;
return 0;
}
Users can then query via:
$ flux exec -r 0 flux python
>>> import flux
>>> h = flux.Flux()
>>> h.rpc("job-manager.myplugin.query").get()
RESOURCES
Flux: http://flux-framework.org
Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc
Issue Tracker: https://github.com/flux-framework/flux-core/issues