flux_rpc(3)
SYNOPSIS
#include <flux/core.h>
flux_future_t *flux_rpc (flux_t *h,
const char *topic,
const char *s,
uint32_t nodeid,
int flags);
flux_future_t *flux_rpc_pack (flux_t *h,
const char *topic,
uint32_t nodeid,
int flags,
const char *fmt,
...);
flux_future_t *flux_rpc_raw (flux_t *h,
const char *topic,
const void *data,
size_t len,
uint32_t nodeid,
int flags);
flux_future_t *flux_rpc_message (flux_t *h,
const flux_msg_t *msg,
uint32_t nodeid,
int flags);
int flux_rpc_get (flux_future_t *f, const char **s);
int flux_rpc_get_unpack (flux_future_t *f, const char *fmt, ...);
int flux_rpc_get_raw (flux_future_t *f,
const void **data,
size_t *len);
uint32_t flux_rpc_get_matchtag (flux_future_t *f);
uint32_t flux_rpc_get_nodeid (flux_future_t *f);
Link with -lflux-core.
DESCRIPTION
A remote procedure call (RPC) consists of a matched request and
response message exchanged with a Flux service. flux_rpc()
,
flux_rpc_pack()
, and flux_rpc_raw()
encode and send a request
message via Flux broker handle h
to a Flux service identified by
topic
and nodeid
. A flux_future_t
object is returned
which acts as a handle for synchronization and a container for the response
message which in turn contains the RPC result.
A lower-level variant of flux_rpc()
, flux_rpc_message()
accepts a
pre-created request message, assigning nodeid
and matchtag according
to flags
.
flux_future_then(3) may be used to register a reactor callback (continuation) to be called once the response has been received. flux_future_wait_for(3) may be used to block until the response has been received. Both accept an optional timeout.
flux_rpc_get()
, flux_rpc_get_unpack()
, and flux_rpc_get_raw()
decode the RPC result. Internally, they call flux_future_get(3)
to access the response message stored in the future. If the response
message has not yet been received, these functions block until it is,
or an error occurs.
flux_rpc_get_matchtag()
and flux_rpc_get_nodeid()
are accessors
which allow access to the RPC matchtag and target nodeid from the
future returned from flux_rpc()
.
REQUEST OPTIONS
The request message is encoded and sent with or without a payload
using one of the three flux_rpc()
variants.
flux_rpc()
attaches s
, a NULL terminated string, as request
payload. If NULL, the request is encoded without a payload.
flux_rpc_pack()
attaches a JSON payload encoded as a NULL terminated
string using Jansson json_pack()
style arguments (see below).
flux_rpc_raw()
attaches a raw payload data
of length len
,
in bytes. If data
is NULL, the request is encoded without a payload.
nodeid
affects request routing, and must be set to one of the following
values:
- FLUX_NODEID_ANY
The request is routed to the first matching service instance.
- FLUX_NODEID_UPSTREAM
The request is routed to the first matching service instance, skipping over the sending rank.
- integer
The request is routed to a specific rank.
flags
may be zero or:
- FLUX_RPC_NORESPONSE
No response is expected. The request will not be assigned a matchtag, and the returned
flux_future_t
is immediately fulfilled, and may simply be destroyed.- FLUX_RPC_STREAMING
The RPC is for a service that may send zero or more non-error responses, and a final error response. ENODATA should be interpreted as a non-error end-of-stream sentinel.
RESPONSE OPTIONS
The response message is stored in the future when the future is fulfilled.
At that time it is decoded with flux_response_decode(3). If it cannot
be decoded, or if the service returned an error, the future is fulfilled
with an error. Otherwise it is fulfilled with the response message.
If there was an error, flux_future_get(3) or the flux_rpc_get()
variants return an error.
flux_rpc_get()
sets s
(if non-NULL) to the NULL-terminated string
payload contained in the RPC response. If there was no payload, s
is set to NULL.
flux_rpc_get_unpack()
decodes the NULL-terminated string payload as JSON
using Jansson json_unpack()
style arguments (see below). It is an error
if there is no payload, or if the payload is not JSON.
flux_rpc_get_raw()
assigns the raw payload of the RPC response message
to data
and its length to len
. If there is no payload, this
function will fail.
PREMATURE DESTRUCTION
If a regular RPC future is destroyed before its response is received, the matchtag allocated to it is not immediately returned to the pool for reuse. If an unclaimed response subsequently arrives with that matchtag, it is returned to the pool then.
If a streaming RPC future is destroyed before its terminating response is received, its matchtag is only returned to the pool when an unclaimed error response is received. Non-error responses are ignored.
It is essential that services which return multiple responses verify that requests were made with the FLUX_RPC_STREAMING flag and return an immediate EPROTO error if they were not. See flux_respond(3).
CANCELLATION
Flux RFC 6 does not currently specify a cancellation protocol for an individual RPC, but does stipulate that an RPC may be canceled if a disconnect message is received, as is automatically generated by the local connector upon client disconnection.
ENCODING JSON PAYLOADS
Flux API functions that are based on Jansson's json_pack()
accept the following tokens in their format string.
The type in parenthesis denotes the resulting JSON type, and
the type in brackets (if any) denotes the C type that is expected as
the corresponding argument or arguments.
- s (string)['const char *']
Convert a null terminated UTF-8 string to a JSON string.
- s? (string)['const char *']
Like s, but if the argument is NULL, outputs a JSON null value.
- s# (string)['const char *', 'int']
Convert a UTF-8 buffer of a given length to a JSON string.
- s% (string)['const char *', 'size_t']
Like s# but the length argument is of type size_t.
- + ['const char *']
Like s, but concatenate to the previous string. Only valid after a string.
- +# ['const char *', 'int']
Like s#, but concatenate to the previous string. Only valid after a string.
- +% ['const char *', 'size_t']
Like +#, but the length argument is of type size_t.
- n (null)
Output a JSON null value. No argument is consumed.
- b (boolean)['int']
Convert a C int to JSON boolean value. Zero is converted to false and non-zero to true.
- i (integer)['int']
Convert a C int to JSON integer.
- I (integer)['int64_t']
Convert a C int64_t to JSON integer. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.
- f (real)['double']
Convert a C double to JSON real.
- o (any value)['json_t *']
Output any given JSON value as-is. If the value is added to an array or object, the reference to the value passed to o is stolen by the container.
- O (any value)['json_t *']
Like o, but the argument's reference count is incremented. This is useful if you pack into an array or object and want to keep the reference for the JSON value consumed by O to yourself.
- o?, O? (any value)['json_t *']
Like o and O, respectively, but if the argument is NULL, output a JSON null value.
- [fmt] (array)
Build an array with contents from the inner format string. fmt may contain objects and arrays, i.e. recursive value building is supported.
- {fmt} (object)
Build an object with contents from the inner format string fmt. The first, third, etc. format specifier represent a key, and must be a string as object keys are always strings. The second, fourth, etc. format specifier represent a value. Any value may be an object or array, i.e. recursive value building is supported.
Whitespace, : (colon) and , (comma) are ignored.
These descriptions came from the Jansson 2.9 manual.
See also: Jansson API: Building Values: http://jansson.readthedocs.io/en/2.9/apiref.html#building-values
DECODING JSON PAYLOADS
Flux API functions that are based on Jansson's json_unpack()
accept the following tokens in their format string.
The type in parenthesis denotes the resulting JSON type, and
the type in brackets (if any) denotes the C type that is expected as
the corresponding argument or arguments.
- s (string)['const char *']
Convert a JSON string to a pointer to a null terminated UTF-8 string. The resulting string is extracted by using 'json_string_value()' internally, so it exists as long as there are still references to the corresponding JSON string.
- s% (string)['const char *', 'size_t']
Convert a JSON string to a pointer to a null terminated UTF-8 string and its length.
- n (null)
Expect a JSON null value. Nothing is extracted.
- b (boolean)['int']
Convert a JSON boolean value to a C int, so that true is converted to 1 and false to 0.
- i (integer)['int']
Convert a JSON integer to a C int.
- I (integer)['int64_t']
Convert a JSON integer to a C int64_t. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.
- f (real)['double']
Convert JSON real to a C double.
- F (real)['double']
Convert JSON number (integer or real) to a C double.
- o (any value)['json_t *']
Store a JSON value, with no conversion, to a json_t pointer.
- O (any value)['json_t *']
Like o, but the JSON value's reference count is incremented.
- [fmt] (array)
Convert each item in the JSON array according to the inner format string. fmt may contain objects and arrays, i.e. recursive value extraction is supported.
- {fmt} (object)
Convert each item in the JSON object according to the inner format string fmt. The first, third, etc. format specifier represent a key, and must by s. The corresponding argument to unpack functions is read as the object key. The second, fourth, etc. format specifier represent a value and is written to the address given as the corresponding argument. Note that every other argument is read from and every other is written to. fmt may contain objects and arrays as values, i.e. recursive value extraction is supported. Any s representing a key may be suffixed with ? to make the key optional. If the key is not found, nothing is extracted.
- !
This special format specifier is used to enable the check that all object and array items are accessed, on a per-value basis. It must appear inside an array or object as the last format specifier before the closing bracket or brace.
Whitespace, : (colon) and , (comma) are ignored.
These descriptions came from the Jansson 2.9 manual.
See also: Jansson API: Parsing and Validating Values: http://jansson.readthedocs.io/en/2.9/apiref.html#parsing-and-validating-values
RETURN VALUE
flux_rpc()
, flux_rpc_pack()
, and flux_rpc_raw()
return a
flux_future_t
object on success. On error, NULL is returned, and
errno
is set appropriately.
flux_rpc_get()
, flux_rpc_get_unpack()
, and flux_rpc_get_raw()
return zero on success. On error, -1 is returned, and errno
is set
appropriately.
flux_rpc_get_matchtag()
returns the matchtag allocated to the particular
RPC request, or FLUX_MATCHTAG_NONE
if no matchtag was allocated (e.g. no
response is expected), or the future argument does not correspond to an RPC.
flux_rpc_get_nodeid()
returns the original nodeid
target of the
flux_rpc()
request, including if the RPC was targeted to
FLUX_NODEID_ANY
or FLUX_NODEID_UPSTREAM
.
ERRORS
- ENOSYS
Service is not available (misspelled topic string, module not loaded, etc), or
flux_t
handle has no send operation.- EINVAL
Some arguments were invalid.
- EPROTO
A request was malformed, the FLUX_RPC_STREAMING flag was omitted on a request to a service that may send multiple responses, or other protocol error occurred.
EXAMPLES
This example performs a synchronous RPC with the broker's "attr.get" service to obtain the broker's rank.
#include <flux/core.h>
#include "die.h"
int main (int argc, char **argv)
{
flux_t *h;
flux_future_t *f;
const char *rankstr;
if (!(h = flux_open (NULL, 0)))
die ("could not connect to broker");
if (!(f = flux_rpc_pack (h,
"attr.get",
FLUX_NODEID_ANY,
0,
"{s:s}",
"name", "rank")))
die ("error sending attr.get request");
if (flux_rpc_get_unpack (f,
"{s:s}",
"value", &rankstr) < 0)
die ("error fetching rank");
printf ("rank is %s\n", rankstr);
flux_future_destroy (f);
flux_close (h);
return (0);
}
This example registers a continuation to do the same thing asynchronously.
#include <flux/core.h>
#include "die.h"
void continuation (flux_future_t *f, void *arg)
{
const char *rankstr;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &rankstr) < 0)
die ("error getting rank");
printf ("rank is %s\n", rankstr);
flux_future_destroy (f);
}
int main (int argc, char **argv)
{
flux_t *h;
flux_future_t *f;
if (!(h = flux_open (NULL, 0)))
die ("could not connect to broker");
if (!(f = flux_rpc_pack (h,
"attr.get",
FLUX_NODEID_ANY,
0,
"{s:s}",
"name", "rank"))
|| flux_future_then (f, -1., continuation, NULL) < 0)
die ("error sending attr.get request");
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
die ("reactor meltdown");
flux_close (h);
return (0);
}
RESOURCES
Flux: http://flux-framework.org
Flux RFC: https://flux-framework.readthedocs.io/projects/flux-rfc
Issue Tracker: https://github.com/flux-framework/flux-core/issues