在 FastAPI 中使用 WebSocket 非常简单。首先需要导入 WebSocket 类和 WebSocketDisconnect 异常类,然后在路由函数中添加一个 WebSocket 参数来处理 WebSocket 连接。
以下是一个简单的例子:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
在这个例子中,我们创建了一个 WebSocket 端点 /ws
,当有客户端连接时,会调用 websocket_endpoint
函数处理连接。在函数中我们首先调用 await websocket.accept()
来接受连接,并开始一个无限循环来接收和发送消息。
当客户端发送消息时,我们通过 await websocket.receive_text()
方法来接收消息,并通过 await websocket.send_text()
方法来发送消息给客户端。
在 FastAPI 中使用 WebSocket 很容易,你可以根据自己的需求来处理 WebSocket 连接和消息。