Background

我們繼續對之前提到Python一系列的教學做重點整理。文章連結在此

Setting Up Your Environment

$ mdkir async
$ virtualenv .venv
$ source .venv/bin/activate
$ pip install --upgrade pip aiohttp aiofiles

The 10,000-Foot View of Async IO

Async IO 相對於 multiprocessing 和 threading 大家討論得比較少,在這個段落中將為您提供更全面的了解異步IO。

Where Does Async IO Fit In?

Concurrency 和 parallelism 是不容易涉足的擴展主題。管本文重點介紹 Async IO 及其在Python中的實現,但值得花一點時間將 Async IO 與同類產品進行比較,以了解 Async IO 應用在大型應用中是件多痛苦的事情。

Parallelism 包括同時執行多個操作。multiprocessing 是一種實現並行性的方法,它需要將任務分散到計算機的中央處理單元(CPU或內核)上。multiprocessing 非常適合CPU bound 的任務。

Concurrency 是一個比 Parallelism 更寬泛的術語。這表明多個任務具有以重疊方式運行的能力。(Concurrency 並不是 Parallelism。)

Threading 是一個 Concurrency 執行模型,多個 thread 輪流執行任務。一個 process 可以包含多個thread。由於其GIL,Python 與 thread 之間的關係十分複雜,但這超出了本文的範圍。重要的是要知道 Threading 適合處理 IO bound 的任務。

綜上所述,Concurrency 包括 multiprocessing(理想的是CPU bound 任務)和 threading(適用於IO bound 任務)。 multiprocessing 是 Parallelism 的一種形式,Parallelism 是 Concurrency 的特定類型(子集)。 Python標準庫通過其 multiprocessing,Threading 和 parallel.futures 包為這兩者提供了長期支持。

Python文檔將 asyncio 軟件包記為一個用於編寫 Concurrency 代碼的庫。但是,Async IO 不是 threading,也不是 multiprocessing。它不是建立在這兩個之上的。

實際上,異步IO是一種 single thread,single process 設計:它使用 cooperative multitasking,這個術語在稍後我們會提到。換句話說,儘管在 single process 中使用 single thread ,asyncio 仍具有 Concurrency 的感覺。Coroutines(asyncio 的主要功能)可以同時進行調度,但是它們並不是天生並發的。

重申一下,asyncio 是 Concurrency 編程的一種樣式,但它不是 Parallelism。與 multiprocessing 相比,它與 threading 的關係更加緊密,但兩者卻截然不同。

asynchronous 是什麼意思?這不是一個嚴格的定義,但是出於我們此處的目的,我可以想到兩個屬性:

  • Asynchronous routines 可以在等待其最終結果的同時"暫停"並讓其他 routines 同時運行。

  • 通過上述機制,Asynchronous code 有助於並發執行。換句話說,異步代碼給與了類似 concurrency 的感覺。

將所有內容集合成一個圖片會類似下圖一樣,白色術語代表概念,綠色術語代表實現或實現它們的方式:

Screen-Shot-2019-09-25-at-5.40.39-PM

Async IO Explained

Async IO 乍看起來似乎違反直覺和自相矛盾。什麼東西可以透過使用單個線程和單個CPU內核來實現concurrent 呢? 我從來都不擅長製作示例,因此我想解釋一下 Miguel Grinberg 在2017年PyCon 演講中的一個段落,它很好地解釋了所有內容:

Chess master Judit Polgár hosts a chess exhibition in which she plays multiple amateur players. She has two ways of conducting the exhibition: synchronously and asynchronously.
Assumptions:

  • 24 opponents
  • Judit makes each chess move in 5 seconds
  • Opponents each take 55 seconds to make a move
  • Games average 30 pair-moves (60 moves total)

Synchronous version: Judit plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.
Asynchronous version: Judit moves from table to table, making one move at each table. She leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour.

JuditPolgár 一個人,只有兩隻手,一次只能執行一個動做。但是 playing asynchronously 可以將展覽時間從12小時減少到1小時。因此,cooperative multitasking 是一種奇特的方式,可以說程序的事件循環(稍後會詳細介紹)與多個任務進行通信,以使每個任務在最佳時間輪流運行。

Async IO 需要較長的等待時間,否則功能將被阻塞,並允許其他功能在停機期間運行。 (一個有效阻止的功能從開始到返回為止一直禁止其他人運行。)

