====================== flux-jobtap-plugins(7) ====================== DESCRIPTION =========== The *jobtap* interface loads builtin and external plugins into the job manager broker module. Plugins assign job priorities, manage dependencies, debug job state flow, or extend job manager functionality. Jobtap plugins use the Flux standard plugin format. Each plugin exports ``flux_plugin_init()``, which calls ``flux_plugin_add_handler(3)`` to register functions for callback topic strings described in :ref:`callback_topics`. Each callback function uses the Flux standard plugin callback form, e.g.:: int callback (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg); where ``p`` is the handle for the current plugin, ``topic`` is the topic string for the invoked callback, ``args`` contains plugin arguments unpacked with ``flux_plugin_arg_unpack(3)``, and ``arg`` is any opaque argument passed when registering the handler. Multiple plugins may be loaded simultaneously. All matching handlers are called in load order. See :man5:`flux-conf-job-manager` or :man1:`flux-jobtap` for loading plugins. JOBTAP PLUGIN NAMES =================== Plugins are referenced in ``flux jobtap list`` output by file name. Plugins loaded by fully qualified path are shortened to basename, yielding names like ``plugin-name.so``. Builtin plugins are named with a leading ``.``, hidden in ``flux jobtap list``, and do not match :linux:man7:`glob` ``*`` or "all" keyword (similar to hidden filesystem files). List builtin plugins with ``-a, --all``; remove them by explicit name or pattern including the leading ``.``. A plugin may assign a name with ``flux_plugin_set_name(3)``, but this name is not displayed in ``flux jobtap list`` or used in matching. The internal name is used only in service names generated by ``flux_jobtap_service_register()``: ``job-manager..``. If no name is set, the basename minus trailing ``.so`` is used. .. _arguments: JOBTAP PLUGIN ARGUMENTS ======================= For job-specific callbacks, all job data is passed to the plugin via ``flux_plugin_arg_t *args``, and return data is sent back to the job manager via the same ``args``. Unpack incoming arguments using ``flux_plugin_arg_unpack(3)``, e.g.:: rc = flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s{s:o}, s:I}", "jobspec", "resources", &resources, "id", &id); unpacks the ``resources`` section of jobspec and the jobid into ``resources`` and ``id``. The full list of available args includes the following: ========== ==== ========================================== name type description ========== ==== ========================================== jobspec o jobspec with environment redacted R o R with scheduling key redacted (RUN state or later) id I jobid state i current job state prev_state i previous state (``job.state.*`` callbacks) userid i userid urgency i current urgency priority I current priority t_submit f submit timestamp in floating point seconds entry o posted eventlog entry, including context end_event o copy of event that caused transition to CLEANUP, if available ========== ==== ========================================== Pack return arguments using ``FLUX_PLUGIN_ARG_OUT`` and optionally ``FLUX_PLUGIN_ARG_REPLACE`` flags. To return a priority:: rc = flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "{s:I}", "priority", (int64_t) priority); While a job is pending, plugin callbacks may add job annotations by returning a value for the ``annotations`` key:: flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "{s:{s:s}}", "annotations", "test", value); UTILITY FUNCTIONS ================= Getting the Flux Handle ------------------------ :: flux_t *flux_jobtap_get_flux (flux_plugin_t *p); Returns the ``flux_t`` handle for the broker instance. Use for RPCs, logging, or other Flux APIs. Returns NULL on failure. The handle is owned by the plugin infrastructure and must not be destroyed. Example:: static int callback (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { flux_t *h = flux_jobtap_get_flux (p); flux_log (h, LOG_INFO, "callback invoked for topic %s", topic); return 0; } .. _callback_topics: JOB CALLBACK TOPICS =================== The following job callback "topic strings" are currently provided by the *jobtap* interface: job.create Notifies plugins about a newly introduced job in three situations: 1. on job submission 2. when the job manager restarts and reloads a job from KVS 3. when a new jobtap plugin is loaded In case 1, job state is always ``FLUX_JOB_STATE_NEW``; in cases 2 and 3, jobs can be in any state except ``FLUX_JOB_STATE_INACTIVE``. In case 1, the job is not yet validated. ``job.create`` may reject it like ``job.validate`` using :man3:`flux_jobtap_reject_job` and a negative return code. In cases 2 and 3, handle fatal errors by raising a fatal job exception. Posting events from a ``job.create`` handler is safe in all cases. .. note:: In case 3, ``job.create`` is called for active jobs in unspecified order. For ordering guarantees, call ``flux_jobtap_set_load_sort_order(3)`` from ``flux_plugin_init()``. The ``mode`` parameter is ``state`` (sort by state then jobid) or ``-state`` (reverse state then jobid). For example:: flux_jobtap_set_load_sort_order (p, "state"); ensures ``job.create`` and ``job.new`` are called on PRIORITY jobs first, then DEPEND, then SCHED, and so on. job.destroy Called after a job is rejected or becomes inactive. job.validate Allows plugins to reject jobs before introduction to the job manager. Rejected jobs cause submission errors in the client, and job data in KVS is purged. No further callbacks except ``job.destroy`` occur for rejected jobs. If a job is not rejected, ``job.new`` is invoked immediately after ``job.validate``. Implement limits or checks in ``job.validate``, but confine accounting to ``job.new``, since ``job.new`` is also called during job-manager restart or plugin reload. job.dependency.* Allows dependency plugins to notify the job manager they handle a given dependency _scheme_. The job manager scans the ``attributes.system.dependencies`` array and issues a ``job.dependency.SCHEME`` callback for each dependency. If no plugin registered for ``SCHEME``, the job is rejected. The plugin calls ``flux_jobtap_dependency_add(3)`` to add a named dependency (if necessary). Jobs remain in ``DEPEND`` state until all dependencies are removed via ``flux_jobtap_dependency_remove(3)``. See ``job.state.depend`` for more information. Reject jobs with invalid specifications using :man3:`flux_jobtap_reject_job` and a negative return code. job.new Announces a new valid job. Called in the same three situations as ``job.create``. job.state.* Called after a job state transition. The callback executes after the state is published to the eventlog but before action is taken on the state (since the action may immediately trigger another transition). job.event.* The ``job.event.*`` callbacks are made only for plugins subscribed to a job with ``flux_jobtap_job_subscribe()``. All job events trigger this callback for subscribed plugins, including events that do not result in state transitions (e.g., ``start`` or non-fatal ``exception``). Subscribing to Job Events:: int flux_jobtap_job_subscribe (flux_plugin_t *p, flux_jobid_t id); void flux_jobtap_job_unsubscribe (flux_plugin_t *p, flux_jobid_t id); ``flux_jobtap_job_subscribe()`` subscribes the plugin to ``job.event.*`` callbacks for job ``id``. Returns 0 on success, -1 on failure. ``flux_jobtap_job_unsubscribe()`` unsubscribes from job ``id`` events. Subscriptions are cleaned up automatically when the job becomes inactive or the plugin is unloaded. job.state.depend The final place to add dependencies to a job. Add dependencies via ``flux_jobtap_dependency_add()``, which attaches a named dependency to a job. Jobs remain in ``DEPEND`` state until all dependencies are removed via ``flux_jobtap_dependency_remove()``. A dependency may be used only once. A second ``flux_jobtap_dependency_add()`` call with the same description returns ``EEXIST``, even if the dependency was removed. This enables idempotent operation during job-manager or plugin restart. .. note:: See :ref:`event_processing` for important information about when state transitions occur synchronously when removing dependencies. job.state.priority Priority-managing plugins must return a priority at callback end. If the priority is not available, use ``flux_jobtap_priority_unavail()`` to indicate it cannot be set. Jobs without a priority (unavailable or no priority plugin loaded) remain in PRIORITY state until assigned. Set the priority asynchronously using ``flux_jobtap_reprioritize_job()``. See :ref:`priority` for details on plugin priority management. .. note:: See :ref:`event_processing` for important information about when the ``job.state.sched`` callback is invoked relative to this callback. job.state.sched Plugins may set ``R`` in output args. If ``R`` is not already assigned, this forces ``R`` for the current job and bypasses the scheduler. job.priority.get Called when the job manager updates a single job priority. The plugin returns a priority immediately, but may use ``flux_jobtap_priority_unavail()`` if unavailable when a job is in PRIORITY state. Returning unavailable priority in SCHED state is an error (logged but ignored). Request ``job.priority.get`` for all jobs via ``flux_jobtap_reprioritize_all()``. See :ref:`priority` for details. job.inactive-add Job transitioned to INACTIVE state and was added to the inactive hash. job.inactive-remove Job was purged from the inactive hash. job.update Job was updated with an RFC 21 ``jobspec-update`` event. CONFIGURATION CALLBACK TOPIC ============================ Plugins may register a ``conf.update`` callback. The current/proposed configuration object is in input arguments under the ``conf`` key. The callback is invoked when: - The plugin is first loaded. Failure causes plugin load to fail. - The configuration changes. Failure causes ``flux config reload`` to fail. Return 0 on success, -1 on failure. On failure, optionally set a human readable error in the ``errstr`` output argument. Use ``flux_jobtap_error()`` for convenience. JOB UPDATE CALLBACKS ==================== The job manager allows updates of select job attributes through plugins. Register a callback matching ``job.update.KEY``, where ``KEY`` is a period-delimited jobspec attribute (e.g., ``job.update.attributes.system.duration``). Requested updates are passed in the ``updates`` key. ``job.update.*`` callbacks allow or deny updates of specific attributes. Updates are denied by default unless a callback exists for the attribute and the plugin returns 0. Deny updates by returning -1; optionally set an error message with ``flux_jobtap_error(3)``. After all updates are allowed, the updated jobspec passes through the ``job.validate`` plugin stack for validation. Note an update is already validated by setting a ``validated`` flag in ``FLUX_PLUGIN_OUT_ARGS``. If all updated attributes have this flag, validation is skipped. This allows instance owners to update attributes beyond limits. Some updates benefit from a feasibility check before application. This prevents users from making feasible jobs infeasible. The update plugin determines if feasibility checking is needed. Feasibility checks occur only if a ``feasibility`` flag in ``FLUX_PLUGIN_OUT_ARGS`` is set. If any plugin requires a check, the updated jobspec as a whole is checked. If infeasible, the update is aborted and an error returned. Updating one attribute may require modifying others. For example, updating ``attributes.system.queue`` may require modifying ``attributes.system.constraints`` to apply the new queue's constraints. Plugins may push an ``updates`` object onto ``FLUX_PLUGIN_OUT_ARGS`` with the same form as RFC 21 ``jobspec-update`` context. To update ``attributes.system.foo`` to 1, set:: {"updates": {"attributes.system.foo": 1}} in ``FLUX_PLUGIN_OUT_ARGS`` before returning. Updates are applied by updating requested updates, so this may overwrite user-requested updates. Use with caution. Example: Complete Update Handler --------------------------------- A plugin that allows updating job duration while enforcing limits:: #define MAX_DURATION 86400 /* 24 hours */ static int update_duration_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { double duration; /* Unpack the requested update value */ if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s:{s:f}}", "updates", "attributes.system.duration", &duration) < 0) { return flux_jobtap_error (p, args, "failed to unpack duration"); } /* Enforce maximum duration limit */ if (duration > MAX_DURATION) { return flux_jobtap_error (p, args, "duration %.0f exceeds maximum of %d", duration, MAX_DURATION); } /* Enforce minimum duration */ if (duration < 60.0) { return flux_jobtap_error (p, args, "duration %.0f is below minimum of 60", duration); } /* Accept the update - no additional validation needed */ flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "{s:i}", "validated", 1); return 0; } int flux_plugin_init (flux_plugin_t *p) { return flux_plugin_add_handler (p, "job.update.attributes.system.duration", update_duration_cb, NULL); } Validates duration updates against limits and sets ``validated=1`` to skip additional validation. UPDATING JOBS PROGRAMMATICALLY =============================== :: int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...); int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p, flux_jobid_t id, const char *fmt, ...); ``flux_jobtap_jobspec_update_pack()`` updates the jobspec of the current job (the job for which the callback was invoked). Arguments follow Jansson pack format with period-delimited keys per RFC 21 jobspec-update format. Call only from a jobtap callback for the job being updated, not for other jobs or outside callback context. Returns 0 on success, -1 on failure. ``flux_jobtap_jobspec_update_id_pack()`` updates job ``id`` asynchronously, outside a callback for the target job. The job must not be the current job (use ``flux_jobtap_jobspec_update_pack()`` for that). Returns 0 on success, -1 on failure. Restrictions: - Jobs in RUN, CLEANUP, or INACTIVE state cannot be updated - Jobs with readonly eventlogs cannot be updated - Updates are subject to ``job.validate`` callbacks - Use ``job.update.*`` callbacks to allow/deny specific attribute updates Example updating job duration:: static int some_callback (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { /* Update current job's duration to 1 hour */ if (flux_jobtap_jobspec_update_pack (p, "{s:f}", "attributes.system.duration", 3600.0) < 0) return -1; return 0; } Example updating another job asynchronously:: /* Called from timer or other async context */ static void timer_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) { flux_plugin_t *p = arg; flux_jobid_t target_job = get_target_job (); /* Update different job's priority attribute */ flux_jobtap_jobspec_update_id_pack (p, target_job, "{s:i}", "attributes.system.urgency", 16); } PLUGIN CALLBACK TOPICS ====================== plugin.query The job manager calls the ``plugin.query`` callback to allow plugins to provide data in response to a ``jobtap-query`` request (used by ``flux jobtap query PLUGIN``). Plugins export internal state for inspection by placing data in callback output arguments, e.g.:: flux_plugin_arg_pack (p, FLUX_PLUGIN_ARG_OUT, "{s:O}" "data", internal_data); .. _event_processing: EVENT PROCESSING MODEL ====================== Understanding synchronous versus asynchronous event processing is important when posting events that affect other jobs or trigger multiple state transitions. Event Queue Mechanism --------------------- The job manager maintains a per-job event queue to ensure in-order processing and prevent recursive callback invocations. When a plugin posts an event (via ``flux_jobtap_dependency_remove()``, ``flux_jobtap_event_post_pack()``, or by returning values like ``priority``), the event is added to the target job's queue. Queue processing depends on whether the target job is executing a callback: **Target job is idle** (not processing any event): The event is processed synchronously before the posting function returns. This triggers a cascade of state transitions and callbacks within the same call stack. **Target job is busy** (currently in a callback): The event is queued and processed after the current callback completes but before control returns to the original caller. This prevents recursive callbacks that see inconsistent job state. Synchronous State Transition Chains ------------------------------------ Jobtap operations can trigger chains of synchronous state transitions. **Example 1: Dependency Removal** When a plugin removes the last dependency from a job:: flux_jobtap_dependency_remove (p, target_jobid, "my-dependency"); // When this returns, target job may have reached SCHED state **If the target job is not currently in a callback**, the following occurs synchronously before the function returns: 1. A ``dependency-remove`` event is posted and processed immediately 2. The job transitions from ``DEPEND`` to ``PRIORITY`` state 3. All plugins' ``job.state.priority`` callbacks are invoked 4. If a plugin returns a valid priority (or uses default priority): a. A ``priority`` event is posted (queued due to recursion prevention) b. After the ``job.state.priority`` callback returns, the queued ``priority`` event is processed c. The job transitions from ``PRIORITY`` to ``SCHED`` state d. All plugins' ``job.state.sched`` callbacks are invoked e. The job is enqueued with the scheduler 5. Only then does ``flux_jobtap_dependency_remove()`` return **If the target job is currently in a callback**, the event is queued, the function returns immediately, and state transitions occur after the target job's callback completes. **Example 2: Priority Assignment** When a plugin returns a priority in its ``job.state.priority`` callback:: int priority_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { // Calculate priority int64_t priority = calculate_priority (args); flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "{s:I}", "priority", priority); return 0; // When this returns, job.state.sched may have been called } When priority is available immediately: 1. The plugin returns with a priority in output args 2. A ``priority`` event is posted (queued to prevent recursion) 3. After all ``job.state.priority`` callbacks complete, the queued event is processed 4. The job transitions from ``PRIORITY`` to ``SCHED`` state 5. All plugins' ``job.state.sched`` callbacks are invoked 6. Control returns through the callback stack Recursion Prevention -------------------- The event queue prevents infinite recursion: **Events posted to the current job**: Events posted to the job for which a callback was invoked are queued and processed after the callback returns, not during. This keeps job state stable during callback execution. **Cascading events during event processing**: Events triggered during processing (like priority assignment triggering a state transition) are queued and processed in order before returning, without deep recursion. This design ensures: - Job state remains consistent during callbacks - Callbacks see a stable, well-defined view of the job - Events are processed in strict FIFO order - Stack depth remains bounded regardless of event chains - Jobs cannot change state "underneath" a running callback Job Hold Mechanism ------------------ The job manager may temporarily "hold" events for a job using the ``job->hold_events`` flag. While set, all events for that job are queued, not processed immediately. This occurs during: - Batch KVS commit operations - Certain bulk job operations When released, all queued events process synchronously in order before control returns. This is transparent to plugins but explains why multiple state transitions may occur atomically. Implications for Plugin Authors -------------------------------- **Synchronous cascades**: Callbacks on other jobs or later states may execute before posting functions return. Do not assume operations occur after event posting—the event and its callbacks complete within the posting function. **State consistency**: Job state remains constant during callback execution. A job in ``job.state.priority`` remains in PRIORITY state until the callback returns, even if the callback posts events. **Dependency removal timing**: If the target job is not in a callback: - ``job.state.priority`` callback executes before ``flux_jobtap_dependency_remove()`` returns - If priority is available, ``job.state.sched`` also executes before return - If priority is unavailable, target job remains in PRIORITY state If the target job is in a callback, the dependency removal is queued and ``flux_jobtap_dependency_remove()`` returns immediately. **Priority assignment timing**: When returning a priority in ``job.state.priority``: - ``job.state.sched`` callback executes before ``job.state.priority`` returns - All plugins' ``job.state.sched`` callbacks execute synchronously - The job may be enqueued with the scheduler before ``job.state.priority`` returns **Asynchronous operations**: For asynchronous work (I/O, RPCs, timers): - For priority: Use ``flux_jobtap_priority_unavail()``, then call ``flux_jobtap_reprioritize_job()`` when ready - For dependencies: Add in ``job.state.depend``, remove when condition met - Do not attempt to defer event processing—use the provided mechanisms **Querying job state**: Querying a job after posting an event returns the state after all synchronous transitions complete:: flux_jobtap_dependency_remove (p, other_id, "my-dependency"); // Query the job - it may now be in SCHED state flux_plugin_arg_t *job_info = flux_jobtap_job_lookup (p, other_id); int state; flux_plugin_arg_unpack (job_info, FLUX_PLUGIN_ARG_IN, "{s:i}", "state", &state); // state may be FLUX_JOB_STATE_SCHED, not DEPEND or PRIORITY **Callback ordering**: Events are processed in order per job. Across jobs, processing order depends on the order events were posted. Example: Dependency Chain -------------------------- Consider a plugin managing a chain of dependent jobs. When the first job completes, it should release the second job:: int finish_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { flux_jobid_t completed_id, dependent_id; flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s:I}", "id", &completed_id); dependent_id = get_dependent_job (completed_id); if (dependent_id != FLUX_JOBID_ANY) { // If dependent_id is idle: DEPEND -> PRIORITY -> SCHED synchronously // If dependent_id is busy: queued for later processing flux_jobtap_dependency_remove (p, dependent_id, "predecessor"); } return 0; } When the target job is idle, synchronous processing releases dependent jobs immediately, minimizing startup latency. .. _job_auxiliary_data: JOB AUXILIARY DATA ================== Plugins can attach arbitrary data to jobs. This data persists across callbacks and state transitions, enabling stateful plugin operation. API Functions ------------- :: int flux_jobtap_job_aux_set (flux_plugin_t *p, flux_jobid_t id, const char *name, void *val, flux_free_f free_fn); void *flux_jobtap_job_aux_get (flux_plugin_t *p, flux_jobid_t id, const char *name); int flux_jobtap_job_aux_delete_value (flux_plugin_t *p, flux_jobid_t id, void *val); ``flux_jobtap_job_aux_set()`` attaches data ``val`` with key ``name`` to job ``id``. If ``free_fn`` is non-NULL, it destroys ``val`` when the job is destroyed or the plugin unloads. Returns 0 on success, -1 on failure. ``flux_jobtap_job_aux_get()`` retrieves data for key ``name`` from job ``id``. Returns the data pointer, or NULL if not found. ``flux_jobtap_job_aux_delete_value()`` removes aux data by value from job ``id``. Use when the key name is unknown but the value pointer is available. Returns 0 on success, -1 on failure. The ``id`` parameter accepts ``FLUX_JOBTAP_CURRENT_JOB`` to operate on the current callback's job. Lifecycle --------- Auxiliary data is cleaned up automatically when: - The job transitions to INACTIVE state - The plugin is unloaded - The data is explicitly deleted Data persists across all state transitions before INACTIVE. Isolation --------- Each plugin's auxiliary data is isolated from other plugins. Two plugins can use the same key without conflict. Example ------- Track the number of times a job has been through PRIORITY state:: struct priority_count { int count; }; static int priority_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { struct priority_count *pc; pc = flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "pricount"); if (!pc) { pc = calloc (1, sizeof (*pc)); if (flux_jobtap_job_aux_set (p, FLUX_JOBTAP_CURRENT_JOB, "pricount", pc, free) < 0) { free (pc); return -1; } } pc->count++; flux_log (flux_jobtap_get_flux (p), LOG_INFO, "job priority calculated %d times", pc->count); return 0; } .. _raising_exceptions: RAISING EXCEPTIONS ================== Plugins can raise job exceptions to signal errors or anomalous conditions. Exceptions are recorded in the job eventlog and can terminate the job. API Function ------------ :: int flux_jobtap_raise_exception (flux_plugin_t *p, flux_jobid_t id, const char *type, int severity, const char *fmt, ...); Raises an exception on job ``id`` with the specified ``type``, ``severity``, and formatted message. The ``id`` parameter accepts ``FLUX_JOBTAP_CURRENT_JOB``. Returns 0 on success, -1 on failure. Severity Levels --------------- **severity = 0** (fatal): Terminates the job immediately. Job transitions to CLEANUP state and will not run. Use for unrecoverable errors. **severity = 1-6** (non-fatal): Logs the exception but allows the job to continue. Use for warning conditions that should be recorded but do not prevent execution. **severity = 7** (debug): Debug-level exception, not shown to users by default. Severity levels correspond to syslog levels, with 0 most severe (LOG_EMERG) and 7 least severe (LOG_DEBUG). Exception Types --------------- The ``type`` parameter identifies the exception category. Common types: - ``"dependency"`` - dependency-related errors - ``"alloc"`` - resource allocation failures - ``"timeout"`` - job timeout - ``"cancel"`` - job cancellation - ``"exec"`` - execution system errors The type is recorded in the exception event for monitoring tools. When to Use ----------- Use ``flux_jobtap_raise_exception()`` instead of returning -1 when: - The error should be visible in job eventlogs - A detailed error message is needed - The error should terminate the job (severity 0) - The job should continue but the condition logged (severity 1-6) Return -1 for plugin-internal errors that should not generate user-visible exceptions. Example ------- Raise a fatal exception if a job violates policy:: static int validate_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { int nnodes; if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s:{s:{s:i}}}", "jobspec", "resources", "nnodes", &nnodes) < 0) return -1; if (nnodes > 1024) { flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "policy", 0, "requested %d nodes exceeds limit of 1024", nnodes); return -1; } return 0; } Raise a non-fatal warning:: static int alloc_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { /* Warn if job has been waiting more than 1 hour */ double t_submit, now = flux_reactor_now (flux_get_reactor ( flux_jobtap_get_flux (p))); flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s:f}", "t_submit", &t_submit); if (now - t_submit > 3600.0) { flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "long-wait", 4, "job waited %.0f seconds for allocation", now - t_submit); } return 0; } .. _querying_job_state: QUERYING JOB STATE ================== Getting Job Information ----------------------- :: flux_plugin_arg_t *flux_jobtap_job_lookup (flux_plugin_t *p, flux_jobid_t id); Returns job information for job ``id`` as a ``flux_plugin_arg_t`` containing the same fields as callback input args (see :ref:`arguments`). Returns NULL on failure. Call ``flux_plugin_arg_destroy()`` on the returned object. Getting Job Result ------------------ :: int flux_jobtap_get_job_result (flux_plugin_t *p, flux_jobid_t id, flux_job_result_t *resultp); Gets the result of job ``id`` and stores it in ``*resultp``. Valid only for jobs in CLEANUP or INACTIVE state. Returns 0 on success, -1 on failure. Result values: ``FLUX_JOB_RESULT_COMPLETED`` Job completed successfully (exit code 0) ``FLUX_JOB_RESULT_FAILED`` Job failed (non-zero exit code) ``FLUX_JOB_RESULT_CANCELED`` Job canceled by exception type "cancel" ``FLUX_JOB_RESULT_TIMEOUT`` Job canceled by exception type "timeout" Use in ``job.state.cleanup`` or ``job.state.inactive`` callbacks to determine why a job ended. Checking Posted Events ----------------------- :: int flux_jobtap_job_event_posted (flux_plugin_t *p, flux_jobid_t id, const char *name); Returns 1 if event ``name`` was posted to job ``id``, 0 if not, -1 on failure. Use for idempotent operations and checking job history. Setting Job Flags ----------------- :: int flux_jobtap_job_set_flag (flux_plugin_t *p, flux_jobid_t id, const char *flag); Sets a flag on job ``id``. The ``id`` parameter accepts ``FLUX_JOBTAP_CURRENT_JOB``. Returns 0 on success, -1 on failure. Supported flags: ``waitable`` Makes the job waitable via ``flux_job_wait()``. By default, only jobs submitted with ``FLUX_JOB_WAITABLE`` are waitable. This allows plugins to make jobs waitable after submission. Posts a ``set-flags`` event to the job eventlog. Example ------- Query job information from another context:: flux_plugin_arg_t *args = flux_jobtap_job_lookup (p, other_jobid); if (args) { int state; flux_jobid_t id; flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, "{s:I s:i}", "id", &id, "state", &state); flux_log (flux_jobtap_get_flux (p), LOG_DEBUG, "job %s is in state %d", idf58 (id), state); flux_plugin_arg_destroy (args); } Get job result in cleanup:: static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { flux_job_result_t result; if (flux_jobtap_get_job_result (p, FLUX_JOBTAP_CURRENT_JOB, &result) == 0) { switch (result) { case FLUX_JOB_RESULT_COMPLETED: /* Job succeeded */ break; case FLUX_JOB_RESULT_FAILED: /* Job failed */ break; case FLUX_JOB_RESULT_CANCELED: /* Job was canceled */ break; case FLUX_JOB_RESULT_TIMEOUT: /* Job timed out */ break; } } return 0; } Check if dependency event was posted:: if (flux_jobtap_job_event_posted (p, id, "dependency-add")) { /* Dependency was already added, skip */ return 0; } .. _priority: PRIORITY ======== Custom job priority assignment is a core jobtap feature. A builtin ``.priority-default`` plugin is always loaded to ensure jobs move past PRIORITY state when no other priority plugin is loaded. The default plugin assigns priority equal to job urgency. When loading a priority plugin, note that ``.priority-default`` may still be loaded. This initializes ``priority`` in return arguments to job urgency. Since ``job.state.priority`` and ``job.priority.get`` callbacks run in order, subsequently loaded plugins overwrite the default ``priority``, making the last loaded priority plugin active. To ensure the default priority is overridden, priority plugins must always set a priority or use ``flux_jobtap_priority_unavail()`` if unavailable, in callbacks that return priority: ``job.state.priority`` and ``job.priority.get``. To fully prevent conflicts, explicitly remove the builtin priority plugin: :: flux jobtap remove .priority-default or via configuration (See :man5:`flux-conf-job-manager`) :: [job-manager] plugins = [ { remove = ".priority-default", load = "complex-priority.so" }, ] .. _perilogs: PROLOG AND EPILOG ACTIONS ========================= Plugins perform asynchronous tasks after ``alloc`` but before running, or after ``finish`` but before freeing resources, using job manager prolog and epilog actions. Delineate actions with these functions: :: int flux_jobtap_prolog_start (flux_plugin_t *p, const char *description); int flux_jobtap_prolog_finish (flux_plugin_t *p, flux_jobid_t id, const char *description, int status); int flux_jobtap_epilog_start (flux_plugin_t *p, const char *description); int flux_jobtap_epilog_finish (flux_plugin_t *p, flux_jobid_t id, const char *description, int status); Call ``flux_jobtap_prolog_start()`` to initiate a prolog action. This blocks the job from starting until ``flux_jobtap_prolog_finish()`` is called, even after resources are assigned. The prolog status passed to ``flux_jobtap_prolog_finish()`` is captured in the eventlog, but the action must raise a job exception or take other action on failure. Non-zero prolog finish status does not trigger automated job manager behavior. The prolog ``description`` is informational only, distinguishing multiple actions in the eventlog. Call ``flux_jobtap_epilog_start()`` to initiate an epilog action, preventing resource release until ``flux_jobtap_epilog_finish()`` is called. The same caveats regarding description and completion status apply. Call ``flux_jobtap_prolog_start()`` anytime before the ``start`` request to the execution system, typically from ``job.state.run`` or ``job.event.alloc`` callbacks (when resources are allocated). Note: plugins receive ``job.event.*`` callbacks only for subscribed jobs (via ``flux_jobtap_job_subscribe()``). Cannot start prolog after CLEANUP state. Call ``flux_jobtap_epilog_start()`` only after a job enters CLEANUP state but before the ``free`` request to the scheduler, typically from ``job.state.cleanup`` or ``job.event.finish`` callbacks. If called for a job in an invalid state, these functions return -1 with ``errno`` set to ``EINVAL``. Multiple prolog or epilog actions can be active simultaneously. CALLING OTHER PLUGINS ===================== Invoke custom callbacks in other plugins using ``flux_jobtap_call()``. Topic strings starting with ``job.`` are reserved for the job manager and cause immediate failure with ``errno`` set to ``EINVAL``:: int flux_jobtap_call (flux_plugin_t *p, flux_jobid_t id, const char *topic, flux_plugin_arg_t *args) The jobtap API assumes a current job, so ``id`` is required. ``args`` is passed unmodified when invoking callbacks for ``topic``, so expected data from :ref:`arguments` for job ``id`` may not be present in newly created ``args`` unless manually added. When invoked from another jobtap callback, use the existing ``args`` with ``FLUX_JOBTAP_CURRENT_JOB``, which preserves expected job arguments. For example, this calls all plugins registered for ``custom.topic`` when ``callback`` is called:: int callback (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, void *arg) { return flux_jobtap_call (p, FLUX_JOBTAP_CURRENT_JOB, "custom.topic", args); } REGISTERING SERVICES ==================== :: int flux_jobtap_service_register (flux_plugin_t *p, const char *method, flux_msg_handler_f cb, void *arg); int flux_jobtap_service_register_ex (flux_plugin_t *p, const char *method, uint32_t rolemask, flux_msg_handler_f cb, void *arg); Registers a service endpoint invoked via Flux RPC. The service name is ``job-manager..``, where ```` is the plugin's internal name (see JOBTAP PLUGIN NAMES section). ``flux_jobtap_service_register()`` registers a service accessible to all users. ``flux_jobtap_service_register_ex()`` restricts access to users matching ``rolemask``. Use ``FLUX_ROLE_OWNER`` for instance owner, ``FLUX_ROLE_USER`` for any user. Both return 0 on success, -1 on failure. The callback receives RPC requests and responds using ``flux_respond()`` or ``flux_respond_error()``. Example:: static void query_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct plugin_data *data = arg; /* Return plugin state */ if (flux_respond_pack (h, msg, "{s:i}", "count", data->count) < 0) flux_log_error (h, "query response failed"); } int flux_plugin_init (flux_plugin_t *p) { struct plugin_data *data = calloc (1, sizeof (*data)); /* Register job-manager.myplugin.query service */ if (flux_jobtap_service_register (p, "query", query_cb, data) < 0) return -1; return 0; } Users can then query via:: $ flux exec -r 0 flux python >>> import flux >>> h = flux.Flux() >>> h.rpc("job-manager.myplugin.query").get() RESOURCES ========= .. include:: common/resources.rst SEE ALSO ======== :man1:`flux-jobtap`, :man5:`flux-conf-job-manager`