Writing a Python Scheduler

Flux supports scheduler broker modules written in Python. The Scheduler base class handles all of the RFC 27 protocol scaffolding — service registration, resource acquisition, the hello/ready handshake with job-manager — so that a Python scheduler only needs to implement scheduling policy.

This guide walks through writing a working scheduler from scratch, explains each override point, and covers more advanced topics such as job annotations and start-time forecasting.

Minimal example

The base class maintains a PendingJob heap queue (_queue) and provides default implementations of alloc(), free(), cancel(), and prioritize(). A minimal scheduler only needs to override schedule():

import heapq
from flux.resource import InsufficientResources, InfeasibleRequest
from flux.scheduler import Scheduler

class SimpleScheduler(Scheduler):

    def schedule(self):
        while self._queue:
            job = self._queue[0]
            try:
                alloc = self.resources.alloc(job.jobid, job.resource_request)
            except InsufficientResources:
                break   # not enough resources — wait for free
            except InfeasibleRequest as exc:
                job.request.deny(str(exc))
            else:
                job.request.success(alloc)
            heapq.heappop(self._queue)

def mod_main(h, *args):
    SimpleScheduler(h, *args).run()

Save this as my-sched.py and load it:

$ flux module unload sched-simple
$ flux module load my-sched.py

The mod_main() entry point is the same convention used by all Python broker modules (see Python Broker Modules).

The resource pool

resources is an instance of Rv1Pool that represents the entire managed resource set. It tracks which resources are up, down, and currently allocated. The pool implementation is selected automatically based on the R version field in the resource data (currently only version 1 is supported).

The resource request

Each PendingJob carries a pool-specific resource request object in job.resource_request, parsed once from the jobspec when the alloc request arrives by parse_resource_request(). The type and fields of this object are defined by the pool; scheduler policy code treats it as opaque and passes it directly to alloc().

For the built-in Rv1Pool pools, node and slot counts carry RFC 14 ranges (a min and optional max), and non-standard resource hierarchies are traversed recursively. The raw jobspec dict is also accessible as job.resource_request.jobspec for reading site-specific hints from attributes.system.

Allocating resources

Pass the jobid and resource request to alloc():

try:
    alloc = self.resources.alloc(job.jobid, job.resource_request)
except InsufficientResources:
    # Not enough resources now — queue the request and retry later
    ...
except OSError as exc:
    # Request can never be satisfied (InfeasibleRequest or other error)
    request.deny(str(exc))
    return

RFC 31 constraint expressions (constraint field on the resource request) are carried through automatically; the resource pool evaluates them when selecting candidate nodes.

On success, alloc() returns a resource pool object containing only the allocated resources. Pass it directly to success(), which converts it automatically:

request.success(alloc)

The pool also records the job's expected expiration (derived from request.duration) internally; free() and start-time simulation (via copy() and job_end_times()) use this state.

Releasing resources

The base class free() calls free() automatically — no override is needed unless the scheduler has additional per-job cleanup to do.

A job's resources may be released in multiple calls when housekeeping is configured: housekeeping returns resources in batches of one or more ranks as they become ready, with final set to False until the last batch. Schedulers that override free() must handle this: R contains only the subset being freed on each call, and per-job state should not be discarded until final is True.

Resource state changes

Node drain/undrain and other resource state changes arrive automatically via the resource.acquire streaming RPC. After each update, the base class calls resource_update(), giving the scheduler a chance to retry pending allocations:

def resource_update(self):
    self._retry_pending()

Alloc requests

When job-manager asks the scheduler to allocate resources for a job, two distinct objects are created and stored together on the PendingJob:

  • job.resource_request — a pool-specific object describing what the job needs. Created once from jobspec by parse_resource_request() when the alloc arrives. For the built-in pools this carries node count, slot count, cores and GPUs per slot, duration, constraints, and exclusivity; node and slot counts carry RFC 14 ranges (min/max) and non-standard resource hierarchies are traversed recursively. The raw jobspec dict is accessible as job.resource_request.jobspec for reading site-specific attributes.system hints. Used by the scheduler during each schedule() pass to decide whether resources can be satisfied. Persists for the lifetime of the pending job.

  • AllocRequest (job.request) — represents the open RFC 27 protocol transaction with job-manager. Wraps the message handle and provides the response methods that finalize the exchange. Must be finalized exactly once; consumed when the scheduler responds.

The scheduler reads resource_request to make scheduling decisions, then calls a method on request to report the outcome.

