コンテンツへスキップ

WebSockets

WebSocketsを定義する際、通常はWebSocket型のパラメータを宣言し、それを使ってクライアントからデータを読み込み、クライアントにデータを送信できます。

これはStarletteによって直接提供されますが、fastapiからインポートできます。

from fastapi import WebSocket

ヒント

HTTPとWebSocketの両方と互換性があるべき依存関係を定義したい場合、RequestWebSocketの代わりにHTTPConnectionを受け取るパラメータを定義できます。

fastapi.WebSocket

WebSocket(scope, receive, send)

基底クラス: HTTPConnection

ソースコードはstarlette/websockets.pyにあります
26
27
28
29
30
31
32
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
    super().__init__(scope)
    assert scope["type"] == "websocket"
    self._receive = receive
    self._send = send
    self.client_state = WebSocketState.CONNECTING
    self.application_state = WebSocketState.CONNECTING

scope instance-attribute

scope = scope

app property

app

url property

url

base_url property

base_url

headers property

headers

query_params property

query_params

path_params property

path_params

cookies property

cookies

client property

client

state property

state

client_state instance-attribute

client_state = CONNECTING

application_state instance-attribute

application_state = CONNECTING

url_for

url_for(name, /, **path_params)
starlette/requests.py内のソースコード
182
183
184
185
186
187
def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
    url_path_provider: Router | Starlette | None = self.scope.get("router") or self.scope.get("app")
    if url_path_provider is None:
        raise RuntimeError("The `url_for` method can only be used inside a Starlette application or with a router.")
    url_path = url_path_provider.url_path_for(name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)

receive async

receive()

ASGI WebSocketメッセージを受信し、有効な状態遷移を保証します。

ソースコードはstarlette/websockets.pyにあります
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def receive(self) -> Message:
    """
    Receive ASGI websocket messages, ensuring valid state transitions.
    """
    if self.client_state == WebSocketState.CONNECTING:
        message = await self._receive()
        message_type = message["type"]
        if message_type != "websocket.connect":
            raise RuntimeError(f'Expected ASGI message "websocket.connect", but got {message_type!r}')
        self.client_state = WebSocketState.CONNECTED
        return message
    elif self.client_state == WebSocketState.CONNECTED:
        message = await self._receive()
        message_type = message["type"]
        if message_type not in {"websocket.receive", "websocket.disconnect"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.receive" or "websocket.disconnect", but got {message_type!r}'
            )
        if message_type == "websocket.disconnect":
            self.client_state = WebSocketState.DISCONNECTED
        return message
    else:
        raise RuntimeError('Cannot call "receive" once a disconnect message has been received.')

send async

send(message)

ASGI WebSocketメッセージを送信し、有効な状態遷移を保証します。

