flux.scheduler module

Base class for Python Flux scheduler broker modules.

Provides the RFC 27 scheduler protocol scaffolding so that subclasses only need to implement scheduling policy:

  • Registers the sched and feasibility dynamic services

  • Manages the resource.acquire streaming RPC for resource state

  • Performs the hello/ready handshake with job-manager on startup

  • Dispatches alloc, free, cancel, prioritize, expiration, and resource-status messages to overridable methods

The base class maintains a PendingJob heap queue (_queue) and provides default implementations of alloc(), free(), cancel(), and prioritize(). A minimal scheduler only needs to override schedule():

import heapq
from flux.resource import InsufficientResources, InfeasibleRequest
from flux.scheduler import Scheduler

class MyScheduler(Scheduler):

    def schedule(self):
        while self._queue:
            job = self._queue[0]
            try:
                alloc = self.resources.alloc(job.jobid, job.resource_request)
            except InsufficientResources:
                break   # not enough resources — wait for free
            except InfeasibleRequest as exc:
                job.request.deny(str(exc))
            else:
                job.request.success(alloc)
            heapq.heappop(self._queue)
            yield   # hand control to the reactor

def mod_main(h, *args):
    MyScheduler(h, *args).run()
class flux.scheduler.AllocRequest(scheduler, msg)

Bases: object

Represents a pending allocation request from the job-manager.

Passed to Scheduler.alloc() and typically stored in the scheduler's pending queue until resources become available. Call success(), deny(), or cancel() to finalize the request; call annotate() to update job annotations while it is pending.

jobid

The job ID for this allocation request.

Type

int

annotate(annotations)

Send an annotation update for this pending request.

May be called any number of times before the request is finalized. Keys set here are tracked so that success() can automatically clear them. Calls after the request is finalized are silently ignored to prevent stale annotations from a still-running forecast generator reaching the job-manager after a cancel.

Parameters

annotations (dict) -- Annotation dict to attach to the job.

cancel()

Finalize the request indicating it was cancelled.

deny(note=None)

Finalize the request with a permanent denial.

Parameters

note (str, optional) -- Human-readable reason for the denial.

jobid
success(R, annotations=None, clear=True)

Finalize the request with a successful allocation.

Parameters
  • R -- The allocated resources, either as a resource pool object (anything with a to_dict() method) or a pre-converted dict. Pool objects are converted automatically.

  • annotations (dict, optional) -- Scheduler annotations to attach.

  • clear (bool) -- If True (the default), any sched annotation keys set via annotate() while the job was pending are automatically nulled in the success response, unless overridden by annotations. This ensures pending-state annotations (e.g. reason_pending, t_estimate) are cleaned up without each scheduler needing to track and clear them manually.

class flux.scheduler.PendingJob(jobid, priority, t_submit, request, resource_request)

Bases: object

Pending allocation request held in the scheduler queue.

Created by the default Scheduler.alloc() implementation and stored in Scheduler._queue. Subclasses may access the queue directly (it is a heapq min-heap ordered by __lt__()).

jobid

The job ID.

Type

int

priority

Job priority (higher is more urgent).

Type

int

t_submit

Submission time (seconds since epoch).

Type

float

request

The open RFC 27 allocation request.

Type

AllocRequest

resource_request

Pool-specific parsed resource request returned by parse_resource_request(); passed to alloc() when scheduling. The raw jobspec dict is accessible as resource_request.jobspec.

jobid
priority
request
resource_request
t_submit
class flux.scheduler.ResourcePool(arg=None, version=1, log=None, **kwargs)

Bases: object

Public wrapper for a resource pool implementation.

Accepts the same argument forms as ResourceSet:

  • An R JSON string or parsed dict → dispatches to the correct ResourcePoolImplementation subclass by version.

  • A ResourcePoolImplementation instance → wraps it directly.

exception InfeasibleRequest

Bases: OSError

Request can never be satisfied by this pool.

exception InsufficientResources

Bases: OSError

Not enough resources available right now — retry after a free event.

alloc(jobid: int, request) ResourcePool

Allocate resources for jobid and return the allocated pool.