AllocRequest must be finalized exactly once by calling one of:

request.success(R, annotations=None)

Allocation succeeded. R is the pool object returned by alloc(); it is converted to a dict automatically. The base class commits R to the KVS asynchronously before sending the response to job-manager, ensuring R is safely stored before the job can run.

request.deny(note=None)

Permanent denial — the job will never run. note is a human-readable reason that appears in flux-job(1) eventlog. Use this for structurally unsatisfiable requests (too many cores, unsupported resource type, etc.).

request.cancel()

The alloc request is being withdrawn. Call this from cancel().

Note

Cancelling an alloc request is not the same as cancelling the job. Job-manager may withdraw an alloc request — for example, to shrink the outstanding request count back within the configured queue-depth — while leaving the job itself in the pending queue. The job will receive a fresh alloc request when a slot opens up.

request.annotate(annotations)

Send an intermediate annotation update while the job is pending. May be called any number of times before finalizing. Annotations are visible in flux-jobs(1) output.

RFC 27 defines two standard sched annotation keys:

resource_summary

Free-form string describing the allocated resources (e.g., "rank[0-1]/core[0-3]"). Set this on a successful allocation.

t_estimate

Estimated start time as a Unix epoch float, or null to clear. Set this on pending jobs that have a shadow reservation; clear it on allocation.

Example (planning scheduler posting a reservation estimate):

# RFC 27 sched annotation: t_estimate is wall clock seconds since epoch
request.annotate({"sched": {"t_estimate": shadow_time}})

When a pending job is cancelled the base class automatically clears any annotations that were previously sent for that job — no explicit null annotation is needed in cancel() overrides.

Note

Scheduler annotations are not recorded in the job eventlog. flux-job(1) wait-event annotations cannot be used to wait for them in tests; instead poll flux-jobs(1) -no {annotations.sched.field}.

Override points

Subclass Scheduler and override any of the following methods:

hello(self, jobid, priority, userid, t_submit, R)

Called once per running job during startup, before the reactor starts. The default marks R as allocated in the resource pool. Override to also record per-job state needed by the scheduler subclass.

alloc(self, request, jobid, priority, userid, t_submit, jobspec)

Called when job-manager requests an allocation. The default calls parse_resource_request() on the pool to parse jobspec and pushes a PendingJob onto _queue. Override to reject jobs before they enter the queue.

free(self, jobid, R, final=False)

Called when resources are released for a completed or cancelled job. final is True on the last (or only) call for the job. When housekeeping is configured, resources may arrive in multiple partial batches before final is set; see Releasing resources for details. The default calls free() on the pool. Override (calling free() via super()) to also remove per-job state or log the event, taking care not to discard per-job state until final is True.

cancel(self, jobid)

Called when a pending alloc is cancelled by the user. The default removes jobid from _queue and calls cancel().

prioritize(self, jobs)

Called with a list of [jobid, priority] pairs when job priorities change. The default updates the priority of each job in _queue and re-heapifies.

schedule(self)

Called to run the scheduling loop after one or more scheduling events (alloc, free, cancel, prioritize, expiration, or resource state change). Events are coalesced by an adaptive timer so that schedule() is not called more often than necessary; see Scheduling deferral for details. The default implementation is a no-op.

If schedule() returns a generator the base class advances it one yield per reactor iteration, allowing other events to be handled between yields. When a new scheduling event arrives while a generator pass is in progress the base class closes it and starts a fresh pass after the settling delay, ensuring that a newly submitted job or freed resource is considered from the top of the queue without waiting for the current pass to finish. Non-generator implementations remain fully supported.

Example:

def schedule(self):
    while self._queue:
        job = self._queue[0]
        try:
            alloc = self.resources.alloc(job.jobid, job.resource_request)
        except InsufficientResources:
            break   # head blocked; stop (FIFO)
        except OSError as exc:
            job.request.deny(str(exc))
        else:
            job.request.success(alloc)
        heapq.heappop(self._queue)
        yield   # let the reactor handle other events between jobs
start_schedule(self)

Called by _request_schedule() after updating the interval EWMA. The default implementation aborts any in-progress generator and arms the one-shot scheduling timer. Override this method (not _request_schedule()) to replace the generator driver — for example, to launch an RPC-based allocation request — while still preserving the _sched_pending guard that coalesces concurrent scheduling events into a single pass.

resource_update(self)

