Module

_priority

HTTP Priority Signals (RFC 9218).

Parses thePriority header (u=N, i) from HTTP/2 requests and provides a priority-based scheduler for DATA frame writes.

RFC 9218 defines:

  • u=N(urgency): 0-7, default 3. Lower is more urgent.
  • i(incremental): boolean. If present, response can be interleaved.

Scheduling policy (§8):

  • Streams at lower urgency numbers are served before higher ones.
  • At the same urgency, non-incremental streams are served one at a time (sticky): the first-ready stream holds the slot until it is unscheduled or removed.
  • At the same urgency, incremental streams round-robin via mark_wrote() after each chunk.
  • Non-incremental streams at a given urgency preempt incremental streams at that urgency, since non-incremental responses expect serial delivery.

Classes

StreamPriority 2
Parsed priority for a single HTTP/2 stream.

Parsed priority for a single HTTP/2 stream.

Attributes

Name Type Description
urgency int

0 (highest) to 7 (lowest). Default: 3.

incremental bool

If True, response can be interleaved with others.

PriorityScheduler 12
RFC 9218-compliant priority scheduler for HTTP/2 streams. Maintains per-stream priority and a read…

RFC 9218-compliant priority scheduler for HTTP/2 streams.

Maintains per-stream priority and a ready-set of streams that have data to send.next_stream() returns the stream that should write next, per the policy described in the module docstring.

Usage::

scheduler.set_priority(sid, parse_priority(header))
scheduler.schedule(sid)
while (ready := scheduler.next_stream()) is not None:
    write_one_chunk(ready)
    if has_more_data:
        scheduler.mark_wrote(ready)  # rotate incremental
    else:
        scheduler.unschedule(ready)
scheduler.remove_stream(sid)  # on stream close

Thread-safe: a lock protects all mutable state for correctness under free-threading.

Methods

has_pending 0 bool
True if any stream is currently ready to send.
property
def has_pending(self) -> bool
Returns
bool
stream_count 0 int
Number of streams with registered priorities.
property
def stream_count(self) -> int
Returns
int
set_priority 2
Set or update the priority for a stream. If the stream is already scheduled, i…
def set_priority(self, stream_id: int, priority: StreamPriority) -> None

Set or update the priority for a stream.

If the stream is already scheduled, it is re-bucketed to match the new urgency/incremental classification.

Parameters
Name Type Description
stream_id
priority
get_priority 1 StreamPriority
Return the priority for a stream (default: urgency=3, non-incremental).
def get_priority(self, stream_id: int) -> StreamPriority
Parameters
Name Type Description
stream_id
Returns
StreamPriority
schedule 1
Mark a stream as ready to send data. Idempotent: repeated calls for the same s…
def schedule(self, stream_id: int) -> None

Mark a stream as ready to send data.

Idempotent: repeated calls for the same stream do not duplicate entries in the ready set.

Parameters
Name Type Description
stream_id
unschedule 1
Remove a stream from the ready set without forgetting its priority.
def unschedule(self, stream_id: int) -> None
Parameters
Name Type Description
stream_id
mark_wrote 1
Signal that a stream just wrote a chunk. For incremental streams, rotates the …
def mark_wrote(self, stream_id: int) -> None

Signal that a stream just wrote a chunk.

For incremental streams, rotates the round-robin cursor so the next same-urgency incremental stream gets the next slot. No-op for non-incremental streams (they stay at the head until unschedule() or remove_stream()).

Parameters
Name Type Description
stream_id
next_stream 0 int | None
Return the next stream that should write data, per RFC 9218. Does not mutate s…
def next_stream(self) -> int | None

Return the next stream that should write data, per RFC 9218.

Does not mutate state — callmark_wrote() to rotate incremental streams after a write.

Returns
int | None Stream ID with highest effective priority, or None if no streams are ready.
remove_stream 1
Remove a stream entirely — priority and ready state.
def remove_stream(self, stream_id: int) -> None
Parameters
Name Type Description
stream_id
await_turn 1
Block until this stream is the one `next_stream`() picks. Called by the H2/H3 …
async
async def await_turn(self, stream_id: int) -> None

Block until this stream is the onenext_stream() picks.

Called by the H2/H3 send path before writing a DATA chunk so that priority ordering actually constrains when streams emit bytes. The stream must already be scheduled — typically the caller does::

scheduler.schedule(stream_id)
await scheduler.await_turn(stream_id)
write_chunk()
scheduler.mark_wrote(stream_id)  # for incremental rotation
Parameters
Name Type Description
stream_id
Internal Methods 2
__init__ 0
def __init__(self) -> None
_wake_current 0
Wake the waiter for whichever stream is now at the head.
def _wake_current(self) -> None

Functions

parse_priority 1 StreamPriority
Parse an RFC 9218 Priority header value.
def parse_priority(value: bytes | str) -> StreamPriority
Parameters
Name Type Description
value bytes | str

The Priority header value (e.g.,u=1, i).

Returns
StreamPriority