flux.kvs module

class flux.kvs.KVSDir(flux_handle=None, path='.', handle=None, namespace=None, _kvstxn=None)

Bases: WrapperPimpl, MutableMapping

User friendly class for KVS operations

KVS values can be read or written through this class's item accessor. e.g.

mydir = KVSDir(flux_handle) print(mydir["mykey"])

mydir["newkey"] = "foo" mydir.commit()

Any KVS directories accessed through the item accessor will share the same internal KVS transaction, so that only a single call to commit() is necessary. e.g.

mydir = KVSDir(flux_handle) subdir = mydir["subdir"] subdir["anotherkey"] = "bar" mydir.commit()

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • path -- Optional base path for all read/write to be relative to (default ".")

  • namespace -- Optional namespace to read/write from/to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

class InnerWrapper(flux_handle=None, path='.', handle=None, namespace=None)

Bases: Wrapper

class KVSDirIterator(kvsdir)

Bases: Iterator

next()
commit(flags=0)

Commit changes to the KVS

When keys are added, removed, or updated in the KVSDir object, the changes are only cached in memory until the commit method asks the KVS service to make them permanent. The commit method only includes keys that have been explicitly updated in the KVSDir object, and the contents of the KVS directory may diverge from the contents of the KVSDir object if other changes are being made to the directory concurrently.

After the commit method returns, updated keys can be accessed by other clients on the same broker rank. Other broker ranks are eventually consistent.

commit_async(flags=0)

Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.

directories()

Get list of directories in basedir

exists(name)

Evaluate if key exists in the basedir

files()

Get list of files in basedir

fill(contents: Mapping[str, Any])

Populate this directory with keys specified by contents

Parameters

contents -- A dict of keys and values to be created in the directory or None, sub-directories can be created by using dir.file syntax, sub-dicts will be stored as json values in a single key

key_at(key)

Get full path to KVS key

list_all()

Get tuple with list of files and directories in basedir

mkdir(key: str, contents: Mapping[str, Any] = None)

Create a new sub-directory, optionally pre-populated by contents, as would be done with fill(contents)

Parameters
  • key -- Key of the directory to be created

  • contents -- A dict of keys and values to be created in the directory or None, sub-directories can be created by using dir.file syntax, sub-dicts will be stored as json values in a single key

class flux.kvs.KVSTxn(flux_handle=None, path='.', namespace=None)

Bases: object

KVS Transaction Object

Stage changes to the KVS. When all changes have been placed within the transaction, use commit() to finalize the transaction. Can be used as a context manager and commits will be handled at exit. e.g.

with KVSTxn(handle, "basedirectory") as kt:

kt.put("a", 1)

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • path -- Optional base path for all writes to be relative to (default ".")

  • namespace -- Optional namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

clear()
commit(flags=0)

Commit changes to the KVS

When keys are added, removed, or updated in the KVSTxn object, the changes are only cached in memory until the commit method asks the KVS service to make them permanent.

After the commit method returns, updated keys can be accessed by other clients on the same broker rank. Other broker ranks are eventually consistent.

commit_async(flags=0)

Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.

mkdir(key)

Create a directory in the KVS

put(key, value)

Put key=value in the KVS

Create a symlink in the KVS

Unlink key in the KVS

class flux.kvs.KVSWatchFuture(future_handle)

Bases: WatchImplementation

A future returned from kvs_watch_async().

watch_cancel(future)

Implementation of watch_cancel() for KVSWatchFuture.

Will be called from WatchABC.cancel()

watch_get(future)

Implementation of watch_get() for KVSWatchFuture.

Will be called from WatchABC.get()

class flux.kvs.KVSWrapper(ffi, lib, handle=None, match=None, filter_match=True, prefixes=(), destructor=None)

Bases: Wrapper

flux_kvsitr_next(*args, **kwargs)
class flux.kvs.WatchImplementation(future_handle)

Bases: Future, ABC

Interface for KVS based watchers

Users to implement watch_get() and watch_cancel() functions.

cancel(stop=False)

Cancel a streaming future

If stop=True, then deactivate the multi-response future so no further callbacks are called.

get(autoreset=True)

Return the new value or None if the stream has terminated.

The future is auto-reset unless autoreset=False, so a subsequent call to get() will try to fetch the next value and thus may block.

abstract watch_cancel(future)
abstract watch_get(future)
flux.kvs.commit(flux_handle, flags: int = 0, namespace=None, _kvstxn=None)

Commit changes to the KVS

Must be called after put(), put_mkdir(), put_unlink(), or put_symlink() to write staged changes to the KVS.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)

  • namespace -- namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

