WebSockets
WebSocketsを定義する際、通常はWebSocket
型のパラメータを宣言し、それを使ってクライアントからデータを読み込み、クライアントにデータを送信できます。
これはStarletteによって直接提供されますが、fastapi
からインポートできます。
from fastapi import WebSocket
ヒント
HTTPとWebSocketの両方と互換性があるべき依存関係を定義したい場合、Request
やWebSocket
の代わりにHTTPConnection
を受け取るパラメータを定義できます。
fastapi.WebSocket
WebSocket(scope, receive, send)
基底クラス: HTTPConnection
ソースコードはstarlette/websockets.py
にあります
| def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
super().__init__(scope)
assert scope["type"] == "websocket"
self._receive = receive
self._send = send
self.client_state = WebSocketState.CONNECTING
self.application_state = WebSocketState.CONNECTING
|
client_state instance-attribute
application_state instance-attribute
url_for
url_for(name, /, **path_params)
starlette/requests.py
内のソースコード
| def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
url_path_provider: Router | Starlette | None = self.scope.get("router") or self.scope.get("app")
if url_path_provider is None:
raise RuntimeError("The `url_for` method can only be used inside a Starlette application or with a router.")
url_path = url_path_provider.url_path_for(name, **path_params)
return url_path.make_absolute_url(base_url=self.base_url)
|
receive async
ASGI WebSocketメッセージを受信し、有効な状態遷移を保証します。
ソースコードはstarlette/websockets.py
にあります
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | async def receive(self) -> Message:
"""
Receive ASGI websocket messages, ensuring valid state transitions.
"""
if self.client_state == WebSocketState.CONNECTING:
message = await self._receive()
message_type = message["type"]
if message_type != "websocket.connect":
raise RuntimeError(f'Expected ASGI message "websocket.connect", but got {message_type!r}')
self.client_state = WebSocketState.CONNECTED
return message
elif self.client_state == WebSocketState.CONNECTED:
message = await self._receive()
message_type = message["type"]
if message_type not in {"websocket.receive", "websocket.disconnect"}:
raise RuntimeError(
f'Expected ASGI message "websocket.receive" or "websocket.disconnect", but got {message_type!r}'
)
if message_type == "websocket.disconnect":
self.client_state = WebSocketState.DISCONNECTED
return message
else:
raise RuntimeError('Cannot call "receive" once a disconnect message has been received.')
|
send async
ASGI WebSocketメッセージを送信し、有効な状態遷移を保証します。
ソースコードはstarlette/websockets.py
にあります
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 | async def send(self, message: Message) -> None:
"""
Send ASGI websocket messages, ensuring valid state transitions.
"""
if self.application_state == WebSocketState.CONNECTING:
message_type = message["type"]
if message_type not in {"websocket.accept", "websocket.close", "websocket.http.response.start"}:
raise RuntimeError(
'Expected ASGI message "websocket.accept", "websocket.close" or "websocket.http.response.start", '
f"but got {message_type!r}"
)
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
elif message_type == "websocket.http.response.start":
self.application_state = WebSocketState.RESPONSE
else:
self.application_state = WebSocketState.CONNECTED
await self._send(message)
elif self.application_state == WebSocketState.CONNECTED:
message_type = message["type"]
if message_type not in {"websocket.send", "websocket.close"}:
raise RuntimeError(
f'Expected ASGI message "websocket.send" or "websocket.close", but got {message_type!r}'
)
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
try:
await self._send(message)
except OSError:
self.application_state = WebSocketState.DISCONNECTED
raise WebSocketDisconnect(code=1006)
elif self.application_state == WebSocketState.RESPONSE:
message_type = message["type"]
if message_type != "websocket.http.response.body":
raise RuntimeError(f'Expected ASGI message "websocket.http.response.body", but got {message_type!r}')
if not message.get("more_body", False):
self.application_state = WebSocketState.DISCONNECTED
await self._send(message)
else:
raise RuntimeError('Cannot call "send" once a close message has been sent.')
|
accept async
accept(subprotocol=None, headers=None)
ソースコードはstarlette/websockets.py
にあります
99
100
101
102
103
104
105
106
107
108
109 | async def accept(
self,
subprotocol: str | None = None,
headers: typing.Iterable[tuple[bytes, bytes]] | None = None,
) -> None:
headers = headers or []
if self.client_state == WebSocketState.CONNECTING: # pragma: no branch
# If we haven't yet seen the 'connect' message, then wait for it first.
await self.receive()
await self.send({"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers})
|
receive_text async
ソースコードはstarlette/websockets.py
にあります
| async def receive_text(self) -> str:
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
return typing.cast(str, message["text"])
|
receive_bytes async
ソースコードはstarlette/websockets.py
にあります
| async def receive_bytes(self) -> bytes:
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
return typing.cast(bytes, message["bytes"])
|
receive_json async
receive_json(mode='text')
ソースコードはstarlette/websockets.py
にあります
129
130
131
132
133
134
135
136
137
138
139
140
141 | async def receive_json(self, mode: str = "text") -> typing.Any:
if mode not in {"text", "binary"}:
raise RuntimeError('The "mode" argument should be "text" or "binary".')
if self.application_state != WebSocketState.CONNECTED:
raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
message = await self.receive()
self._raise_on_disconnect(message)
if mode == "text":
text = message["text"]
else:
text = message["bytes"].decode("utf-8")
return json.loads(text)
|
iter_text async
ソースコードはstarlette/websockets.py
にあります
| async def iter_text(self) -> typing.AsyncIterator[str]:
try:
while True:
yield await self.receive_text()
except WebSocketDisconnect:
pass
|
iter_bytes async
ソースコードはstarlette/websockets.py
にあります
| async def iter_bytes(self) -> typing.AsyncIterator[bytes]:
try:
while True:
yield await self.receive_bytes()
except WebSocketDisconnect:
pass
|
iter_json async
ソースコードはstarlette/websockets.py
にあります
| async def iter_json(self) -> typing.AsyncIterator[typing.Any]:
try:
while True:
yield await self.receive_json()
except WebSocketDisconnect:
pass
|
send_text async
ソースコードはstarlette/websockets.py
にあります
| async def send_text(self, data: str) -> None:
await self.send({"type": "websocket.send", "text": data})
|
send_bytes async
ソースコードはstarlette/websockets.py
にあります
| async def send_bytes(self, data: bytes) -> None:
await self.send({"type": "websocket.send", "bytes": data})
|
send_json async
send_json(data, mode='text')
ソースコードはstarlette/websockets.py
にあります
170
171
172
173
174
175
176
177 | async def send_json(self, data: typing.Any, mode: str = "text") -> None:
if mode not in {"text", "binary"}:
raise RuntimeError('The "mode" argument should be "text" or "binary".')
text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
if mode == "text":
await self.send({"type": "websocket.send", "text": text})
else:
await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})
|
close async
close(code=1000, reason=None)
ソースコードはstarlette/websockets.py
にあります
| async def close(self, code: int = 1000, reason: str | None = None) -> None:
await self.send({"type": "websocket.close", "code": code, "reason": reason or ""})
|
クライアントが切断すると、WebSocketDisconnect
例外がスローされ、これをキャッチできます。
fastapi
から直接インポートできます
from fastapi import WebSocketDisconnect
fastapi.WebSocketDisconnect
WebSocketDisconnect(code=1000, reason=None)
基底クラス: Exception
ソースコードはstarlette/websockets.py
にあります
| def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""
|
reason instance-attribute
WebSockets - その他のクラス
WebSocketsを処理するための追加クラス。
これはStarletteによって直接提供されますが、fastapi
からインポートできます。
from fastapi.websockets import WebSocketDisconnect, WebSocketState
fastapi.websockets.WebSocketDisconnect
WebSocketDisconnect(code=1000, reason=None)
基底クラス: Exception
ソースコードはstarlette/websockets.py
にあります
| def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""
|
reason instance-attribute
fastapi.websockets.WebSocketState
基底クラス: Enum
CONNECTING class-attribute
instance-attribute
CONNECTED class-attribute
instance-attribute
DISCONNECTED class-attribute
instance-attribute
RESPONSE class-attribute
instance-attribute