flux_rpc(3)

SYNOPSIS

#include <flux/core.h>

flux_future_t *flux_rpc (flux_t *h,
                         const char *topic,
                         const char *s,
                         uint32_t nodeid,
                         int flags);

flux_future_t *flux_rpc_pack (flux_t *h,
                              const char *topic,
                              uint32_t nodeid,
                              int flags,
                              const char *fmt,
                              ...);

flux_future_t *flux_rpc_raw (flux_t *h,
                             const char *topic,
                             const void *data,
                             int len,
                             uint32_t nodeid,
                             int flags);

flux_future_t *flux_rpc_message (flux_t *h,
                                 const flux_msg_t *msg,
                                 uint32_t nodeid,
                                 int flags);

int flux_rpc_get (flux_future_t *f, const char **s);

int flux_rpc_get_unpack (flux_future_t *f, const char *fmt, ...);

int flux_rpc_get_raw (flux_future_t *f,
                      const void **data,
                      int *len);

uint32_t flux_rpc_get_matchtag (flux_future_t *f);

uint32_t flux_rpc_get_nodeid (flux_future_t *f);

Link with -lflux-core.

DESCRIPTION

A remote procedure call (RPC) consists of a matched request and response message exchanged with a Flux service. flux_rpc(), flux_rpc_pack(), and flux_rpc_raw() encode and send a request message via Flux broker handle h to a Flux service identified by topic and nodeid. A flux_future_t object is returned which acts as a handle for synchronization and a container for the response message which in turn contains the RPC result.

A lower-level variant of flux_rpc(), flux_rpc_message() accepts a pre-created request message, assigning nodeid and matchtag according to flags.

flux_future_then(3) may be used to register a reactor callback (continuation) to be called once the response has been received. flux_future_wait_for(3) may be used to block until the response has been received. Both accept an optional timeout.

flux_rpc_get(), flux_rpc_get_unpack(), and flux_rpc_get_raw() decode the RPC result. Internally, they call flux_future_get(3) to access the response message stored in the future. If the response message has not yet been received, these functions block until it is, or an error occurs.

flux_rpc_get_matchtag() and flux_rpc_get_nodeid() are accessors which allow access to the RPC matchtag and target nodeid from the future returned from flux_rpc().

REQUEST OPTIONS

The request message is encoded and sent with or without a payload using one of the three flux_rpc() variants.

flux_rpc() attaches s, a NULL terminated string, as request payload. If NULL, the request is encoded without a payload.

flux_rpc_pack() attaches a JSON payload encoded as a NULL terminated string using Jansson json_pack() style arguments (see below).

flux_rpc_raw() attaches a raw payload data of length len, in bytes. If data is NULL, the request is encoded without a payload.

nodeid affects request routing, and must be set to one of the following values:

FLUX_NODEID_ANY

The request is routed to the first matching service instance.

FLUX_NODEID_UPSTREAM

The request is routed to the first matching service instance, skipping over the sending rank.

integer

The request is routed to a specific rank.

flags may be zero or:

FLUX_RPC_NORESPONSE

No response is expected. The request will not be assigned a matchtag, and the returned flux_future_t is immediately fulfilled, and may simply be destroyed.

FLUX_RPC_STREAMING

The RPC is for a service that may send zero or more non-error responses, and a final error response. ENODATA should be interpreted as a non-error end-of-stream sentinel.

RESPONSE OPTIONS

The response message is stored in the future when the future is fulfilled. At that time it is decoded with flux_response_decode(3). If it cannot be decoded, or if the service returned an error, the future is fulfilled with an error. Otherwise it is fulfilled with the response message. If there was an error, flux_future_get(3) or the flux_rpc_get() variants return an error.

flux_rpc_get() sets s (if non-NULL) to the NULL-terminated string payload contained in the RPC response. If there was no payload, s is set to NULL.

flux_rpc_get_unpack() decodes the NULL-terminated string payload as JSON using Jansson json_unpack() style arguments (see below). It is an error if there is no payload, or if the payload is not JSON.

flux_rpc_get_raw() assigns the raw payload of the RPC response message to data and its length to len. If there is no payload, this function will fail.

