flux.abc.journal module
Abstract classes for journal consumer interfaces
- class flux.abc.journal.JournalConsumerBase(flux_handle, topic, cancel_topic=None, full=True, since=0.0, include_sentinel=False)
Bases:
ABCBase class for consuming events from a journal-style RPC
This base class implements a wrapper around journal style RPCs which allow clients to obtain a stream of events, possibly including historical data.
Subclasses of :obj`JournalConsumerBase` should be implemented to more completely handle the specifics of a given interface, but all such subclasses will follow the interface documented here.
A journal consumer is created by passing the constructor an open
Fluxhandle, the topic string of the journal service along with any other optional parameters described below. Thestart()method should then be called, which sends the initial RPC. To stop the stream of events, call thestop()method. After all events have been processedpoll()or the registered callback will return None.When the consumer is first started, historical data (events in the past) will be sent from the journal service unless full is set to False. These events are stored until all historical events are processed, then are time ordered before returning them via
poll()or to the callback registered byset_callback().To avoid processing previously seen events with a new instance of this class, the timestamp of the newest processed event can be passed via the since parameter. Timestamps should be unique so
poll()or the callback will start with the newest event after the since timestamp.When full is True, a compliant journal service sends a sentinel event in the journal stream to demarcate the transition between history and current events. If include_sentinel is set to True, then an empty event will be returned by
poll()or the registered callback to represent the sentinel. An empty event can be compared toself.SENTINEL_EVENTor by using theJournalEventBase.is_empty()method. The default behavior is to suppress the sentinel.Subclasses of
JournalConsumerBasemust implement the following abstract methods, see method documentation for details:flux.abc.JournalConsumerBase.process_response(), in which a single response from the journal service is processed and a dictionary of kwargs is returned.flux.abc.JournalConsumerBase.create_event(), which is passed one event entry from the same response and the **kwargs obtained from above, and should return an object of a class derived from
Subclasses may optionally implement the following methods:
flux.abc.JournalConsumerBase.is_empty_response(), which is used to determine if a journal response message is to be considered the "empty" sentinel response.
Finally, subclasses may optionally override the following properties:
request_payload, which is the payload of a journal request (the default is an empty payload)SENTINEL_EVENT, which is a class constant representing the sentinel event for this class.
- SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507859.9357414}
- abstract create_event(entry, **kwargs)
Return a journal event object from a response entry
This method should return an event object given an individual eventlog entry and
**kwargsreturned fromprocess_response().- Parameters
entry (dict) -- An individual event dictionary from the
eventsarray of a journal response.**kwargs -- Additional keyword arguments returned from the
process_response()method.
- Returns
JournalEventBaseor a subclass
- is_empty_response(resp)
Return True if resp is "empty"
- poll(timeout=-1.0)
Synchronously get the next journal event
if full is True, then this call will not return until all historical events have been processed. Historical events will sorted in time order and returned once per
poll()call.start()must be called before this function.
- abstract process_response(resp)
Process a journal response.
This method may return a dictionary which is then passed as
*kwargstocreate_event()below.- Parameters
resp (dict) -- The payload of a single journal response (converted from json to a dict)
- Returns
dict
- property request_payload
Appropriate request payload for this journal RPC
- set_callback(event_cb, *args, **kwargs)
Register callback event_cb to be called for each journal event
If provided,
*args, and**kwargsare passed along to event_cb, whose only required argument is an event, e.g.>>> def event_cb(event) >>> # do something with event
After a
JournalConsumeris stopped and the final event is received, event_cb will be called with an event of None, which signals the end of the event stream.
- start()
Start the stream of events by sending a request
This function initiates the stream of events for a journal consumer by sending the initial request to the configured journal service endpoint.
Note
If
start()is called more than once the stream of events will be restarted using the original options passed to the constructor. This may cause duplicate events, or missed events if full is False since no history will be included.
- class flux.abc.journal.JournalEventBase(event)
Bases:
EventLogEventA container for an event from a journal RPC interface
- name
event name (Possible event names will depend on the journal service being consumed)
- Type
str
- timestamp
event timestamp in seconds since the epoch with sub-millisecond precision.
- Type
float
- context
context dictionary
- Type
dict
- context_string
context dict converted to comma separated key=value string.
- Type
str
- is_empty()
Return True if this event is an empty journal event