flux.kvs.commit_async(flux_handle, flags: int = 0, namespace=None, _kvstxn=None)

Commit changes to the KVS. Identical to commit(), but returns a Future to wait on for RPC to complete.

Must be called after put(), put_mkdir(), put_unlink(), or put_symlink() to write staged changes to the KVS.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)

  • namespace -- namespace to write to, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

Returns

a future fulfilled when the commit RPC returns

Return type

Future

flux.kvs.dropcache(flux_handle)

Drop KVS cache entries

Inform KVS module to drop cache entries without a reference.

Parameters

flux_handle -- A Flux handle obtained from flux.Flux()

flux.kvs.exists(flux_handle, key, namespace=None)

Determine if key exists

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- key to check for existence

Returns

True if key exists, False if not namespace: namespace to read from, defaults to None. If namespace

is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

Return type

bool

flux.kvs.get(flux_handle, key, namespace=None, _kvstxn=None)

Get KVS directory

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- key to get

  • namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

Returns

If value is decodeable by json.loads(), the decoded result is returned. If the value is a legal utf-8 decodable string, it is returned as a string. Otherwise, the value is returned as a bytes array.

flux.kvs.get_dir(flux_handle, key='.', namespace=None, _kvstxn=None)

Get KVS directory

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- directory name (default ".")

  • namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

Returns

object representing directory

Return type

KVSDir

flux.kvs.get_key_direct(flux_handle, key, namespace=None)
flux.kvs.isdir(flux_handle, key, namespace=None)

Determine if key is a directory

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- key to check if it is a directory

Returns

True if key is a directory, False if not namespace: namespace to read from, defaults to None. If namespace

is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

Return type

bool

flux.kvs.join(*args)

Convenience function for use with walk(), similar to os.path.join()

flux.kvs.kvs_watch_async(flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False)

Asynchronously get KVS updates for a key

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- the key on which to watch

  • namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.

  • waitcreate -- If True and a key does not yet exist, will wait for it to exit. Defaults to False.

  • uniq -- If True, only different values will be returned by watch. Defaults to False.

  • full -- If True, any change that can affect the key is monitored. Typically, this is to capture when a parent directory is removed or altered in some way. Typically kvs watch will not detect this as the exact key has not been changed. Defaults to False.

Returns

a KVSWatchFuture object. Call .get() from the then

callback to get the currently returned value from the Future object.

Return type

KVSWatchFuture

flux.kvs.namespace_create(flux_handle, namespace, owner=1005, flags: int = 0)

Create KVS Namespace

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • flags -- defaults to 0, possible flag options: flux.constants.FLUX_KVS_NO_MERGE - disallow merging of different commits flux.constants.FLUX_KVS_TXN_COMPACT - if possible compact changes flux.constants.FLUX_KVS_SYNC - flush & checkpoint commit (only against primary KVS)

  • namespace -- namespace to create

  • owner -- uid of namespace owner, defaults to caller uid

  • flags -- currently unused, defaults to 0

flux.kvs.namespace_list(flux_handle)

Get list of KVS Namespace

Parameters

flux_handle -- A Flux handle obtained from flux.Flux()

Returns

list of strings with names of namespaces

Return type

list

flux.kvs.namespace_remove(flux_handle, namespace)

Remove KVS Namespace

Namespace is removed in background. Caller cannot be certain of its removal after this function returns.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • namespace -- namespace to remove

flux.kvs.put(flux_handle, key, value, _kvstxn=None)

Put data into the KVS

Internally will stage changes until commit() is called.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- key to write to

  • value -- value of the key

flux.kvs.put_mkdir(flux_handle, key, _kvstxn=None)

Create directory in the KVS

Internally will stage changes until commit() is called.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- directory to create

Create symlink in the KVS

Internally will stage changes until commit() is called.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- symlink name

  • target -- target symlink points to

Unlink key in the KVS

Internally will stage changes until commit() is called.

Parameters
  • flux_handle -- A Flux handle obtained from flux.Flux()

  • key -- key to delete

flux.kvs.walk(directory, topdown=False, flux_handle=None, namespace=None)

Walk a directory in the style of os.walk()

Parameters
  • directory -- A path or KVSDir object

  • topdown -- Specify True for current directory to be listed before subdirectories.

  • flux_handle -- Required if "directory" is a path.

  • namespace -- namespace to read from, defaults to None. If namespace is None, the namespace specified in the FLUX_KVS_NAMESPACE environment variable will be used. If FLUX_KVS_NAMESPACE is not set, the primary namespace will be used.