flux.modprobe module
Flux modprobe: Task and module orchestration for broker startup/shutdown.
This module implements the Flux modprobe system, which manages broker module loading/unloading and task execution during rc1 (startup) and rc3 (shutdown). It provides declarative configuration through TOML files and Python rc scripts with dependency resolution and precedence ordering.
Architecture
The implementation is organized into several key components:
- TaskDB: Priority-based task database supporting service alternatives.
Stores tasks in a dict-of-dicts structure {service: {name: TaskEntry}} where TaskEntry is a namedtuple(index, task). Priority comes from task.priority. When selecting a task for a service, the highest priority enabled task is chosen.
- DependencySolver: Dependency resolution engine.
Handles recursive requirement resolution (requires), needs filtering, execution order precedence (before/after), and safe module removal.
- ConfigLoader: Configuration and rc file loading.
Manages search paths and loads TOML module config and Python rc files from configurable directories.
- Task: Base class for modprobe tasks.
Represents a task with configuration including ranks, dependencies, and enabled/disabled state. A task can be load/remove of a module (the Module class) or a Python function (CodeTask class: Python function decorated with @task).
- Context: Execution context passed to all tasks.
Provides access to broker configuration, attributes, environment variables, shared data between tasks, and module argument management. Uses thread-local storage to provide an automatic per-thread Flux handle.
- Modprobe: Main orchestrator class.
Coordinates TaskDB, DependencySolver, and ConfigLoader. Provides high-level operations: configure_modules(), load(), remove(), run(), read_rcfile(). Used by flux-modprobe(1) command.
Usage Example
Creating tasks in rc1.py:
from flux.modprobe import task
@task("setup-kvs", ranks="0", after=["kvs"], needs=["kvs"])
def setup_kvs(context):
# Configure KVS after it loads
context.print("Configuring KVS")
# ... setup code ...
Using Modprobe programmatically:
from flux.modprobe import Modprobe
M = Modprobe(verbose=True)
M.configure_modules()
M.load(["kvs", "job-manager"])
M.run(M.get_deps(["kvs", "job-manager"]))
Configuration
Module configuration in modprobe.toml:
[[modules]]
name = "kvs"
ranks = "all"
provides = ["kvs-service"]
requires = []
after = []
[[modules]]
name = "job-manager"
ranks = "0"
requires = ["kvs"]
after = ["kvs"]
See flux-modprobe(1) for complete configuration documentation.
- class flux.modprobe.CodeTask(name, func, *args, **kwargs)
Bases:
TaskA modprobe task that runs as a Python function
- dry_run(context)
Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.
- run(context)
A task's run() method. This should be overridden in the specific Task subclass.
- class flux.modprobe.ConfigLoader(searchpath, print_func)
Bases:
objectConfiguration and rc file loading with search path management.
ConfigLoader handles all file I/O for modprobe configuration, separating I/O concerns from business logic. It manages search paths for both TOML module config files and Python rc scripts.
- Search Path:
Default search path is built from: - Built-in: $datadir/modprobe or $libexecdir/modprobe - FLUX_MODPROBE_PATH (replaces default) - FLUX_MODPROBE_PATH_PREPEND (added before default) - FLUX_MODPROBE_PATH_APPEND (added after default)
- File Discovery:
TOML: modprobe.toml and modprobe.d/.toml RC: rc1.py, rc1.d/.py (or rc3.py, rc3.d/*.py)
- searchpath
Paths for 'toml' and 'py' file types
- Type
dict
- print
Function for verbose output
- Type
callable
- static build_searchpath(builtindir='datadir') List[str]
Build searchpath list from environment variables and config.
Returns list of dirs in
FLUX_MODPROBE_PATHif set, otherwise returns the default modprobe search path.- Parameters
builtindir -- base path for builtin/package path. Should be either "datadir" or "libexecdir".
- Returns
List of directory paths (duplicates removed)
- get_rc_files(name='rc1') List[str]
- Return all modprobe rc *.py files found in the following order:
Always read
{fluxdatadir}/modprobe/{name}.py(e.g.rc1.py)for dir in self.searchpath: read
{dir}/{name}.d/*.py
- Parameters
name -- RC file basename (e.g., "rc1", "rc3")
- Returns
List of absolute paths to Python RC files
- get_toml_files() List[str]
- Return all modprobe config toml files found in the following order:
Always read
{fluxdatadir}/modprobe/modprobe.tomlfor dir in self.searchpath: read
{dir}/modprobe.d/*.toml
- Returns
List of absolute paths to TOML files
- class flux.modprobe.Context(modprobe, verbose=False, dry_run=False)
Bases:
objectExecution context passed to all modprobe tasks.
Context provides task functions with access to broker state, configuration, and shared data. It enables tasks to query and modify the environment, communicate with the broker, and share data with other tasks.
- Key Features:
Broker access: Configuration, attributes, RPCs via Flux handle
Environment: Get/set variables in current process and broker
Shared data: Store/retrieve arbitrary data between tasks
Module arguments: Configure module load-time arguments
Thread safety: Per-thread Flux handles via thread-local storage
- Common Methods:
handle: Flux handle for broker communication (thread-local) rank: Current broker rank conf_get(key): Get broker config value attr_get(attr): Get broker attribute getenv(var): Get environment variable (local or broker) setenv(name, value): Set environment variable (local and broker) set(key, value) / get(key): Store/retrieve shared data setopt(module, options): Configure module arguments rpc(topic, args): Send RPC to broker
- verbose
Verbose output mode
- Type
bool
- dry_run
Dry-run mode (print without executing)
- Type
bool
- attr_get(attr, default=None)
Get broker attribute with optional default
- bash(command)
Execute shell command via bash -c.
- Parameters
command -- Shell command string
- Raises
RuntimeError -- If command exits non-zero or is killed by signal
- conf_get(key, default=None)
Get config key with optional default
- enable(name)
Force enable a module/service/task, overriding ranks conditional, needs-config, and needs-attrs.
Note: This will not also enable dependencies of
name.
- get(key, default=None)
Get arbitrary data set by other tasks with optional default value
- getenv(var, default=None)
Get environment variable value from local process or broker.
Checks the local process environment first, then falls back to querying the broker via RPC. This is useful because the broker may filter some variables from rc1/rc3 environments (e.g., those set by resource managers or launchers). Results from broker are cached.
- Parameters
var -- Environment variable name
default -- Value to return if variable is not set
- Returns
Variable value (str) or default if not set
- getopts(name, default=None, also=None)
Get module load-time arguments.
Retrieves the argument list for a module, optionally including arguments from related modules.
- Parameters
name -- Module name
default -- Default arguments (used if no overwrite occurred)
also -- List of additional module names whose args to include
- Returns
List of argument strings
- property handle
Return a per-thread Flux handle created on demand
- load_modules(modules)
Schedule modules to be loaded during task execution.
Adds modules to the active task list so they will be loaded when run() is called. Typically used in setup() functions.
- Parameters
modules -- List of module names to load
- print(*args)
Print message if modprobe is in verbose output mode
- property rank
- remove_modules(modules=None)
Set a list of modules to remove by name. Remove all if
modulesis None.
- rpc(topic, *args, **kwargs)
Convenience function to call context.handle.rpc()
- set(key, value)
Set arbitrary data at key for future use. (see get())
- set_alternative(name, alternative)
Force an alternative for module
nametoalternative
- setenv(name_or_env, value=None)
Set or unset environment variables in the current process and broker.
Variables set via this method that are not in the broker's env blocklist will be inherited by rc2 and rc3. A value of None causes the named variable to be unset.
Note: concurrent calls from unrelated tasks running in parallel are safe in CPython since os.environ operations are serialized by the GIL.
- Parameters
name_or_env -- a variable name string, or a dict mapping names to values. Values may be strings or None to unset.
value -- the value to set, when name_or_env is a string.
- Raises
ValueError -- if any value is not a string or None.
OSError -- if the RPC fails.
- setopt(module, options, overwrite=False)
Add module load-time arguments.
Appends options to a module's argument list (or overwrites if overwrite=True). Options can be a single string with whitespace-separated arguments.
- Parameters
module -- Module name
options -- Options string (whitespace-separated)
overwrite -- Replace all existing options (default False)
Example
context.setopt("kvs", "checkpoint-period=10m")
- tls = <_thread._local object>
- class flux.modprobe.DependencySolver(taskdb, context)
Bases:
objectHandles all dependency resolution for modprobe tasks.
This class encapsulates the complex logic for resolving task dependencies, including: - Finding all required dependencies (requires) - Filtering tasks based on needs constraints - Building execution order precedence graphs (before/after) - Finding safely removable modules
Separated from Modprobe class for clarity and testability.
- get_requires(tasks) Dict[str, List[str]]
Get forward requires dependency map for tasks.
- Parameters
tasks -- Iterable of task names
- Returns
Dict mapping each task name to list of tasks it requires
- get_reverse_requires(tasks) Dict[str, Set[str]]
Get reverse requires dependency map for tasks.
- Parameters
tasks -- Iterable of task names
- Returns
Dict mapping each task name to set of tasks that require it
- resolve_service(service: str, ignore_needs: bool = False)
Resolve service name to actual task, considering needs constraints.
Returns the highest priority task providing service that: 1. Is enabled (passes enabled() check with context) 2. Has all its needs satisfied (unless ignore_needs=True)
Falls back to highest priority task if no viable tasks exist.
- Parameters
service -- Service or task name to resolve
ignore_needs -- If True, skip needs checking (for explicit loads)
- Returns
Task object
- Raises
ValueError -- If service doesn't exist in taskdb
- solve_execution_order(tasks) Dict[str, List[str]]
Build precedence graph for tasks based on before/after constraints.
- Parameters
tasks -- List/set of task names
- Returns
Dict mapping task names to list of predecessor task names
Note: If tasks is a set, a copy is made internally to avoid mutating the input. Lists are always converted to sets internally.
- solve_needs(tasks) List[str]
Filter out tasks where needs constraints are not met.
When a task is removed because a needed service is not available, all tasks that need it are also recursively removed.
- Parameters
tasks -- Iterable of task names
- Returns
New list of task names with unsatisfied needs removed. Does not modify input.
- solve_removal(dependencies: Dict[str, List[str]], modules_to_remove) List[str]
Find modules that can be safely removed.
Given a set of modules to remove and their dependency lists, finds additional modules that can be removed because they no longer have any dependents.
- Parameters
dependencies -- Dict of modules to their dependency list
modules_to_remove -- Iterable of modules to remove
- Returns
New list of modules that can be safely removed (includes original modules plus cascaded removals). Does not modify inputs.
- Raises
ValueError -- If any module to remove still has dependents
- solve_requirements(tasks, ignore_disabled=False) List[str]
Recursively find all requirements of tasks.
- Parameters
tasks -- Iterable of task names to solve
ignore_disabled -- If True, include disabled tasks in result
- Returns
List of task names including all required dependencies. Disabled tasks are skipped unless ignore_disabled=True. Does not modify input.
- class flux.modprobe.Modprobe(timing=False, verbose=False, dry_run=False)
Bases:
objectMain orchestrator for flux-modprobe task and module management.
Modprobe coordinates TaskDB, DependencySolver, and ConfigLoader to provide high-level operations for module loading/unloading and task execution. Used by flux-modprobe(1) command and available for programmatic use.
- Key Operations:
configure_modules(): Load module config from TOML files load(modules): Load modules and their dependencies remove(modules): Unload modules and unused dependencies read_rcfile(name): Load and execute rc Python scripts run(deps): Execute tasks in dependency order
- solver
Dependency resolver
- Type
- loader
Configuration loader
- Type
- rank
Broker rank
- Type
int
- exitcode
Exit code (0=success, 1=failure)
- Type
int
- timing
Optional timing data for performance analysis
- Type
list
- Parameters
timing (bool) -- Enable timing instrumentation (default False)
verbose (bool) -- Enable verbose output (default False)
dry_run (bool) -- Print actions without executing (default False)
- activate_modules(modules)
- property active_tasks
Return all active, enabled tasks
- add_active_task(task)
Add a task to the task db and active tasks list
- add_modules(file)
- add_task(task)
Add a task to internal task db
- add_timing(name, starttime, end=None)
- configure_modules()
Load module configuration from TOML config files.
Reads all modprobe.toml and modprobe.d/*.toml files from the search path, adding module definitions to the task database. Also applies any module configuration overrides from broker config.
- Returns
For method chaining
- Return type
self
- disable(name)
Disable module/task
name
- get_deps(tasks)
Build execution order precedence graph for tasks.
Constructs a dict mapping task names to lists of predecessors (tasks that must complete before each task can start). Respects before/after constraints and handles special before=["*"] and after=["*"] cases.
- Parameters
tasks -- Iterable of task names
- Returns
[predecessor_list]} for topological sort
- Return type
Dict of {task_name
- get_requires(tasks, reverse=False)
Return dependencies for tasks as dicts of names to dependencies
- get_task(name, default=None)
Return task by name from taskdb.
Note: This does NOT do service resolution. For service-aware lookup (e.g., "sched" -> actual scheduler module), use resolve_service().
- has_task(name)
Return True if task exists in taskdb
- load(modules)
Load modules and their dependencies (if not already loaded)
- Parameters
modules (list) -- List of modules to load.
- Raises
FileExistsError -- Target modules (and all their dependencies) are already loaded, so there is nothing to do.
Note
This method uses ignore_disabled=True to allow loading modules that are disabled by configuration. This enables explicit loading of non-default alternatives or disabled modules.
- print(*args)
Wrapper for context.print()
- read_rcfile(name)
Load and execute Python rc files.
Loads rc Python files from search path (e.g., rc1.py, rc1.d/*.py) and registers @task-decorated functions. Also executes setup() function if present in any loaded module.
- Parameters
name -- RC file basename (e.g., "rc1", "rc3") or absolute path to a .py file
- remove(modules)
Unload modules and any unused dependencies.
Finds all modules that can be safely removed (no remaining dependents) and unloads them in reverse load order.
- Parameters
modules -- List of module names to remove, or ["all"] to remove all loaded modules
- Raises
ValueError -- If a specified module is not loaded or still has active dependents
- resolve_service(name, ignore_needs=False)
Resolve service name to actual task using needs-aware resolution.
This is the public API for service resolution. It considers: - Task enabled/disabled status - Needs constraints (unless ignore_needs=True) - Priority and alternatives
- Parameters
name -- Service or task name to resolve
ignore_needs -- If True, skip needs checking
- Returns
Task object that would be loaded for this service
- run(deps)
Execute tasks in parallel respecting precedence constraints.
Uses ThreadPoolExecutor to run tasks concurrently when their dependencies are satisfied. Tasks are executed in topological order based on the deps precedence graph.
- Parameters
deps -- Dict of {task_name: [predecessors]} from get_deps()
- save_task_timing(tasks)
- set_alternative(name, alternative)
Force an alternative for module
nametoalternative
- set_remove(modules=None)
Register a set of modules to remove or remove all modules
- solve(tasks, timing=True, ignore_disabled=False)
Recursively resolve all requirements for the given tasks.
- Parameters
tasks -- Iterable of task/module names
timing -- Record timing data (default True)
ignore_disabled -- Include disabled tasks (default False)
- Returns
List of task names including all recursive requirements
- property timestamp
- update_module(name, entry, new_module=None)
Update attributes of an existing module/task.
- Parameters
name -- Task name or service name to look up
entry -- Dict of attribute updates to apply
new_module -- Optional pre-constructed Module for attribute source
Note: 'name' may be a service name (e.g., "feasibility") rather than the actual task name (e.g., "sched-simple"), so we never overwrite task.name during updates.
- class flux.modprobe.Module(conf)
Bases:
TaskA modprobe task to load/remove a broker module. The default action is to the load the module. Call the
set_remove()method to convert the task to remove the module.- VALID_KEYS = ('name', 'module', 'args', 'ranks', 'provides', 'requires', 'needs', 'before', 'after', 'needs-attrs', 'needs-config', 'needs-env', 'priority', 'disabled', 'exec')
- dry_run(context)
Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.
- set_remove()
Mark module to be removed instead of loaded (the default)
- to_dict()
- class flux.modprobe.ModuleList(handle)
Bases:
objectSimple class for iteration and lookup of loaded modules
- lookup(name)
- class flux.modprobe.RankConditional(arg)
Bases:
objectConditional rank statement, e.g.
>0- test(rank)
- class flux.modprobe.Task(name, *args, **kwargs)
Bases:
objectBase class representing a modprobe task and its configuration.
A Task represents a unit of work in the modprobe system with associated configuration controlling when and where it runs. Tasks can be broker modules (Module subclass) or Python functions (CodeTask subclass).
- Configuration Attributes:
name (str): Unique task identifier ranks (RankConditional|RankIDset): Ranks where task executes provides (list): Service names this task provides (alternatives) requires (list): Tasks/services that must be active when this runs needs (list): Tasks/services that must be enabled for this to
enable
before (list): Tasks/services this must run before after (list): Tasks/services this must run after needs_attrs (list): Required broker attributes needs_config (list): Required config keys needs_env (list): Required environment variables disabled (bool): Whether task is explicitly disabled priority (int): Priority for service alternative selection
(default 100)
- Special Values:
before=["*"]: Run before all other tasks (cannot also use after)
after=["*"]: Run after all other tasks (cannot also use before)
Specs starting with "!" are inverted (must NOT be set)
- Runtime Attributes:
starttime (float): Task start timestamp (set during execution) endtime (float): Task end timestamp (set during execution) force_enabled (bool): Override disabled state (internal use)
- Enabling Logic:
A task is enabled if ALL of the following are true: 1. Not explicitly disabled via disabled=True 2. Runs on the current broker rank 3. All needs_config keys are set (or not set if prefixed with !) 4. All needs_attrs attributes are set (or not set if !) 5. All needs_env variables are set (or not set if !) 6. All tasks in needs list are also enabled
- VALID_ARGS = {'after': [], 'before': [], 'disabled': False, 'needs': [], 'needs_attrs': [], 'needs_config': [], 'needs_env': [], 'priority': 100, 'provides': [], 'ranks': 'all', 'requires': []}
- dry_run(context)
Default task dry_run() method. This prints the task name. Override in a subclass with more specific information if necessary.
- enabled(context=None)
Return True if task is currently not disabled. A task may be disabled by configuration, because it only runs on a given set of ranks, if the task has been configured to require a configuration or broker attribute which is not set, or if a module this task needs is not enabled.
- run(context)
A task's run() method. This should be overridden in the specific Task subclass.
- runtask(context)
Run this task's run() method (or dry_run() if dry_run is True)
- class flux.modprobe.TaskDB
Bases:
objectTask database supporting service alternatives and priority-based selection.
Structure: {service: {task_name: TaskEntry(index, task)}}
Tasks are stored in a dict-of-dicts structure where each service maps to a dict of task names to entries. Each entry is a TaskEntry namedtuple with insertion index (int) and task object. Priority comes from task.priority.
Priority Model: - task.priority is the single source of truth - set_alternative() boosts task.priority to ensure the task will be
the highest priority alternative. This affects all services the task provides
Priority updates via config/TOML set task.priority globally
- class TaskEntry(index, task)
Bases:
tuple- index
Alias for field number 0
- task
Alias for field number 1
- add(task: Task, index: int = None) None
Add task to database for its name and all services it provides
- disable(service: str) None
Disable all tasks providing this task/module/service
- enable(service: str) None
Force a module/task/service to be enabled even if it would normally be disabled by rank, needs-config, or needs-attr.
- get(service: str) Task
Return the highest priority task providing
servicewhich is not disabled. If there are no non-disabled tasks providingservice, then return the highest priority task regardless of disabled status. If no tasks provideservice, raiseValueError.Note: Only checks the disabled flag, not the full enabled() method which requires context for rank/config/attr checks.
- get_all(service: str) List[Task]
Return all tasks providing
service, sorted by (priority, index).- Parameters
service -- Service or task name to look up
- Returns
List of Task objects sorted from lowest to highest priority. Empty list if service doesn't exist.
- get_entry(service: str, task_name: str)
Get TaskEntry for a specific task providing a service.
- Parameters
service -- Service name
task_name -- Task name
- Returns
TaskEntry with priority, index, and task
- Raises
ValueError -- If service or task not found
- has_enabled_provider(tasks, service: str) bool
Check if any non-disabled task in tasks provides the given service.
- Parameters
tasks -- Iterable of task names to check
service -- Service name to look for
- Returns
True if at least one non-disabled task provides service
- set_alternative(service: str, name: str) None
Select a specific alternative 'name' for service.
Boosts task.priority above all other tasks providing this service. Since priority is global to the task, this affects all services the task provides.
- flux.modprobe.default_flux_confdir()
Return the builtin Flux confdir
- flux.modprobe.rank_conditional(arg)
Rank conditional factory function
- flux.modprobe.run_all_rc_scripts(runlevel)
Helper script for flux-modprobe(1) rc1 and rc3 scripts that replaces the following shell code from rc1/rc3: ``` core_dir=$(cd ${0%/*} && pwd -P) all_dirs=$core_dir${FLUX_RC_EXTRA:+":$FLUX_RC_EXTRA"} IFS=: for rcdir in $all_dirs; do
for rcfile in $rcdir/rc{runlevel}.d/*; do [ -e $rcfile ] || continue
echo running $rcfile $rcfile || exit_rc=1
done
- Parameters
runlevel (int) -- runlevel (1 or 3) in which function is running
- Raises
OSError -- one or more rc scripts failed
- flux.modprobe.task(name, **kwargs)
Decorator for modprobe "rc" task functions.
This decorator is applied to functions in an rc1 or rc3 python source file to turn them into valid flux-modprobe(1) tasks.
- Parameters
name (required, str) -- The name of this task.
ranks (optional, str) -- A rank expression that indicates on which ranks this task should be invoked.
ranksmay be a valid RFC 22 Idset string, a single integer prefixed with<or<to indicate matching ranks less than or greater than a given rank, or the stringall(the default ifranksis not specified). Examples:0,>0,0-3.requires (optional, list) -- An optional list of task or module names this task requires. This is used to ensure required tasks are active when activating another task. It does not indicate that this task will necessarily be run before or after the tasks it requires. (See
beforeorafterfor those features)needs (options, list) -- Disable this task if any task in
needsis not active.provides (optional, list) -- An optional list of string service name that this task provides. This can be used to set up alternatives for a given service. (Mostly useful with modules)
before (optional, list) -- A list of tasks or modules for which this task must be run before.
after (optional, list) -- must be run after.
needs_attrs (optional, list) -- A list of broker attributes on which this task depends. If any of the attributes are not set then the task will not be run.
needs_config (optional, list) -- A list of config keys on which this task depends. If any of the specified config keys are not set, then this task will not be run.
needs_env (optional, list) -- A list of environment variables on which this task depends. If any of the specified environment variables are not set in the current environment, then this task will not be run.
Example:
# Declare a task that will be run after the kvs module is loaded # only on rank 0 @task("test", ranks="0", needs=["kvs"], after=["kvs"]) def test_kvs_task(context): # do something with kvs