check_feasibility(request) None

Check whether request is structurally satisfiable.

copy() ResourcePool

Return a full independent copy preserving allocation state.

copy_allocated()

Return a ResourceSet containing only the allocated resources.

copy_down()

Return a ResourceSet containing only the down resources.

dumps() str

Return a compact human-readable summary of the resource set.

property expiration: float

Resource expiration timestamp (seconds since epoch, 0 = none).

free(jobid: int, R=None, final: bool = False) None

Return a job's allocated resources to the pool.

property generation: int

Monotonically increasing mutation counter.

job_end_times()

Return a list of (jobid, end_time) pairs for all tracked jobs.

mark_down(ids: str) None

Mark resources identified by idset string (or "all") as down.

mark_up(ids: str) None

Mark resources identified by idset string (or "all") as up.

parse_resource_request(jobspec: dict)

Parse a jobspec dict and return a pool-specific resource request.

register_alloc(jobid: int, R: ResourcePool) None

Register an existing allocation during scheduler reconnect.

remove_ranks(ranks) None

Remove ranks from the pool (called on shrink events).

set_expiration(expiration: float) None

Set the resource set expiration (seconds since epoch, 0 = none).

set_starttime(starttime: float) None

Set the resource set starttime (seconds since epoch).

to_dict() dict

Return the resource set as a parsed R JSON dict.

to_resource_set()

Return a topology+availability snapshot as a ResourceSet.

update_expiration(jobid: int, expiration: float) None

Update the tracked end time for a running job.

class flux.scheduler.Scheduler(h, *args)

Bases: BrokerModule

Base class for Python scheduler broker modules.

Handles the RFC 27 protocol scaffolding so that subclasses only need to implement scheduling policy.

The base class maintains a pending-job queue (_queue, a heapq of PendingJob ordered by priority then jobid) and provides default implementations of alloc() (parse jobspec and enqueue), free() (release resources), cancel() (dequeue and cancel), and prioritize() (re-sort the queue). Subclasses must override schedule() to implement their allocation policy, and may override any of the defaults.

Subclasses may also override:

  • hello() — called once per running job during startup (optional; default marks resources allocated)

  • resource_update() — called after resource state changes

  • feasibility_check() — called for feasibility.check RPCs

  • forecast() — called after schedule() to annotate pending jobs with forward-looking estimates such as sched.t_estimate

Subclasses may set the class attributes:

  • queue_depth — maximum pending alloc requests: a positive integer (e.g. 8) for limited mode, or the string "unlimited" (default). The base class translates this to the wire format automatically. End users may override at load time with queue-depth=N|unlimited.

Alloc requests are represented by AllocRequest objects passed to alloc(). Call request.success(R), request.deny(note), request.cancel(), or request.annotate(d) to respond.

The current resource pool is available as resources. Resource state is updated automatically from resource.acquire and resource_update() is called after each update.

SCHED_DELAY_MAX = 1.0

Maximum scheduling delay used when coalescing bursts of alloc requests. The adaptive timer will not exceed this value regardless of how long schedule() takes. 1 second is a reasonable upper bound for most HPC schedulers; lower it if worst-case latency matters more than throughput during large submission bursts.

SCHED_EWMA_ALPHA = 0.25

Exponential weighted moving average (EWMA) smoothing factor for the adaptive scheduling timer. Higher values react faster to changes in burst rate and scheduling cost; lower values are more stable. 0.25 converges in roughly 4 samples.

alloc(request, jobid, priority, userid, t_submit, jobspec)

Queue an allocation request for the next schedule() pass.

Parses jobspec and pushes a PendingJob onto _queue. Override to change how jobs are parsed or queued.

Parameters
  • request (AllocRequest) -- The allocation request to respond to.

  • jobid (int) -- The job ID (also available as request.jobid).

  • priority (int) -- Job priority.

  • userid (int) -- Submitting user ID.

  • t_submit (float) -- Job submission time.

  • jobspec (dict) -- The raw jobspec dict.

alloc_annotate(msg, annotations)

Send an alloc annotation update. Typically called via AllocRequest.annotate().

