flux.kvs module
- class flux.kvs.KVSDir(flux_handle=None, path='.', handle=None, namespace=None, _kvstxn=None)
Bases:
WrapperPimpl
,MutableMapping
User friendly class for KVS operations
KVS values can be read or written through this class's item accessor. e.g.
mydir = KVSDir(flux_handle) print(mydir["mykey"])
mydir["newkey"] = "foo" mydir.commit()
Any KVS directories accessed through the item accessor will share the same internal KVS transaction, so that only a single call to commit() is necessary. e.g.
mydir = KVSDir(flux_handle) subdir = mydir["subdir"] subdir["anotherkey"] = "bar" mydir.commit()
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
path -- Optional base path for all read/write to be relative to (default ".")
namespace -- Optional namespace to read/write from/to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- commit(flags=0)
Commit changes to the KVS
When keys are added, removed, or updated in the KVSDir object, the changes are only cached in memory until the commit method asks the KVS service to make them permanent. The commit method only includes keys that have been explicitly updated in the KVSDir object, and the contents of the KVS directory may diverge from the contents of the KVSDir object if other changes are being made to the directory concurrently.
After the commit method returns, updated keys can be accessed by other clients on the same broker rank. Other broker ranks are eventually consistent.
- commit_async(flags=0)
Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.
- directories()
Get list of directories in basedir
- exists(name)
Evaluate if key exists in the basedir
- files()
Get list of files in basedir
- fill(contents: Mapping[str, Any])
Populate this directory with keys specified by contents
- Parameters
contents -- A dict of keys and values to be created in the directory or None, sub-directories can be created by using
dir.file
syntax, sub-dicts will be stored as json values in a single key
- key_at(key)
Get full path to KVS key
- list_all()
Get tuple with list of files and directories in basedir
- mkdir(key: str, contents: Mapping[str, Any] = None)
Create a new sub-directory, optionally pre-populated by contents, as would be done with
fill(contents)
- Parameters
key -- Key of the directory to be created
contents -- A dict of keys and values to be created in the directory or None, sub-directories can be created by using dir.file syntax, sub-dicts will be stored as json values in a single key
- class flux.kvs.KVSTxn(flux_handle=None, path='.', namespace=None)
Bases:
object
KVS Transaction Object
Stage changes to the KVS. When all changes have been placed within the transaction, use commit() to finalize the transaction. Can be used as a context manager and commits will be handled at exit. e.g.
- with KVSTxn(handle, "basedirectory") as kt:
kt.put("a", 1)
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
path -- Optional base path for all writes to be relative to (default ".")
namespace -- Optional namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- clear()
- commit(flags=0)
Commit changes to the KVS
When keys are added, removed, or updated in the KVSTxn object, the changes are only cached in memory until the commit method asks the KVS service to make them permanent.
After the commit method returns, updated keys can be accessed by other clients on the same broker rank. Other broker ranks are eventually consistent.
- commit_async(flags=0)
Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.
- mkdir(key)
Create a directory in the KVS
- put(key, value)
Put key=value in the KVS
- symlink(key, target)
Create a symlink in the KVS
- unlink(key)
Unlink key in the KVS
- class flux.kvs.KVSWatchFuture(future_handle)
Bases:
WatchImplementation
A future returned from kvs_watch_async().
- watch_cancel(future)
Implementation of watch_cancel() for KVSWatchFuture.
Will be called from WatchABC.cancel()
- watch_get(future)
Implementation of watch_get() for KVSWatchFuture.
Will be called from WatchABC.get()
- class flux.kvs.KVSWrapper(ffi, lib, handle=None, match=None, filter_match=True, prefixes=(), destructor=None)
Bases:
Wrapper
- flux_kvsitr_next(*args, **kwargs)
- class flux.kvs.WatchImplementation(future_handle)
Bases:
Future
,ABC
Interface for KVS based watchers
Users to implement watch_get() and watch_cancel() functions.
- cancel(stop=False)
Cancel a streaming future
If stop=True, then deactivate the multi-response future so no further callbacks are called.
- get(autoreset=True)
Return the new value or None if the stream has terminated.
The future is auto-reset unless autoreset=False, so a subsequent call to get() will try to fetch the next value and thus may block.
- abstract watch_cancel(future)
- abstract watch_get(future)
- flux.kvs.commit(flux_handle, flags: int = 0, namespace=None, _kvstxn=None)
Commit changes to the KVS
Must be called after put(), put_mkdir(), put_unlink(), or put_symlink() to write staged changes to the KVS.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)
namespace -- namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- flux.kvs.commit_async(flux_handle, flags: int = 0, namespace=None, _kvstxn=None)
Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.
Must be called after put(), put_mkdir(), put_unlink(), or put_symlink() to write staged changes to the KVS.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)
namespace -- namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- Returns
a future fulfilled when the commit RPC returns
- Return type
- flux.kvs.dropcache(flux_handle)
Drop KVS cache entries
Inform KVS module to drop cache entries without a reference.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
- flux.kvs.exists(flux_handle, key, namespace=None)
Determine if key exists
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- key to check for existence
- Returns
True if key exists, False if not namespace: namespace to read from, defaults to None. If namespace
is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- Return type
bool
- flux.kvs.get(flux_handle, key, namespace=None, _kvstxn=None)
Get KVS directory
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- key to get
namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- Returns
If value is decodeable by json.loads(), the decoded result is returned. If the value is a legal utf-8 decodable string, it is returned as a string. Otherwise, the value is returned as a bytes array.
- flux.kvs.get_dir(flux_handle, key='.', namespace=None, _kvstxn=None)
Get KVS directory
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- directory name (default ".")
namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- Returns
object representing directory
- Return type
- flux.kvs.get_key_direct(flux_handle, key, namespace=None)
- flux.kvs.isdir(flux_handle, key, namespace=None)
Determine if key is a directory
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- key to check if it is a directory
- Returns
True if key is a directory, False if not namespace: namespace to read from, defaults to None. If namespace
is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
- Return type
bool
- flux.kvs.join(*args)
Convenience function for use with walk(), similar to os.path.join()
- flux.kvs.kvs_watch_async(flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False)
Asynchronously get KVS updates for a key
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- the key on which to watch
namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.
waitcreate -- If True and a key does not yet exist, will wait for it to exit. Defaults to False.
uniq -- If True, only different values will be returned by watch. Defaults to False.
full -- If True, any change that can affect the key is monitored. Typically, this is to capture when a parent directory is removed or altered in some way. Typically kvs watch will not detect this as the exact key has not been changed. Defaults to False.
- Returns
- a KVSWatchFuture object. Call .get() from the then
callback to get the currently returned value from the Future object.
- Return type
- flux.kvs.namespace_create(flux_handle, namespace, owner=1005, flags: int = 0)
Create KVS Namespace
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)
namespace -- namespace to create
owner -- uid of namespace owner, defaults to caller uid
flags -- currently unused, defaults to 0
- flux.kvs.namespace_list(flux_handle)
Get list of KVS Namespace
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
- Returns
list of strings with names of namespaces
- Return type
list
- flux.kvs.namespace_remove(flux_handle, namespace)
Remove KVS Namespace
Namespace is removed in background. Caller cannot be certain of its removal after this function returns.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
namespace -- namespace to remove
- flux.kvs.put(flux_handle, key, value, _kvstxn=None)
Put data into the KVS
Internally will stage changes until commit() is called.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- key to write to
value -- value of the key
- flux.kvs.put_mkdir(flux_handle, key, _kvstxn=None)
Create directory in the KVS
Internally will stage changes until commit() is called.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- directory to create
- flux.kvs.put_symlink(flux_handle, key, target, _kvstxn=None)
Create symlink in the KVS
Internally will stage changes until commit() is called.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- symlink name
target -- target symlink points to
- flux.kvs.put_unlink(flux_handle, key, _kvstxn=None)
Unlink key in the KVS
Internally will stage changes until commit() is called.
- Parameters
flux_handle -- A Flux handle obtained from flux.Flux()
key -- key to delete
- flux.kvs.walk(directory, topdown=False, flux_handle=None, namespace=None)
Walk a directory in the style of os.walk()
- Parameters
directory -- A path or KVSDir object
topdown -- Specify True for current directory to be listed before subdirectories.
flux_handle -- Required if "directory" is a path.
namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.