做网站需求需要沟通什么,餐饮网站建设方案书,口碑营销的策略技巧,顺义区快速建站WebSocket是一种网络通信协议#xff0c;它在单个TCP连接上提供全双工的通信信道。在本篇文章中#xff0c;我们将探讨如何在Python中使用WebSocket实现实时通信。
websockets是Python中最常用的网络库之一#xff0c;也是websocket协议的Python实现。它不仅作为基础组件在…WebSocket是一种网络通信协议它在单个TCP连接上提供全双工的通信信道。在本篇文章中我们将探讨如何在Python中使用WebSocket实现实时通信。
websockets是Python中最常用的网络库之一也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用其源码也值得广大“Python玩家”研究。 官网:https://github.com/python-websockets/websockets
1. 什么是WebSocket
WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据使得构建实时Web应用程序变得更加简单。
2. 在Python中使用WebSocket
Python中有多个库可以帮助我们使用WebSocket如websockets、aiohttp等。在本文中我们将使用websockets库来演示WebSocket编程。
要安装websockets库你可以使用pip
pip install websockets3. 创建WebSocket服务器
使用websockets库我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例
import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(fReceived message: {message})await websocket.send(fEcho: {message})start_server websockets.serve(echo, localhost, 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()在这个示例中我们定义了一个名为echo的协程函数它接收两个参数websocket和path。该函数使用async for循环读取客户端发送的消息并将消息发送回客户端。
然后我们使用websockets.serve()函数创建一个WebSocket服务器监听本地主机的8765端口。最后我们使用asyncio的事件循环启动服务器。
4. 创建WebSocket客户端
要创建一个WebSocket客户端我们同样可以使用websockets库。以下是一个简单的客户端示例
import asyncio
import websocketsasync def main():async with websockets.connect(ws://localhost:8765) as websocket:message Hello, server!await websocket.send(message)print(fSent: {message})response await websocket.recv()print(fReceived: {response})asyncio.run(main())在这个示例中我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后我们使用send()方法向服务器发送消息并使用recv()方法接收服务器的响应。
5. 总结
WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力使得构建实时Web应用程序变得更加容易。在Python中我们可以使用websockets库轻松地实现WebSocket编程。
6. 通过websockets这个项目从大型开源项目中学习asyncio库。
一、asyncio.Transport 在官方文档中Transport被描述成对socket的抽象它控制着如何传输数据。除了websocketsuvicorn、daphne等ASGI实现都会用到Transport。
Transport继承于ReadTransport和WriteTransport两者都继承于BaseTransport。顾名思义Transport兼备读和写的功能可以类比为读写socket对象。
Transport对象提供以下常用函数——
is_reading判断该Transport是否在读。
set_write_buffer_limits设置写入Transport的高和低水位。考虑到网络状况有时不希望写入过多的数据。
write、write_eof、write_line为当前Transport写入数据分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport但会flush数据。
abort立刻关闭Transport不接受新的数据。留在缓冲的数据也会丢失后续调用Protocol的connection_lost函数。
在websockets中Transport使用场景不多一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中会设置Transport的最高水位。同样在这种场景下该对象也是作为回调参数使用的。
二、asyncio.Protocol 如果Transport是对socket的抽象那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
用户使用的Protocol直接继承自BaseProtocol并提供了六个Unimplemented函数需要用户去实现——
connection_made当连接建立时会执行该函数该函数包含一个Transport类型的参数。
connection_lost当连接丢失或者关闭时会执行该函数该函数包含一个Exception类型的参数。
pause_writing当Transport对象写入的数据高于之前设置的高水位时被调用一般会暂停数据的写入。
resume_writing当Transport对象写入的数据低于之前设置的低水位时被调用一般用于恢复数据写入。
data_received当有数据被接受时回调该函数包含一个二进制对象data用来表示接受的数据。
eof_received当被Transport对象被调用write_eof时被调用。
在websockets中server端的connection_made实现截图如图所示。在该函数中websockets将用户实现的handler封装成task对象并和websocket的server绑定。
而在client端中实现如第一节截图所示只是在reader中注册该Transport对象。
websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态以及维护reader对象。
在其他函数的实现中websockets也主要用到了reader对象完成数据流的暂停和恢复以及数据的写入。
从上面代码实现可以看出websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。
附录进阶版本
python使用websockets库 serve:在server端使用等待客户端的连接。如果连接成功返回一个websocket。
connect: 在client端使用用于建立连接。
send:发送数据
recv:接收数据
close:关闭连接
服务端
#!/usr/bin/python3
# 主要功能创建1个基本的websocket server, 符合asyncio 开发要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data await websocket.recv()reply fData received as \{data}\. time: {datetime.now()}print(reply)await websocket.send(reply)print(Send reply)async def main():async with websockets.serve(handler, localhost, 9999):await asyncio.Future() # run foreverif __name__ __main__:asyncio.run(main())客户端
import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send(Hello, I am PyPy.)response await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client(ws://localhost:9999))服务端
import asyncio
import websocketsIP_ADDR 127.0.0.1
IP_PORT 9090# 握手通过接收Hi发送success来进行双方的握手。
async def serverHands(websocket):while True:recv_text await websocket.recv()print(recv_text recv_text)if recv_text Hi:print(connected success)await websocket.send(success)return Trueelse:await websocket.send(connected fail)# 接收从客户端发来的消息并处理再返给客户端success
async def serverRecv(websocket):while True:recv_text await websocket.recv()print(recv:, recv_text)await websocket.send(success,get mess: recv_text)# 握手并且接收数据
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ __main__:print(server)server websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()客户端
import asyncio
import websocketsIP_ADDR 127.0.0.1
IP_PORT 9090async def clientHands(websocket):while True:# 通过发送hello握手await websocket.send(Hi)response_str await websocket.recv()# 接收success来进行双方的握手if success in response_str:print(握手成功)return True# 向服务器端发送消息
async def clientSend(websocket):while True:input_text input(input text: )if input_text exit:print(fexit, bye!)await websocket.close(reasonexit)return Falseawait websocket.send(input_text)recv_text await websocket.recv()print(f{recv_text})# 进行websocket连接
async def clientRun():ipaddress IP_ADDR : IP_PORTasync with websockets.connect(ws:// ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ __main__:print(client)asyncio.get_event_loop().run_until_complete(clientRun())服务端
# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP 127.0.0.1
PORT_CHAT 9090USERS {}#提供聊天的后台
async def ServerWs(websocket,path):logging.basicConfig(format%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s,filenamechat.log,levellogging.INFO)# 握手await websocket.send(json.dumps({type: handshake}))async for message in websocket:data json.loads(message)message # 用户发信息if data[type] send:name 404for k, v in USERS.items():if v websocket:name kdata[from] nameif len(USERS) ! 0: # asyncio.wait doesnt accept an empty listmessage json.dumps({type: user, content: data[content], from: name})# 用户注册elif data[type] register:try:USERS[data[uuid]] websocketif len(USERS) ! 0: # asyncio.wait doesnt accept an empty listmessage json.dumps({type: login, content: data[content], user_list: list(USERS.keys())})except Exception as exp:print(exp)# 用户注销elif data[type] unregister:del USERS[data[uuid]]if len(USERS) ! 0: # asyncio.wait doesnt accept an empty listmessage json.dumps({type: logout, content: data[content], user_list: list(USERS.keys())})#打印日志logging.info(data)# 群发await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print(server)start_server websockets.serve(ServerWs, 0.0.0.0, PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ __main__:from multiprocessing import Processmultiprocessing.freeze_support()server Process(targetserver_run, daemonFalse)server.start()服务端
import asyncio
import websockets
import time
import json
import threading
# 功能模块
class OutputHandler():async def run(self,message,send_ms,websocket):# 用户发信息await send_ms(message, websocket)# 单发消息# await send_ms(message, websocket)# 群发消息#await s(hi起来)# 存储所有的客户端
Clients {}# 服务端
class WS_Server():def __init__(self):self.ip 127.0.0.1self.port 9090# 回调函数(发消息给客户端)async def callback_send(self, msg, websocketNone):await self.sendMsg(msg, websocket)# 发送消息async def sendMsg(self, msg, websocket):print(sendMsg:, msg)# websocket不为空单发为空群发消息if websocket ! None:await websocket.send(msg)else:# 群发消息await self.broadcastMsg(msg)# 避免被卡线程await asyncio.sleep(0.2)# 群发消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 针对不同的信息进行请求可以考虑json文本async def runCaseX(self,jsonMsg,websocket):print(runCase)op OutputHandler()# 参数消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 连接一个客户端起一个循环监听async def echo(self,websocket, path):# 添加到客户端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({type: handshake}))# 循环监听while True:# 接受信息try:# 接受文本recv_text await websocket.recv()message Get message: {}.format(recv_text)# 返回客户端信息await websocket.send(message)# 转jsondata json.loads(recv_text)# 用户发信息if data[type] send:name 404for k, v in Clients.items():if v websocket:name kdata[from] nameif len(Clients) ! 0: # asyncio.wait doesnt accept an empty listmessage json.dumps({type: send, content: data[content], from: name})await self.runCaseX(jsonMsgmessage, websocketwebsocket)# 用户注册elif data[type] register:try:Clients[data[uuid]] websocketif len(Clients) ! 0: # asyncio.wait doesnt accept an empty listmessage json.dumps({type: register, content: data[content], user_list: list(Clients.keys())})await self.runCaseX(jsonMsgmessage, websocketwebsocket)except Exception as exp:print(exp)# 用户注销elif data[type] unregister:del Clients[data[uuid]]# 对message进行解析跳进不同功能区# await self.runCaseX(jsonMsgdata,websocketwebsocket)# 链接断开except websockets.ConnectionClosed:print(ConnectionClosed..., path)# del Clientsbreak# 无效状态except websockets.InvalidState:print(InvalidState...)# del Clientsbreak# 报错except Exception as e:print(ws连接报错,e)# del Clientsbreak# 启动服务器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future() # run forever# 多协程模式防止阻塞主线程无法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多线程启动def startServer(self):# 多线程启动否则会堵塞thread threading.Thread(targetself.WebSocketServer)thread.start()# thread.join()if __name____main__:print(server)s WS_Server()s.startServer()