flux.scheduler module
Base class for Python Flux scheduler broker modules.
Provides the RFC 27 scheduler protocol scaffolding so that subclasses only need to implement scheduling policy:
Registers the
schedandfeasibilitydynamic servicesManages the
resource.acquirestreaming RPC for resource statePerforms the hello/ready handshake with job-manager on startup
Dispatches alloc, free, cancel, prioritize, expiration, and resource-status messages to overridable methods
The base class maintains a PendingJob heap queue (_queue)
and provides default implementations of alloc(), free(),
cancel(), and prioritize(). A minimal scheduler only needs
to override schedule():
import heapq
from flux.resource import InsufficientResources, InfeasibleRequest
from flux.scheduler import Scheduler
class MyScheduler(Scheduler):
def schedule(self):
while self._queue:
job = self._queue[0]
try:
alloc = self.resources.alloc(job.jobid, job.resource_request)
except InsufficientResources:
break # not enough resources — wait for free
except InfeasibleRequest as exc:
job.request.deny(str(exc))
else:
job.request.success(alloc)
heapq.heappop(self._queue)
yield # hand control to the reactor
def mod_main(h, *args):
MyScheduler(h, *args).run()
- class flux.scheduler.AllocRequest(scheduler, msg)
Bases:
objectRepresents a pending allocation request from the job-manager.
Passed to
Scheduler.alloc()and typically stored in the scheduler's pending queue until resources become available. Callsuccess(),deny(), orcancel()to finalize the request; callannotate()to update job annotations while it is pending.- jobid
The job ID for this allocation request.
- Type
int
- annotate(annotations)
Send an annotation update for this pending request.
May be called any number of times before the request is finalized. Keys set here are tracked so that
success()can automatically clear them. Calls after the request is finalized are silently ignored to prevent stale annotations from a still-running forecast generator reaching the job-manager after a cancel.- Parameters
annotations (dict) -- Annotation dict to attach to the job.
- cancel()
Finalize the request indicating it was cancelled.
- deny(note=None)
Finalize the request with a permanent denial.
- Parameters
note (str, optional) -- Human-readable reason for the denial.
- jobid
- success(R, annotations=None, clear=True)
Finalize the request with a successful allocation.
- Parameters
R -- The allocated resources, either as a resource pool object (anything with a
to_dict()method) or a pre-converted dict. Pool objects are converted automatically.annotations (dict, optional) -- Scheduler annotations to attach.
clear (bool) -- If True (the default), any
schedannotation keys set viaannotate()while the job was pending are automatically nulled in the success response, unless overridden by annotations. This ensures pending-state annotations (e.g.reason_pending,t_estimate) are cleaned up without each scheduler needing to track and clear them manually.
- class flux.scheduler.PendingJob(jobid, priority, t_submit, request, resource_request)
Bases:
objectPending allocation request held in the scheduler queue.
Created by the default
Scheduler.alloc()implementation and stored inScheduler._queue. Subclasses may access the queue directly (it is aheapqmin-heap ordered by__lt__()).- jobid
The job ID.
- Type
int
- priority
Job priority (higher is more urgent).
- Type
int
- t_submit
Submission time (seconds since epoch).
- Type
float
- request
The open RFC 27 allocation request.
- Type
- resource_request
Pool-specific parsed resource request returned by
parse_resource_request(); passed toalloc()when scheduling. The raw jobspec dict is accessible asresource_request.jobspec.
- jobid
- priority
- request
- resource_request
- t_submit
- class flux.scheduler.ResourcePool(arg=None, version=1, log=None, **kwargs)
Bases:
objectPublic wrapper for a resource pool implementation.
Accepts the same argument forms as
ResourceSet:An R JSON string or parsed dict → dispatches to the correct
ResourcePoolImplementationsubclass by version.A
ResourcePoolImplementationinstance → wraps it directly.
- exception InfeasibleRequest
Bases:
OSErrorRequest can never be satisfied by this pool.
- exception InsufficientResources
Bases:
OSErrorNot enough resources available right now — retry after a free event.
- alloc(jobid: int, request) ResourcePool
Allocate resources for jobid and return the allocated pool.
- check_feasibility(request) None
Check whether request is structurally satisfiable.
- copy() ResourcePool
Return a full independent copy preserving allocation state.
- copy_allocated()
Return a ResourceSet containing only the allocated resources.
- copy_down()
Return a ResourceSet containing only the down resources.
- dumps() str
Return a compact human-readable summary of the resource set.
- property expiration: float
Resource expiration timestamp (seconds since epoch, 0 = none).
- free(jobid: int, R=None, final: bool = False) None
Return a job's allocated resources to the pool.
- property generation: int
Monotonically increasing mutation counter.
- job_end_times()
Return a list of
(jobid, end_time)pairs for all tracked jobs.
- mark_down(ids: str) None
Mark resources identified by idset string (or
"all") as down.
- mark_up(ids: str) None
Mark resources identified by idset string (or
"all") as up.
- parse_resource_request(jobspec: dict)
Parse a jobspec dict and return a pool-specific resource request.
- register_alloc(jobid: int, R: ResourcePool) None
Register an existing allocation during scheduler reconnect.
- remove_ranks(ranks) None
Remove ranks from the pool (called on shrink events).
- set_expiration(expiration: float) None
Set the resource set expiration (seconds since epoch, 0 = none).
- set_starttime(starttime: float) None
Set the resource set starttime (seconds since epoch).
- to_dict() dict
Return the resource set as a parsed R JSON dict.
- to_resource_set()
Return a topology+availability snapshot as a ResourceSet.
- update_expiration(jobid: int, expiration: float) None
Update the tracked end time for a running job.
- class flux.scheduler.Scheduler(h, *args)
Bases:
BrokerModuleBase class for Python scheduler broker modules.
Handles the RFC 27 protocol scaffolding so that subclasses only need to implement scheduling policy.
The base class maintains a pending-job queue (
_queue, aheapqofPendingJobordered by priority then jobid) and provides default implementations ofalloc()(parse jobspec and enqueue),free()(release resources),cancel()(dequeue and cancel), andprioritize()(re-sort the queue). Subclasses must overrideschedule()to implement their allocation policy, and may override any of the defaults.Subclasses may also override:
hello()— called once per running job during startup (optional; default marks resources allocated)resource_update()— called after resource state changesfeasibility_check()— called forfeasibility.checkRPCsforecast()— called afterschedule()to annotate pending jobs with forward-looking estimates such assched.t_estimate
Subclasses may set the class attributes:
queue_depth— maximum pending alloc requests: a positive integer (e.g.8) for limited mode, or the string"unlimited"(default). The base class translates this to the wire format automatically. End users may override at load time withqueue-depth=N|unlimited.
Alloc requests are represented by
AllocRequestobjects passed toalloc(). Callrequest.success(R),request.deny(note),request.cancel(), orrequest.annotate(d)to respond.The current resource pool is available as
resources. Resource state is updated automatically fromresource.acquireandresource_update()is called after each update.- SCHED_DELAY_MAX = 1.0
Maximum scheduling delay used when coalescing bursts of alloc requests. The adaptive timer will not exceed this value regardless of how long
schedule()takes. 1 second is a reasonable upper bound for most HPC schedulers; lower it if worst-case latency matters more than throughput during large submission bursts.
- SCHED_EWMA_ALPHA = 0.25
Exponential weighted moving average (EWMA) smoothing factor for the adaptive scheduling timer. Higher values react faster to changes in burst rate and scheduling cost; lower values are more stable. 0.25 converges in roughly 4 samples.
- alloc(request, jobid, priority, userid, t_submit, jobspec)
Queue an allocation request for the next
schedule()pass.Parses jobspec and pushes a
PendingJobonto_queue. Override to change how jobs are parsed or queued.- Parameters
request (
AllocRequest) -- The allocation request to respond to.jobid (int) -- The job ID (also available as
request.jobid).priority (int) -- Job priority.
userid (int) -- Submitting user ID.
t_submit (float) -- Job submission time.
jobspec (dict) -- The raw jobspec dict.
- alloc_annotate(msg, annotations)
Send an alloc annotation update. Typically called via
AllocRequest.annotate().- Parameters
msg -- The alloc request
Message.annotations (dict) -- Annotation dict to attach to the job.
- alloc_cancel(msg)
Send an alloc cancel response. Typically called via
AllocRequest.cancel().- Parameters
msg -- The original alloc request
Message.
- alloc_deny(msg, note=None)
Send an alloc denial response. Typically called via
AllocRequest.deny().- Parameters
msg -- The alloc request
Message.note (str, optional) -- Human-readable reason for the denial.
- alloc_success(msg, R, annotations=None)
Commit R to KVS and send an alloc success response.
Typically called via
AllocRequest.success(). R must be a dict (parsed R JSON object). The alloc response to job-manager is sent only after R is safely stored in KVS.- Parameters
msg -- The alloc request
Message.R (dict) -- The allocated resource set as a parsed JSON object.
annotations (dict, optional) -- Scheduler annotations to include.
- cancel(jobid)
Remove a pending job from the queue and respond with cancel.
- Parameters
jobid (int) -- The job ID to cancel.
- expiration(msg, jobid, expiration)
Called when job-manager requests a job expiration update.
The default implementation accepts all updates by responding success. Planning schedulers that track per-job expiration should override this to update their internal records before calling
self.handle.respond(msg, None).- Parameters
msg -- The request
Message.jobid (int) -- The job ID whose expiration is being updated.
expiration (float) -- New expiration timestamp (seconds since epoch).
- feasibility_check(msg, jobspec)
Called for
feasibility.checkRPC.Default implementation responds success if the job could ever fit within the total resource set (ignoring current availability). Subclasses may override for custom feasibility logic.
- Parameters
msg -- The request
Message.jobspec (dict) -- The raw jobspec dict.
- forecast()
Called after
schedule()to annotate pending jobs with forward-looking estimates.Override in subclasses to populate
sched.t_estimate(and other forward-looking annotations) on pending jobs. The base class implementation is a no-op.Triggered automatically after each
schedule()call completes. If a forecast pass is already in progress when a new scheduling event arrives, it is left running to completion; the next forecast pass starts after the current one finishes and the nextschedule()pass completes. Forecast estimates are approximate by design, so a slightly stalet_estimateis preferable to having none at all.Supports the same generator protocol as
schedule(): addyieldat each desired reactor handoff point — typically after each annotated job — to return control to the reactor between annotations.
- free(jobid, R, final=False)
Return released resources to the pool.
Calls
free()on the resource pool. Override (callingsuper().free()) to perform additional bookkeeping such as removing per-job state.- Parameters
jobid (int) -- The job ID.
R -- The released resources.
final (bool) -- True if this is the final free for the job.
- hello(jobid, priority, userid, t_submit, R)
Called for each running job during the hello protocol.
Invoked synchronously during
run()before the reactor starts. The default implementation marks R as allocated in the resource pool, which is sufficient for most schedulers. Override to additionally track per-job state (e.g., store the job's expiration time).- Parameters
jobid (int) -- The job ID.
priority (int) -- Job priority.
userid (int) -- Submitting user ID.
t_submit (float) -- Job submission time.
R -- The job's current allocation.
- hello_partial_ok = True
If True (the default), send
partial-ok: Truein the hello RPC so that job-manager may report partially-freed ranks for running jobs. Set to False (or passtest-hello-nopartialat load time on subclasses that support it) to disable partial-ok behaviour.
- pool_class: Optional[type] = None
Custom pool class. When set to a
ResourcePoolsubclass,_make_pool()instantiates it directly instead of using the defaultResourcePoolversion dispatch. The subclass is responsible for its own version dispatch (e.g. via an_impl_map). This is the preferred way to inject a custom pool:class MyScheduler(Scheduler): pool_class = MyPool
- pool_kwargs: dict = {}
Extra keyword arguments forwarded to the pool class at construction time. Passed to whichever pool class is selected by
_make_pool()— whether that ispool_class, a writer-discovered class, or the defaultResourcePool.
- prioritize(jobs)
Apply priority updates to queued jobs and re-sort the queue.
- Parameters
jobs (list) -- List of
[jobid, priority]pairs.
- queue_depth = 'unlimited'
Maximum number of pending alloc requests the job-manager will queue for this scheduler. Set to a positive integer (e.g.
8) forlimitedmode or to the string"unlimited"for unlimited mode. TheSchedulerbase class translates this to the wire format sent injob-manager.sched-ready; subclasses and end users should never need to know about the internal"limited=N"representation. End users may override at load time withqueue-depth=N|unlimited.
- resource_update()
Called after resource state changes from resource.acquire.
The updated state is available via
resources.
- property resources
The current resource pool.
- run()
Initialize the scheduler and start the reactor.
Performs the RFC 27 initialization sequence:
Register the
scheddynamic serviceRegister the
feasibilitydynamic serviceflux_module_set_running()— allowflux module loadto returnresource.acquire— sync first response, async continuationjob-manager.sched-hello— sync hello protocoljob-manager.sched-ready— announce readinessStart the reactor
- Raises
OSError -- If any initialization step fails.
- schedule()
Called to run the scheduling loop.
Override this in subclasses to implement allocation policy. Triggered automatically after every
alloc(),free(),cancel(),prioritize(),expiration(), andresource_update()event via an adaptive one-shot timer. When events arrive in rapid bursts the timer delay grows to coalesce them into fewerschedule()calls; when events are infrequent the delay returns to zero so that each event is processed promptly.Generator protocol — if this method returns a generator the base class advances it one yield per reactor iteration, allowing other events (new jobs, free responses, RPCs) to be handled between yields. Yield at each point where reactor responsiveness is desired — typically after each dispatched or denied job, but also when blocking on resources:
def schedule(self): while self._queue: job = self._queue[0] try: alloc = self.resources.alloc(job.jobid, job.resource_request) except InsufficientResources: break except InfeasibleRequest as exc: job.request.deny(str(exc)) else: job.request.success(alloc) heapq.heappop(self._queue) yield # hand control back to the reactor
When a new scheduling event arrives while a generator pass is in progress the base class closes the generator (triggering any
finallyblocks) and starts a fresh pass after the settling delay. This ensures a newly submitted high-priority job or a freed resource is considered from the top of the queue without waiting for the current pass to finish.Non-generator
schedule()implementations (noyield) behave exactly as before and remain fully supported.
- start_schedule()
Arm the scheduling timer, aborting any in-progress generator pass.
Called by
_request_schedule()after the EWMA interval update. Override this method (not_request_schedule()) to replace the generator-based scheduling loop with an alternative driver, while still benefiting from the burst-coalescing delay computed in_request_schedule().The default implementation:
Closes any in-progress
schedule()generator and stops its yield watchers (queue or resource state is about to change so in-progress allocations are stale). An in-progressforecast()generator is left running to completion: forecast estimates are approximate by nature and a slightly stalet_estimateis more useful than none at all.Arms the one-shot scheduling timer with the current adaptive delay if it is not already pending. Subsequent calls while the timer is armed are no-ops, coalescing all events in the window into a single
schedule()invocation.
- stats_get()
Return a dict of scheduler statistics for the stats-get RPC.
Called by the built-in
<module-name>.stats-gethandler. Subclasses may override to add extra fields; callsuper().stats_get()and update the returned dict:def stats_get(self): stats = super().stats_get() stats["my_counter"] = self._my_counter return stats
Standard fields:
sched_passesNumber of completed
schedule()passes.sched_yieldsTotal yields across all
schedule()generator passes (always 0 for synchronous schedulers).forecast_passesNumber of completed
forecast()passes.forecast_yieldsTotal yields across all
forecast()generator passes (always 0 for synchronous schedulers).sched_delayCurrent adaptive burst-coalescing delay in seconds (0 = immediate). Always 0 for generator-based schedulers.
sched_duration_ewmaEWMA of
schedule()wall-clock duration in seconds. Always 0 for generator-based schedulers.sched_interval_ewmaEWMA of time between
_request_schedule()calls in seconds. Tracked for all schedulers but does not affectsched_delayfor generator-based schedulers.pending_jobsCurrent number of pending alloc requests in the scheduler queue.