flux_rpc(3)¶
SYNOPSIS¶
#include <flux/core.h>
flux_future_t *flux_rpc (flux_t *h, const char *topic,
const char *s,
uint32_t nodeid, int flags);
flux_future_t *flux_rpc_pack (flux_t *h, const char *topic,
uint32_t nodeid, int flags,
const char *fmt, ...);
flux_future_t *flux_rpc_raw (flux_t *h, const char *topic,
const void *data, int len,
uint32_t nodeid, int flags);
flux_future_t *flux_rpc_message (flux_t *h,
const flux_msg_t *msg,
uint32_t nodeid, int flags);
int flux_rpc_get (flux_future_t *f, const char **s);
int flux_rpc_get_unpack (flux_future_t *f, const char *fmt, ...);
int flux_rpc_get_raw (flux_future_t *f,
const void **data, int *len);
uint32_t flux_rpc_get_matchtag (flux_future_t *f);
uint32_t flux_rpc_get_nodeid (flux_future_t *f);
DESCRIPTION¶
A remote procedure call (RPC) consists of a matched request and
response message exchanged with a Flux service. flux_rpc()
,
flux_rpc_pack()
, and flux_rpc_raw()
encode and send a request message
via Flux broker handle h to a Flux service identified by topic
and nodeid. A flux_future_t
object is returned which acts as a handle
for synchronization and a container for the response message which in
turn contains the RPC result.
A lower-level variant of flux_rpc()
, flux_rpc_message()
accepts a
pre-created request message, assigning nodeid and matchtag according
to flags.
flux_future_then(3) may be used to register a reactor callback (continuation) to be called once the response has been received. flux_future_wait_for(3) may be used to block until the response has been received. Both accept an optional timeout.
flux_rpc_get()
, flux_rpc_get_unpack()
, and flux_rpc_get_raw()
decode the RPC result. Internally, they call flux_future_get()
to access the response message stored in the future. If the response
message has not yet been received, these functions block until it is,
or an error occurs.
flux_rpc_get_matchtag()
and flux_rpc_get_nodeid()
are accessors
which allow access to the RPC matchtag and target nodeid from the
future returned from flux_rpc(3)
.
REQUEST OPTIONS¶
The request message is encoded and sent with or without a payload
using one of the three flux_rpc()
variants.
flux_rpc()
attaches s, a NULL terminated string, as request
payload. If NULL, the request is encoded without a payload.
flux_rpc_pack()
attaches a JSON payload encoded as a NULL terminated
string using Jansson json_pack()
style arguments (see below).
flux_rpc_raw()
attaches a raw payload data of length len, in bytes.
If data is NULL, the request is encoded without a payload.
nodeid affects request routing, and must be set to one of the following values:
- FLUX_NODEID_ANY
The request is routed to the first matching service instance.
- FLUX_NODEID_UPSTREAM
The request is routed to the first matching service instance, skipping over the sending rank.
- integer
The request is routed to a specific rank.
flags may be zero or:
- FLUX_RPC_NORESPONSE
No response is expected. The request will not be assigned a matchtag, and the returned flux_future_t is immediately fulfilled, and may simply be destroyed.
- FLUX_RPC_STREAMING
The RPC is for a service that may send zero or more non-error responses, and a final error response. ENODATA should be interpreted as a non-error end-of-stream sentinel.
RESPONSE OPTIONS¶
The response message is stored in the future when the future is fulfilled.
At that time it is decoded with flux_response_decode(3). If it cannot
be decoded, or if the service returned an error, the future is fulfilled
with an error. Otherwise it is fulfilled with the response message.
If there was an error, flux_future_get()
or the flux_rpc_get()
variants
return an error.
flux_rpc_get()
sets s (if non-NULL) to the NULL-terminated string
payload contained in the RPC response. If there was no payload, s
is set to NULL.
flux_rpc_get_unpack()
decodes the NULL-terminated string payload as JSON
using Jansson json_unpack()
style arguments (see below). It is an error
if there is no payload, or if the payload is not JSON.
flux_rpc_get_raw()
assigns the raw payload of the RPC response message
to data and its length to len. If there is no payload, this function
will fail.
PREMATURE DESTRUCTION¶
If a regular RPC future is destroyed before its response is received, the matchtag allocated to it is not immediately returned to the pool for reuse. If an unclaimed response subsequently arrives with that matchtag, it is returned to the pool then.
If a streaming RPC future is destroyed before its terminating response is received, its matchtag is only returned to the pool when an unclaimed error response is received. Non-error responses are ignored.
It is essential that services which return multiple responses verify that requests were made with the FLUX_RPC_STREAMING flag and return an immediate EPROTO error if they were not. See flux_respond(3).
CANCELLATION¶
Flux RFC 6 does not currently specify a cancellation protocol for an individual RPC, but does stipulate that an RPC may be canceled if a disconnect message is received, as is automatically generated by the local connector upon client disconnection.
ENCODING JSON PAYLOADS¶
Flux API functions that are based on Jansson's json_pack() accept the following tokens in their format string. The type in parenthesis denotes the resulting JSON type, and the type in brackets (if any) denotes the C type that is expected as the corresponding argument or arguments.
- s (string)['const char *']
Convert a null terminated UTF-8 string to a JSON string.
- s? (string)['const char *']
Like s, but if the argument is NULL, outputs a JSON null value.
- s# (string)['const char *', 'int']
Convert a UTF-8 buffer of a given length to a JSON string.
- s% (string)['const char *', 'size_t']
Like s# but the length argument is of type size_t.
- + ['const char *']
Like s, but concatenate to the previous string. Only valid after a string.
- +# ['const char *', 'int']
Like s#, but concatenate to the previous string. Only valid after a string.
- +% ['const char *', 'size_t']
Like +#, but the length argument is of type size_t.
- n (null)
Output a JSON null value. No argument is consumed.
- b (boolean)['int']
Convert a C int to JSON boolean value. Zero is converted to false and non-zero to true.
- i (integer)['int']
Convert a C int to JSON integer.
- I (integer)['int64_t']
Convert a C int64_t to JSON integer. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.
- f (real)['double']
Convert a C double to JSON real.
- o (any value)['json_t *']
Output any given JSON value as-is. If the value is added to an array or object, the reference to the value passed to o is stolen by the container.
- O (any value)['json_t *']
Like o, but the argument's reference count is incremented. This is useful if you pack into an array or object and want to keep the reference for the JSON value consumed by O to yourself.
- o?, O? (any value)['json_t *']
Like o and O, respectively, but if the argument is NULL, output a JSON null value.
- [fmt] (array)
Build an array with contents from the inner format string. fmt may contain objects and arrays, i.e. recursive value building is supported.
- {fmt} (object)
Build an object with contents from the inner format string fmt. The first, third, etc. format specifier represent a key, and must be a string as object keys are always strings. The second, fourth, etc. format specifier represent a value. Any value may be an object or array, i.e. recursive value building is supported.
Whitespace, : (colon) and , (comma) are ignored.
These descriptions came from the Jansson 2.10 manual.
See also: Jansson API: Building Values: http://jansson.readthedocs.io/en/2.10/apiref.html#building-values
DECODING JSON PAYLOADS¶
Flux API functions that are based on Jansson's json_unpack() accept the following tokens in their format string. The type in parenthesis denotes the resulting JSON type, and the type in brackets (if any) denotes the C type that is expected as the corresponding argument or arguments.
- s (string)['const char *']
Convert a JSON string to a pointer to a null terminated UTF-8 string. The resulting string is extracted by using 'json_string_value()' internally, so it exists as long as there are still references to the corresponding JSON string.
- s% (string)['const char *', 'size_t']
Convert a JSON string to a pointer to a null terminated UTF-8 string and its length.
- n (null)
Expect a JSON null value. Nothing is extracted.
- b (boolean)['int']
Convert a JSON boolean value to a C int, so that true is converted to 1 and false to 0.
- i (integer)['int']
Convert a JSON integer to a C int.
- I (integer)['int64_t']
Convert a JSON integer to a C int64_t. Note: Jansson expects a json_int_t here without committing to a size, but Flux guarantees that this is a 64-bit integer.
- f (real)['double']
Convert JSON real to a C double.
- F (real)['double']
Convert JSON number (integer or real) to a C double.
- o (any value)['json_t *']
Store a JSON value, with no conversion, to a json_t pointer.
- O (any value)['json_t *']
Like o, but the JSON value's reference count is incremented.
- [fmt] (array)
Convert each item in the JSON array according to the inner format string. fmt may contain objects and arrays, i.e. recursive value extraction is supported.
- {fmt} (object)
Convert each item in the JSON object according to the inner format string fmt. The first, third, etc. format specifier represent a key, and must by s. The corresponding argument to unpack functions is read as the object key. The second, fourth, etc. format specifier represent a value and is written to the address given as the corresponding argument. Note that every other argument is read from and every other is written to. fmt may contain objects and arrays as values, i.e. recursive value extraction is supported. Any s representing a key may be suffixed with ? to make the key optional. If the key is not found, nothing is extracted.
- !
This special format specifier is used to enable the check that all object and array items are accessed, on a per-value basis. It must appear inside an array or object as the last format specifier before the closing bracket or brace.
Whitespace, : (colon) and , (comma) are ignored.
These descriptions came from the Jansson 2.10 manual.
See also: Jansson API: Parsing and Validating Values: http://jansson.readthedocs.io/en/2.10/apiref.html#parsing-and-validating-values
RETURN VALUE¶
flux_rpc()
, flux_rpc_pack()
, and flux_rpc_raw()
return a flux_future_t
object on success. On error, NULL is returned, and errno is set appropriately.
flux_rpc_get()
, flux_rpc_get_unpack()
, and flux_rpc_get_raw()
return
zero on success. On error, -1 is returned, and errno is set appropriately.
flux_rpc_get_matchtag()
returns the matchtag allocated to the particular
RPC request, or FLUX_MATCHTAG_NONE
if no matchtag was allocated (e.g. no
response is expected), or the future argument does not correspond to an RPC.
flux_rpc_get_nodeid()
returns the original nodeid
target of the
flux_rpc()
request, including if the RPC was targeted to
FLUX_NODEID_ANY
or FLUX_NODEID_UPSTREAM
.
ERRORS¶
- ENOSYS
Service is not available (misspelled topic string, module not loaded, etc), or flux_t handle has no send operation.
- EINVAL
Some arguments were invalid.
- EPROTO
A request was malformed, the FLUX_RPC_STREAMING flag was omitted on a request to a service that may send multiple responses, or other protocol error occurred.
EXAMPLES¶
This example performs a synchronous RPC with the broker's "attr.get" service to obtain the broker's rank.
#include <flux/core.h>
#include "src/common/libutil/log.h"
int main (int argc, char **argv)
{
flux_t *h;
flux_future_t *f;
const char *rankstr;
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");
if (!(f = flux_rpc_pack (h, "attr.get", FLUX_NODEID_ANY, 0,
"{s:s}", "name", "rank")))
log_err_exit ("flux_rpc_pack");
if (flux_rpc_get_unpack (f, "{s:s}", "value", &rankstr) < 0)
log_err_exit ("flux_rpc_get_unpack");
printf ("rank is %s\n", rankstr);
flux_future_destroy (f);
flux_close (h);
return (0);
}
This example registers a continuation to do the same thing asynchronously.
#include <flux/core.h>
#include "src/common/libutil/log.h"
void continuation (flux_future_t *f, void *arg)
{
const char *rankstr;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &rankstr) < 0)
log_err_exit ("flux_rpc_get_unpack");
printf ("rank is %s\n", rankstr);
flux_future_destroy (f);
}
int main (int argc, char **argv)
{
flux_t *h;
flux_future_t *f;
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");
if (!(f = flux_rpc_pack (h, "attr.get", FLUX_NODEID_ANY, 0,
"{s:s}", "name", "rank")))
log_err_exit ("flux_rpc_pack");
if (flux_future_then (f, -1., continuation, NULL) < 0)
log_err_exit ("flux_future_then");
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
log_err_exit ("flux_reactor_run");
flux_close (h);
return (0);
}
RESOURCES¶
Flux: http://flux-framework.org
RFC 6: Flux Remote Procedure Call Protocol: https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_6.html