Async IO Is Not Easy

我曾經聽說:"盡可能使用 Async IO,沒得選的時候再用 threading",事實是,構建持久的多線程代碼可能很困難且容易出錯。Async IO避免了線程設計可能會遇到的某些潛在問題。

但這並不是說 Python 中的 Async IO很簡單。請注意,異步編程也可能會很困難! Python的異步模型是基於諸如 callbacks,events,transports,protocols 和 futures 之類的概念構建的。術語可能令人生畏,其 API 不斷變化使其變得不那麼容易。

幸運的是,asyncio 已經發展到某種程度,它的大多數功能不再是臨時的,而其相關文件已得到了巨大的改進,與此相關的一些優質資源也開始出現。

The asyncio Package and async/await

現在,您已經對 asyncio 有了一定的了解。Python 的 asyncio 程序包(在Python 3.4中引入)及其兩個關鍵字 async 和 await 具有不同的用途,但可以一起幫助您聲明,構建,執行和管理asynchronous 代碼。

The async/await Syntax and Native Coroutines

A Word of Caution: Be careful what you read out there on the Internet. Python’s async IO API has evolved rapidly from Python 3.4 to Python 3.7. Some old patterns are no longer used, and some things that were at first disallowed are now allowed through new introductions. For all I know, this tutorial will join the club of the outdated soon too.

async IO的核心是 coroutines。coroutine 是 Python 生成器函數的專用版本。讓我們從基本定義開始,然後再逐步完善: coroutines 是一種可以在到達返回值之前暫停其執行的函數,同時可以將控制權間接傳遞給另一個 coroutines 一段時間。

稍後,您將深入研究如何將傳統的生成器準確地用於 coroutine。目前,了解 coroutine 工作方式的最簡單方法是開始製作 coroutine。

讓我們開始編寫一些 async IO的代碼:

countasync.py

import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time
    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")
$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.

此輸出的順序是 async IO 的核心。對 count() 的每個調用進行交談都是一個單一事件循環或協調器(coordinator)。當每個任務到達等待 asyncio.sleep(1)時,該函數都會對事件循環大喊: "我要睡眠1秒鐘。繼續,讓其他有意義的事情同時進行。",並把控制權交回。

將此與同步版本進行對比:

#!/usr/bin/env python3
# countsync.py

import time

def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    for _ in range(3):
        count()

if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

當執行時,順序和執行時間會有微小但關鍵的變化:

$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.

雖然使用 time.sleep()和 asyncio.sleep()似乎是平庸的,但我們還是常用它們來當作耗時的進程的替身。也就是說 time.sleep()可以表示呼叫耗時的阻塞函數, asyncio.sleep() 則可以用來代替進行非阻塞的呼叫 (但也需要一些時間才能完成)。

正如您將在下一節中看到的那樣,等待某些內容(包括asyncio.sleep())的好處是可以暫時將控制權讓給另一個更容易立即執行的某項功能。相反,time.sleep()或任何其他阻塞調用與異步 Python 代碼不兼容,它會在睡眠時間內停止所有內容。

The Rules of Async IO

此時,如果對 async 以及 await 有更多的形式定義,會幫助我們在建立 coroutine functions 時更有幫助。

The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid, and you’ll see them later on.

  • async def 的語法 引入了原生協程(native coroutine)或異步生成器(asynchronous generator)。async withasync for 的表達式也有效,稍後您將看到它們。

  • 關鍵字 await 將功能控制權回傳給事件循環(event loop)。(它將暫停周圍 coroutine 的執行) 如果 Python 在 g() 函式中遇到了 await f() 這行程式,這就是 await 如何告知事件循環(event loop),"暫停執行 g() 直到 f() 執行完並返回,在這段期間內去執行其他該執行的程序"。

上面講的程序看起來會像這樣:

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

