協程¶
協程是在 Tornado 中撰寫非同步程式碼的建議方式。協程使用 Python 的 await
關鍵字來暫停和恢復執行,而不是使用一連串的回呼函式(像 gevent 等框架中看到的協作輕量級執行緒有時也被稱為協程,但在 Tornado 中,所有協程都使用明確的上下文切換,並被稱為非同步函式)。
協程幾乎和同步程式碼一樣簡單,但沒有執行緒的開銷。它們還透過減少上下文切換可能發生的位置,使並發更容易理解。
範例
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
原生與裝飾協程¶
Python 3.5 引入了 async
和 await
關鍵字(使用這些關鍵字的函式也稱為「原生協程」)。為了與舊版本的 Python 相容,您可以使用使用 tornado.gen.coroutine
裝飾器的「裝飾」或「基於 yield」的協程。
建議盡可能使用原生協程。僅在需要與舊版本的 Python 相容時才使用裝飾協程。Tornado 文件中的範例通常會使用原生形式。
兩種形式之間的轉換通常很簡單
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
兩種形式的協程之間的其他差異概述如下。
原生協程
通常速度更快。
可以使用
async for
和async with
語句,這使得某些模式更簡單。除非您
await
或yield
它們,否則根本不會執行。裝飾協程可以在被呼叫後立即開始在「背景」中執行。請注意,對於這兩種協程,都必須使用await
或yield
,以便任何例外狀況都有地方可以去。
裝飾協程
與
concurrent.futures
套件有額外的整合,允許直接產生executor.submit
的結果。對於原生協程,請改用IOLoop.run_in_executor
。支援透過產生清單或字典來等待多個物件的一些簡寫。使用
tornado.gen.multi
在原生協程中執行此操作。可以支援與包括 Twisted 在內的其他套件整合,透過轉換函式的註冊表。若要在原生協程中存取此功能,請使用
tornado.gen.convert_yielded
。總是返回一個
Future
物件。原生協程返回一個不是Future
的*可等待*物件。在 Tornado 中,這兩者在很大程度上是可以互換的。
運作方式¶
本節說明裝飾協程的操作。原生協程在概念上類似,但由於與 Python 執行時間的額外整合,因此稍微複雜一些。
包含 yield
的函式是一個 產生器。所有產生器都是非同步的;在被呼叫時,它們會傳回一個產生器物件,而不是執行到完成。@gen.coroutine
裝飾器透過 yield
運算式與產生器進行通訊,並透過傳回 Future
與協程的呼叫者進行通訊。
以下是協程裝飾器內部迴圈的簡化版本
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
裝飾器從產生器接收一個 Future
,等待(不阻塞)該 Future
完成,然後「解包」Future
,並將結果傳回產生器作為 yield
運算式的結果。大多數非同步程式碼永遠不會直接接觸 Future
類別,除非立即將非同步函式傳回的 Future
傳遞到 yield
運算式。
如何呼叫協程¶
協程不會以正常方式引發例外狀況:它們引發的任何例外狀況都會被捕獲在可等待物件中,直到它被產生。這表示以正確的方式呼叫協程很重要,否則您可能會發生未被注意到的錯誤
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
在幾乎所有情況下,任何呼叫協程的函式都必須是協程本身,並在呼叫中使用 await
或 yield
關鍵字。當您覆寫超類別中定義的方法時,請查閱文件,以查看是否允許協程(文件應說明該方法「可能是協程」或「可能傳回 Future
」)
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
有時您可能想要「啟動並忘記」一個協程,而不等待其結果。在這種情況下,建議使用 IOLoop.spawn_callback
,這會使 IOLoop
負責呼叫。如果失敗,IOLoop
將記錄堆疊追蹤
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
對於使用 @gen.coroutine
的函式,建議以這種方式使用 IOLoop.spawn_callback
,但對於使用 async def
的函式,則*必須*使用(否則協程執行器不會啟動)。
最後,在程式的頂層,*如果 IOLoop 尚未執行,*您可以使用 IOLoop.run_sync
方法啟動 IOLoop
、執行協程,然後停止 IOLoop
。這通常用於啟動批次導向程式的 main
函式
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
協程模式¶
呼叫阻塞函式¶
從協程呼叫阻塞函式的最簡單方法是使用 IOLoop.run_in_executor
,它會傳回與協程相容的 Futures
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
並行性¶
multi
函式接受值為 Futures
的清單和字典,並等待所有這些 Futures
並行執行
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
在裝飾協程中,可以直接 yield
清單或字典
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
交錯¶
有時,儲存 Future
而不是立即產生它很有用,這樣您就可以在等待之前開始另一個操作。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = await fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
await self.flush()
使用裝飾協程執行此操作稍微容易一些,因為它們會在被呼叫時立即啟動
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
迴圈¶
在原生協程中,可以使用 async for
。在舊版本的 Python 中,使用協程進行迴圈很棘手,因為沒有辦法在 for
或 while
迴圈的每次迭代中 yield
並捕獲 yield 的結果。相反,您需要將迴圈條件與存取結果分開,如以下來自 Motor 的範例所示
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
在背景中執行¶
作為 PeriodicCallback
的替代方案,一個協程可以包含一個 while True:
迴圈,並使用 tornado.gen.sleep
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
有時可能需要更複雜的迴圈。例如,先前的迴圈每 60+N
秒執行一次,其中 N
是 do_something()
的執行時間。若要精確地每 60 秒執行一次,請使用上述的交錯模式
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.