flux.job.journal module

class flux.job.journal.JournalConsumer(flux_handle, full=True, since=0.0, include_sentinel=False)

Bases: JournalConsumerBase

Class for consuming the job manager journal

This class is a wrapper around the job-manager.events-journal RPC, which allows clients to subscribe to primary job events for all jobs (See RFC 21) in a single interface.

A JournalConsumer returns individual job events as JournalEvent objects either synchronously, via the poll() method, or asynchronously via a callback registered with the set_callback() method. In the case of asynchronous mode, the Flux reactor must be run via reactor_run().

A JournalConsumer is created by passing the constructor an open Flux handle, along with any other optional parameters described below. The start() method should then be called, which sends the initial RPC. To stop the stream of events, call the stop() method. After all events have been processed poll() or the registered callback will return None.

When the consumer is first started, historical data (events in the past) will be sent from the job manager unless full is set to False. These events are stored until all historical events are processed, then are time ordered before returning them via poll() or to the callback registered by set_callback().

To avoid processing previously seen events with a new instance of this class, the timestamp of the newest processed event can be passed via the since parameter. Timestamps should be unique so poll() or the callback will start with the newest event after the since timestamp.

When full is True, the job manager sends a sentinel event in the journal stream to demarcate the transition between history and current events. If include_sentinel is set to True, then an empty event will be returned by poll() or the registered callback to represent the sentinel. An empty event can be compared to flux.job.journal.SENTINEL_EVENT or by using the JournalEvent.is_empty() method. The default behavior is to suppress the sentinel.

SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507982.2437155}
create_event(entry, jobid=-1, jobspec=None, R=None)

Create a single JournalEvent from one event entry

process_response(resp)

Process a single job manager journal response

The response will contain the jobid and possibly jobspec and R, depending on the specific events in the payload. Return these in a dict so they are passed as keyword arguments to create_event.

property request_payload

Appropriate request payload for this journal RPC

class flux.job.journal.JournalEvent(jobid, event, jobspec=None, R=None)

Bases: JournalEventBase

A container for an event from the job manager journal

jobid

The job id for which the event applies

Type

flux.job.JobID

name

event name (See RFC 21 for possible event names)

Type

str

timestamp

event timestamp in seconds since the epoch with sub-millisecond precision.

Type

float

context

context dictionary (See RFC 21: Event Descriptions.)

Type

dict

context_string

context dict converted to comma separated key=value string.

Type

str

jobspec

For submit events, the job's redacted jobspec (See RFC 25).

Type

dict

R

For alloc events, the job's assigned R (See RFC 20)

Type

dict

is_empty()

Return True if this event is an empty journal event

flux.job.journal.SENTINEL_EVENT = {'context': {}, 'timestamp': 1780507982.2437155}

A constant JournalEvent demarcating the transition from historical events to current events when a JournalConsumer is created with include_sentinel set to True.