關於何時以及如何以及不能使用 async/await 有一套嚴格的規則。無論你是否有使用過 async/await,這套方法都適合謹記在心:

  • 您使用 async def 宣告的 function 是 coroutine。它可以使用 await,return 或yield,但是所有這些都是可選的。

    • 使用 await and/or return將創建協程函數(coroutine functions)。要使用協程函數(coroutine functions),必須使用 await 來獲得結果。

    • 在 aysnc def 區塊中使用 yield 的情況不太普遍。這將創建一個異步生成器(async generator),你可以使用 async for 來對異步生成器(async generator)進行迭代。暫時不要使用異步生成器(async generator),而將重點放在獲取協程函數(coroutine functions)的語法上,協程函數則透過await and/or return

    • 用 async def 定義的任何內容都可能不使用yield from,這將引發SyntaxError。

  • 就像在def函數之外使用 yield 時會有 SyntaxError 一樣,在 async def協程之外使用 await也是 SyntaxError。您只能在協程函數(coroutine functions)內使用 await。

以下是一些總結上面幾條規則的範例:

async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y 

最後,當您使用 await f()時,需要 f()是可等待的對象。現在,只知道一個可等待的對像不是(1)另一個 coroutine 就是(2)an object defining an .__await__() dunder method that returns an iterator.

好吧,那不是很有幫助,是嗎?現在,只知道一個可等待的對像是(1)另一個協程或(2)定義返回迭代器的.__ await __()dunder方法的對象。如果您正在編寫程序,通常您只需要擔心案例1。

將函數標記為 coroutine 的一種較舊的方法是用 @asyncio.coroutin e裝飾一個普通的函數。結果是 generator-based coroutine。自從Python 3.5中引入 async / await 語法以來,這種構造已經過時了。