PREMATURE DESTRUCTION

If a regular RPC future is destroyed before its response is received, the matchtag allocated to it is not immediately returned to the pool for reuse. If an unclaimed response subsequently arrives with that matchtag, it is returned to the pool then.

If a streaming RPC future is destroyed before its terminating response is received, its matchtag is only returned to the pool when an unclaimed error response is received. Non-error responses are ignored.

It is essential that services which return multiple responses verify that requests were made with the FLUX_RPC_STREAMING flag and return an immediate EPROTO error if they were not. See flux_respond(3).

CANCELLATION

Flux RFC 6 does not currently specify a cancellation protocol for an individual RPC, but does stipulate that an RPC may be canceled if a disconnect message is received, as is automatically generated by the local connector upon client disconnection.

ENCODING JSON PAYLOADS

Flux API functions that are based on Jansson's json_pack() accept the following tokens in their format string. The type in parenthesis denotes the resulting JSON type, and the type in brackets (if any) denotes the C type that is expected as the corresponding argument or arguments.

s (string)['const char *']

Convert a null terminated UTF-8 string to a JSON string.

s? (string)['const char *']

Like s, but if the argument is NULL, outputs a JSON null value.

s# (string)['const char *', 'int']

Convert a UTF-8 buffer of a given length to a JSON string.

s% (string)['const char *', 'size_t']

Like s# but the length argument is of type size_t.

+ ['const char *']

Like s, but concatenate to the previous string. Only valid after a string.

+# ['const char *', 'int']

Like s#, but concatenate to the previous string. Only valid after a string.

+% ['const char *', 'size_t']

Like +#, but the length argument is of type size_t.

n (null)

Output a JSON null value. No argument is consumed.

b (boolean)['int']

Convert a C int to JSON boolean value. Zero is converted to false and non-zero to true.

i (integer)['int']

Convert a C int to JSON integer.

I (integer)['int64_t']

Convert a C int64_t to JSON integer. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.

f (real)['double']

Convert a C double to JSON real.

o (any value)['json_t *']

Output any given JSON value as-is. If the value is added to an array or object, the reference to the value passed to o is stolen by the container.

O (any value)['json_t *']

Like o, but the argument's reference count is incremented. This is useful if you pack into an array or object and want to keep the reference for the JSON value consumed by O to yourself.

o?, O? (any value)['json_t *']

Like o and O, respectively, but if the argument is NULL, output a JSON null value.

[fmt] (array)

Build an array with contents from the inner format string. fmt may contain objects and arrays, i.e. recursive value building is supported.

{fmt} (object)

Build an object with contents from the inner format string fmt. The first, third, etc. format specifier represent a key, and must be a string as object keys are always strings. The second, fourth, etc. format specifier represent a value. Any value may be an object or array, i.e. recursive value building is supported.

Whitespace, : (colon) and , (comma) are ignored.

These descriptions came from the Jansson 2.9 manual.

See also: Jansson API: Building Values: http://jansson.readthedocs.io/en/2.9/apiref.html#building-values

DECODING JSON PAYLOADS

Flux API functions that are based on Jansson's json_unpack() accept the following tokens in their format string. The type in parenthesis denotes the resulting JSON type, and the type in brackets (if any) denotes the C type that is expected as the corresponding argument or arguments.

s (string)['const char *']

Convert a JSON string to a pointer to a null terminated UTF-8 string. The resulting string is extracted by using 'json_string_value()' internally, so it exists as long as there are still references to the corresponding JSON string.

s% (string)['const char *', 'size_t']

Convert a JSON string to a pointer to a null terminated UTF-8 string and its length.

n (null)

Expect a JSON null value. Nothing is extracted.

b (boolean)['int']

Convert a JSON boolean value to a C int, so that true is converted to 1 and false to 0.

i (integer)['int']

Convert a JSON integer to a C int.

I (integer)['int64_t']

Convert a JSON integer to a C int64_t. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.

f (real)['double']

Convert JSON real to a C double.

F (real)['double']

Convert JSON number (integer or real) to a C double.

o (any value)['json_t *']

Store a JSON value, with no conversion, to a json_t pointer.

O (any value)['json_t *']

