flux.job.output module

class flux.job.output.JobExceptionEvent(entry)

Bases: EventLogEvent

Object representing a JobException event .. attribute:: timestamp

timestamp for this event

type

float

name

name of this event: 'exception'

Type

str

severity

exception severity

Type

int

exc_type

exception type

Type

str

note

exception note

Type

str

render()
class flux.job.output.JobOutput(stdout: str, stderr: str, log: str)

Bases: NamedTuple

Tuple containing job output result .. attribute:: stdout

stdout from all tasks

type

str

stderr

stderr from all tasks

Type

str

log

log messages

Type

str

log: str

Alias for field number 2

stderr: str

Alias for field number 1

stdout: str

Alias for field number 0

class flux.job.output.JobOutputEventWatch(flux_handle, jobid, labelio=False, nowait=False)

Bases: FutureExt

A class for watching job output events.

See output_watch_events_async() for full documentation.

get_event(autoreset=True)
class flux.job.output.JobOutputWatch(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False)

Bases: FutureExt

A class for watching job output.

See output_watch_async() for full documentation.

get_output()

Return a tuple of (stream, data) containing the next chunk of output from a JobOutputWatch Future. Possible values for stream include: "stdout", "stderr", or "log."

When no more output is available, (None, None) will be returned.

class flux.job.output.JobOutputWatchLines(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False, keepends=False)

Bases: FutureExt

A class for watching lines of job output.

See output_watch_lines_async() for full documentation.

getline()

Return a tuple of (stream, line) for the next line of available output from a JobOutputWatch Future. Possible values for stream include: "stdout", "stderr", or "log."

When no more output is available, (None, None) will be returned.

class flux.job.output.LogEvent(entry)

Bases: EventLogEvent

Object representing a RFC 24 Job "log" event .. attribute:: timestamp

timestamp for this log event

type

float

name

name of this event: 'log'

Type

str

rank

shell rank that produced the log message

Type

Taskset

level

log level

Type

int

levelstr

log level string

Type

str

message

log message

Type

str

component

log component if available

Type

str

file

source file if available

Type

str

line
Type

int

dict

original event as dict

Type

dict

property levelstr
log_level_string = ['FATAL', 'FATAL', 'FATAL', 'ERROR', ' WARN', '', 'DEBUG', 'TRACE']
render(include_file_and_line=False)
class flux.job.output.OutputEvent(entry, labelio=False)

Bases: EventLogEvent

Object representing RFC 24 Job Standard I/O data events .. attribute:: timestamp

timestamp for this event

type

float

name

Name of this event: 'data'

Type

str

rank

Set of ranks to which this event applies

Type

Taskset

stream

name of output stream ("stdout", "stderr")

Type

str

eof

True if this event marks EOF for stream

Type

bool

data

output data

Type

str

dict

original event as dict

Type

dict

render()
class flux.job.output.OutputHeaderEvent(entry)

Bases: EventLogEvent

Object representing an RFC 24 header event .. attribute:: timestamp

timestamp for this event

type

float

name

Name of this event: 'header'

Type

str

version

RFC 24 version

Type

int

stdout_count

expected number of stdout streams

Type

int

stderr_count

expected number of stderr streams

Type

int

stdout_encoding

default encoding for stdout

Type

str

stderr_encoding

default encoding for stderr

Type

str

event

original event as dict

Type

dict

class flux.job.output.RedirectEvent(entry)

Bases: EventLogEvent

Object representing an RFC 24 redirect event .. attribute:: timestamp

timestamp for this event

type

float

name

Name of this event: 'redirect'

Type

str

stream

the name of the stream being redirected

Type

str

rank

the set of ranks being redirected

Type

Taskset

path

redirect path

Type

str

dict

original event as dict

Type

dict

render()
class flux.job.output.Taskset(tasks)

Bases: object

A Taskset represents a specific set of task ranks or "all" tasks. Implements a minimal interface of the IDset class, mainly for testing if one Taskset intersects with another.

The underlying IDset is only initialized for Taskset objects that do represent all tasks.

all

a Taskset representing all tasks

Type

bool

ids

If self.all is False, the underlying idset

Type

IDset

intersect(tasks)
flux.job.output.job_output(flux_handle, jobid, tasks='*', labelio=False, nowait=False, log_stderr_level=7)

Synchronously fetch output for a job.

If labelio is True, then each line of output will be labeled with the source task rank.

If nowait is True, then currently available output is returned without waiting for the output eventlog to be complete (i.e. for the job to finish). In this case, a FileNotFound exception is raised if the job output evenlog does not exist (job has not yet started writing output), or the jobid is not valid.

If nowait is False (the default), then this function will block until the job output is complete, i.e. the output eventlog has received EOF on all streams. In this case a FileNotFound exception will be raised only if the jobid is not valid.

Log messages at or below log_stderr_level will be sent to stderr. All other messages will appear in the log stream. By default, log_stderr_level=LOG_TRACE, so all messages are sent to stderr. To separate all log messages in the log stream, set log_stderr_level=-1 (LOG_QUIET).

Parameters
  • flux_handle (Flux) -- Flux handle

  • jobid (int, JobID, str) -- target jobid

  • tasks (str) -- idset of task ranks to include in output (default=all)

  • labelio (bool) -- prefix lines of output with source task rank

  • log_stderr_level (int) -- combine log messages at or below level with stderr (default=LOG_TRACE)

Returns

JobOutput tuple containing output for stdout,

stderr, and log streams.

Return type

JobOutput

Raises
  • FileNotFoundError -- jobid does not exist or no output found

  • flux.job.JobException -- Job received an exception

flux.job.output.output_event_watch(flux_handle, jobid, labelio=False, nowait=False)

