Module

streaming

Streaming support for long-running CLI commands.

Classes

Progress 3
Progress notification from a streaming command.

Progress notification from a streaming command.

Attributes

Name Type Description
status str
step int
total int

Functions

consume_generator 1 tuple[list[Progress], An…
Consume a generator, collecting Progress yields and capturing the final value. …
def consume_generator(gen: Any) -> tuple[list[Progress], Any]

Consume a generator, collecting Progress yields and capturing the final value.

Returns(progress_list, final_value). If the generator has noStopIteration.value, final_value is None.

Parameters
Name Type Description
gen Any
Returns
tuple[list[Progress], Any]
is_generator_result 1 bool
Check if a call result is a generator that should be consumed for streaming.
def is_generator_result(result: Any) -> bool
Parameters
Name Type Description
result Any
Returns
bool