Python asyncio, thread 与 websocket 库初探
在写 Nanachat-core 的时候打算写事件循环,还要使用到 websocket,于是想参考下相关的设计模式。
异步 I/O
异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程,我们称其为消息模型:
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
消息模型是如何解决同步IO必须等待IO操作这一问题的呢?
当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。
当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。
Coroutine 协程
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
And we may get…
1
2
x
y
3
z
协程的执行效率比线程切换高,减少了线程切换的开销;同时,我们也不需要考虑锁机制的问题。
Python generator
Python对协程的支持是通过generator实现的。
在generator中,我们不但可以通过for
循环来迭代,还可以不断调用next()
函数获取由yield
语句返回的下一个值。
但是Python的yield
不但可以返回一个值,它还可以接收调用者发出的参数。
我们写一个简单的程序来输出 yield 关键字的执行顺序.
def consumer(): print("[CONSUMER] Entering function...") r = '' while True: print("[CONSUMER] Entering while loop...") n = yield r print("[CONSUMER] Fetched n...") if not n: print("[CONSUMER] Before returning...") return print(f"[CONSUMER] Consuming {n}") r = f'Consumed {n}!' print("[MAIN] Entering main...") c = consumer() # Get the generator print("[MAIN] Generating generator object...") print("[MAIN] Starting first send...") r = c.send(None) print("[MAIN] Ending first send...") print(f"[MAIN] First send end with result '{r}'...") print("[MAIN] Starting second send...") r = c.send(2) print("[MAIN] Ending second send...") print(f"[MAIN] Second send end with result '{r}'...") print("[MAIN] Starting last send...") r = c.send(None) print("[MAIN] Ending last send...") print(f"[MAIN] Last send end with result '{r}'...")
输出结果为:
[MAIN] Entering main... [MAIN] Generating generator object... [MAIN] Starting first send... [CONSUMER] Entering function... [CONSUMER] Entering while loop... [MAIN] Ending first send... [MAIN] First send end with result ''... [MAIN] Starting second send... [CONSUMER] Fetched n... [CONSUMER] Consuming 2 [CONSUMER] Entering while loop... [MAIN] Ending second send... [MAIN] Second send end with result 'Consumed 2!'... [MAIN] Starting last send... [CONSUMER] Fetched n... [CONSUMER] Before returning... Traceback (most recent call last): File "coroutine.py", line 26, in <module> r = c.send(None) StopIteration
可以看到,在第一次执行 generator.send()
的时候,generator 会从头开始执行.
之后每次便从 yield 处开始执行,先完成赋值操作,再继续执行到下一次 yield 或者 return.
yield r
会把 r 作为 send()
函数的返回值返回给 MAIN 函数.
asyncio
An event loop runs in a thread (typically the main thread) and executes all callbacks and Tasks in its thread.
While a Task is running in the event loop, no other Tasks can run in the same thread.
When a Task executes an await
expression, the running Task gets suspended, and the event loop executes the next Task.
asyncio
的编程模型就是一个消息循环。
我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。
import datetime
import asyncio
@asyncio.coroutine
def hello():
print(f"Hello world! {datetime.datetime.now()}")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print(f"Hello again! {datetime.datetime.now()}")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
print(hello())
loop.run_until_complete(hello())
loop.close()
coroutine.py:6: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
def hello():
<generator object hello at 0x7f05bea44cf0>
Hello world! 2021-09-06 11:20:07.045310
Hello again! 2021-09-06 11:20:08.047006
yield from
语法可以让我们方便地调用另一个generator
.
async/await
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async
和await
,可以让coroutine的代码更简洁易读。
请注意,async
和await
是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
- 把
@asyncio.coroutine
替换为async
; - 把
yield from
替换为await
。
import datetime
import asyncio
async def hello():
print(f"Hello world! {datetime.datetime.now()}")
# 异步调用asyncio.sleep(1):
r = await asyncio.sleep(1)
print(f"Hello again! {datetime.datetime.now()}")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
Hello world! 2021-09-06 11:23:14.967038
Hello again! 2021-09-06 11:23:15.968643
Websocket
Server
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Client
# WS client example
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
name = input("What's your name? ")
await websocket.send(name)
print(f"> {name}")
greeting = await websocket.recv()
print(f"< {greeting}")
asyncio.get_event_loop().run_until_complete(hello())
Example
我们要实现具有以下功能的 websocket 结构.
- Server
- 每 3s 向 Client 发一条消息.
- 收到消息立刻 Prompt.
- Client
- 每 1s 向 Server 发一条消息.
- 收到消息立刻 Prompt.
# Server.py
#!/usr/bin/env python
# WS server example
import datetime
import asyncio
import websockets
async def recv(websocket):
while True:
name = await websocket.recv()
print(f"< {name}")
async def send(websocket):
while True:
# Send
msg = f"!Server Send! {datetime.datetime.now()}"
await websocket.send(msg)
print(f"> {msg}")
await asyncio.sleep(3)
async def socket(websocket, path):
await asyncio.gather(recv(websocket), send(websocket))
start_server = websockets.serve(socket, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
# Client.py
import asyncio
import datetime
import websockets
async def recv(websocket):
while True:
name = await websocket.recv()
print(f"< {name}")
async def send(websocket):
while True:
# Send
msg = f"!Client Send! {datetime.datetime.now()}"
await websocket.send(msg)
print(f"> {msg}")
await asyncio.sleep(1)
async def socket():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await asyncio.gather(recv(websocket), send(websocket))
async def sayhello():
while True:
print("INTER BREAK")
await asyncio.sleep(0.2)
asyncio.get_event_loop().run_until_complete(asyncio.gather(socket(), sayhello()))