Functions
build_ws_scope
4
dict[str, Any]
▼
Build an ASGI WebSocket scope dict from the upgrade request.
build_ws_scope
4
dict[str, Any]
▼
def build_ws_scope(request: RequestReceived, config: ServerConfig, client: tuple[str, int], server: tuple[str, int]) -> dict[str, Any]
Parameters
| Name | Type | Description |
|---|---|---|
request |
RequestReceived |
The parsed HTTP upgrade request. |
config |
ServerConfig |
Server configuration. |
client |
tuple[str, int] |
Client (host, port) tuple. |
server |
tuple[str, int] |
Server (host, port) tuple. |
Returns
dict[str, Any]
create_ws_receive
1
Receive
▼
Create an ASGI receive callable for WebSocket.
The worker pushes WebSocket eve…
create_ws_receive
1
Receive
▼
def create_ws_receive(events: asyncio.Queue[dict[str, Any]]) -> Receive
Create an ASGI receive callable for WebSocket.
The worker pushes WebSocket events into the queue. The ASGI app
consumes them viareceive().
Parameters
| Name | Type | Description |
|---|---|---|
events |
asyncio.Queue[dict[str, Any]] |
Queue of ASGI WebSocket event dicts. |
Returns
Receive
create_ws_send
5
Send
▼
Create an ASGI send callable for WebSocket.
Handles ``websocket.accept``, ``we…
create_ws_send
5
Send
▼
def create_ws_send(writer: asyncio.StreamWriter, ws_protocol: WSProtocol, ws_key: bytes, *, accept_event: asyncio.Event, close_event: asyncio.Event) -> Send
Create an ASGI send callable for WebSocket.
Handleswebsocket.accept, websocket.send, and
websocket.closemessages from the ASGI app.
Parameters
| Name | Type | Description |
|---|---|---|
writer |
asyncio.StreamWriter |
Asyncio stream writer for the connection. |
ws_protocol |
WSProtocol |
The WSProtocol instance for WebSocket framing. |
ws_key |
bytes |
The Sec-WebSocket-Key from the client's upgrade request. |
accept_event |
asyncio.Event |
Set when the app sends |
close_event |
asyncio.Event |
Set when the app sends |
Returns
Send