Parameters
  • msg -- The alloc request Message.

  • annotations (dict) -- Annotation dict to attach to the job.

alloc_cancel(msg)

Send an alloc cancel response. Typically called via AllocRequest.cancel().

Parameters

msg -- The original alloc request Message.

alloc_deny(msg, note=None)

Send an alloc denial response. Typically called via AllocRequest.deny().

Parameters
  • msg -- The alloc request Message.

  • note (str, optional) -- Human-readable reason for the denial.

alloc_success(msg, R, annotations=None)

Commit R to KVS and send an alloc success response.

Typically called via AllocRequest.success(). R must be a dict (parsed R JSON object). The alloc response to job-manager is sent only after R is safely stored in KVS.

Parameters
  • msg -- The alloc request Message.

  • R (dict) -- The allocated resource set as a parsed JSON object.

  • annotations (dict, optional) -- Scheduler annotations to include.

cancel(jobid)

Remove a pending job from the queue and respond with cancel.

Parameters

jobid (int) -- The job ID to cancel.

expiration(msg, jobid, expiration)

Called when job-manager requests a job expiration update.

The default implementation accepts all updates by responding success. Planning schedulers that track per-job expiration should override this to update their internal records before calling self.handle.respond(msg, None).

Parameters
  • msg -- The request Message.

  • jobid (int) -- The job ID whose expiration is being updated.

  • expiration (float) -- New expiration timestamp (seconds since epoch).

feasibility_check(msg, jobspec)

Called for feasibility.check RPC.

Default implementation responds success if the job could ever fit within the total resource set (ignoring current availability). Subclasses may override for custom feasibility logic.

Parameters
  • msg -- The request Message.

  • jobspec (dict) -- The raw jobspec dict.

forecast()

Called after schedule() to annotate pending jobs with forward-looking estimates.

Override in subclasses to populate sched.t_estimate (and other forward-looking annotations) on pending jobs. The base class implementation is a no-op.

Triggered automatically after each schedule() call completes. If a forecast pass is already in progress when a new scheduling event arrives, it is left running to completion; the next forecast pass starts after the current one finishes and the next schedule() pass completes. Forecast estimates are approximate by design, so a slightly stale t_estimate is preferable to having none at all.

Supports the same generator protocol as schedule(): add yield at each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations.

free(jobid, R, final=False)

Return released resources to the pool.

Calls free() on the resource pool. Override (calling super().free()) to perform additional bookkeeping such as removing per-job state.

Parameters
  • jobid (int) -- The job ID.

  • R -- The released resources.

  • final (bool) -- True if this is the final free for the job.

hello(jobid, priority, userid, t_submit, R)

Called for each running job during the hello protocol.

