Writing a Python Scheduler
Flux supports scheduler broker modules written in Python. The
Scheduler base class handles all of the RFC 27
protocol scaffolding — service registration, resource acquisition, the
hello/ready handshake with job-manager — so that a Python scheduler only
needs to implement scheduling policy.
This guide walks through writing a working scheduler from scratch, explains each override point, and covers more advanced topics such as job annotations and start-time forecasting.
Minimal example
The base class maintains a PendingJob heap queue
(_queue) and provides default
implementations of alloc(),
free(),
cancel(), and
prioritize(). A minimal scheduler only
needs to override schedule():
import heapq
from flux.resource import InsufficientResources, InfeasibleRequest
from flux.scheduler import Scheduler
class SimpleScheduler(Scheduler):
def schedule(self):
while self._queue:
job = self._queue[0]
try:
alloc = self.resources.alloc(job.jobid, job.resource_request)
except InsufficientResources:
break # not enough resources — wait for free
except InfeasibleRequest as exc:
job.request.deny(str(exc))
else:
job.request.success(alloc)
heapq.heappop(self._queue)
def mod_main(h, *args):
SimpleScheduler(h, *args).run()
Save this as my-sched.py and load it:
$ flux module unload sched-simple
$ flux module load my-sched.py
The mod_main() entry point is the same convention used by all Python
broker modules (see Python Broker Modules).
The resource pool
resources is an instance of
Rv1Pool that represents the entire managed resource
set. It tracks which resources are up, down, and currently allocated.
The pool implementation is selected automatically based on the R version
field in the resource data (currently only version 1 is supported).
The resource request
Each PendingJob carries a pool-specific resource request object in
job.resource_request, parsed once from the jobspec when the alloc request
arrives by parse_resource_request(). The
type and fields of this object are defined by the pool; scheduler policy code
treats it as opaque and passes it directly to
alloc().
For the built-in Rv1Pool pools, node and slot counts
carry RFC 14 ranges (a min and optional max), and non-standard resource
hierarchies are traversed recursively. The raw jobspec dict is also accessible
as job.resource_request.jobspec for reading site-specific hints from
attributes.system.
Allocating resources
Pass the jobid and resource request to
alloc():
try:
alloc = self.resources.alloc(job.jobid, job.resource_request)
except InsufficientResources:
# Not enough resources now — queue the request and retry later
...
except OSError as exc:
# Request can never be satisfied (InfeasibleRequest or other error)
request.deny(str(exc))
return
RFC 31 constraint expressions (constraint field on the resource request)
are carried through automatically; the resource pool evaluates them when
selecting candidate nodes.
On success, alloc() returns a resource
pool object containing only the allocated resources. Pass it directly to
success(), which converts it automatically:
request.success(alloc)
The pool also records the job's expected expiration (derived from
request.duration) internally; free()
and start-time simulation (via copy()
and job_end_times()) use this state.
Releasing resources
The base class free() calls
free() automatically — no override is
needed unless the scheduler has additional per-job cleanup to do.
A job's resources may be released in multiple calls when housekeeping is
configured: housekeeping returns resources in batches of one or more ranks as
they become ready, with final set to False until the last batch.
Schedulers that override free() must handle this: R
contains only the subset being freed on each call, and per-job state should
not be discarded until final is True.
Resource state changes
Node drain/undrain and other resource state changes arrive automatically
via the resource.acquire streaming RPC. After each update, the base
class calls resource_update(), giving the
scheduler a chance to retry pending allocations:
def resource_update(self):
self._retry_pending()
Alloc requests
When job-manager asks the scheduler to allocate resources for a job, two
distinct objects are created and stored together on the
PendingJob:
job.resource_request— a pool-specific object describing what the job needs. Created once from jobspec byparse_resource_request()when the alloc arrives. For the built-in pools this carries node count, slot count, cores and GPUs per slot, duration, constraints, and exclusivity; node and slot counts carry RFC 14 ranges (min/max) and non-standard resource hierarchies are traversed recursively. The raw jobspec dict is accessible asjob.resource_request.jobspecfor reading site-specificattributes.systemhints. Used by the scheduler during eachschedule()pass to decide whether resources can be satisfied. Persists for the lifetime of the pending job.AllocRequest(job.request) — represents the open RFC 27 protocol transaction with job-manager. Wraps the message handle and provides the response methods that finalize the exchange. Must be finalized exactly once; consumed when the scheduler responds.
The scheduler reads resource_request to
make scheduling decisions, then calls a method on
request to report the outcome.
AllocRequest must be finalized exactly once by
calling one of:
request.success(R, annotations=None)Allocation succeeded. R is the pool object returned by
alloc(); it is converted to a dict automatically. The base class commits R to the KVS asynchronously before sending the response to job-manager, ensuring R is safely stored before the job can run.request.deny(note=None)Permanent denial — the job will never run. note is a human-readable reason that appears in flux-job(1) eventlog. Use this for structurally unsatisfiable requests (too many cores, unsupported resource type, etc.).
request.cancel()The alloc request is being withdrawn. Call this from
cancel().Note
Cancelling an alloc request is not the same as cancelling the job. Job-manager may withdraw an alloc request — for example, to shrink the outstanding request count back within the configured
queue-depth— while leaving the job itself in the pending queue. The job will receive a fresh alloc request when a slot opens up.request.annotate(annotations)Send an intermediate annotation update while the job is pending. May be called any number of times before finalizing. Annotations are visible in flux-jobs(1) output.
RFC 27 defines two standard
schedannotation keys:- resource_summary
Free-form string describing the allocated resources (e.g.,
"rank[0-1]/core[0-3]"). Set this on a successful allocation.- t_estimate
Estimated start time as a Unix epoch float, or
nullto clear. Set this on pending jobs that have a shadow reservation; clear it on allocation.
Example (planning scheduler posting a reservation estimate):
# RFC 27 sched annotation: t_estimate is wall clock seconds since epoch request.annotate({"sched": {"t_estimate": shadow_time}})
When a pending job is cancelled the base class automatically clears any annotations that were previously sent for that job — no explicit null annotation is needed in
cancel()overrides.Note
Scheduler annotations are not recorded in the job eventlog. flux-job(1)
wait-event annotationscannot be used to wait for them in tests; instead poll flux-jobs(1)-no {annotations.sched.field}.
Override points
Subclass Scheduler and override any of the
following methods:
hello(self, jobid, priority, userid, t_submit, R)Called once per running job during startup, before the reactor starts. The default marks R as allocated in the resource pool. Override to also record per-job state needed by the scheduler subclass.
alloc(self, request, jobid, priority, userid, t_submit, jobspec)Called when job-manager requests an allocation. The default calls
parse_resource_request()on the pool to parse jobspec and pushes aPendingJobonto_queue. Override to reject jobs before they enter the queue.free(self, jobid, R, final=False)Called when resources are released for a completed or cancelled job. final is
Trueon the last (or only) call for the job. When housekeeping is configured, resources may arrive in multiple partial batches before final is set; see Releasing resources for details. The default callsfree()on the pool. Override (callingfree()viasuper()) to also remove per-job state or log the event, taking care not to discard per-job state until final isTrue.cancel(self, jobid)Called when a pending alloc is cancelled by the user. The default removes jobid from
_queueand callscancel().prioritize(self, jobs)Called with a list of
[jobid, priority]pairs when job priorities change. The default updates the priority of each job in_queueand re-heapifies.schedule(self)Called to run the scheduling loop after one or more scheduling events (alloc, free, cancel, prioritize, expiration, or resource state change). Events are coalesced by an adaptive timer so that
schedule()is not called more often than necessary; see Scheduling deferral for details. The default implementation is a no-op.If
schedule()returns a generator the base class advances it one yield per reactor iteration, allowing other events to be handled between yields. When a new scheduling event arrives while a generator pass is in progress the base class closes it and starts a fresh pass after the settling delay, ensuring that a newly submitted job or freed resource is considered from the top of the queue without waiting for the current pass to finish. Non-generator implementations remain fully supported.Example:
def schedule(self): while self._queue: job = self._queue[0] try: alloc = self.resources.alloc(job.jobid, job.resource_request) except InsufficientResources: break # head blocked; stop (FIFO) except OSError as exc: job.request.deny(str(exc)) else: job.request.success(alloc) heapq.heappop(self._queue) yield # let the reactor handle other events between jobs
start_schedule(self)Called by
_request_schedule()after updating the interval EWMA. The default implementation aborts any in-progress generator and arms the one-shot scheduling timer. Override this method (not_request_schedule()) to replace the generator driver — for example, to launch an RPC-based allocation request — while still preserving the_sched_pendingguard that coalesces concurrent scheduling events into a single pass.resource_update(self)Called after each resource state update. For queue-based schedulers that rely on
schedule(), this override is usually not needed because the base class calls_request_schedule()automatically after every resource update. Override only if you need to react to the updatedresourcesvalue before the scheduling pass.feasibility_check(self, msg, jobspec)Called for
feasibility.checkRPCs from job-ingest. The default callsparse_resource_request()andcheck_feasibility()on the pool, responding withEINVALif the job can never fit the total resource set. Override only if custom feasibility logic is needed.forecast(self)Called after each
schedule()pass to annotate pending jobs with forward-looking estimates such assched.t_estimate. The base class implementation is a no-op. Override to post start-time estimates (or other planning metadata) without impacting scheduling throughput.Supports the same generator protocol as
schedule(): addyieldat each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations. Unlike the schedule generator, a running forecast generator is not aborted when a new scheduling event arrives — it runs to completion. A slightly stalet_estimateis more useful than none at all, and the simulation snapshot taken at pass start remains internally consistent regardless of real-pool changes mid-pass.Example (annotating the head-of-queue job with its estimated start time):
def forecast(self): if not self._queue: return head = sorted(self._queue)[0] t = self._shadow_time(head) # simulate pool forward to find earliest start if head._last_annotation != t: head._last_annotation = t head.request.annotate({"sched": {"t_estimate": t}})
Queue depth
The queue-depth= module argument controls how many alloc requests
job-manager sends concurrently:
queue-depth=unlimited(the default) — all pending jobs get an alloc request at once. The scheduler sees the full queue on startup.queue-depth=N(a positive integer) — job-manager sends at most N outstanding requests. Use this to bound the scheduler queue and annotation overhead.FIFOSchedulerdefaults toqueue-depth=8to match the behaviour of the built-insched-simple.
Set the class attribute queue_depth on the
subclass to choose a default; use a plain integer or the string
"unlimited":
class MyScheduler(Scheduler):
queue_depth = 8 # or "unlimited"
The base class translates the value to the wire format for the hello protocol automatically. Users can override the default at load time:
$ flux module load my-sched.py queue-depth=unlimited
Scheduling deferral
Every scheduling event (alloc, free, cancel, prioritize, expiration, resource
update) calls _request_schedule() rather than
calling schedule() directly. The base class
defers the actual call via a one-shot timer, coalescing multiple events into a
single schedule() invocation.
The timer uses an adaptive delay tuned automatically at runtime:
The delay starts at zero, so the first event after an idle period is processed on the very next reactor iteration — no added latency.
After each
schedule()call, the base class compares two exponential moving averages (EWMAs) 1:_sched_interval_ewma— average time between_request_schedule()calls (measures how fast events are arriving)._sched_duration_ewma— average timeschedule()takes to run (measures scheduling cost).
If events arrive faster than
schedule()can process them (interval < duration), the timer delay is raised to approximatelyduration, coalescing the burst into roughly one scheduling pass per cycle. ADEBUGlog message is emitted when the delay changes.Once events slow down again the delay resets to zero immediately.
Note
When schedule() is a generator the EWMA duration is
not updated, so sched_delay remains 0 and the timer fires on
the next reactor iteration with no burst-coalescing window. Events
that arrive while the timer is already armed are still coalesced by
the _sched_pending guard, but there is no adaptive settling period.
Two class attributes control the behaviour and can be overridden on the subclass:
class MyScheduler(Scheduler):
SCHED_DELAY_MAX = 1.0 # seconds; cap on adaptive delay (default 1s)
SCHED_EWMA_ALPHA = 0.25 # EWMA smoothing factor (default 0.25 ≈ 4 samples)
SCHED_DELAY_MAX bounds worst-case scheduling
latency during a sustained burst.
SCHED_EWMA_ALPHA controls how quickly the
timer reacts: higher values respond faster but are noisier; lower values are
smoother but adapt more slowly.
For schedulers with a small queue-depth= (e.g. sched-fifo's default
of 8), events arrive at most 8-at-a-time so the interval rarely falls below
the schedule duration and the delay stays near zero. Schedulers using
queue-depth=unlimited benefit most from the adaptive timer during large
submission bursts.
Footnotes
- 1
An exponential moving average (EMA, or EWMA with weights) is a low-pass filter that gives exponentially decreasing weight to older samples:
new = α × sample + (1 − α) × old. Withα = 0.25the influence of a sample halves roughly every three updates. See https://en.wikipedia.org/wiki/Exponential_smoothing for background.
Forecast deferral
forecast() is called immediately after each
schedule() pass completes. Because
forecast() supports the same generator protocol as
schedule(), annotation work may be spread across multiple
reactor iterations to avoid blocking the critical scheduling path.
If a new scheduling event arrives while a forecast generator is in progress,
the base class leaves it running to completion. Forecast estimates are
approximate by design, and a slightly stale t_estimate is more useful
than none at all. The simulation snapshot is taken once at pass start
(a deep copy of the pool), so it remains internally consistent regardless
of real-pool changes mid-pass. A fresh forecast pass is triggered after
the next schedule() pass completes.
Module arguments
Arguments passed after the module path at load time arrive as *args
in mod_main() and are forwarded to __init__:
$ flux module load my-sched.py queue-depth=8 log-level=debug
The base class automatically handles four built-in arguments:
- queue-depth=N|unlimited
Maximum number of concurrent outstanding alloc requests (default 8, or
"unlimited"ifqueue_depthis set to that string on the subclass).- log-level=LEVEL
Minimum log severity to emit. LEVEL is one of
emerg,alert,crit,err,warning,notice,info, ordebug(defaultinfo).- pool-class=URI
Select a custom resource pool class at load time. URI is resolved by
_pool_class_from_uri(): a plain module name imports the Python module and reads itspool_classattribute;module:ClassNameloads a named class from the module. The module must be importable, so setPYTHONPATHas needed. Equivalent to settingpool_classon the subclass, but applied at load time without subclassing. See Pool class hook (pool_class).
Any argument not consumed by the subclass or the base class is rejected
with an error at load time, so a typo like log_level=debug (underscore
instead of hyphen) is caught immediately. Subclasses parse their own
arguments before calling super().__init__, which consumes the built-in
ones and then rejects whatever remains:
def __init__(self, h, *args):
# Parse custom arguments first; leave the rest for the base class.
remaining = []
for arg in args:
if arg.startswith("my-option="):
self._my_option = arg[10:]
else:
remaining.append(arg)
super().__init__(h, *remaining) # handles queue-depth=, log-level=
Logging
log is a
BrokerLogger instance available on every
BrokerModule, including Scheduler. Use
the convenience methods to emit log messages without importing syslog:
self.log.debug(f"alloc: {jobid}: {alloc.dumps()}")
self.log.info("scheduler ready")
self.log.warning(f"unknown pool option {key!r} ignored")
self.log.error(f"unexpected error: {exc}")
The full set of methods mirrors syslog severity levels: debug, info,
notice, warning, error, critical, alert, and emerg.
Messages are filtered by a severity threshold before being forwarded to
handle.log; only messages at or above the threshold are emitted. The
default threshold for Scheduler is info. Set it at load time
to enable more verbose output:
$ flux module load my-sched.py log-level=debug
Valid level names are emerg, alert, crit, err, warning,
notice, info, and debug.
Pool implementations receive the same log
object and should call it unconditionally.
Log messages appear in flux-dmesg(1) and the broker's stderr.
Statistics
The base class registers a <module-name>.stats-get RPC handler that
calls stats_get() and responds with the returned dict.
Use flux-module(1) to query it:
$ flux module stats my-sched
Standard fields reported by the base class:
sched_passesNumber of completed
schedule()passes.sched_yieldsTotal yields across all
schedule()generator passes (always 0 for synchronous schedulers).forecast_passes,forecast_yieldsEquivalent counters for
forecast()passes (forecast_yieldsalways 0 for synchronous schedulers).sched_delayCurrent adaptive burst-coalescing delay in seconds. Always 0 for generator-based schedulers.
sched_duration_ewmaEWMA of
schedule()wall-clock duration in seconds. Always 0 for generator-based schedulers.sched_interval_ewmaEWMA of time between scheduling requests in seconds. Tracked for all schedulers but does not affect
sched_delayfor generator-based schedulers.pending_jobsCurrent number of pending alloc requests in the scheduler queue.
Subclasses can extend the response by overriding stats_get():
def stats_get(self):
stats = super().stats_get()
stats["my_counter"] = self._my_counter
return stats
Testing
Use flux-module(1) to load and remove the scheduler during a running instance:
$ flux module unload sched-simple
$ flux module load ./my-sched.py
$ flux run -n2 hostname
$ flux module reload my-sched.py # reload without restarting instance
$ flux module remove my-sched.py
$ flux module load sched-simple
Write sharness shell tests using test_under_flux with the job
personality (which loads a mock resource set and disables job execution),
replacing the default scheduler with your own:
test_under_flux 4 job
test_expect_success 'load my scheduler' '
flux module unload sched-simple &&
flux module load ./my-sched.py
'
test_expect_success 'job runs' '
jobid=$(flux submit hostname) &&
flux job wait-event --timeout=5 $jobid alloc
'
See t/t2306-sched-fifo.t in the source tree for a comprehensive
example covering annotations, priority, cancel, drain/undrain, and the
hello/reload protocol.
Customizing the resource pool
By default the scheduler uses the built-in
ResourcePool version dispatch to
construct its resource pool. A custom pool class can be injected without
modifying the scheduler itself using the pool class hook described below.
Pool class hook (pool_class)
There are three ways to bind a custom pool class to a scheduler:
Class attribute — set
pool_classon the subclass:from flux.scheduler import Scheduler class RackScheduler(Scheduler): pool_class = RackPool # must also override schedule() to allocate jobs
Module argument — pass
pool-class=URIat load time (no subclassing required):$ PYTHONPATH=/path/to/plugins flux module load sched-simple pool-class=rackpool
Writer auto-discovery — if the system R carries a
scheduling.writerURI pointing to a pool implementation, the scheduler loads it automatically without any explicit argument. See Writer identification.
In all three cases pool_class must be a
ResourcePool subclass. The scheduler's
_make_pool() helper checks pool_class
first (highest priority), then scheduling.writer auto-discovery, and
finally falls back to the default ResourcePool.
In every case pool_kwargs are forwarded as keyword
arguments to the chosen pool constructor.
The pool subclass is responsible for its own version dispatch. The pattern is
to map version integers to version-specific implementation classes in an
_impl_map and construct the right one in __init__:
class RackPool(ResourcePool):
_impl_map = {1: _RackPoolV1} # extend here to add Rv2 support
def __init__(self, R, log=None, **kwargs):
version = R.get("version", 1) if isinstance(R, Mapping) else 1
impl_class = self._impl_map.get(version)
if impl_class is None:
raise ValueError(f"R version {version} not supported by RackPool")
super().__init__(impl_class(R, log=log))
The R.scheduling key
R may carry a scheduling key in its top-level JSON object containing
scheduler-specific topology metadata. Schedulers use this key to encode
information not captured in the standard R execution section — for example,
rack or chassis membership, network topology, or fabric locality.
Rv1Pool propagates the scheduling key to every
allocated R by default.
Writer identification
The scheduling key is defined by 20/Resource Set Specification Version 1. By convention it
includes a writer URI identifying the scheduler that created it; a missing
writer implies fluxion.
When a scheduler starts and finds a scheduling.writer URI in the system R,
_pool_class_from_writer() resolves the URI to a pool class
and _make_pool() instantiates it automatically. This enables
a sub-instance scheduler to load the same pool class as the parent without any
explicit configuration — the parent bakes the URI into every allocated R, and
the sub-instance picks it up on startup.
API reference
- class flux.scheduler.Scheduler(h, *args)
Base class for Python scheduler broker modules.
Handles the RFC 27 protocol scaffolding so that subclasses only need to implement scheduling policy.
The base class maintains a pending-job queue (
_queue, aheapqofPendingJobordered by priority then jobid) and provides default implementations ofalloc()(parse jobspec and enqueue),free()(release resources),cancel()(dequeue and cancel), andprioritize()(re-sort the queue). Subclasses must overrideschedule()to implement their allocation policy, and may override any of the defaults.Subclasses may also override:
hello()— called once per running job during startup (optional; default marks resources allocated)resource_update()— called after resource state changesfeasibility_check()— called forfeasibility.checkRPCsforecast()— called afterschedule()to annotate pending jobs with forward-looking estimates such assched.t_estimate
Subclasses may set the class attributes:
queue_depth— maximum pending alloc requests: a positive integer (e.g.8) for limited mode, or the string"unlimited"(default). The base class translates this to the wire format automatically. End users may override at load time withqueue-depth=N|unlimited.
Alloc requests are represented by
AllocRequestobjects passed toalloc(). Callrequest.success(R),request.deny(note),request.cancel(), orrequest.annotate(d)to respond.The current resource pool is available as
resources. Resource state is updated automatically fromresource.acquireandresource_update()is called after each update.- SCHED_DELAY_MAX = 1.0
Maximum scheduling delay used when coalescing bursts of alloc requests. The adaptive timer will not exceed this value regardless of how long
schedule()takes. 1 second is a reasonable upper bound for most HPC schedulers; lower it if worst-case latency matters more than throughput during large submission bursts.
- SCHED_EWMA_ALPHA = 0.25
Exponential weighted moving average (EWMA) smoothing factor for the adaptive scheduling timer. Higher values react faster to changes in burst rate and scheduling cost; lower values are more stable. 0.25 converges in roughly 4 samples.
- alloc(request, jobid, priority, userid, t_submit, jobspec)
Queue an allocation request for the next
schedule()pass.Parses jobspec and pushes a
PendingJobonto_queue. Override to change how jobs are parsed or queued.- Parameters
request (
AllocRequest) -- The allocation request to respond to.jobid (int) -- The job ID (also available as
request.jobid).priority (int) -- Job priority.
userid (int) -- Submitting user ID.
t_submit (float) -- Job submission time.
jobspec (dict) -- The raw jobspec dict.
- alloc_annotate(msg, annotations)
Send an alloc annotation update. Typically called via
AllocRequest.annotate().- Parameters
msg -- The alloc request
Message.annotations (dict) -- Annotation dict to attach to the job.
- alloc_cancel(msg)
Send an alloc cancel response. Typically called via
AllocRequest.cancel().- Parameters
msg -- The original alloc request
Message.
- alloc_deny(msg, note=None)
Send an alloc denial response. Typically called via
AllocRequest.deny().- Parameters
msg -- The alloc request
Message.note (str, optional) -- Human-readable reason for the denial.
- alloc_success(msg, R, annotations=None)
Commit R to KVS and send an alloc success response.
Typically called via
AllocRequest.success(). R must be a dict (parsed R JSON object). The alloc response to job-manager is sent only after R is safely stored in KVS.- Parameters
msg -- The alloc request
Message.R (dict) -- The allocated resource set as a parsed JSON object.
annotations (dict, optional) -- Scheduler annotations to include.
- cancel(jobid)
Remove a pending job from the queue and respond with cancel.
- Parameters
jobid (int) -- The job ID to cancel.
- expiration(msg, jobid, expiration)
Called when job-manager requests a job expiration update.
The default implementation accepts all updates by responding success. Planning schedulers that track per-job expiration should override this to update their internal records before calling
self.handle.respond(msg, None).- Parameters
msg -- The request
Message.jobid (int) -- The job ID whose expiration is being updated.
expiration (float) -- New expiration timestamp (seconds since epoch).
- feasibility_check(msg, jobspec)
Called for
feasibility.checkRPC.Default implementation responds success if the job could ever fit within the total resource set (ignoring current availability). Subclasses may override for custom feasibility logic.
- Parameters
msg -- The request
Message.jobspec (dict) -- The raw jobspec dict.
- forecast()
Called after
schedule()to annotate pending jobs with forward-looking estimates.Override in subclasses to populate
sched.t_estimate(and other forward-looking annotations) on pending jobs. The base class implementation is a no-op.Triggered automatically after each
schedule()call completes. If a forecast pass is already in progress when a new scheduling event arrives, it is left running to completion; the next forecast pass starts after the current one finishes and the nextschedule()pass completes. Forecast estimates are approximate by design, so a slightly stalet_estimateis preferable to having none at all.Supports the same generator protocol as
schedule(): addyieldat each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations.
- free(jobid, R, final=False)
Return released resources to the pool.
Calls
free()on the resource pool. Override (callingsuper().free()) to perform additional bookkeeping such as removing per-job state.- Parameters
jobid (int) -- The job ID.
R -- The released resources.
final (bool) -- True if this is the final free for the job.
- hello(jobid, priority, userid, t_submit, R)
Called for each running job during the hello protocol.
Invoked synchronously during
run()before the reactor starts. The default implementation marks R as allocated in the resource pool, which is sufficient for most schedulers. Override to additionally track per-job state (e.g., store the job's expiration time).- Parameters
jobid (int) -- The job ID.
priority (int) -- Job priority.
userid (int) -- Submitting user ID.
t_submit (float) -- Job submission time.
R -- The job's current allocation.
- hello_partial_ok = True
If True (the default), send
partial-ok: Truein the hello RPC so that job-manager may report partially-freed ranks for running jobs. Set to False (or passtest-hello-nopartialat load time on subclasses that support it) to disable partial-ok behaviour.
- pool_class: Optional[type] = None
Custom pool class. When set to a
ResourcePoolsubclass,_make_pool()instantiates it directly instead of using the defaultResourcePoolversion dispatch. The subclass is responsible for its own version dispatch (e.g. via an_impl_map). This is the preferred way to inject a custom pool:class MyScheduler(Scheduler): pool_class = MyPool
- pool_kwargs: dict = {}
Extra keyword arguments forwarded to the pool class at construction time. Passed to whichever pool class is selected by
_make_pool()— whether that ispool_class, a writer-discovered class, or the defaultResourcePool.
- prioritize(jobs)
Apply priority updates to queued jobs and re-sort the queue.
- Parameters
jobs (list) -- List of
[jobid, priority]pairs.
- queue_depth = 'unlimited'
Maximum number of pending alloc requests the job-manager will queue for this scheduler. Set to a positive integer (e.g.
8) forlimitedmode or to the string"unlimited"for unlimited mode. TheSchedulerbase class translates this to the wire format sent injob-manager.sched-ready; subclasses and end users should never need to know about the internal"limited=N"representation. End users may override at load time withqueue-depth=N|unlimited.
- resource_update()
Called after resource state changes from resource.acquire.
The updated state is available via
resources.
- property resources
The current resource pool.
- run()
Initialize the scheduler and start the reactor.
Performs the RFC 27 initialization sequence:
Register the
scheddynamic serviceRegister the
feasibilitydynamic serviceflux_module_set_running()— allowflux module loadto returnresource.acquire— sync first response, async continuationjob-manager.sched-hello— sync hello protocoljob-manager.sched-ready— announce readinessStart the reactor
- Raises
OSError -- If any initialization step fails.
- schedule()
Called to run the scheduling loop.
Override this in subclasses to implement allocation policy. Triggered automatically after every
alloc(),free(),cancel(),prioritize(),expiration(), andresource_update()event via an adaptive one-shot timer. When events arrive in rapid bursts the timer delay grows to coalesce them into fewerschedule()calls; when events are infrequent the delay returns to zero so that each event is processed promptly.Generator protocol — if this method returns a generator the base class advances it one yield per reactor iteration, allowing other events (new jobs, free responses, RPCs) to be handled between yields. Yield at each point where reactor responsiveness is desired — typically after each dispatched or denied job, but also when blocking on resources:
def schedule(self): while self._queue: job = self._queue[0] try: alloc = self.resources.alloc(job.jobid, job.resource_request) except InsufficientResources: break except InfeasibleRequest as exc: job.request.deny(str(exc)) else: job.request.success(alloc) heapq.heappop(self._queue) yield # hand control back to the reactor
When a new scheduling event arrives while a generator pass is in progress the base class closes the generator (triggering any
finallyblocks) and starts a fresh pass after the settling delay. This ensures a newly submitted high-priority job or a freed resource is considered from the top of the queue without waiting for the current pass to finish.Non-generator
schedule()implementations (noyield) behave exactly as before and remain fully supported.
- start_schedule()
Arm the scheduling timer, aborting any in-progress generator pass.
Called by
_request_schedule()after the EWMA interval update. Override this method (not_request_schedule()) to replace the generator-based scheduling loop with an alternative driver, while still benefiting from the burst-coalescing delay computed in_request_schedule().The default implementation:
Closes any in-progress
schedule()generator and stops its yield watchers (queue or resource state is about to change so in-progress allocations are stale). An in-progressforecast()generator is left running to completion: forecast estimates are approximate by nature and a slightly stalet_estimateis more useful than none at all.Arms the one-shot scheduling timer with the current adaptive delay if it is not already pending. Subsequent calls while the timer is armed are no-ops, coalescing all events in the window into a single
schedule()invocation.
- stats_get()
Return a dict of scheduler statistics for the stats-get RPC.
Called by the built-in
<module-name>.stats-gethandler. Subclasses may override to add extra fields; callsuper().stats_get()and update the returned dict:def stats_get(self): stats = super().stats_get() stats["my_counter"] = self._my_counter return stats
Standard fields:
sched_passesNumber of completed
schedule()passes.sched_yieldsTotal yields across all
schedule()generator passes (always 0 for synchronous schedulers).forecast_passesNumber of completed
forecast()passes.forecast_yieldsTotal yields across all
forecast()generator passes (always 0 for synchronous schedulers).sched_delayCurrent adaptive burst-coalescing delay in seconds (0 = immediate). Always 0 for generator-based schedulers.
sched_duration_ewmaEWMA of
schedule()wall-clock duration in seconds. Always 0 for generator-based schedulers.sched_interval_ewmaEWMA of time between
_request_schedule()calls in seconds. Tracked for all schedulers but does not affectsched_delayfor generator-based schedulers.pending_jobsCurrent number of pending alloc requests in the scheduler queue.
- class flux.scheduler.AllocRequest(scheduler, msg)
Represents a pending allocation request from the job-manager.
Passed to
Scheduler.alloc()and typically stored in the scheduler's pending queue until resources become available. Callsuccess(),deny(), orcancel()to finalize the request; callannotate()to update job annotations while it is pending.- jobid
The job ID for this allocation request.
- Type
int
- annotate(annotations)
Send an annotation update for this pending request.
May be called any number of times before the request is finalized. Keys set here are tracked so that
success()can automatically clear them. Calls after the request is finalized are silently ignored to prevent stale annotations from a still-running forecast generator reaching the job-manager after a cancel.- Parameters
annotations (dict) -- Annotation dict to attach to the job.
- cancel()
Finalize the request indicating it was cancelled.
- deny(note=None)
Finalize the request with a permanent denial.
- Parameters
note (str, optional) -- Human-readable reason for the denial.
- success(R, annotations=None, clear=True)
Finalize the request with a successful allocation.
- Parameters
R -- The allocated resources, either as a resource pool object (anything with a
to_dict()method) or a pre-converted dict. Pool objects are converted automatically.annotations (dict, optional) -- Scheduler annotations to attach.
clear (bool) -- If True (the default), any
schedannotation keys set viaannotate()while the job was pending are automatically nulled in the success response, unless overridden by annotations. This ensures pending-state annotations (e.g.reason_pending,t_estimate) are cleaned up without each scheduler needing to track and clear them manually.
- class flux.scheduler.PendingJob(jobid, priority, t_submit, request, resource_request)
Pending allocation request held in the scheduler queue.
Created by the default
Scheduler.alloc()implementation and stored inScheduler._queue. Subclasses may access the queue directly (it is aheapqmin-heap ordered by__lt__()).- jobid
The job ID.
- Type
int
- priority
Job priority (higher is more urgent).
- Type
int
- t_submit
Submission time (seconds since epoch).
- Type
float
- request
The open RFC 27 allocation request.
- Type
- resource_request
Pool-specific parsed resource request returned by
parse_resource_request(); passed toalloc()when scheduling. The raw jobspec dict is accessible asresource_request.jobspec.
- class flux.resource.ResourcePool.ResourcePool(arg=None, version=1, log=None, **kwargs)
Public wrapper for a resource pool implementation.
Accepts the same argument forms as
ResourceSet:An R JSON string or parsed dict → dispatches to the correct
ResourcePoolImplementationsubclass by version.A
ResourcePoolImplementationinstance → wraps it directly.
- exception InfeasibleRequest
Request can never be satisfied by this pool.
- exception InsufficientResources
Not enough resources available right now — retry after a free event.
- alloc(jobid: int, request) ResourcePool
Allocate resources for jobid and return the allocated pool.
- check_feasibility(request) None
Check whether request is structurally satisfiable.
- copy() ResourcePool
Return a full independent copy preserving allocation state.
- copy_allocated()
Return a ResourceSet containing only the allocated resources.
- copy_down()
Return a ResourceSet containing only the down resources.
- dumps() str
Return a compact human-readable summary of the resource set.
- property expiration: float
Resource expiration timestamp (seconds since epoch, 0 = none).
- free(jobid: int, R=None, final: bool = False) None
Return a job's allocated resources to the pool.
- property generation: int
Monotonically increasing mutation counter.
- job_end_times()
Return a list of
(jobid, end_time)pairs for all tracked jobs.
- mark_down(ids: str) None
Mark resources identified by idset string (or
"all") as down.
- mark_up(ids: str) None
Mark resources identified by idset string (or
"all") as up.
- parse_resource_request(jobspec: dict)
Parse a jobspec dict and return a pool-specific resource request.
- register_alloc(jobid: int, R: ResourcePool) None
Register an existing allocation during scheduler reconnect.
- remove_ranks(ranks) None
Remove ranks from the pool (called on shrink events).
- set_expiration(expiration: float) None
Set the resource set expiration (seconds since epoch, 0 = none).
- set_starttime(starttime: float) None
Set the resource set starttime (seconds since epoch).
- to_dict() dict
Return the resource set as a parsed R JSON dict.
- to_resource_set()
Return a topology+availability snapshot as a ResourceSet.
- update_expiration(jobid: int, expiration: float) None
Update the tracked end time for a running job.