ソースコードはstarlette/websockets.pyにあります
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
async def send(self, message: Message) -> None:
    """
    Send ASGI websocket messages, ensuring valid state transitions.
    """
    if self.application_state == WebSocketState.CONNECTING:
        message_type = message["type"]
        if message_type not in {"websocket.accept", "websocket.close", "websocket.http.response.start"}:
            raise RuntimeError(
                'Expected ASGI message "websocket.accept", "websocket.close" or "websocket.http.response.start", '
                f"but got {message_type!r}"
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        elif message_type == "websocket.http.response.start":
            self.application_state = WebSocketState.RESPONSE
        else:
            self.application_state = WebSocketState.CONNECTED
        await self._send(message)
    elif self.application_state == WebSocketState.CONNECTED:
        message_type = message["type"]
        if message_type not in {"websocket.send", "websocket.close"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.send" or "websocket.close", but got {message_type!r}'
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        try:
            await self._send(message)
        except OSError:
            self.application_state = WebSocketState.DISCONNECTED
            raise WebSocketDisconnect(code=1006)
    elif self.application_state == WebSocketState.RESPONSE:
        message_type = message["type"]
        if message_type != "websocket.http.response.body":
            raise RuntimeError(f'Expected ASGI message "websocket.http.response.body", but got {message_type!r}')
        if not message.get("more_body", False):
            self.application_state = WebSocketState.DISCONNECTED
        await self._send(message)
    else:
        raise RuntimeError('Cannot call "send" once a close message has been sent.')

accept async

accept(subprotocol=None, headers=None)
ソースコードはstarlette/websockets.pyにあります
 99
100
101
102
103
104
105
106
107
108
109
async def accept(
    self,
    subprotocol: str | None = None,
    headers: typing.Iterable[tuple[bytes, bytes]] | None = None,
) -> None:
    headers = headers or []

    if self.client_state == WebSocketState.CONNECTING:  # pragma: no branch
        # If we haven't yet seen the 'connect' message, then wait for it first.
        await self.receive()
    await self.send({"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers})

receive_text async

receive_text()
ソースコードはstarlette/websockets.pyにあります
115
116
117
118
119
120
async def receive_text(self) -> str:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return typing.cast(str, message["text"])

receive_bytes async

receive_bytes()
ソースコードはstarlette/websockets.pyにあります
122
123
124
125
126
127
async def receive_bytes(self) -> bytes:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return typing.cast(bytes, message["bytes"])

receive_json async

receive_json(mode='text')
ソースコードはstarlette/websockets.pyにあります
129
130
131
132
133
134
135
136
137
138
139
140
141
async def receive_json(self, mode: str = "text") -> typing.Any:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)

    if mode == "text":
        text = message["text"]
    else:
        text = message["bytes"].decode("utf-8")
    return json.loads(text)

iter_text async

iter_text()
ソースコードはstarlette/websockets.pyにあります
143
144
145
146
147
148
async def iter_text(self) -> typing.AsyncIterator[str]:
    try:
        while True:
            yield await self.receive_text()
    except WebSocketDisconnect:
        pass

iter_bytes async

iter_bytes()
ソースコードはstarlette/websockets.pyにあります
150
151
152
153
154
155
async def iter_bytes(self) -> typing.AsyncIterator[bytes]:
    try:
        while True:
            yield await self.receive_bytes()
    except WebSocketDisconnect:
        pass

iter_json async

iter_json()
ソースコードはstarlette/websockets.pyにあります
157
158
159
160
161
162
async def iter_json(self) -> typing.AsyncIterator[typing.Any]:
    try:
        while True:
            yield await self.receive_json()
    except WebSocketDisconnect:
        pass

send_text async

send_text(data)
ソースコードはstarlette/websockets.pyにあります
164
165
async def send_text(self, data: str) -> None:
    await self.send({"type": "websocket.send", "text": data})

send_bytes async

send_bytes(data)
ソースコードはstarlette/websockets.pyにあります
167
168
async def send_bytes(self, data: bytes) -> None:
    await self.send({"type": "websocket.send", "bytes": data})

send_json async

send_json(data, mode='text')
ソースコードはstarlette/websockets.pyにあります
170
171
172
173
174
175
176
177
async def send_json(self, data: typing.Any, mode: str = "text") -> None:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
    if mode == "text":
        await self.send({"type": "websocket.send", "text": text})
    else:
        await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})

close async

close(code=1000, reason=None)
ソースコードはstarlette/websockets.pyにあります
179
180
async def close(self, code: int = 1000, reason: str | None = None) -> None:
    await self.send({"type": "websocket.close", "code": code, "reason": reason or ""})

クライアントが切断すると、WebSocketDisconnect例外がスローされ、これをキャッチできます。

fastapiから直接インポートできます

from fastapi import WebSocketDisconnect

fastapi.WebSocketDisconnect

WebSocketDisconnect(code=1000, reason=None)

基底クラス: Exception

ソースコードはstarlette/websockets.pyにあります
20
21
22
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
    self.code = code
    self.reason = reason or ""

code instance-attribute

code = code

reason instance-attribute

reason = reason or ''

WebSockets - その他のクラス

WebSocketsを処理するための追加クラス。

これはStarletteによって直接提供されますが、fastapiからインポートできます。

from fastapi.websockets import WebSocketDisconnect, WebSocketState

fastapi.websockets.WebSocketDisconnect

WebSocketDisconnect(code=1000, reason=None)

基底クラス: Exception

ソースコードはstarlette/websockets.pyにあります
20
21
22
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
    self.code = code
    self.reason = reason or ""

code instance-attribute

code = code

reason instance-attribute

reason = reason or ''

fastapi.websockets.WebSocketState

基底クラス: Enum

CONNECTING class-attribute instance-attribute

CONNECTING = 0

CONNECTED class-attribute instance-attribute

CONNECTED = 1

DISCONNECTED class-attribute instance-attribute

DISCONNECTED = 2

RESPONSE class-attribute instance-attribute

RESPONSE = 3