flux.abc package

class flux.abc.JournalConsumerBase(flux_handle, topic, cancel_topic=None, full=True, since=0.0, include_sentinel=False)

Bases: ABC

Base class for consuming events from a journal-style RPC

This base class implements a wrapper around journal style RPCs which allow clients to obtain a stream of events, possibly including historical data.

Subclasses of :obj`JournalConsumerBase` should be implemented to more completely handle the specifics of a given interface, but all such subclasses will follow the interface documented here.

A journal consumer is created by passing the constructor an open Flux handle, the topic string of the journal service along with any other optional parameters described below. The start() method should then be called, which sends the initial RPC. To stop the stream of events, call the stop() method. After all events have been processed poll() or the registered callback will return None.

When the consumer is first started, historical data (events in the past) will be sent from the journal service unless full is set to False. These events are stored until all historical events are processed, then are time ordered before returning them via poll() or to the callback registered by set_callback().

To avoid processing previously seen events with a new instance of this class, the timestamp of the newest processed event can be passed via the since parameter. Timestamps should be unique so poll() or the callback will start with the newest event after the since timestamp.

When full is True, a compliant journal service sends a sentinel event in the journal stream to demarcate the transition between history and current events. If include_sentinel is set to True, then an empty event will be returned by poll() or the registered callback to represent the sentinel. An empty event can be compared to self.SENTINEL_EVENT or by using the JournalEventBase.is_empty() method. The default behavior is to suppress the sentinel.

Subclasses of JournalConsumerBase must implement the following abstract methods, see method documentation for details:

Subclasses may optionally implement the following methods:

Finally, subclasses may optionally override the following properties:

  • request_payload, which is the payload of a journal request (the default is an empty payload)

  • SENTINEL_EVENT, which is a class constant representing the sentinel event for this class.

SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507981.9051635}
abstract create_event(entry, **kwargs)

Return a journal event object from a response entry

This method should return an event object given an individual eventlog entry and **kwargs returned from process_response().

Parameters
  • entry (dict) -- An individual event dictionary from the events array of a journal response.

  • **kwargs -- Additional keyword arguments returned from the process_response() method.

Returns

JournalEventBase or a subclass

is_empty_response(resp)

Return True if resp is "empty"

poll(timeout=-1.0)

Synchronously get the next journal event

if full is True, then this call will not return until all historical events have been processed. Historical events will sorted in time order and returned once per poll() call.

start() must be called before this function.

Parameters

timeout (float) -- Only wait timeout seconds for the next event. If the timeout expires then a TimeoutError is raised. A timeout of -1.0 disables any timeout.

Raises

RuntimeError -- poll() was called before start().

abstract process_response(resp)

Process a journal response.

This method may return a dictionary which is then passed as *kwargs to create_event() below.

Parameters

resp (dict) -- The payload of a single journal response (converted from json to a dict)

Returns

dict

property request_payload

Appropriate request payload for this journal RPC

set_callback(event_cb, *args, **kwargs)

Register callback event_cb to be called for each journal event

If provided, *args, and **kwargs are passed along to event_cb, whose only required argument is an event, e.g.

>>> def event_cb(event)
>>>     # do something with event

After a JournalConsumer is stopped and the final event is received, event_cb will be called with an event of None, which signals the end of the event stream.

start()

Start the stream of events by sending a request

This function initiates the stream of events for a journal consumer by sending the initial request to the configured journal service endpoint.

Note

If start() is called more than once the stream of events will be restarted using the original options passed to the constructor. This may cause duplicate events, or missed events if full is False since no history will be included.

stop()

Cancel the journal RPC

Cancel the RPC. This will eventually stop the stream of events to either poll() or the defined callback. After all events have been processed an event of None will be returned by poll() or the defined callback.

class flux.abc.JournalEventBase(event)

Bases: EventLogEvent

A container for an event from a journal RPC interface

name

event name (Possible event names will depend on the journal service being consumed)

Type

str

timestamp

event timestamp in seconds since the epoch with sub-millisecond precision.

Type

float

context

context dictionary

Type

dict

context_string

context dict converted to comma separated key=value string.

Type

str

is_empty()

Return True if this event is an empty journal event

Submodules