在写 Nanachat-core 的时候打算写事件循环,还要使用到 websocket,于是想参考下相关的设计模式。

异步 I/O

异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程,我们称其为消息模型

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

消息模型是如何解决同步IO必须等待IO操作这一问题的呢?

  • 当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。

  • 当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

Coroutine 协程

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def A():
    print('1')
    print('2')
    print('3')

def B():
    print('x')
    print('y')
    print('z')

And we may get…

1
2
x
y
3
z

协程的执行效率比线程切换高,减少了线程切换的开销;同时,我们也不需要考虑锁机制的问题。

Python generator

Python对协程的支持是通过generator实现的。

在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。

但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。

我们写一个简单的程序来输出 yield 关键字的执行顺序.

def consumer():
    print("[CONSUMER] Entering function...")
    r = ''
    while True:
        print("[CONSUMER] Entering while loop...")
        n = yield r
        print("[CONSUMER] Fetched n...")
        if not n:
            print("[CONSUMER] Before returning...")
            return
        print(f"[CONSUMER] Consuming {n}")
        r = f'Consumed {n}!'
        
print("[MAIN] Entering main...")
c = consumer() # Get the generator
print("[MAIN] Generating generator object...")
print("[MAIN] Starting first send...")
r = c.send(None)
print("[MAIN] Ending first send...")
print(f"[MAIN] First send end with result '{r}'...")
print("[MAIN] Starting second send...")
r = c.send(2)
print("[MAIN] Ending second send...")
print(f"[MAIN] Second send end with result '{r}'...")
print("[MAIN] Starting last send...")
r = c.send(None)
print("[MAIN] Ending last send...")
print(f"[MAIN] Last send end with result '{r}'...")

输出结果为:

[MAIN] Entering main...
[MAIN] Generating generator object...       
[MAIN] Starting first send...
[CONSUMER] Entering function...
[CONSUMER] Entering while loop...
[MAIN] Ending first send...
[MAIN] First send end with result ''...     
[MAIN] Starting second send...
[CONSUMER] Fetched n...
[CONSUMER] Consuming 2
[CONSUMER] Entering while loop...
[MAIN] Ending second send...
[MAIN] Second send end with result 'Consumed 2!'...
[MAIN] Starting last send...
[CONSUMER] Fetched n...
[CONSUMER] Before returning...
Traceback (most recent call last):
  File "coroutine.py", line 26, in <module> 
    r = c.send(None)
StopIteration

可以看到,在第一次执行 generator.send() 的时候,generator 会从头开始执行.

之后每次便从 yield 处开始执行,先完成赋值操作,再继续执行到下一次 yield 或者 return.

yield r 会把 r 作为 send() 函数的返回值返回给 MAIN 函数.

asyncio

An event loop runs in a thread (typically the main thread) and executes all callbacks and Tasks in its thread.

While a Task is running in the event loop, no other Tasks can run in the same thread.

When a Task executes an await expression, the running Task gets suspended, and the event loop executes the next Task.

asyncio的编程模型就是一个消息循环。

我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

import datetime
import asyncio


@asyncio.coroutine
def hello():
    print(f"Hello world! {datetime.datetime.now()}")
    # 异步调用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print(f"Hello again! {datetime.datetime.now()}")


# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
print(hello())
loop.run_until_complete(hello())
loop.close()
coroutine.py:6: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
  def hello():
<generator object hello at 0x7f05bea44cf0>
Hello world! 2021-09-06 11:20:07.045310   
Hello again! 2021-09-06 11:20:08.047006

yield from语法可以让我们方便地调用另一个generator.

async/await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

请注意,asyncawait是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  1. @asyncio.coroutine替换为async
  2. yield from替换为await
import datetime
import asyncio


async def hello():
    print(f"Hello world! {datetime.datetime.now()}")
    # 异步调用asyncio.sleep(1):
    r = await asyncio.sleep(1)
    print(f"Hello again! {datetime.datetime.now()}")


# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
Hello world! 2021-09-06 11:23:14.967038
Hello again! 2021-09-06 11:23:15.968643

Websocket

Server

# WS server example

import asyncio
import websockets

async def hello(websocket, path):
    name = await websocket.recv()
    print(f"< {name}")

    greeting = f"Hello {name}!"

    await websocket.send(greeting)
    print(f"> {greeting}")

start_server = websockets.serve(hello, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Client

# WS client example

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")

        await websocket.send(name)
        print(f"> {name}")

        greeting = await websocket.recv()
        print(f"< {greeting}")

asyncio.get_event_loop().run_until_complete(hello())

Example

我们要实现具有以下功能的 websocket 结构.

  • Server
    • 每 3s 向 Client 发一条消息.
    • 收到消息立刻 Prompt.
  • Client
    • 每 1s 向 Server 发一条消息.
    • 收到消息立刻 Prompt.
# Server.py
#!/usr/bin/env python

# WS server example
import datetime
import asyncio
import websockets


async def recv(websocket):
    while True:
        name = await websocket.recv()
        print(f"< {name}")


async def send(websocket):
    while True:
        # Send
        msg = f"!Server Send! {datetime.datetime.now()}"
        await websocket.send(msg)
        print(f"> {msg}")
        await asyncio.sleep(3)


async def socket(websocket, path):
    await asyncio.gather(recv(websocket), send(websocket))
        

start_server = websockets.serve(socket, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
# Client.py
import asyncio
import datetime
import websockets

async def recv(websocket):
    while True:
        name = await websocket.recv()
        print(f"< {name}")

async def send(websocket):
    while True:
        # Send
        msg = f"!Client Send! {datetime.datetime.now()}"
        await websocket.send(msg)
        print(f"> {msg}")
        await asyncio.sleep(1)

async def socket():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await asyncio.gather(recv(websocket), send(websocket))


async def sayhello():
    while True:
        print("INTER BREAK")
        await asyncio.sleep(0.2)

asyncio.get_event_loop().run_until_complete(asyncio.gather(socket(), sayhello()))

Reference