tornado.queues – 協程佇列

4.2 版本新增。

用於協程的非同步佇列。這些類別與標準函式庫中的 asyncio 套件中提供的非常相似。

警告

與標準函式庫的 queue 模組不同,此處定義的類別並「不是」執行緒安全的。若要從另一個執行緒使用這些佇列,請在使用任何佇列方法前,使用 IOLoop.add_callback 將控制權轉移至 IOLoop 執行緒。

類別

Queue

class tornado.queues.Queue(maxsize: int = 0)[原始碼]

協調生產者和消費者協程。

如果 maxsize 為 0(預設值),則佇列大小沒有上限。

import asyncio
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await asyncio.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

asyncio.run(main())
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在沒有原生協程的 Python 版本(3.5 之前),consumer() 可以寫成

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

在 4.3 版本變更: 在 Python 3.5 中新增 async for 支援。

property maxsize: int

佇列中允許的項目數量。

qsize() int[原始碼]

佇列中的項目數量。

put(item: _T, timeout: Optional[Union[float, timedelta]] = None) Future[None][原始碼]

將項目放入佇列,可能會等待直到有空間。

返回一個 Future,該 Future 在逾時後會引發 tornado.util.TimeoutError

timeout 可以是一個表示時間的數字(與 tornado.ioloop.IOLoop.time 的尺度相同,通常是 time.time),或是一個相對於目前時間的截止時間的 datetime.timedelta 物件。

put_nowait(item: _T) None[原始碼]

將項目放入佇列而不阻塞。

如果沒有可立即使用的空位,則引發 QueueFull

get(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_T][原始碼]

從佇列中移除並返回一個項目。

返回一個可等待的物件,一旦項目可用,則該物件會解析,或是在逾時後引發 tornado.util.TimeoutError

timeout 可以是一個表示時間的數字(與 tornado.ioloop.IOLoop.time 的尺度相同,通常是 time.time),或是一個相對於目前時間的截止時間的 datetime.timedelta 物件。

注意

此方法的 timeout 引數與標準函式庫的 queue.Queue.get 不同。該方法將數值解譯為相對逾時;此方法將其解譯為絕對截止時間,並要求使用 timedelta 物件進行相對逾時(與 Tornado 中的其他逾時一致)。

get_nowait() _T[原始碼]

從佇列中移除並返回一個項目,而不阻塞。

如果立即有項目可用,則返回該項目,否則引發 QueueEmpty

task_done() None[原始碼]

表示先前已加入佇列的任務已完成。

由佇列消費者使用。對於每個用於提取任務的 get,後續呼叫 task_done 會告知佇列該任務的處理已完成。

如果 join 正在阻塞,則當所有項目都已處理時(也就是說,當每個 put 都與一個 task_done 配對時)它會恢復。

如果呼叫次數超過 put 的次數,則會引發 ValueError 錯誤。

join(timeout: Optional[Union[float, timedelta]] = None) Awaitable[None][原始碼]

阻塞直到佇列中的所有項目都被處理完畢。

返回一個可等待的對象,該對象在超時後會引發 tornado.util.TimeoutError

PriorityQueue

class tornado.queues.PriorityQueue(maxsize: int = 0)[原始碼]

一個 Queue,它會按照優先順序提取條目,優先順序最低的先提取。

條目通常是像 (優先順序 數字, 資料) 這樣的元組。

import asyncio
from tornado.queues import PriorityQueue

async def main():
    q = PriorityQueue()
    q.put((1, 'medium-priority item'))
    q.put((0, 'high-priority item'))
    q.put((10, 'low-priority item'))

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize: int = 0)[原始碼]

一個 Queue,它會先提取最近放入的項目。

import asyncio
from tornado.queues import LifoQueue

async def main():
    q = LifoQueue()
    q.put(3)
    q.put(2)
    q.put(1)

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
1
2
3

例外

QueueEmpty

exception tornado.queues.QueueEmpty[原始碼]

當佇列沒有項目時,由 Queue.get_nowait 引發。

QueueFull

exception tornado.queues.QueueFull[原始碼]

當佇列達到其最大大小時,由 Queue.put_nowait 引發。