flux.job.journal module
- class flux.job.journal.JournalConsumer(flux_handle, full=True, since=0.0, include_sentinel=False)
Bases:
JournalConsumerBaseClass for consuming the job manager journal
This class is a wrapper around the
job-manager.events-journalRPC, which allows clients to subscribe to primary job events for all jobs (See RFC 21) in a single interface.A
JournalConsumerreturns individual job events asJournalEventobjects either synchronously, via thepoll()method, or asynchronously via a callback registered with theset_callback()method. In the case of asynchronous mode, the Flux reactor must be run viareactor_run().A
JournalConsumeris created by passing the constructor an openFluxhandle, along with any other optional parameters described below. Thestart()method should then be called, which sends the initial RPC. To stop the stream of events, call thestop()method. After all events have been processedpoll()or the registered callback will return None.When the consumer is first started, historical data (events in the past) will be sent from the job manager unless full is set to False. These events are stored until all historical events are processed, then are time ordered before returning them via
poll()or to the callback registered byset_callback().To avoid processing previously seen events with a new instance of this class, the timestamp of the newest processed event can be passed via the since parameter. Timestamps should be unique so
poll()or the callback will start with the newest event after the since timestamp.When full is True, the job manager sends a sentinel event in the journal stream to demarcate the transition between history and current events. If include_sentinel is set to True, then an empty event will be returned by
poll()or the registered callback to represent the sentinel. An empty event can be compared toflux.job.journal.SENTINEL_EVENTor by using theJournalEvent.is_empty()method. The default behavior is to suppress the sentinel.- SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507982.2437155}
- create_event(entry, jobid=-1, jobspec=None, R=None)
Create a single JournalEvent from one event entry
- process_response(resp)
Process a single job manager journal response
The response will contain the jobid and possibly jobspec and R, depending on the specific events in the payload. Return these in a dict so they are passed as keyword arguments to create_event.
- property request_payload
Appropriate request payload for this journal RPC
- class flux.job.journal.JournalEvent(jobid, event, jobspec=None, R=None)
Bases:
JournalEventBaseA container for an event from the job manager journal
- jobid
The job id for which the event applies
- Type
- timestamp
event timestamp in seconds since the epoch with sub-millisecond precision.
- Type
float
- context
context dictionary (See RFC 21: Event Descriptions.)
- Type
dict
- context_string
context dict converted to comma separated key=value string.
- Type
str
- is_empty()
Return True if this event is an empty journal event
- flux.job.journal.SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507982.2437155}
A constant
JournalEventdemarcating the transition from historical events to current events when aJournalConsumeris created with include_sentinel set to True.