tornado.locks – 同步原語

4.2 版本新增。

使用與標準程式庫提供給執行緒的同步原語相似的同步原語來協調協程。這些類別與標準程式庫的 asyncio 套件中提供的類別非常相似。

警告

請注意,這些原語實際上並非執行緒安全,且不能取代標準程式庫 threading 模組中的原語 – 它們旨在協調單執行緒應用程式中的 Tornado 協程,而非保護多執行緒應用程式中的共享物件。

條件 (Condition)

class tornado.locks.Condition[原始碼]

條件允許一個或多個協程等待直到收到通知。

類似於標準 threading.Condition,但不需要取得和釋放的基礎鎖定。

使用 Condition,協程可以等待其他協程的通知

import asyncio
from tornado import gen
from tornado.locks import Condition

condition = Condition()

async def waiter():
    print("I'll wait right here")
    await condition.wait()
    print("I'm done waiting")

async def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

async def runner():
    # Wait for waiter() and notifier() in parallel
    await gen.multi([waiter(), notifier()])

asyncio.run(runner())
I'll wait right here
About to notify
Done notifying
I'm done waiting

wait 接受一個可選的 timeout 引數,它是一個絕對時間戳記

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)

…或是一個相對於目前時間的逾時時間的 datetime.timedelta

# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))

如果在截止時間前沒有收到通知,該方法將返回 False。

在 5.0 版本變更: 先前,等待者可以在 notify 中同步收到通知。現在,通知將始終在 IOLoop 的下一次迭代中收到。

wait(timeout: Optional[Union[float, timedelta]] = None) Awaitable[bool][原始碼]

等待 notify

返回一個 Future,如果條件收到通知,則解析為 True,如果逾時,則解析為 False

notify(n: int = 1) None[原始碼]

喚醒 n 個等待者。

notify_all() None[原始碼]

喚醒所有等待者。

事件 (Event)

class tornado.locks.Event[原始碼]

事件會封鎖協程,直到其內部旗標設為 True。

類似於 threading.Event

協程可以等待事件被設定。一旦設定,除非事件已被清除,否則呼叫 yield event.wait() 將不會封鎖

import asyncio
from tornado import gen
from tornado.locks import Event

event = Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Not waiting this time")
    await event.wait()
    print("Done")

async def setter():
    print("About to set the event")
    event.set()

async def runner():
    await gen.multi([waiter(), setter()])

asyncio.run(runner())
Waiting for event
About to set the event
Not waiting this time
Done
is_set() bool[原始碼]

如果內部旗標為 true,則返回 True

set() None[原始碼]

將內部旗標設定為 True。所有等待者都會被喚醒。

一旦旗標設定,呼叫 wait 將不會封鎖。

clear() None[原始碼]

將內部旗標重設為 False

呼叫 wait 將會封鎖,直到呼叫 set

wait(timeout: Optional[Union[float, timedelta]] = None) Awaitable[None][原始碼]

封鎖直到內部旗標為 true。

返回一個可等待物件,它會在逾時後引發 tornado.util.TimeoutError

號誌 (Semaphore)

class tornado.locks.Semaphore(value: int = 1)[原始碼]

一個鎖定,在封鎖前可以取得固定次數。

一個 Semaphore 管理一個計數器,該計數器代表 release 呼叫次數減去 acquire 呼叫次數,再加上一個初始值。如果計數器在沒有變成負數的情況下,acquire 方法會阻塞,直到可以返回為止。

Semaphore 限制對共享資源的存取。為了允許兩個工作人員同時存取:

import asyncio
from tornado import gen
from tornado.locks import Semaphore

sem = Semaphore(2)

async def worker(worker_id):
    await sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        await use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

async def runner():
    # Join all workers.
    await gen.multi([worker(i) for i in range(3)])

asyncio.run(runner())
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

工作人員 0 和 1 可以同時執行,但工作人員 2 會等待,直到 semaphore 被工作人員 0 釋放一次。

semaphore 可以用作非同步上下文管理器。

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

為了與舊版本的 Python 相容,acquire 是一個上下文管理器,因此 worker 也可以寫成:

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在 4.3 版本中變更:在 Python 3.5 中新增了 async with 支援。

release() None[原始碼]

遞增計數器並喚醒一個等待者。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager][原始碼]

遞減計數器。返回一個可等待的物件。

如果計數器為零,則會阻塞並等待 release。 在截止時間過後,此可等待的物件會引發 TimeoutError

BoundedSemaphore

class tornado.locks.BoundedSemaphore(value: int = 1)[原始碼]

一個防止 release() 被呼叫太多次的 semaphore。

如果 release 會使 semaphore 的值超過初始值,則會引發 ValueError。Semaphore 主要用於保護容量有限的資源,因此 semaphore 被釋放太多次是錯誤的跡象。

release() None[原始碼]

遞增計數器並喚醒一個等待者。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager]

遞減計數器。返回一個可等待的物件。

如果計數器為零,則會阻塞並等待 release。在截止時間過後,此可等待的物件會引發 TimeoutError

Lock

class tornado.locks.Lock[原始碼]

用於協程的鎖。

Lock 一開始是未鎖定的,並且 acquire 會立即將其鎖定。當它被鎖定時,一個產生 acquire 的協程會等待,直到另一個協程呼叫 release

釋放一個未鎖定的鎖會引發 RuntimeError

Lock 可以用作帶有 async with 陳述式的非同步上下文管理器。

>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

為了與舊版本的 Python 相容,acquire 方法會非同步返回一個常規的上下文管理器。

>>> async def f2():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在 4.3 版本中變更:在 Python 3.5 中新增了 async with 支援。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager][原始碼]

嘗試鎖定。返回一個可等待的物件。

返回一個可等待物件,它會在逾時後引發 tornado.util.TimeoutError

release() None[原始碼]

解除鎖定。

排隊等待 acquire 的第一個協程會取得鎖。

如果未鎖定,則會引發 RuntimeError