Called after each resource state update. For queue-based schedulers that rely on schedule(), this override is usually not needed because the base class calls _request_schedule() automatically after every resource update. Override only if you need to react to the updated resources value before the scheduling pass.

feasibility_check(self, msg, jobspec)

Called for feasibility.check RPCs from job-ingest. The default calls parse_resource_request() and check_feasibility() on the pool, responding with EINVAL if the job can never fit the total resource set. Override only if custom feasibility logic is needed.

forecast(self)

Called after each schedule() pass to annotate pending jobs with forward-looking estimates such as sched.t_estimate. The base class implementation is a no-op. Override to post start-time estimates (or other planning metadata) without impacting scheduling throughput.

Supports the same generator protocol as schedule(): add yield at each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations. Unlike the schedule generator, a running forecast generator is not aborted when a new scheduling event arrives — it runs to completion. A slightly stale t_estimate is more useful than none at all, and the simulation snapshot taken at pass start remains internally consistent regardless of real-pool changes mid-pass.

Example (annotating the head-of-queue job with its estimated start time):

def forecast(self):
    if not self._queue:
        return
    head = sorted(self._queue)[0]
    t = self._shadow_time(head)   # simulate pool forward to find earliest start
    if head._last_annotation != t:
        head._last_annotation = t
        head.request.annotate({"sched": {"t_estimate": t}})

Queue depth

The queue-depth= module argument controls how many alloc requests job-manager sends concurrently:

  • queue-depth=unlimited (the default) — all pending jobs get an alloc request at once. The scheduler sees the full queue on startup.

  • queue-depth=N (a positive integer) — job-manager sends at most N outstanding requests. Use this to bound the scheduler queue and annotation overhead. FIFOScheduler defaults to queue-depth=8 to match the behaviour of the built-in sched-simple.

Set the class attribute queue_depth on the subclass to choose a default; use a plain integer or the string "unlimited":

class MyScheduler(Scheduler):
    queue_depth = 8   # or "unlimited"

The base class translates the value to the wire format for the hello protocol automatically. Users can override the default at load time:

$ flux module load my-sched.py queue-depth=unlimited

Scheduling deferral

Every scheduling event (alloc, free, cancel, prioritize, expiration, resource update) calls _request_schedule() rather than calling schedule() directly. The base class defers the actual call via a one-shot timer, coalescing multiple events into a single schedule() invocation.

The timer uses an adaptive delay tuned automatically at runtime:

  • The delay starts at zero, so the first event after an idle period is processed on the very next reactor iteration — no added latency.

  • After each schedule() call, the base class compares two exponential moving averages (EWMAs) 1:

    • _sched_interval_ewma — average time between _request_schedule() calls (measures how fast events are arriving).

    • _sched_duration_ewma — average time schedule() takes to run (measures scheduling cost).

  • If events arrive faster than schedule() can process them (interval < duration), the timer delay is raised to approximately duration, coalescing the burst into roughly one scheduling pass per cycle. A DEBUG log message is emitted when the delay changes.

  • Once events slow down again the delay resets to zero immediately.

Note

When schedule() is a generator the EWMA duration is not updated, so sched_delay remains 0 and the timer fires on the next reactor iteration with no burst-coalescing window. Events that arrive while the timer is already armed are still coalesced by the _sched_pending guard, but there is no adaptive settling period.

Two class attributes control the behaviour and can be overridden on the subclass:

class MyScheduler(Scheduler):
    SCHED_DELAY_MAX  = 1.0   # seconds; cap on adaptive delay (default 1s)
    SCHED_EWMA_ALPHA = 0.25  # EWMA smoothing factor (default 0.25 ≈ 4 samples)

SCHED_DELAY_MAX bounds worst-case scheduling latency during a sustained burst. SCHED_EWMA_ALPHA controls how quickly the timer reacts: higher values respond faster but are noisier; lower values are smoother but adapt more slowly.