這兩個 coroutine 在本質上是等效的(兩者都是 awaitable),但是第一個協程是 generator-based 的,而第二個是 native coroutine:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine, older syntax"""
    yield from stuff()

async def py35_coro():
    """Native coroutine, modern syntax"""
    await stuff()

如果您要自己編寫任何代碼,則最好使用 native coroutines ,Generator-based coroutines 將在Python 3.10中移除。

事不宜遲,讓我們舉一些更多的例子。

這是一個異步IO如何減少等待時間的示例:給定一個協程makerandom(),該協程不斷產生範圍為[0,10]的隨機整數,直到其中一個超過閾值為止,您要讓該協程多次調用不需要等待彼此相繼完成。您可以在很大程度上遵循上述兩個腳本的模式,並稍作更改:

rand.py

import asyncio
import random

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res

if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

輸出會像下圖這個gif檔一樣:

asyncio-rand.dffdd83b4256

該程序使用一個主協程(coroutine)-> makerandom(),並在3個不同的輸入上同時運行它。大多數程序將包含小型模塊化 coroutines 和一個 wrapper function,用於將每個較小的 coroutines 鏈接在一起。 main() is then used to gather tasks (futures) by mapping the central coroutine across some iterable or pool.

在此微型示例中,pool的大小為 range(3)。在稍後提供的更完整的示例中,它是一組URL的集合,這些URL之後會用來同時發出請求,解析和處理,而 main()為每個URL封裝了整個例程。

儘管 "makerandom"可能不是 asyncio 的最佳選擇,但在範例中正是 asyncio.sleep()的存在來模仿 IO Bound 的過程等待時間不確定的地方。例如在呼叫 asyncio.sleep()可能表示在消息應用程序的兩個客戶端之間發送和接收不太隨機的整數。

Async IO Design Patterns

Async IO 附帶了自己的腳本設計,本節將介紹它們。

Chaining Coroutines

協程(coroutines)的一個關鍵特徵是它們可以鏈接在一起。 (請記住,一個協程對像是可以等待的,因此另一個協程可以等待它。)這使您可以將程序分解為較小的,可管理的,可回收的協程:

chained.py

import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
    import sys
    random.seed(444)
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end:0.2f} seconds.")

注意輸出,part1()睡眠一段隨機的時間,part2()在它們可用時來用 part1 的結果進行後續的動作:

$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.

Using a Queue

asyncio 軟件包提供的 queue classes 與 classes of the queue module 相似。到目前為止,我們實際上並不需要 queue module。在 chained.py中,每個任務(未來)都由一組 coroutines 組成,這些 coroutines 明確地在鏈中相互等待並且傳遞資訊。

還有一種可以與 async IO一起使用的替代結構:許多彼此不相關的生產者(producers)將項目添加到 Queue 中。每個生產者(producers)可以在交錯,隨機,未通知的時間將多個項目添加到 Queue 中。一群消費者(consumers)在將它們從 Queue 中拉出,而不必等待任何其他信號。

在這種設計中,沒有任何個人消費者(consumers)鏈接到生產者(producers)。消費者(consumers)不知道生產者(producers)的數量,甚至不知道將要添加到 Queue 中的項目的累計數量。

每個生產者(producers)或消費者(consumers)可隨時分別從 Queue 中放入和提取項目。Queue 充當可以與生產者(producers)和消費者(consumers)進行通信的中繼站,而無需他們直接相互交談。

Note: While queues are often used in threaded programs because of the thread-safety of queue.Queue(), you shouldn’t need to concern yourself with thread safety when it comes to async IO. (The exception is when you’re combining the two, but that isn’t done in this tutorial.)

One use-case for queues (as is the case here) is for the queue to act as a transmitter for producers and consumers that aren’t otherwise directly chained or associated with each other.

該程序的同步版本看起來非常令人沮喪:一組阻塞的生產者(producers)將項目串行添加到 Queue 中,一次一個。只有在所有生產者(producers)都完成之後,才能由一個消費者(consumers)逐項處理。此設計存在大量延遲。項目可能閒置地排在 Queue 中,而不是立即處理。

下面的 asyncq.py。該工作流程中具有挑戰性的部分是,需要向消費者(consumers)發出生產已經完成的信號。否則,await q.get()會無限期掛起,因為 Queue 已被完全處理,但是消費者(consumers)不會知道生產已經完成。

asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":
    import argparse
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

前幾個協程是輔助函數,它們返回一個隨機字串,一個計數器和一個隨機整數。生產者將1到5個項目放入 Queue 中。每個項目都是(i,t)的元組,其中i是隨機字符串,t是生產者(producers)嘗試將元組放入隊列的時間。

消費者(consumers)將項目取出時,它僅使用放入商品的時間戳來計算商品在隊列中的處理時間。

請記住,asyncio.sleep()用於模仿其他一些更複雜的協程(coroutine),如果這是常規的阻止函數(blocking function),則會消耗時間並阻止所有其他執行。

這是由兩個生產者和五個消費者進行的測試:

$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added <377b1e8f82> to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added <413b8802f8> to queue.
Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element <413b8802f8> in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <06c055b3ab> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added <17a8613276> to queue.
Consumer 4 got element <17a8613276> in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.

在這種情況下,項目將在幾分之一秒內完成處理。延遲可能有兩個原因:

  • 不可避免的開銷
  • 當項目出現在 Queue 中時所有消費者都在睡覺的情況

關於第二個原因,幸運的是,擴展到成百上千的消費者是完全正常的。 python3 asyncq.py -p 5 -c 100 應該沒有問題。從理論上講,您可以在不同的系統上使用不同的用戶來控制生產者和消費者的管理,而 Queue 則作為中心。

到目前為止,您已經陷入困境,並看到了三個利用 asyncio 呼叫協程用的範例。如果您不完全遵循或只是想更深入地了解現代協程(coroutines)在Python中的使用方式,那麼您將從第一節開始下一節。

Async IO’s Roots in Generators

先前,您看到了一個基於生成器(generator-based)的老式協程(coroutines)的範例,這些協程(coroutines)已經過時了,但該範例值得稍作調整以重新展示:

import asyncio

@asyncio.coroutine
def py34_coro():
    """Generator-based coroutine"""
    # No need to build these yourself, but be aware of what they are
    s = yield from stuff()
    return s

async def py35_coro():
    """Native coroutine, modern syntax"""
    s = await stuff()
    return s

async def stuff():
    return 0x10, 0x20, 0x30

作為實驗,如果不經過 await 或不呼叫任何 asyncio.run() 或是其他 asyncio “porcelain” 函數時會發生什麼事呢?孤立地呼叫協程(coroutine)將返回協程對象:

>>> py35_coro()
<coroutine object py35_coro at 0x10126dcc8>

表面上這不是很有趣。單獨調用協程的結果是一個等待的協程對象。

Time for a quiz: what other feature of Python looks like this? (What feature of Python doesn’t actually “do much” when it’s called on its own?)

答案是生成器(generator),因為協程(coroutine)是增強型生成器(generator)。在這方面,行為是相似的:

>>> def gen():
...     yield 0x10, 0x20, 0x30
...
>>> g = gen()
>>> g  # Nothing much happens - need to iterate with `.__next__()`
<generator object gen at 0x1012705e8>
>>> next(g)
(16, 32, 48)

生成器函數(Generator functions)是 async IO 的基礎(無論您是否使用async def聲明協程,而不是使用舊的 @asyncio.coroutine 包裝器聲明)。從技術上講,await 比 yield 更類似於 yield from。

與 async IO 相關的生成器的一項關鍵功能是可以有效地隨意停止和重新啟動它們。例如,您可以中斷對生成器對象的迭代,然後之後可以恢復迭代。當生成器函數運行到 yield 時,它會生成該值,但隨後會處於空閒狀態,直到被告知要生成其後續值。

可以通過下一個例子充實一下:

>>> from itertools import cycle
>>> def endless():
...     """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
...     yield from cycle((9, 8, 7, 6))

