協程

協程是在 Tornado 中撰寫非同步程式碼的建議方式。協程使用 Python 的 await 關鍵字來暫停和恢復執行,而不是使用一連串的回呼函式(像 gevent 等框架中看到的協作輕量級執行緒有時也被稱為協程,但在 Tornado 中,所有協程都使用明確的上下文切換,並被稱為非同步函式)。

協程幾乎和同步程式碼一樣簡單,但沒有執行緒的開銷。它們還透過減少上下文切換可能發生的位置,使並發更容易理解

範例

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

原生與裝飾協程

Python 3.5 引入了 asyncawait 關鍵字(使用這些關鍵字的函式也稱為「原生協程」)。為了與舊版本的 Python 相容,您可以使用使用 tornado.gen.coroutine 裝飾器的「裝飾」或「基於 yield」的協程。

建議盡可能使用原生協程。僅在需要與舊版本的 Python 相容時才使用裝飾協程。Tornado 文件中的範例通常會使用原生形式。

兩種形式之間的轉換通常很簡單

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

兩種形式的協程之間的其他差異概述如下。

  • 原生協程

    • 通常速度更快。

    • 可以使用 async forasync with 語句,這使得某些模式更簡單。

    • 除非您 awaityield 它們,否則根本不會執行。裝飾協程可以在被呼叫後立即開始在「背景」中執行。請注意,對於這兩種協程,都必須使用 awaityield,以便任何例外狀況都有地方可以去。

  • 裝飾協程

    • concurrent.futures 套件有額外的整合,允許直接產生 executor.submit 的結果。對於原生協程,請改用 IOLoop.run_in_executor

    • 支援透過產生清單或字典來等待多個物件的一些簡寫。使用 tornado.gen.multi 在原生協程中執行此操作。

    • 可以支援與包括 Twisted 在內的其他套件整合,透過轉換函式的註冊表。若要在原生協程中存取此功能,請使用 tornado.gen.convert_yielded

    • 總是返回一個 Future 物件。原生協程返回一個不是 Future 的*可等待*物件。在 Tornado 中,這兩者在很大程度上是可以互換的。

運作方式

本節說明裝飾協程的操作。原生協程在概念上類似,但由於與 Python 執行時間的額外整合,因此稍微複雜一些。

包含 yield 的函式是一個 產生器。所有產生器都是非同步的;在被呼叫時,它們會傳回一個產生器物件,而不是執行到完成。@gen.coroutine 裝飾器透過 yield 運算式與產生器進行通訊,並透過傳回 Future 與協程的呼叫者進行通訊。

以下是協程裝飾器內部迴圈的簡化版本

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

裝飾器從產生器接收一個 Future,等待(不阻塞)該 Future 完成,然後「解包」Future,並將結果傳回產生器作為 yield 運算式的結果。大多數非同步程式碼永遠不會直接接觸 Future 類別,除非立即將非同步函式傳回的 Future 傳遞到 yield 運算式。

如何呼叫協程

協程不會以正常方式引發例外狀況:它們引發的任何例外狀況都會被捕獲在可等待物件中,直到它被產生。這表示以正確的方式呼叫協程很重要,否則您可能會發生未被注意到的錯誤

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在幾乎所有情況下,任何呼叫協程的函式都必須是協程本身,並在呼叫中使用 awaityield 關鍵字。當您覆寫超類別中定義的方法時,請查閱文件,以查看是否允許協程(文件應說明該方法「可能是協程」或「可能傳回 Future」)

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有時您可能想要「啟動並忘記」一個協程,而不等待其結果。在這種情況下,建議使用 IOLoop.spawn_callback,這會使 IOLoop 負責呼叫。如果失敗,IOLoop 將記錄堆疊追蹤

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

對於使用 @gen.coroutine 的函式,建議以這種方式使用 IOLoop.spawn_callback,但對於使用 async def 的函式,則*必須*使用(否則協程執行器不會啟動)。

最後,在程式的頂層,*如果 IOLoop 尚未執行,*您可以使用 IOLoop.run_sync 方法啟動 IOLoop、執行協程,然後停止 IOLoop。這通常用於啟動批次導向程式的 main 函式

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

協程模式

呼叫阻塞函式

從協程呼叫阻塞函式的最簡單方法是使用 IOLoop.run_in_executor,它會傳回與協程相容的 Futures

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

並行性

multi 函式接受值為 Futures 的清單和字典,並等待所有這些 Futures 並行執行

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在裝飾協程中,可以直接 yield 清單或字典

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

交錯

有時,儲存 Future 而不是立即產生它很有用,這樣您就可以在等待之前開始另一個操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = await fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        await self.flush()

使用裝飾協程執行此操作稍微容易一些,因為它們會在被呼叫時立即啟動

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

迴圈

在原生協程中,可以使用 async for。在舊版本的 Python 中,使用協程進行迴圈很棘手,因為沒有辦法在 forwhile 迴圈的每次迭代中 yield 並捕獲 yield 的結果。相反,您需要將迴圈條件與存取結果分開,如以下來自 Motor 的範例所示

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在背景中執行

作為 PeriodicCallback 的替代方案,一個協程可以包含一個 while True: 迴圈,並使用 tornado.gen.sleep

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有時可能需要更複雜的迴圈。例如,先前的迴圈每 60+N 秒執行一次,其中 Ndo_something() 的執行時間。若要精確地每 60 秒執行一次,請使用上述的交錯模式

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.