flux.modprobe module

Flux modprobe: Task and module orchestration for broker startup/shutdown.

This module implements the Flux modprobe system, which manages broker module loading/unloading and task execution during rc1 (startup) and rc3 (shutdown). It provides declarative configuration through TOML files and Python rc scripts with dependency resolution and precedence ordering.

Architecture

The implementation is organized into several key components:

TaskDB: Priority-based task database supporting service alternatives.

Stores tasks in a dict-of-dicts structure {service: {name: TaskEntry}} where TaskEntry is a namedtuple(index, task). Priority comes from task.priority. When selecting a task for a service, the highest priority enabled task is chosen.

DependencySolver: Dependency resolution engine.

Handles recursive requirement resolution (requires), needs filtering, execution order precedence (before/after), and safe module removal.

ConfigLoader: Configuration and rc file loading.

Manages search paths and loads TOML module config and Python rc files from configurable directories.

Task: Base class for modprobe tasks.

Represents a task with configuration including ranks, dependencies, and enabled/disabled state. A task can be load/remove of a module (the Module class) or a Python function (CodeTask class: Python function decorated with @task).

Context: Execution context passed to all tasks.

Provides access to broker configuration, attributes, environment variables, shared data between tasks, and module argument management. Uses thread-local storage to provide an automatic per-thread Flux handle.

Modprobe: Main orchestrator class.

Coordinates TaskDB, DependencySolver, and ConfigLoader. Provides high-level operations: configure_modules(), load(), remove(), run(), read_rcfile(). Used by flux-modprobe(1) command.

Usage Example

Creating tasks in rc1.py:

from flux.modprobe import task

@task("setup-kvs", ranks="0", after=["kvs"], needs=["kvs"])
def setup_kvs(context):
    # Configure KVS after it loads
    context.print("Configuring KVS")
    # ... setup code ...

Using Modprobe programmatically:

from flux.modprobe import Modprobe

M = Modprobe(verbose=True)
M.configure_modules()
M.load(["kvs", "job-manager"])
M.run(M.get_deps(["kvs", "job-manager"]))

Configuration

Module configuration in modprobe.toml:

[[modules]]
name = "kvs"
ranks = "all"
provides = ["kvs-service"]
requires = []
after = []

[[modules]]
name = "job-manager"
ranks = "0"
requires = ["kvs"]
after = ["kvs"]

See flux-modprobe(1) for complete configuration documentation.

class flux.modprobe.CodeTask(name, func, *args, **kwargs)

Bases: Task

A modprobe task that runs as a Python function

dry_run(context)

Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.

run(context)

A task's run() method. This should be overridden in the specific Task subclass.

class flux.modprobe.ConfigLoader(searchpath, print_func)

Bases: object

Configuration and rc file loading with search path management.

ConfigLoader handles all file I/O for modprobe configuration, separating I/O concerns from business logic. It manages search paths for both TOML module config files and Python rc scripts.

Search Path:

Default search path is built from: - Built-in: $datadir/modprobe or $libexecdir/modprobe - FLUX_MODPROBE_PATH (replaces default) - FLUX_MODPROBE_PATH_PREPEND (added before default) - FLUX_MODPROBE_PATH_APPEND (added after default)

File Discovery:

TOML: modprobe.toml and modprobe.d/.toml RC: rc1.py, rc1.d/.py (or rc3.py, rc3.d/*.py)

searchpath

Paths for 'toml' and 'py' file types

Type

dict

print

Function for verbose output

Type

callable

static build_searchpath(builtindir='datadir') List[str]

Build searchpath list from environment variables and config.

Returns list of dirs in FLUX_MODPROBE_PATH if set, otherwise returns the default modprobe search path.

Parameters

builtindir -- base path for builtin/package path. Should be either "datadir" or "libexecdir".

Returns

List of directory paths (duplicates removed)

get_rc_files(name='rc1') List[str]
Return all modprobe rc *.py files found in the following order:
  • Always read {fluxdatadir}/modprobe/{name}.py (e.g. rc1.py)

  • for dir in self.searchpath: read {dir}/{name}.d/*.py

Parameters

name -- RC file basename (e.g., "rc1", "rc3")

Returns

List of absolute paths to Python RC files

get_toml_files() List[str]
Return all modprobe config toml files found in the following order:
  • Always read {fluxdatadir}/modprobe/modprobe.toml

  • for dir in self.searchpath: read {dir}/modprobe.d/*.toml

Returns

List of absolute paths to TOML files

class flux.modprobe.Context(modprobe, verbose=False, dry_run=False)

Bases: object

Execution context passed to all modprobe tasks.

Context provides task functions with access to broker state, configuration, and shared data. It enables tasks to query and modify the environment, communicate with the broker, and share data with other tasks.

Key Features:
  • Broker access: Configuration, attributes, RPCs via Flux handle

  • Environment: Get/set variables in current process and broker

  • Shared data: Store/retrieve arbitrary data between tasks

  • Module arguments: Configure module load-time arguments

  • Thread safety: Per-thread Flux handles via thread-local storage

Common Methods:

handle: Flux handle for broker communication (thread-local) rank: Current broker rank conf_get(key): Get broker config value attr_get(attr): Get broker attribute getenv(var): Get environment variable (local or broker) setenv(name, value): Set environment variable (local and broker) set(key, value) / get(key): Store/retrieve shared data setopt(module, options): Configure module arguments rpc(topic, args): Send RPC to broker

verbose

Verbose output mode

Type

bool

dry_run

Dry-run mode (print without executing)

Type

bool

modprobe

Parent Modprobe instance

Type

Modprobe

attr_get(attr, default=None)

Get broker attribute with optional default

bash(command)

Execute shell command via bash -c.

Parameters

command -- Shell command string

Raises

RuntimeError -- If command exits non-zero or is killed by signal

conf_get(key, default=None)

Get config key with optional default

enable(name)

Force enable a module/service/task, overriding ranks conditional, needs-config, and needs-attrs.

Note: This will not also enable dependencies of name.

get(key, default=None)

Get arbitrary data set by other tasks with optional default value

getenv(var, default=None)

Get environment variable value from local process or broker.

Checks the local process environment first, then falls back to querying the broker via RPC. This is useful because the broker may filter some variables from rc1/rc3 environments (e.g., those set by resource managers or launchers). Results from broker are cached.

Parameters
  • var -- Environment variable name

  • default -- Value to return if variable is not set

Returns

Variable value (str) or default if not set

getopts(name, default=None, also=None)

Get module load-time arguments.

Retrieves the argument list for a module, optionally including arguments from related modules.

Parameters
  • name -- Module name

  • default -- Default arguments (used if no overwrite occurred)

  • also -- List of additional module names whose args to include

Returns

List of argument strings

property handle

Return a per-thread Flux handle created on demand

load_modules(modules)

Schedule modules to be loaded during task execution.

Adds modules to the active task list so they will be loaded when run() is called. Typically used in setup() functions.

Parameters

modules -- List of module names to load

print(*args)

Print message if modprobe is in verbose output mode

property rank
remove_modules(modules=None)

Set a list of modules to remove by name. Remove all if modules is None.

rpc(topic, *args, **kwargs)

Convenience function to call context.handle.rpc()

set(key, value)

Set arbitrary data at key for future use. (see get())

set_alternative(name, alternative)

Force an alternative for module name to alternative

setenv(name_or_env, value=None)

Set or unset environment variables in the current process and broker.

Variables set via this method that are not in the broker's env blocklist will be inherited by rc2 and rc3. A value of None causes the named variable to be unset.

Note: concurrent calls from unrelated tasks running in parallel are safe in CPython since os.environ operations are serialized by the GIL.

Parameters
  • name_or_env -- a variable name string, or a dict mapping names to values. Values may be strings or None to unset.

  • value -- the value to set, when name_or_env is a string.

Raises
  • ValueError -- if any value is not a string or None.

  • OSError -- if the RPC fails.

setopt(module, options, overwrite=False)

Add module load-time arguments.

Appends options to a module's argument list (or overwrites if overwrite=True). Options can be a single string with whitespace-separated arguments.

Parameters
  • module -- Module name

  • options -- Options string (whitespace-separated)

  • overwrite -- Replace all existing options (default False)

Example

context.setopt("kvs", "checkpoint-period=10m")

tls = <_thread._local object>
class flux.modprobe.DependencySolver(taskdb, context)

Bases: object

Handles all dependency resolution for modprobe tasks.

This class encapsulates the complex logic for resolving task dependencies, including: - Finding all required dependencies (requires) - Filtering tasks based on needs constraints - Building execution order precedence graphs (before/after) - Finding safely removable modules

Separated from Modprobe class for clarity and testability.

get_requires(tasks) Dict[str, List[str]]

Get forward requires dependency map for tasks.

Parameters

tasks -- Iterable of task names

Returns

Dict mapping each task name to list of tasks it requires

get_reverse_requires(tasks) Dict[str, Set[str]]

Get reverse requires dependency map for tasks.

Parameters

tasks -- Iterable of task names

Returns

Dict mapping each task name to set of tasks that require it

resolve_service(service: str, ignore_needs: bool = False)

Resolve service name to actual task, considering needs constraints.

Returns the highest priority task providing service that: 1. Is enabled (passes enabled() check with context) 2. Has all its needs satisfied (unless ignore_needs=True)

Falls back to highest priority task if no viable tasks exist.

Parameters
  • service -- Service or task name to resolve

  • ignore_needs -- If True, skip needs checking (for explicit loads)

Returns

Task object

Raises

ValueError -- If service doesn't exist in taskdb

solve_execution_order(tasks) Dict[str, List[str]]

Build precedence graph for tasks based on before/after constraints.

Parameters

tasks -- List/set of task names

Returns

Dict mapping task names to list of predecessor task names

Note: If tasks is a set, a copy is made internally to avoid mutating the input. Lists are always converted to sets internally.

solve_needs(tasks) List[str]

Filter out tasks where needs constraints are not met.

When a task is removed because a needed service is not available, all tasks that need it are also recursively removed.

Parameters

tasks -- Iterable of task names

Returns

New list of task names with unsatisfied needs removed. Does not modify input.

solve_removal(dependencies: Dict[str, List[str]], modules_to_remove) List[str]

Find modules that can be safely removed.

Given a set of modules to remove and their dependency lists, finds additional modules that can be removed because they no longer have any dependents.

Parameters
  • dependencies -- Dict of modules to their dependency list

  • modules_to_remove -- Iterable of modules to remove

Returns

New list of modules that can be safely removed (includes original modules plus cascaded removals). Does not modify inputs.

Raises

ValueError -- If any module to remove still has dependents

solve_requirements(tasks, ignore_disabled=False) List[str]

Recursively find all requirements of tasks.

Parameters
  • tasks -- Iterable of task names to solve

  • ignore_disabled -- If True, include disabled tasks in result

Returns

List of task names including all required dependencies. Disabled tasks are skipped unless ignore_disabled=True. Does not modify input.

class flux.modprobe.Modprobe(timing=False, verbose=False, dry_run=False)

Bases: object

Main orchestrator for flux-modprobe task and module management.

Modprobe coordinates TaskDB, DependencySolver, and ConfigLoader to provide high-level operations for module loading/unloading and task execution. Used by flux-modprobe(1) command and available for programmatic use.

Key Operations:

configure_modules(): Load module config from TOML files load(modules): Load modules and their dependencies remove(modules): Unload modules and unused dependencies read_rcfile(name): Load and execute rc Python scripts run(deps): Execute tasks in dependency order

taskdb

Task database

Type

TaskDB

context

Execution context

Type

Context

solver

Dependency resolver

Type

DependencySolver

loader

Configuration loader

Type

ConfigLoader

handle

Flux handle (from context)

Type

Flux

rank

Broker rank

Type

int

exitcode

Exit code (0=success, 1=failure)

Type

int

timing

Optional timing data for performance analysis

Type

list

Parameters
  • timing (bool) -- Enable timing instrumentation (default False)

  • verbose (bool) -- Enable verbose output (default False)

  • dry_run (bool) -- Print actions without executing (default False)

activate_modules(modules)
property active_tasks

Return all active, enabled tasks

add_active_task(task)

Add a task to the task db and active tasks list

add_modules(file)
add_task(task)

Add a task to internal task db

add_timing(name, starttime, end=None)
configure_modules()

Load module configuration from TOML config files.

Reads all modprobe.toml and modprobe.d/*.toml files from the search path, adding module definitions to the task database. Also applies any module configuration overrides from broker config.

Returns

For method chaining

Return type

self

disable(name)

Disable module/task name

get_deps(tasks)

Build execution order precedence graph for tasks.

Constructs a dict mapping task names to lists of predecessors (tasks that must complete before each task can start). Respects before/after constraints and handles special before=["*"] and after=["*"] cases.

Parameters

tasks -- Iterable of task names

Returns

[predecessor_list]} for topological sort

Return type

Dict of {task_name

get_requires(tasks, reverse=False)

Return dependencies for tasks as dicts of names to dependencies

get_task(name, default=None)

Return task by name from taskdb.

Note: This does NOT do service resolution. For service-aware lookup (e.g., "sched" -> actual scheduler module), use resolve_service().

has_task(name)

Return True if task exists in taskdb

load(modules)

Load modules and their dependencies (if not already loaded)

Parameters

modules (list) -- List of modules to load.

Raises

FileExistsError -- Target modules (and all their dependencies) are already loaded, so there is nothing to do.

Note

This method uses ignore_disabled=True to allow loading modules that are disabled by configuration. This enables explicit loading of non-default alternatives or disabled modules.

print(*args)

Wrapper for context.print()

read_rcfile(name)

Load and execute Python rc files.

Loads rc Python files from search path (e.g., rc1.py, rc1.d/*.py) and registers @task-decorated functions. Also executes setup() function if present in any loaded module.

Parameters

name -- RC file basename (e.g., "rc1", "rc3") or absolute path to a .py file

remove(modules)

Unload modules and any unused dependencies.

Finds all modules that can be safely removed (no remaining dependents) and unloads them in reverse load order.

Parameters

modules -- List of module names to remove, or ["all"] to remove all loaded modules

Raises

ValueError -- If a specified module is not loaded or still has active dependents

resolve_service(name, ignore_needs=False)

Resolve service name to actual task using needs-aware resolution.

This is the public API for service resolution. It considers: - Task enabled/disabled status - Needs constraints (unless ignore_needs=True) - Priority and alternatives

Parameters
  • name -- Service or task name to resolve

  • ignore_needs -- If True, skip needs checking

Returns

Task object that would be loaded for this service

run(deps)

Execute tasks in parallel respecting precedence constraints.

Uses ThreadPoolExecutor to run tasks concurrently when their dependencies are satisfied. Tasks are executed in topological order based on the deps precedence graph.

Parameters

deps -- Dict of {task_name: [predecessors]} from get_deps()

save_task_timing(tasks)
set_alternative(name, alternative)

Force an alternative for module name to alternative

set_remove(modules=None)

Register a set of modules to remove or remove all modules

solve(tasks, timing=True, ignore_disabled=False)

Recursively resolve all requirements for the given tasks.

Parameters
  • tasks -- Iterable of task/module names

  • timing -- Record timing data (default True)

  • ignore_disabled -- Include disabled tasks (default False)

Returns

List of task names including all recursive requirements

property timestamp
update_module(name, entry, new_module=None)

Update attributes of an existing module/task.

Parameters
  • name -- Task name or service name to look up

  • entry -- Dict of attribute updates to apply

  • new_module -- Optional pre-constructed Module for attribute source

Note: 'name' may be a service name (e.g., "feasibility") rather than the actual task name (e.g., "sched-simple"), so we never overwrite task.name during updates.

class flux.modprobe.Module(conf)

Bases: Task

A modprobe task to load/remove a broker module. The default action is to the load the module. Call the set_remove() method to convert the task to remove the module.

VALID_KEYS = ('name', 'module', 'args', 'ranks', 'provides', 'requires', 'needs', 'before', 'after', 'needs-attrs', 'needs-config', 'needs-env', 'priority', 'disabled', 'exec')
dry_run(context)

Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.

set_remove()

Mark module to be removed instead of loaded (the default)

to_dict()
class flux.modprobe.ModuleList(handle)

Bases: object

Simple class for iteration and lookup of loaded modules

lookup(name)
class flux.modprobe.RankConditional(arg)

Bases: object

Conditional rank statement, e.g. >0

test(rank)
class flux.modprobe.RankIDset(arg)

Bases: object

Rank by IDset, e.g. all or 0-1

test(rank)
class flux.modprobe.Task(name, *args, **kwargs)

Bases: object

Base class representing a modprobe task and its configuration.

A Task represents a unit of work in the modprobe system with associated configuration controlling when and where it runs. Tasks can be broker modules (Module subclass) or Python functions (CodeTask subclass).

Configuration Attributes:

name (str): Unique task identifier ranks (RankConditional|RankIDset): Ranks where task executes provides (list): Service names this task provides (alternatives) requires (list): Tasks/services that must be active when this runs needs (list): Tasks/services that must be enabled for this to

enable

before (list): Tasks/services this must run before after (list): Tasks/services this must run after needs_attrs (list): Required broker attributes needs_config (list): Required config keys needs_env (list): Required environment variables disabled (bool): Whether task is explicitly disabled priority (int): Priority for service alternative selection

(default 100)

Special Values:
  • before=["*"]: Run before all other tasks (cannot also use after)

  • after=["*"]: Run after all other tasks (cannot also use before)

  • Specs starting with "!" are inverted (must NOT be set)

Runtime Attributes:

starttime (float): Task start timestamp (set during execution) endtime (float): Task end timestamp (set during execution) force_enabled (bool): Override disabled state (internal use)

Enabling Logic:

A task is enabled if ALL of the following are true: 1. Not explicitly disabled via disabled=True 2. Runs on the current broker rank 3. All needs_config keys are set (or not set if prefixed with !) 4. All needs_attrs attributes are set (or not set if !) 5. All needs_env variables are set (or not set if !) 6. All tasks in needs list are also enabled

VALID_ARGS = {'after': [], 'before': [], 'disabled': False, 'needs': [], 'needs_attrs': [], 'needs_config': [], 'needs_env': [], 'priority': 100, 'provides': [], 'ranks': 'all', 'requires': []}
dry_run(context)

Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.

enabled(context=None)

Return True if task is currently not disabled. A task may be disabled by configuration, because it only runs on a given set of ranks, if the task has been configured to require a configuration or broker attribute which is not set, or if a module this task needs is not enabled.

run(context)

A task's run() method. This should be overridden in the specific Task subclass.

runtask(context)

Run this task's run() method (or dry_run() if dry_run is True)

class flux.modprobe.TaskDB

Bases: object

Task database supporting service alternatives and priority-based selection.

Structure: {service: {task_name: TaskEntry(index, task)}}

Tasks are stored in a dict-of-dicts structure where each service maps to a dict of task names to entries. Each entry is a TaskEntry namedtuple with insertion index (int) and task object. Priority comes from task.priority.

Priority Model: - task.priority is the single source of truth - set_alternative() boosts task.priority to ensure the task will be

the highest priority alternative. This affects all services the task provides

  • Priority updates via config/TOML set task.priority globally

class TaskEntry(index, task)

Bases: tuple

index

Alias for field number 0

task

Alias for field number 1

add(task: Task, index: int = None) None

Add task to database for its name and all services it provides

disable(service: str) None

Disable all tasks providing this task/module/service

enable(service: str) None

Force a module/task/service to be enabled even if it would normally be disabled by rank, needs-config, or needs-attr.

get(service: str) Task

Return the highest priority task providing service which is not disabled. If there are no non-disabled tasks providing service, then return the highest priority task regardless of disabled status. If no tasks provide service, raise ValueError.

Note: Only checks the disabled flag, not the full enabled() method which requires context for rank/config/attr checks.

get_all(service: str) List[Task]

Return all tasks providing service, sorted by (priority, index).

Parameters

service -- Service or task name to look up

Returns

List of Task objects sorted from lowest to highest priority. Empty list if service doesn't exist.

get_entry(service: str, task_name: str)

Get TaskEntry for a specific task providing a service.

Parameters
  • service -- Service name

  • task_name -- Task name

Returns

TaskEntry with priority, index, and task

Raises

ValueError -- If service or task not found

has_enabled_provider(tasks, service: str) bool

Check if any non-disabled task in tasks provides the given service.

Parameters
  • tasks -- Iterable of task names to check

  • service -- Service name to look for

Returns

True if at least one non-disabled task provides service

set_alternative(service: str, name: str) None

Select a specific alternative 'name' for service.

Boosts task.priority above all other tasks providing this service. Since priority is global to the task, this affects all services the task provides.

flux.modprobe.default_flux_confdir()

Return the builtin Flux confdir

flux.modprobe.rank_conditional(arg)

Rank conditional factory function

flux.modprobe.run_all_rc_scripts(runlevel)

Helper script for flux-modprobe(1) rc1 and rc3 scripts that replaces the following shell code from rc1/rc3: ``` core_dir=$(cd ${0%/*} && pwd -P) all_dirs=$core_dir${FLUX_RC_EXTRA:+":$FLUX_RC_EXTRA"} IFS=: for rcdir in $all_dirs; do

for rcfile in $rcdir/rc{runlevel}.d/*; do [ -e $rcfile ] || continue

echo running $rcfile $rcfile || exit_rc=1

done

done ```

Parameters

runlevel (int) -- runlevel (1 or 3) in which function is running

Raises

OSError -- one or more rc scripts failed

flux.modprobe.task(name, **kwargs)

Decorator for modprobe "rc" task functions.

This decorator is applied to functions in an rc1 or rc3 python source file to turn them into valid flux-modprobe(1) tasks.

Parameters
  • name (required, str) -- The name of this task.

  • ranks (optional, str) -- A rank expression that indicates on which ranks this task should be invoked. ranks may be a valid RFC 22 Idset string, a single integer prefixed with < or < to indicate matching ranks less than or greater than a given rank, or the string all (the default if ranks is not specified). Examples: 0, >0, 0-3.

  • requires (optional, list) -- An optional list of task or module names this task requires. This is used to ensure required tasks are active when activating another task. It does not indicate that this task will necessarily be run before or after the tasks it requires. (See before or after for those features)

  • needs (options, list) -- Disable this task if any task in needs is not active.

  • provides (optional, list) -- An optional list of string service name that this task provides. This can be used to set up alternatives for a given service. (Mostly useful with modules)

  • before (optional, list) -- A list of tasks or modules for which this task must be run before.

  • after (optional, list) -- must be run after.

  • needs_attrs (optional, list) -- A list of broker attributes on which this task depends. If any of the attributes are not set then the task will not be run.

  • needs_config (optional, list) -- A list of config keys on which this task depends. If any of the specified config keys are not set, then this task will not be run.

  • needs_env (optional, list) -- A list of environment variables on which this task depends. If any of the specified environment variables are not set in the current environment, then this task will not be run.

Example:

# Declare a task that will be run after the kvs module is loaded
# only on rank 0
@task("test", ranks="0", needs=["kvs"], after=["kvs"])
def test_kvs_task(context):
    # do something with kvs