>>> e = endless()
>>> total = 0
>>> for i in e:
...     if total < 30:
...         print(i, end=" ")
...         total += i
...     else:
...         print()
...         # Pause execution. We can resume later.
...         break
9 8 7 6 9 8 7 6 9 8 7 6 9 8

>>> # Resume
>>> next(e), next(e), next(e)
(6, 9, 8)

關鍵字 await 的行為類似,在協程(coroutine)中有個中斷點讓其可順利暫停並讓其他協程(coroutine)能工作。"暫停"是指已暫時放棄控制權但尚未完全退出或結束的協程(coroutines)。
請記住,yield,yield from and await,是生成器執行過程中的一個斷點。

這是函數和生成器之間的根本區別。函數就是全有還是全無。一旦函數啟,並且不再前進。動,它不會停止直到它遇到 return。另一方面,生成器每次遇到 yield 時都會暫停,並且不再前進。它不僅可以將回傳值推送到 Stack,而且還可以通過 next()來保留其局部變量。

生成器的第二個鮮為人知的功能也很重要。您也可以通過其 .send()方法將值發送到生成器中。這允許生成器(和協程)去呼叫(await)而不會阻塞。我不會再贅述此功能了,因為它主要對幕後協程的實現很重要,但是您根本不需要自己直接使用它。

為了將事情聯繫在一起,以下是協程(coroutine)作為生成器的一些關鍵點:

  • 協程是經過重新利用(repurposed generators)的生成器,可以利用生成器方法的特殊性。
  • 舊的基於生成器的協程使用 yield from 等待協程結果。現代Python語法的原生協程中是利用await 來取代 yield from。
  • 使用 await 是標記斷點的信號。它允許協程(coroutine)暫時中止執行,並允許程序稍後返回。

Other Features: async for and Async Generators + Comprehensions

除了 async / await 之外,Python 還提供了 async for 讓我們可以進行 asynchronous 的迭代(asynchronous iterator)。

與普通 的async / await 一起,Python 還使 async 用於在異步迭代器上進行迭代。異步迭代器的目的是使它能夠在迭代時在每個階段調用異步代碼。

這個概念的自然擴展就是異步生成器(asynchronous generator)。回想一下,您可以在本地協程中使用 await,return 或yield。在 Python 3.6(PEP 525)中,可以在協程中使用 yield,它引入了異步生成器)(asynchronous generator),目的是允許在同一個協程函數中使用 await 和 yield:

>>> async def mygen(u: int = 10):
...     """Yield powers of 2."""
...     i = 0
...     while i < u:
...         yield 2 ** i
...         i += 1
...         await asyncio.sleep(0.1)

最後但並非最不重要的一點是,Python使用async for啟用了asynchronous comprehension