Synchronously watch job output events via a generator.

This function will block until the first output event is available.

Example

>>> for event in output_event_watch(flux_handle, jobid):
...     if event.name == "data" and event.data is not None:
...         print(f"{event.stream}: {event.data}"
Parameters
  • flux_handle (Flux) -- Flux handle

  • jobid (int, JobID, str) -- jobid to watch

  • labelio (bool) -- label lines of output with source tasks (default: False)

  • nowait (bool) -- If True, assume output eventlog already exists and skip watching precursor eventlogs.

flux.job.output.output_event_watch_async(flux_handle, jobid, labelio=False, nowait=False)

Asynchronously get output event updates for a job.

Returns a JobOutputEventWatch Future. Call .get_event() from the callback to get the next available output event from the Future. The event will be of type OutputEvent, LogEvent, OutputHeaderEvent, RedirectEvent, or JobExceptionEvent (check 'name' attribute or type to get type of event)

If the job has a fatal exception before the 'start' event, then the future will be fulfilled with an error with errno=EIO and message 'job {jobid} never started'.

Parameters
  • flux_handle (Flux) -- Flux handle

  • jobid (int, JobID, str) -- jobid to watch

  • labelio (bool) -- label lines of output with source tasks (default: False)

  • nowait (bool) -- Assume output eventlog already exists and skip watching precursor eventlogs.

Returns

JobOutputEventWatch Future

Return type

JobOutputEventWatch

flux.job.output.output_watch(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False)

Synchronously fetch job output via a generator.

This function will block until job output is available.

Example

>>> for stream, data in output_watch(flux_handle, jobid):
...     print(f"{stream}: {data}")
Parameters
  • flux_handle (flux.Flux) -- Flux handle

  • jobid (int, JobID, str) -- jobid to watch

  • labelio (bool) -- label lines of output with source tasks (default: False)

  • log_stderr_level (int) -- return log messages at or below this level to stderr instead of the "log" stream (default=LOG_TRACE, i.e. all log messages are sent to stderr)

  • nowait (bool) -- If True, assume output eventlog already exists and skip watching precursor eventlogs.

flux.job.output.output_watch_async(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False)

Asynchronously get output data for a job.

This function returns a JobOutputWatch Future. Use future.get_output() to get the available output, which returns a (stream, data) tuple, where stream is one of 'stdout', 'stderr', or 'log'.

If the job receives an exception while watching output, an appropriate error message is emitted to the stderr stream. Similarly, redirect events generate a "redirected to" message on stderr.

If the job has a fatal exception before the 'start' event, then the future will be fulfilled with an error with errno=EIO and message 'job {jobid} never started'.

Parameters
  • flux_handle (Flux) -- Flux handle

  • jobid (JobID) -- jobid to watch

  • labelio (bool) -- label lines of output with source tasks (default=False)

  • log_stderr_level (int) -- emit log messages at this level or below to stderr instead of the "log" stream. (default=LOG_TRACE, i.e. all log messages are copied to stderr)

  • nowait (bool) -- Assume output eventlog already exists and skip watching precursor eventlogs.

Returns

JobOutputWatch Future

Return type

JobOutputWatch

flux.job.output.output_watch_lines(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False, keepends=False)

Synchronously fetch job output lines via a generator.

This function will block until the first line of output is available.

Example

>>> for stream, line in output_watch(flux_handle, jobid):
...     print(f"{stream}: {line}")

Note

The current implementation may return a partial line as a full line if a partial line is written into a job output eventlog entry. That is, this implementation does not currently line buffer and wait for full lines before returning the next stream, line pair. This may be fixed in a future release, at which time this note will be removed).

Parameters
  • flux_handle (flux.flux) -- Flux handle

  • jobid (int, JobID, str) -- jobid to watch

  • labelio (bool) -- label lines of output with source tasks (default: False)

  • log_stderr_level (int) -- return log messages at or below this level to stderr instead of the "log" stream (default=LOG_TRACE, i.e. all log messages are sent to stderr)

  • nowait (bool) -- If True, assume output eventlog already exists and skip watching precursor eventlogs.

  • keepends (bool) -- If True, keep line breaks in the result.

flux.job.output.output_watch_lines_async(flux_handle, jobid, labelio=False, log_stderr_level=7, nowait=False, keepends=False)

Asynchronously get lines of output for a job.

This function returns a JobOutputWatchLines Future. Use future.getline() to get the next line of output, which returns a (stream, line) tuple, where stream is one of 'stdout', 'stderr', or 'log'.

If the job receives an exception while watching output, an appropriate error message is emitted to the stderr stream. Similarly, redirect events generate a "redirected to" message to stderr.

If the job has a fatal exception before the 'start' event, then the future will be fulfilled with an error with errno=EIO and message 'job {jobid} never started'.

Note

The current implementation may return a partial line as a full line from getline() if a partial line is written into a job output eventlog entry. That is, this implementation does not currently line buffer and wait for full lines to fulfill the JobOutputWatchLines future. (This may be fixed in a future release, at which time this note will be removed).

Parameters
  • flux_handle (Flux) -- Flux handle

  • jobid (int, JobID, str) -- jobid to watch

  • labelio (bool) -- label lines of output with source task (default=False)

  • log_stderr_level (int) -- emit log messages at this level or below to stderr instead of the "log" stream. (default=LOG_TRACE, i.e. all log messages are copied to stderr)

  • nowait (bool) -- Assume output eventlog already exists and skip watching precursor eventlogs.

  • keepends (bool) -- If True, keep line breaks in the result.

Returns

JobOutputWatchLines Future

Return type

JobOutputWatchLines