Python异步WebSocket客户端
【Python 异步 WebSocket 客户端】websockets 库提供了完整的异步 WebSocket 客户端实现支持连接、收发消息、心跳检测和自动重连等特性。──────────────────────────────────────────────示例 1基础 WebSocket 客户端──────────────────────────────────────────────import asyncioimport websockets # 需要 pip install websocketsasync def 基本客户端():连接 WebSocket 服务器并收发消息uri ws://echo.websocket.org # 测试用 echo 服务async with websockets.connect(uri) as websocket:# 发送消息await websocket.send(Hello, WebSocket!)print(已发送: Hello, WebSocket!)# 接收消息响应 await websocket.recv()print(f收到响应: {响应})# 发送多条消息for i in range(3):await websocket.send(f消息-{i})回显 await websocket.recv()print(f回显: {回显})# asyncio.run(基本客户端())──────────────────────────────────────────────示例 2持续收发与 ping/pong──────────────────────────────────────────────import asyncioimport websocketsasync def 持续通信():持续收发消息并检测连接健康uri ws://echo.websocket.orgasync with websockets.connect(uri) as ws:# 发送初始消息await ws.send(开始通信)async def 发送心跳():定期发送 ping 保持连接while True:await asyncio.sleep(5)try:pong_waiter await ws.ping()await asyncio.wait_for(pong_waiter, timeout2)print(心跳正常)except asyncio.TimeoutError:print(心跳超时!)breakasync def 接收消息():持续接收消息try:async for 消息 in ws:print(f收到: {消息})except websockets.ConnectionClosed:print(连接已关闭)# 并发执行心跳和接收await asyncio.gather(发送心跳(),接收消息(),)# asyncio.run(持续通信())──────────────────────────────────────────────示例 3自动重连机制──────────────────────────────────────────────import asyncioimport websocketsasync def 自动重连客户端(uri):带自动重连逻辑的 WebSocket 客户端重连延迟 1 # 初始重连延迟秒最大延迟 30最大重试 5for 尝试次数 in range(最大重试):try:print(f连接尝试 {尝试次数 1}...)async with websockets.connect(uri) as ws:print(连接成功!)重连延迟 1 # 成功后重置延迟try:async for 消息 in ws:print(f收到: {消息})# 处理业务消息...await ws.send(f确认: {消息})except websockets.ConnectionClosed:print(连接断开准备重连...)except (ConnectionRefusedError, OSError) as e:print(f连接失败: {e})# 指数退避重连print(f等待 {重连延迟} 秒后重试...)await asyncio.sleep(重连延迟)重连延迟 min(重连延迟 * 2, 最大延迟)print(达到最大重试次数放弃连接)# asyncio.run(自动重连客户端(ws://localhost:8765))──────────────────────────────────────────────示例 4WebSocket 心跳与健康检测──────────────────────────────────────────────import asyncioimport websocketsimport timeclass 健康WebSocket客户端:带完整心跳和健康检测的 WebSocket 客户端def __init__(self, uri, 心跳间隔10):self.uri uriself.心跳间隔 心跳间隔self.ws Noneself.上次收到 0async def 连接(self):建立连接并启动心跳self.ws await websockets.connect(self.uri)self.上次收到 time.time()asyncio.create_task(self._心跳循环())return selfasync def _心跳循环(self):定期 ping 检测连接健康while True:await asyncio.sleep(self.心跳间隔)try:await self.ws.ping()# 检查上次收到消息的时间空闲时间 time.time() - self.上次收到if 空闲时间 self.心跳间隔 * 3:print(f连接空闲超时: {空闲时间:.1f}s)breakprint(f心跳正常 (空闲:{空闲时间:.1f}s))except websockets.ConnectionClosed:print(心跳检测: 连接已关闭)breakasync def 发送(self, 消息):发送消息if self.ws is None:raise RuntimeError(未连接)await self.ws.send(消息)async def 接收(self):接收消息并更新时间戳消息 await self.ws.recv()self.上次收到 time.time()return 消息async def 关闭(self):安全关闭连接if self.ws:await self.ws.close()async def 健康客户端示例():client 健康WebSocket客户端(ws://echo.websocket.org)await client.连接()await client.发送(测试健康检测)响应 await client.接收()print(f响应: {响应})await client.关闭()# asyncio.run(健康客户端示例())──────────────────────────────────────────────核心要点──────────────────────────────────────────────• websockets.connect - 异步连接 WebSocket 服务器• ws.send(msg) - 发送文本/二进制消息• ws.recv() - 接收单条消息• async for msg in ws - 持续迭代接收消息• ws.ping/pong - 心跳检测• ConnectionClosed - 连接断开异常• 自动重连 指数退避 - 可靠的客户端策略