>>> async def main():
...     # This does *not* introduce concurrent execution
...     # It is meant to show syntax only
...     g = [i async for i in mygen()]
...     f = [j async for j in mygen() if not (j // 3 % 5)]
...     return g, f
...
>>> g, f = asyncio.run(main())
>>> g
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
>>> f
[1, 2, 16, 32, 256, 512]

這是一個至關重要的區別:neither asynchronous generators nor comprehensions make the iteration concurrent.。他們所做的只是提供同步對象的皮,但使得在迴圈中可以放棄對事件循環的控制權以便其他協程運行的能力。

換句話說,異步迭代器(asynchronous iterators)和異步生成器(asynchronous generators )並不是被設計成使得某個函數可以變成序列化或是變成可迭代的形式。它們只是為了讓封閉的協程允許其他任務輪流使用。僅在使用 plain for 或 with 會"破壞"協程(coroutine)中 await的性質的情況下,才需要async for 和a sync with 語句。

The Event Loop and asyncio.run()

您可以將事件循環視為類似於無限循環,該循環監視協程,持續尋找當進入到空閒狀態時可以接著執行的事情。這個循環同時也有能力喚醒一個空閒的協程,當這個協程從等待的狀態變成可用的狀態。

到目前為止,整個事件循環的管理是由一個函數來處理:

$ asyncio.run(main())

Python 3.7中引入的 asyncio.run()負責獲取事件循環,運行任務直到將其標記為完成,然後關閉事件循環。

使用get_event_loop(),可以更輕鬆地管理異步事件循環。典型的模式如下所示:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

在較早的示例中,您可能會看到 loop.get_event_loop()隨處可見,但是除非您特別需要對事件循環管理的控制做微調,否則 asyncio.run()對於大多數程序而言就足夠了。

如果您確實需要與 Python 程序中的事件循環進行互動,則循環是一種老式的Python對象(但足夠好了),它支持使用loop.is_running()和loop.is_closed()進行 introspection。(像通過將循環作為參數傳遞來安排回調(scheduling a callback)之類的事情)

以下是有關事件循環的一些要點:

1.協程在依賴於事件循環之前不會做很多事情。

之前在有關生產者(generator)的說明中已經看到了這一點,但是值得重看一遍。如果您有一個等待他人的主要協程(main coroutine),則簡單地單獨調用它幾乎沒有效果:

>>> import asyncio

>>> async def main():
...     print("Hello ...")
...     await asyncio.sleep(1)
...     print("World!")

>>> routine = main()
>>> routine
<coroutine object main at 0x1027a6150>

請記住使用asyncio.run()強制通過調度來執行 main()協程(未來對象)在事件循環上的任務:

>>> asyncio.run(routine)
Hello ...
World!

(其他協程可以使用 await 執行。通常將main()包裝在asyncio.run()中,從那裡呼叫帶有await的鍊式協程。)

2.默認情況下,異步IO事件循環在單個線程和單個CPU內核上運行。通常,在一個CPU內核中運行一個單線程事件循環綽綽有餘。還可以跨多個內核運行事件循環。

3.事件循環是可插入的。也就是說,如果您確實需要,可以編寫自己的事件循環實現,並使它運行相同的任務。這在 uvloop 軟件包中得到了很好的演示,該軟件包是透過 Cython 來實現。

這就是術語"pluggable event loop"的含義:您可以使用事件循環來完成任何可行的實現,而與協程本身的結構無關。 asyncio程序包本身帶有兩種不同的事件循環實現,默認實現基於選擇器模塊。 (第二種實現僅適用於Windows。)

A Full Program: Asynchronous Requests

在本部分中,您將使用aiohttp(一種非常快的異步HTTP客戶端/服務器框架)來構建可抓取網址的URL收集器 areq.py。這樣的工具可用於映射站點集群之間的連接,鏈接形成有向圖。

Note: You may be wondering why Python’s requests package isn’t compatible with async IO. requests is built on top of urllib3, which in turn uses Python’s http and socket modules.

By default, socket operations are blocking. This means that Python won’t like await requests.get(url) because .get() is not awaitable. In contrast, almost everything in aiohttp is an awaitable coroutine, such as session.request() and response.text(). It’s a great package otherwise, but you’re doing yourself a disservice by using requests in asynchronous code.

程式結構如下所示:

  1. 從本地文件 urls.txt 中讀取URL序列。

  2. 發送對URL的GET請求並解碼結果內容。如果失敗,請在此處停止輸入URL。

  3. 在響應的HTML中的 href tags 內搜索URL。

  4. 將結果寫入 foundurls.txt

  5. 盡可能利用異步(asynchronously)和並發(concurrently)執行上述所有操作。(將aiohttp用於請求,aiofiles用於文件處理。這是兩個非常適合異步IO模型的主要範例。)

這是urls.txt的內容:

$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

列表中的第二個URL應該返回404響應,您需要對其進行適當處理。如果您正在運行此程序的擴展版本,則可能需要處理比這更棘手的問題,例如服務器斷開連接和無休止的重新導向。

請求本身應使用單個會話(session)發出,以充分利用會話(session)的內部連接池。

讓我們看一下完整的程序。我們將逐步介紹以下內容:

areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

HREF_RE是一個正則表達式,用於提取我們最終在HTML中搜索的href標籤:

>>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
<re.Match object; span=(15, 45), match='href="https://realpython.com/"'>

協程 fetch_html() 是GET請求的包裝,用於發出請求並解碼生成的頁面HTML。它發出請求,等待響應,並在非200狀態下立即引發:

resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()

如果狀態正常,則 fetch_html() 返回頁面HTML(字串型態

html = await resp.text()

我們正在等待session.request()resp.text(),因為它們是等待的協程。否則,請求/響應週期將是應用程序的長尾,耗時的部分,但是對於異步IO,fetch_html()可使事件循環在其他易於使用的作業(例如,解析和寫入已獲取的URL)上工作。

協同程序鏈中的下一個是 parse(),它在 fetch_html() 中等待給定的URL,然後從該頁面的HTML中提取所有href標記,確保每個標記均有效並將其格式化為絕對路徑。

誠然,parse()的第二部分是阻塞的,但是它由快速的正則表達式匹配組成,並確保將發現的連結設置為絕對路徑。

在這種特定情況下,此同步代碼應該快速而不起眼。但是,請記住,給定協程中的任何行都會阻塞其他協程,除非該行使用yield,await或return。如果解析是一個比較繁瑣的過程,則可能需要考慮使用loop.run_in_executor() 在其自己的程序中運行此部分。

接下來,協程 write()接收一個文件對象和一個URL,然後等待 parse()返回一組已解析的URL,通過使用 aiofiles 將每個URL異步寫入文件中。

最後,bulk_crawl_and_write()作為腳本協程鏈的主要入口。它使用一個會話(session),並為每個最終從 urls.txt 中讀取的URL創建一個任務。

以下是一些值得一提的其他要點:

  • 默認的 ClientSession 具有一個最多具有100個連線的調配器(adapter)。要更改此設置,請將asyncio.connector.TCPConnector 的實例傳遞給 ClientSession。您還可以基於每個主機指定限制。

  • 您可以為整個會話和單個請求指定最大超時限制。

  • 該腳本還使用 async with,它與異步上下文管理器(asynchronous context manager)一起使用。我沒有專門討論這個概念,因為從同步上下文管理器到異步上下文管理器的過渡非常簡單。後者必須定義.__ aenter __().__ aexit __()而不是.__ exit __().__ enter __()。如您所料,async with只能在用 async def 聲明的協程函數中使用。

這是所有執行過程的結果,areq.py可以在一秒鐘之內獲取,解析並保存9個網址的結果:

$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/

Next Steps: If you’d like to up the ante, make this webcrawler recursive. You can use aio-redis to keep track of which URLs have been crawled within the tree to avoid requesting them twice, and connect links with Python’s networkx library.

Remember to be nice. Sending 1000 concurrent requests to a small, unsuspecting website is bad, bad, bad. There are ways to limit how many concurrent requests you’re making in one batch, such as in using the sempahore objects of asyncio or using a pattern like this one. If you don’t heed this warning, you may get a massive batch of TimeoutError exceptions and only end up hurting your own program.

Async IO in Context

在經過這麼多的討論過後,那麼讓我們退一步,考慮一下什麼時候異步IO是理想的選擇,以及如何進行比較以得出結論或選擇其他並發模型。

When and Why Is Async IO the Right Choice?

本教程沒有關於異步IO,線程與多處理的擴展論述。但是,了解異步IO何時可能是三個中的最佳候選者很有用。

異步IO與多處理(multiprocessing)之間的鬥爭根本不是一場爭鬥。實際上,它們可以一起使用。如果您有多個 CPU-bound 的任務(一個很好的例子是在scikit-learn或keras之類的庫中進行網格搜索),那麼多處理應(multiprocessing)該是一個明顯的選擇。

如果所有函數都使用阻塞形式的,那使用異步IO是一個壞主意。(實際上可能會使您的代碼變慢。)但是,如前所述,異步IO和多處理(multiprocessing)在某些地方可以和諧共處。

異步IO與線程(thread)之間的競爭更為直接。我在引言中提到"threading is hard."。全文是,即使在線程似乎易於實現的情況下,由於競爭條件(race condition)和內存使用等原因,它仍然可能導致無法被追蹤的 bugs。

由於線程是具有有限可用性的系統資源,因此線程的擴展也往往比異步IO優雅。創建數千個線程將在許多計算機上失敗,因此我不建議您首先嘗試。創建數千個異步IO任務是完全可行的。

當您有多個IO-bound任務時,那異步IO則是非常適合解決這些任務,否則這些任務將花長時間的等待,例如:

  • 網絡IO,無論您的程序是服務器端還是客戶端

  • 無服務器設計,例如對等,多用戶網絡(如群組聊天室)

  • Read/write operations where you want to mimic a "fire-and-forget" style but worry less about holding a lock on whatever you’re reading and writing to (不確定怎麼翻就不翻了)

Odds and Ends

在接下來的幾節中,我們將介紹 async / await 的其他各個部分。

Other Top-Level asyncio Functions

除了 asyncio.run()之外,您還看到了其他一些包級功能,例如 asyncio.create_task()asyncio.gather()

您可以使用 create_task() 安排協程對象的執行,然後使用 asyncio.run()

>>> import asyncio

>>> async def coro(seq) -> list:
...     """'IO' wait time is proportional to the max element."""
...     await asyncio.sleep(max(seq))
...     return list(reversed(seq))
...
>>> async def main():
...     # This is a bit redundant in the case of one task
...     # We could use `await coro([3, 2, 1])` on its own
...     t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
...     await t
...     print(f't: type {type(t)}')
...     print(f't done: {t.done()}')
...
>>> t = asyncio.run(main())
t: type <class '_asyncio.Task'>
t done: True

這種寫法有個點要特別留意:如果您不等待 main()中的內容,它可能會在 main()本身表示已完成之前就結束了。因為 asyncio.run(main()) 呼叫 loop.run_until_complete(main()),所以事件循環只關心 main()是否已經完成,而不關心在 main()中創建的任務是否完成。如果沒有加這行 await t 循環中的其他任務可能會在完成之前被取消。如果想要獲取當前待處理任務的列表,可以使用 asyncio.Task.all_tasks()

Note: asyncio.create_task() was introduced in Python 3.7. In Python 3.6 or lower, use asyncio.ensure_future() in place of create_task().

另外還有 asyncio.gather()。儘管它沒有做任何特別的事情,但是 gather() 的目的是將協程(futures)入單個協程中,它最終返回一個 future 的對象,如果你使用 await asyncio.gather() 並指定多個任務或協程,你將會等待這些任務或協程全部完成。 (這與我們前面的示例 queue.join()有點類似。)

>>> import time
>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+
...     print('Start:', time.strftime('%X'))
...     a = await asyncio.gather(t, t2)
...     print('End:', time.strftime('%X'))  # Should be 10 seconds
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...     return a
...
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]

您可能已經註意到 gather() 會等待傳入的 futures 或是協程的全部結果。或者您可以搭配迴圈來執行 asyncio.as_completed()以按完成順序獲取任務完成時的任務。該函數返回一個迭代器。下面,coro([3,2,1])的結果會比 coro([10, 5, 0]) 先完成,下面同時也是一個不使用 gather 的範例:

>>> async def main():
...     t = asyncio.create_task(coro([3, 2, 1]))
...     t2 = asyncio.create_task(coro([10, 5, 0]))
...     print('Start:', time.strftime('%X'))
...     for res in asyncio.as_completed((t, t2)):
...         compl = await res
...         print(f'res: {compl} completed at {time.strftime("%X")}')
...     print('End:', time.strftime('%X'))
...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
...
>>> a = asyncio.run(main())
Start: 09:49:07
res: [1, 2, 3] completed at 09:49:10
res: [0, 5, 10] completed at 09:49:17
End: 09:49:17
Both tasks done: True

最後,您可能還會看到 asyncio.ensure_future()。您幾乎不需要它,因為它是一個較低級的管道API,並在很大程度上被 create_task()所取代。

The Precedence of await

儘管它們的行為有些相似,但是 await 關鍵字的優先級比 yield 的優先級高得多。有關更多信息,請參見PEP 492中的await表達式示例。

Conclusion

現在您已經了解和可以使用 async / await 的相關套件。以下是您所涵蓋內容的回顧:

  • 異步IO作為與語言無關的模型,以及通過使協程彼此間接通信來實現並發的方法

  • Python新的async和await關鍵字,用於標記和定義協程

  • asyncio,提供用於運行和管理協程的API

Reference