Like o, but the JSON value's reference count is incremented.

[fmt] (array)

Convert each item in the JSON array according to the inner format string. fmt may contain objects and arrays, i.e. recursive value extraction is supported.

{fmt} (object)

Convert each item in the JSON object according to the inner format string fmt. The first, third, etc. format specifier represent a key, and must by s. The corresponding argument to unpack functions is read as the object key. The second, fourth, etc. format specifier represent a value and is written to the address given as the corresponding argument. Note that every other argument is read from and every other is written to. fmt may contain objects and arrays as values, i.e. recursive value extraction is supported. Any s representing a key may be suffixed with ? to make the key optional. If the key is not found, nothing is extracted.

!

This special format specifier is used to enable the check that all object and array items are accessed, on a per-value basis. It must appear inside an array or object as the last format specifier before the closing bracket or brace.

Whitespace, : (colon) and , (comma) are ignored.

These descriptions came from the Jansson 2.9 manual.

See also: Jansson API: Parsing and Validating Values: http://jansson.readthedocs.io/en/2.9/apiref.html#parsing-and-validating-values

RETURN VALUE

flux_rpc(), flux_rpc_pack(), and flux_rpc_raw() return a flux_future_t object on success. On error, NULL is returned, and errno is set appropriately.

flux_rpc_get(), flux_rpc_get_unpack(), and flux_rpc_get_raw() return zero on success. On error, -1 is returned, and errno is set appropriately.

flux_rpc_get_matchtag() returns the matchtag allocated to the particular RPC request, or FLUX_MATCHTAG_NONE if no matchtag was allocated (e.g. no response is expected), or the future argument does not correspond to an RPC.

flux_rpc_get_nodeid() returns the original nodeid target of the flux_rpc() request, including if the RPC was targeted to FLUX_NODEID_ANY or FLUX_NODEID_UPSTREAM.

ERRORS

ENOSYS

Service is not available (misspelled topic string, module not loaded, etc), or flux_t handle has no send operation.

EINVAL

Some arguments were invalid.

EPROTO

A request was malformed, the FLUX_RPC_STREAMING flag was omitted on a request to a service that may send multiple responses, or other protocol error occurred.

EXAMPLES

This example performs a synchronous RPC with the broker's "attr.get" service to obtain the broker's rank.

#include <flux/core.h>
#include "die.h"

int main (int argc, char **argv)
{
    flux_t *h;
    flux_future_t *f;
    const char *rankstr;

    if (!(h = flux_open (NULL, 0)))
        die ("could not connect to broker");

    if (!(f = flux_rpc_pack (h,
                             "attr.get",
			     FLUX_NODEID_ANY,
			     0,
		             "{s:s}",
			     "name", "rank")))
        die ("error sending attr.get request");

    if (flux_rpc_get_unpack (f,
			     "{s:s}",
			     "value", &rankstr) < 0)
        die ("error fetching rank");

    printf ("rank is %s\n", rankstr);

    flux_future_destroy (f);
    flux_close (h);
    return (0);
}

This example registers a continuation to do the same thing asynchronously.

#include <flux/core.h>
#include "die.h"

void continuation (flux_future_t *f, void *arg)
{
    const char *rankstr;

    if (flux_rpc_get_unpack (f, "{s:s}", "value", &rankstr) < 0)
        die ("error getting rank");

    printf ("rank is %s\n", rankstr);
    flux_future_destroy (f);
}

int main (int argc, char **argv)
{
    flux_t *h;
    flux_future_t *f;

    if (!(h = flux_open (NULL, 0)))
        die ("could not connect to broker");

    if (!(f = flux_rpc_pack (h,
                             "attr.get",
			     FLUX_NODEID_ANY,
			     0,
		             "{s:s}",
			     "name", "rank"))
    	|| flux_future_then (f, -1., continuation, NULL) < 0)
        die ("error sending attr.get request");

    if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
        die ("reactor meltdown");

    flux_close (h);
    return (0);
}

RESOURCES

Flux: http://flux-framework.org

Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc

FLUX RFC

6/Flux Remote Procedure Call Protocol

SEE ALSO

flux_future_get(3), flux_respond(3)