Invoked synchronously during run() before the reactor starts. The default implementation marks R as allocated in the resource pool, which is sufficient for most schedulers. Override to additionally track per-job state (e.g., store the job's expiration time).

Parameters
  • jobid (int) -- The job ID.

  • priority (int) -- Job priority.

  • userid (int) -- Submitting user ID.

  • t_submit (float) -- Job submission time.

  • R -- The job's current allocation.

hello_partial_ok = True

If True (the default), send partial-ok: True in the hello RPC so that job-manager may report partially-freed ranks for running jobs. Set to False (or pass test-hello-nopartial at load time on subclasses that support it) to disable partial-ok behaviour.

pool_class: Optional[type] = None

Custom pool class. When set to a ResourcePool subclass, _make_pool() instantiates it directly instead of using the default ResourcePool version dispatch. The subclass is responsible for its own version dispatch (e.g. via an _impl_map). This is the preferred way to inject a custom pool:

class MyScheduler(Scheduler):
    pool_class = MyPool
pool_kwargs: dict = {}

Extra keyword arguments forwarded to the pool class at construction time. Passed to whichever pool class is selected by _make_pool() — whether that is pool_class, a writer-discovered class, or the default ResourcePool.

prioritize(jobs)

Apply priority updates to queued jobs and re-sort the queue.

Parameters

jobs (list) -- List of [jobid, priority] pairs.

queue_depth = 'unlimited'

Maximum number of pending alloc requests the job-manager will queue for this scheduler. Set to a positive integer (e.g. 8) for limited mode or to the string "unlimited" for unlimited mode. The Scheduler base class translates this to the wire format sent in job-manager.sched-ready; subclasses and end users should never need to know about the internal "limited=N" representation. End users may override at load time with queue-depth=N|unlimited.

resource_update()

Called after resource state changes from resource.acquire.

The updated state is available via resources.

property resources

The current resource pool.

run()

Initialize the scheduler and start the reactor.

Performs the RFC 27 initialization sequence:

  1. Register the sched dynamic service

  2. Register the feasibility dynamic service

  3. flux_module_set_running() — allow flux module load to return

  4. resource.acquire — sync first response, async continuation

  5. job-manager.sched-hello — sync hello protocol

  6. job-manager.sched-ready — announce readiness

  7. Start the reactor

Raises

OSError -- If any initialization step fails.

schedule()

Called to run the scheduling loop.

Override this in subclasses to implement allocation policy. Triggered automatically after every alloc(), free(), cancel(), prioritize(), expiration(), and resource_update() event via an adaptive one-shot timer. When events arrive in rapid bursts the timer delay grows to coalesce them into fewer schedule() calls; when events are infrequent the delay returns to zero so that each event is processed promptly.

Generator protocol — if this method returns a generator the base class advances it one yield per reactor iteration, allowing other events (new jobs, free responses, RPCs) to be handled between yields. Yield at each point where reactor responsiveness is desired — typically after each dispatched or denied job, but also when blocking on resources:

def schedule(self):
    while self._queue:
        job = self._queue[0]
        try:
            alloc = self.resources.alloc(job.jobid, job.resource_request)
        except InsufficientResources:
            break
        except InfeasibleRequest as exc:
            job.request.deny(str(exc))
        else:
            job.request.success(alloc)
        heapq.heappop(self._queue)
        yield  # hand control back to the reactor

When a new scheduling event arrives while a generator pass is in progress the base class closes the generator (triggering any finally blocks) and starts a fresh pass after the settling delay. This ensures a newly submitted high-priority job or a freed resource is considered from the top of the queue without waiting for the current pass to finish.

Non-generator schedule() implementations (no yield) behave exactly as before and remain fully supported.

start_schedule()

Arm the scheduling timer, aborting any in-progress generator pass.

Called by _request_schedule() after the EWMA interval update. Override this method (not _request_schedule()) to replace the generator-based scheduling loop with an alternative driver, while still benefiting from the burst-coalescing delay computed in _request_schedule().

The default implementation:

  1. Closes any in-progress schedule() generator and stops its yield watchers (queue or resource state is about to change so in-progress allocations are stale). An in-progress forecast() generator is left running to completion: forecast estimates are approximate by nature and a slightly stale t_estimate is more useful than none at all.

  2. Arms the one-shot scheduling timer with the current adaptive delay if it is not already pending. Subsequent calls while the timer is armed are no-ops, coalescing all events in the window into a single schedule() invocation.

stats_get()

Return a dict of scheduler statistics for the stats-get RPC.

Called by the built-in <module-name>.stats-get handler. Subclasses may override to add extra fields; call super().stats_get() and update the returned dict:

def stats_get(self):
    stats = super().stats_get()
    stats["my_counter"] = self._my_counter
    return stats

Standard fields:

sched_passes

Number of completed schedule() passes.

sched_yields

Total yields across all schedule() generator passes (always 0 for synchronous schedulers).

forecast_passes

Number of completed forecast() passes.

forecast_yields

Total yields across all forecast() generator passes (always 0 for synchronous schedulers).

sched_delay

Current adaptive burst-coalescing delay in seconds (0 = immediate). Always 0 for generator-based schedulers.

sched_duration_ewma

EWMA of schedule() wall-clock duration in seconds. Always 0 for generator-based schedulers.

sched_interval_ewma

EWMA of time between _request_schedule() calls in seconds. Tracked for all schedulers but does not affect sched_delay for generator-based schedulers.

pending_jobs

Current number of pending alloc requests in the scheduler queue.