For schedulers with a small queue-depth= (e.g. sched-fifo's default of 8), events arrive at most 8-at-a-time so the interval rarely falls below the schedule duration and the delay stays near zero. Schedulers using queue-depth=unlimited benefit most from the adaptive timer during large submission bursts.

Footnotes

1

An exponential moving average (EMA, or EWMA with weights) is a low-pass filter that gives exponentially decreasing weight to older samples: new = α × sample + (1 α) × old. With α = 0.25 the influence of a sample halves roughly every three updates. See https://en.wikipedia.org/wiki/Exponential_smoothing for background.

Forecast deferral

forecast() is called immediately after each schedule() pass completes. Because forecast() supports the same generator protocol as schedule(), annotation work may be spread across multiple reactor iterations to avoid blocking the critical scheduling path.

If a new scheduling event arrives while a forecast generator is in progress, the base class leaves it running to completion. Forecast estimates are approximate by design, and a slightly stale t_estimate is more useful than none at all. The simulation snapshot is taken once at pass start (a deep copy of the pool), so it remains internally consistent regardless of real-pool changes mid-pass. A fresh forecast pass is triggered after the next schedule() pass completes.

Module arguments

Arguments passed after the module path at load time arrive as *args in mod_main() and are forwarded to __init__:

$ flux module load my-sched.py queue-depth=8 log-level=debug

The base class automatically handles four built-in arguments:

queue-depth=N|unlimited

Maximum number of concurrent outstanding alloc requests (default 8, or "unlimited" if queue_depth is set to that string on the subclass).

log-level=LEVEL

Minimum log severity to emit. LEVEL is one of emerg, alert, crit, err, warning, notice, info, or debug (default info).

pool-class=URI

Select a custom resource pool class at load time. URI is resolved by _pool_class_from_uri(): a plain module name imports the Python module and reads its pool_class attribute; module:ClassName loads a named class from the module. The module must be importable, so set PYTHONPATH as needed. Equivalent to setting pool_class on the subclass, but applied at load time without subclassing. See Pool class hook (pool_class).

Any argument not consumed by the subclass or the base class is rejected with an error at load time, so a typo like log_level=debug (underscore instead of hyphen) is caught immediately. Subclasses parse their own arguments before calling super().__init__, which consumes the built-in ones and then rejects whatever remains:

def __init__(self, h, *args):
    # Parse custom arguments first; leave the rest for the base class.
    remaining = []
    for arg in args:
        if arg.startswith("my-option="):
            self._my_option = arg[10:]
        else:
            remaining.append(arg)
    super().__init__(h, *remaining)   # handles queue-depth=, log-level=

Logging

log is a BrokerLogger instance available on every BrokerModule, including Scheduler. Use the convenience methods to emit log messages without importing syslog:

self.log.debug(f"alloc: {jobid}: {alloc.dumps()}")
self.log.info("scheduler ready")
self.log.warning(f"unknown pool option {key!r} ignored")
self.log.error(f"unexpected error: {exc}")

The full set of methods mirrors syslog severity levels: debug, info, notice, warning, error, critical, alert, and emerg.

Messages are filtered by a severity threshold before being forwarded to handle.log; only messages at or above the threshold are emitted. The default threshold for Scheduler is info. Set it at load time to enable more verbose output:

$ flux module load my-sched.py log-level=debug

Valid level names are emerg, alert, crit, err, warning, notice, info, and debug.

Pool implementations receive the same log object and should call it unconditionally.

Log messages appear in flux-dmesg(1) and the broker's stderr.

Statistics

The base class registers a <module-name>.stats-get RPC handler that calls stats_get() and responds with the returned dict. Use flux-module(1) to query it:

$ flux module stats my-sched

Standard fields reported by the base class:

sched_passes

Number of completed schedule() passes.

sched_yields

Total yields across all schedule() generator passes (always 0 for synchronous schedulers).

forecast_passes, forecast_yields

Equivalent counters for forecast() passes (forecast_yields always 0 for synchronous schedulers).

sched_delay

Current adaptive burst-coalescing delay in seconds. Always 0 for generator-based schedulers.

sched_duration_ewma

EWMA of schedule() wall-clock duration in seconds. Always 0 for generator-based schedulers.

sched_interval_ewma

EWMA of time between scheduling requests in seconds. Tracked for all schedulers but does not affect sched_delay for generator-based schedulers.

pending_jobs

Current number of pending alloc requests in the scheduler queue.

Subclasses can extend the response by overriding stats_get():

def stats_get(self):
    stats = super().stats_get()
    stats["my_counter"] = self._my_counter
    return stats

Testing

Use flux-module(1) to load and remove the scheduler during a running instance:

$ flux module unload sched-simple
$ flux module load ./my-sched.py
$ flux run -n2 hostname
$ flux module reload my-sched.py          # reload without restarting instance
$ flux module remove my-sched.py
$ flux module load sched-simple

Write sharness shell tests using test_under_flux with the job personality (which loads a mock resource set and disables job execution), replacing the default scheduler with your own:

test_under_flux 4 job

test_expect_success 'load my scheduler' '
    flux module unload sched-simple &&
    flux module load ./my-sched.py
'
test_expect_success 'job runs' '
    jobid=$(flux submit hostname) &&
    flux job wait-event --timeout=5 $jobid alloc
'

See t/t2306-sched-fifo.t in the source tree for a comprehensive example covering annotations, priority, cancel, drain/undrain, and the hello/reload protocol.

Customizing the resource pool

By default the scheduler uses the built-in ResourcePool version dispatch to construct its resource pool. A custom pool class can be injected without modifying the scheduler itself using the pool class hook described below.

Pool class hook (pool_class)

There are three ways to bind a custom pool class to a scheduler:

  1. Class attribute — set pool_class on the subclass:

    from flux.scheduler import Scheduler
    
    class RackScheduler(Scheduler):
        pool_class = RackPool
        # must also override schedule() to allocate jobs
    
  2. Module argument — pass pool-class=URI at load time (no subclassing required):

    $ PYTHONPATH=/path/to/plugins flux module load sched-simple pool-class=rackpool
    
  3. Writer auto-discovery — if the system R carries a scheduling.writer URI pointing to a pool implementation, the scheduler loads it automatically without any explicit argument. See Writer identification.

In all three cases pool_class must be a ResourcePool subclass. The scheduler's _make_pool() helper checks pool_class first (highest priority), then scheduling.writer auto-discovery, and finally falls back to the default ResourcePool. In every case pool_kwargs are forwarded as keyword arguments to the chosen pool constructor.

The pool subclass is responsible for its own version dispatch. The pattern is to map version integers to version-specific implementation classes in an _impl_map and construct the right one in __init__:

class RackPool(ResourcePool):
    _impl_map = {1: _RackPoolV1}   # extend here to add Rv2 support

    def __init__(self, R, log=None, **kwargs):
        version = R.get("version", 1) if isinstance(R, Mapping) else 1
        impl_class = self._impl_map.get(version)
        if impl_class is None:
            raise ValueError(f"R version {version} not supported by RackPool")
        super().__init__(impl_class(R, log=log))

The R.scheduling key

R may carry a scheduling key in its top-level JSON object containing scheduler-specific topology metadata. Schedulers use this key to encode information not captured in the standard R execution section — for example, rack or chassis membership, network topology, or fabric locality. Rv1Pool propagates the scheduling key to every allocated R by default.

Writer identification

The scheduling key is defined by 20/Resource Set Specification Version 1. By convention it includes a writer URI identifying the scheduler that created it; a missing writer implies fluxion.

When a scheduler starts and finds a scheduling.writer URI in the system R, _pool_class_from_writer() resolves the URI to a pool class and _make_pool() instantiates it automatically. This enables a sub-instance scheduler to load the same pool class as the parent without any explicit configuration — the parent bakes the URI into every allocated R, and the sub-instance picks it up on startup.

API reference

class flux.scheduler.Scheduler(h, *args)

Base class for Python scheduler broker modules.

Handles the RFC 27 protocol scaffolding so that subclasses only need to implement scheduling policy.

The base class maintains a pending-job queue (_queue, a heapq of PendingJob ordered by priority then jobid) and provides default implementations of alloc() (parse jobspec and enqueue), free() (release resources), cancel() (dequeue and cancel), and prioritize() (re-sort the queue). Subclasses must override schedule() to implement their allocation policy, and may override any of the defaults.

Subclasses may also override:

  • hello() — called once per running job during startup (optional; default marks resources allocated)

  • resource_update() — called after resource state changes

  • feasibility_check() — called for feasibility.check RPCs

  • forecast() — called after schedule() to annotate pending jobs with forward-looking estimates such as sched.t_estimate

Subclasses may set the class attributes:

  • queue_depth — maximum pending alloc requests: a positive integer (e.g. 8) for limited mode, or the string "unlimited" (default). The base class translates this to the wire format automatically. End users may override at load time with queue-depth=N|unlimited.

Alloc requests are represented by AllocRequest objects passed to alloc(). Call request.success(R), request.deny(note), request.cancel(), or request.annotate(d) to respond.

The current resource pool is available as resources. Resource state is updated automatically from resource.acquire and resource_update() is called after each update.

SCHED_DELAY_MAX = 1.0

Maximum scheduling delay used when coalescing bursts of alloc requests. The adaptive timer will not exceed this value regardless of how long schedule() takes. 1 second is a reasonable upper bound for most HPC schedulers; lower it if worst-case latency matters more than throughput during large submission bursts.

SCHED_EWMA_ALPHA = 0.25

Exponential weighted moving average (EWMA) smoothing factor for the adaptive scheduling timer. Higher values react faster to changes in burst rate and scheduling cost; lower values are more stable. 0.25 converges in roughly 4 samples.

alloc(request, jobid, priority, userid, t_submit, jobspec)

Queue an allocation request for the next schedule() pass.

Parses jobspec and pushes a PendingJob onto _queue. Override to change how jobs are parsed or queued.

Parameters
  • request (AllocRequest) -- The allocation request to respond to.

  • jobid (int) -- The job ID (also available as request.jobid).

  • priority (int) -- Job priority.

  • userid (int) -- Submitting user ID.

  • t_submit (float) -- Job submission time.

  • jobspec (dict) -- The raw jobspec dict.

alloc_annotate(msg, annotations)

Send an alloc annotation update. Typically called via AllocRequest.annotate().

Parameters
  • msg -- The alloc request Message.

  • annotations (dict) -- Annotation dict to attach to the job.

alloc_cancel(msg)

Send an alloc cancel response. Typically called via AllocRequest.cancel().

Parameters

msg -- The original alloc request Message.

alloc_deny(msg, note=None)

Send an alloc denial response. Typically called via AllocRequest.deny().

Parameters
  • msg -- The alloc request Message.

  • note (str, optional) -- Human-readable reason for the denial.

alloc_success(msg, R, annotations=None)

Commit R to KVS and send an alloc success response.

Typically called via AllocRequest.success(). R must be a dict (parsed R JSON object). The alloc response to job-manager is sent only after R is safely stored in KVS.

Parameters
  • msg -- The alloc request Message.

  • R (dict) -- The allocated resource set as a parsed JSON object.

  • annotations (dict, optional) -- Scheduler annotations to include.

cancel(jobid)

Remove a pending job from the queue and respond with cancel.

Parameters

jobid (int) -- The job ID to cancel.

expiration(msg, jobid, expiration)

Called when job-manager requests a job expiration update.

The default implementation accepts all updates by responding success. Planning schedulers that track per-job expiration should override this to update their internal records before calling self.handle.respond(msg, None).

Parameters
  • msg -- The request Message.

  • jobid (int) -- The job ID whose expiration is being updated.

  • expiration (float) -- New expiration timestamp (seconds since epoch).

feasibility_check(msg, jobspec)

Called for feasibility.check RPC.

Default implementation responds success if the job could ever fit within the total resource set (ignoring current availability). Subclasses may override for custom feasibility logic.

Parameters
  • msg -- The request Message.

  • jobspec (dict) -- The raw jobspec dict.

forecast()

Called after schedule() to annotate pending jobs with forward-looking estimates.

Override in subclasses to populate sched.t_estimate (and other forward-looking annotations) on pending jobs. The base class implementation is a no-op.

Triggered automatically after each schedule() call completes. If a forecast pass is already in progress when a new scheduling event arrives, it is left running to completion; the next forecast pass starts after the current one finishes and the next schedule() pass completes. Forecast estimates are approximate by design, so a slightly stale t_estimate is preferable to having none at all.

Supports the same generator protocol as schedule(): add yield at each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations.

free(jobid, R, final=False)

Return released resources to the pool.

Calls free() on the resource pool. Override (calling super().free()) to perform additional bookkeeping such as removing per-job state.

Parameters
  • jobid (int) -- The job ID.

  • R -- The released resources.

  • final (bool) -- True if this is the final free for the job.

hello(jobid, priority, userid, t_submit, R)

Called for each running job during the hello protocol.

Invoked synchronously during run() before the reactor starts. The default implementation marks R as allocated in the resource pool, which is sufficient for most schedulers. Override to additionally track per-job state (e.g., store the job's expiration time).

Parameters
  • jobid (int) -- The job ID.

  • priority (int) -- Job priority.

  • userid (int) -- Submitting user ID.

  • t_submit (float) -- Job submission time.

  • R -- The job's current allocation.

hello_partial_ok = True

If True (the default), send partial-ok: True in the hello RPC so that job-manager may report partially-freed ranks for running jobs. Set to False (or pass test-hello-nopartial at load time on subclasses that support it) to disable partial-ok behaviour.

pool_class: Optional[type] = None

Custom pool class. When set to a ResourcePool subclass, _make_pool() instantiates it directly instead of using the default ResourcePool version dispatch. The subclass is responsible for its own version dispatch (e.g. via an _impl_map). This is the preferred way to inject a custom pool:

class MyScheduler(Scheduler):
    pool_class = MyPool
pool_kwargs: dict = {}

Extra keyword arguments forwarded to the pool class at construction time. Passed to whichever pool class is selected by _make_pool() — whether that is pool_class, a writer-discovered class, or the default ResourcePool.

prioritize(jobs)

Apply priority updates to queued jobs and re-sort the queue.

Parameters

jobs (list) -- List of [jobid, priority] pairs.

queue_depth = 'unlimited'

Maximum number of pending alloc requests the job-manager will queue for this scheduler. Set to a positive integer (e.g. 8) for limited mode or to the string "unlimited" for unlimited mode. The Scheduler base class translates this to the wire format sent in job-manager.sched-ready; subclasses and end users should never need to know about the internal "limited=N" representation. End users may override at load time with queue-depth=N|unlimited.

resource_update()

Called after resource state changes from resource.acquire.

The updated state is available via resources.

property resources

The current resource pool.

run()

Initialize the scheduler and start the reactor.

Performs the RFC 27 initialization sequence:

  1. Register the sched dynamic service

  2. Register the feasibility dynamic service

  3. flux_module_set_running() — allow flux module load to return

  4. resource.acquire — sync first response, async continuation

  5. job-manager.sched-hello — sync hello protocol

  6. job-manager.sched-ready — announce readiness

  7. Start the reactor

Raises

OSError -- If any initialization step fails.

schedule()

Called to run the scheduling loop.

Override this in subclasses to implement allocation policy. Triggered automatically after every alloc(), free(), cancel(), prioritize(), expiration(), and resource_update() event via an adaptive one-shot timer. When events arrive in rapid bursts the timer delay grows to coalesce them into fewer schedule() calls; when events are infrequent the delay returns to zero so that each event is processed promptly.

Generator protocol — if this method returns a generator the base class advances it one yield per reactor iteration, allowing other events (new jobs, free responses, RPCs) to be handled between yields. Yield at each point where reactor responsiveness is desired — typically after each dispatched or denied job, but also when blocking on resources:

def schedule(self):
    while self._queue:
        job = self._queue[0]
        try:
            alloc = self.resources.alloc(job.jobid, job.resource_request)
        except InsufficientResources:
            break
        except InfeasibleRequest as exc:
            job.request.deny(str(exc))
        else:
            job.request.success(alloc)
        heapq.heappop(self._queue)
        yield  # hand control back to the reactor

When a new scheduling event arrives while a generator pass is in progress the base class closes the generator (triggering any finally blocks) and starts a fresh pass after the settling delay. This ensures a newly submitted high-priority job or a freed resource is considered from the top of the queue without waiting for the current pass to finish.

Non-generator schedule() implementations (no yield) behave exactly as before and remain fully supported.

start_schedule()

Arm the scheduling timer, aborting any in-progress generator pass.

Called by _request_schedule() after the EWMA interval update. Override this method (not _request_schedule()) to replace the generator-based scheduling loop with an alternative driver, while still benefiting from the burst-coalescing delay computed in _request_schedule().

The default implementation:

  1. Closes any in-progress schedule() generator and stops its yield watchers (queue or resource state is about to change so in-progress allocations are stale). An in-progress forecast() generator is left running to completion: forecast estimates are approximate by nature and a slightly stale t_estimate is more useful than none at all.

  2. Arms the one-shot scheduling timer with the current adaptive delay if it is not already pending. Subsequent calls while the timer is armed are no-ops, coalescing all events in the window into a single schedule() invocation.

stats_get()

Return a dict of scheduler statistics for the stats-get RPC.

Called by the built-in <module-name>.stats-get handler. Subclasses may override to add extra fields; call super().stats_get() and update the returned dict:

def stats_get(self):
    stats = super().stats_get()
    stats["my_counter"] = self._my_counter
    return stats

Standard fields:

sched_passes

Number of completed schedule() passes.

sched_yields

Total yields across all schedule() generator passes (always 0 for synchronous schedulers).

forecast_passes

Number of completed forecast() passes.

forecast_yields

Total yields across all forecast() generator passes (always 0 for synchronous schedulers).

sched_delay

Current adaptive burst-coalescing delay in seconds (0 = immediate). Always 0 for generator-based schedulers.

sched_duration_ewma

EWMA of schedule() wall-clock duration in seconds. Always 0 for generator-based schedulers.

sched_interval_ewma

EWMA of time between _request_schedule() calls in seconds. Tracked for all schedulers but does not affect sched_delay for generator-based schedulers.

pending_jobs

Current number of pending alloc requests in the scheduler queue.

class flux.scheduler.AllocRequest(scheduler, msg)

Represents a pending allocation request from the job-manager.

Passed to Scheduler.alloc() and typically stored in the scheduler's pending queue until resources become available. Call success(), deny(), or cancel() to finalize the request; call annotate() to update job annotations while it is pending.

jobid

The job ID for this allocation request.

Type

int

annotate(annotations)

Send an annotation update for this pending request.

May be called any number of times before the request is finalized. Keys set here are tracked so that success() can automatically clear them. Calls after the request is finalized are silently ignored to prevent stale annotations from a still-running forecast generator reaching the job-manager after a cancel.

Parameters

annotations (dict) -- Annotation dict to attach to the job.

cancel()

Finalize the request indicating it was cancelled.

deny(note=None)

Finalize the request with a permanent denial.

Parameters

note (str, optional) -- Human-readable reason for the denial.

success(R, annotations=None, clear=True)

Finalize the request with a successful allocation.

Parameters
  • R -- The allocated resources, either as a resource pool object (anything with a to_dict() method) or a pre-converted dict. Pool objects are converted automatically.

  • annotations (dict, optional) -- Scheduler annotations to attach.

  • clear (bool) -- If True (the default), any sched annotation keys set via annotate() while the job was pending are automatically nulled in the success response, unless overridden by annotations. This ensures pending-state annotations (e.g. reason_pending, t_estimate) are cleaned up without each scheduler needing to track and clear them manually.

class flux.scheduler.PendingJob(jobid, priority, t_submit, request, resource_request)

Pending allocation request held in the scheduler queue.

Created by the default Scheduler.alloc() implementation and stored in Scheduler._queue. Subclasses may access the queue directly (it is a heapq min-heap ordered by __lt__()).

jobid

The job ID.

Type

int

priority

Job priority (higher is more urgent).

Type

int

t_submit

Submission time (seconds since epoch).

Type

float

request

The open RFC 27 allocation request.

Type

AllocRequest

resource_request

Pool-specific parsed resource request returned by parse_resource_request(); passed to alloc() when scheduling. The raw jobspec dict is accessible as resource_request.jobspec.

class flux.resource.ResourcePool.ResourcePool(arg=None, version=1, log=None, **kwargs)

Public wrapper for a resource pool implementation.

Accepts the same argument forms as ResourceSet:

  • An R JSON string or parsed dict → dispatches to the correct ResourcePoolImplementation subclass by version.

  • A ResourcePoolImplementation instance → wraps it directly.

exception InfeasibleRequest

Request can never be satisfied by this pool.

exception InsufficientResources

Not enough resources available right now — retry after a free event.

alloc(jobid: int, request) ResourcePool

Allocate resources for jobid and return the allocated pool.

check_feasibility(request) None

Check whether request is structurally satisfiable.

copy() ResourcePool

Return a full independent copy preserving allocation state.

copy_allocated()

Return a ResourceSet containing only the allocated resources.

copy_down()

Return a ResourceSet containing only the down resources.

dumps() str

Return a compact human-readable summary of the resource set.

property expiration: float

Resource expiration timestamp (seconds since epoch, 0 = none).

free(jobid: int, R=None, final: bool = False) None

Return a job's allocated resources to the pool.

property generation: int

Monotonically increasing mutation counter.

job_end_times()

Return a list of (jobid, end_time) pairs for all tracked jobs.

mark_down(ids: str) None

Mark resources identified by idset string (or "all") as down.

mark_up(ids: str) None

Mark resources identified by idset string (or "all") as up.

parse_resource_request(jobspec: dict)

Parse a jobspec dict and return a pool-specific resource request.

register_alloc(jobid: int, R: ResourcePool) None

Register an existing allocation during scheduler reconnect.

remove_ranks(ranks) None

Remove ranks from the pool (called on shrink events).

set_expiration(expiration: float) None

Set the resource set expiration (seconds since epoch, 0 = none).

set_starttime(starttime: float) None

Set the resource set starttime (seconds since epoch).

to_dict() dict

Return the resource set as a parsed R JSON dict.

to_resource_set()

Return a topology+availability snapshot as a ResourceSet.

update_expiration(jobid: int, expiration: float) None

Update the tracked end time for a running job.