Background

之前在看一系列 concurrency 的教材,所以想說把常出現的專有名詞整理一下,順便搞懂定義,不然看這些教材真的會看得蠻痛苦的。

Generators

首先我們先看一下 Generator 英文的定義:

Introduced with PEP 255, generator functions are a special kind of function that return a lazy iterator.

在 Python3 的世界中,任何物件帶有 __next__ 方法的都稱作 iterator。(eg. for in 的語法)

那 generator functions 算是一種 iterator,但特別的是您只能對其進行一次迭代。他不像使用 for in 的語法一樣一次就迭代全部,而是一次一次慢慢來,交由程式開發人員來決定什麼情況下才會呼叫下一次迭代,因此被稱作 lazy iterator。

Most of the time generators are implemented as functions. However, they do not return a value, they yield it.

根據上面這句話我們可以知道 generator 算一種 function,但跟一般 function 不一樣的地方是一般的 function 會使用 return 來作為回傳的方式,但 generator 則是使用 yield,那 returnyield 兩者的差別在哪呢?我們來看下面這個範例:

$ python3
Python 3.7.2 (default, Jan 13 2019, 12:50:01)
[Clang 10.0.0 (clang-1000.11.45.5)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def generator_function():
...     for i in range(3):
...         yield i
...
>>> a = generator_function()
>>> type(a)
<class 'generator'>
>>> print(next(a))
0
>>> print(next(a))
1
>>> print(next(a))
2
>>> print(next(a))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

上面的範例我們宣告了一個 generator_function 的函示,並且使用 yield 來回傳相關變數。接著我們宣告變數 a = generator_function() 使得 a 變成一個 generator,接著我們就可以對 a 這個 generatornext 的操作。根據上面的範例我們可以了解每當我們呼叫 nextgenerator_function 中的 for loop 才會工作。呼應到上面所說的 交由程式開發人員來決定什麼情況下才會呼叫下一次迭代,直到迭代結束才會產生 StopIteration 的 error。

那我們在對 generator function 做個結論: generator function 是一種可以暫停執行並生成多個值的函數(而不僅僅是返回一個值)。可以想像成一個可以中止和控制的迭代器。

Coroutine

一樣我們先看一下 Coroutine 英文的定義:

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed.

可以多次輸入和退出,每次都暫停和恢復的函數稱為協程,Generators 只是 Coroutine 的簡化形式。利用非同步的想法來解決 consumer-producer 的問題。在最新的版本中我們利用 async/await 的語法來完成對 Coroutine 的宣告。我們來看以下的一個簡單範例:

coroutine_ex.py

import asyncio
import datetime
import random
 
async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(random.randint(0, 5))
 
 
loop = asyncio.get_event_loop()
 
asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

上述的範例中我們透過 async 的關鍵字把一個 function 定義成一個 native coroutine。透過 asyncio.get_event_loop 來取得當前線程(thread)的事件循環(event loop)。

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.

我們利用 ipyhon 做進一步的討論

$ ipython -i coroutine_ex.py
Python 3.7.2 (default, Jan 13 2019, 12:50:01)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.8.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: whos
Variable       Type                      Data/Info
--------------------------------------------------
asyncio        module                    <module 'asyncio' from '/<...>3.7/asyncio/__init__.py'>
datetime       module                    <module 'datetime' from '<...>b/python3.7/datetime.py'>
display_date   function                  <function display_date at 0x11120de18>
loop           _UnixSelectorEventLoop    <_UnixSelectorEventLoop r<...>closed=False debug=False>
random         module                    <module 'random' from '/u<...>lib/python3.7/random.py'>

我們可以看到 loop 是 EventLoop 的物件,display_date 是一個 function。我們接著初始化一個 function 的物件(a)看看:

In [2]: a = display_date(3, loop)

In [3]: whos
Variable       Type                      Data/Info
--------------------------------------------------
a              coroutine                 <coroutine object display_date at 0x111366548>
asyncio        module                    <module 'asyncio' from '/<...>3.7/asyncio/__init__.py'>
datetime       module                    <module 'datetime' from '<...>b/python3.7/datetime.py'>
display_date   function                  <function display_date at 0x11120de18>
loop           _UnixSelectorEventLoop    <_UnixSelectorEventLoop r<...>closed=False debug=False>
random         module                    <module 'random' from '/u<...>lib/python3.7/random.py'>

我們可以看見 Variable a 的 Type 是一個 coroutine,那在原始的程式碼中我們透過 asyncio.ensure_future 來向 Event Loop 來註冊兩個任務,Python3.7之後我們可以用 asyncio.create_task 來取代 asyncio.ensure_future。(這個概念也可以想成用asyncio.create_task() 將 Coroutine 封裝為 Task)

接著我們來執行 Event Loop 中的任務:

In [4]: loop.run_forever()
Loop: 1 Time: 2019-10-02 15:21:25.900197
Loop: 2 Time: 2019-10-02 15:21:25.900416
Loop: 1 Time: 2019-10-02 15:21:26.901946
Loop: 1 Time: 2019-10-02 15:21:29.905117
Loop: 1 Time: 2019-10-02 15:21:29.905297
Loop: 2 Time: 2019-10-02 15:21:30.903558
Loop: 2 Time: 2019-10-02 15:21:31.907187
Loop: 1 Time: 2019-10-02 15:21:33.909868

我們可以看到執行結果 Loop 1Loop 2 似乎不斷地在做切換,原因是 await asyncio.sleep(random.randint(0, 5)) 這行程式造成的(在範例中我們喜歡用 sleep 來模擬程式執行到耗時的部分)。那透過 await 這個 keyword 可以讓 Event Loop 去切換目前 available 的任務。

asyncio.Future & asyncio.Task

Future 是一個數據結構,表示還未完成的工作結果,事件循環可以監視 Future 對象是否完成。Task 是 Future 的子類別,目的是包裝和管理一個協程的執行,用 Task 來封裝協程,給原本沒有狀態的協程增加一些狀態。

Future is used to accept asynchronous results.
Task is a combination of futures and program objects for event loops.

個人感想不確定對不對,因為這塊的觀念算蠻新的,官方文件也沒說明的很清楚。Future 沒什麼問題就是如上面所說用來接收 asynchronous 的結果。那我自己是有點搞不懂 Task 的目的是什麼,似乎是想要偷懶的樣子,猜測想要 future 的部分功能同時可以去管理在 Task 在 Event Loop 中的行為。

Asynchronous Generators

這也算是蠻新的觀念,詳細的定義落在PEP 525中。整體的想法是傳統的 Genrators 用了一種優雅的方式來編寫複雜的數據生成器,並使它們的行為類似於迭代器。(白話一點就是有一個生產者(producer),他不斷的在生產資料(input),那透過 Generator 我們可以透過類似 for 的方式來使用這些資料(input))。

但是貪心的我們發現,目前沒有異步迭代協議(async for)的等效概念。那這是個怎樣的神奇概念呢?打個比方我今天到了壽司店我要點10個壽司,有三個師傅可以選擇A(function), B(generator), C(Asynchronous Generators)。

  • A師傅(fuction)表示:點10個壽司沒問題,你坐在這邊等我一下,我一次做好給你,所以你一次拿到10個壽司。

  • B師傅(generator)表示: 點10個壽司沒問題,你要的時候就按一下服務鈴,坐著等一下(不能離開喔),我馬上做一個給你,等你什麼時候要吃下一個,再按一次服務鈴,我再做一個給你,直到10個壽司為止。

  • C師傅(Asynchronous Generators)表示: 我跟B師傅基本上差不多,差別在於,當你按下服務鈴之後,不用坐在那邊看我做壽司,你可以去倒湯或是去拿點小菜之類的不用坐在這邊等我做好。

OK,那知道這個神奇的概念之後我們來看程式範例:

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

上面的程式可以將其實現為更簡單的異步生成器:

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async_generator.py

import asyncio

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async for i in ticker(1, 10):
    print(i)

Summary

我們用以下的例子來快速的表示什麼是 function/generator/coroutine/asynchronous generator:

def func():            # a function
    return

def genfunc():         # a generator function
    yield
    
async def coro():      # a coroutine function
    await smth()

async def asyncgen():  # an asynchronous generator function
    await smth()
    yield 42

Additional

yield vs yield from

這個 section 主要是補充 yield fromyield 的差別。 我們直接看兩個例子(出處)

yield_ex.py

# 字符串
astr='ABC'
# 列表
alist=[1,2,3]
# 字典
adict={"name":"wangbm","age":18}
# 生成器
agen=(i for i in range(4,8))

def gen(*args, **kw):
    for item in args:
        for i in item:
            yield i

new_list=gen(astr, alist, adict, agen)
print(list(new_list))
# ['A', 'B', 'C', 1, 2, 3, 'name', 'age', 4, 5, 6, 7]

yield_from_ex.py

# 字符串
astr='ABC'
# 列表
alist=[1,2,3]
# 字典
adict={"name":"wangbm","age":18}
# 生成器
agen=(i for i in range(4,8))

def gen(*args, **kw):
    for item in args:
        yield from item

new_list=gen(astr, alist, adict, agen)
print(list(new_list))
# ['A', 'B', 'C', 1, 2, 3, 'name', 'age', 4, 5, 6, 7]

從上面兩個範例我們可以大概猜到,yield from 後面帶入的是一個可以迭代的對象,他的功能其實跟跑一個 for 把 item 的內容一個一個 yield 出來是一樣的,那當然目的就是為了簡潔偷懶。當然這是 yield from 最基本的功能,他在實用性上可是非常強大的,詳細內容可以參考這兩個例子的出處。

asyncio.gather vs asyncio.wait

一樣我們先看一段範例:

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')
    return 'A'


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'

使用 Ipython 執行

In [1]: import asyncio

In [2]: return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a

In [3]: return_value_a, return_value_b
Out[3]: ('A', 'B')
In [8]: done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a

In [9]: done
Out[9]:
{<Task finished coro=<a() done, defined at /Users/taiker/workspace/async/asyncio_ex.py:1> result='A'>,
 <Task finished coro=<b() done, defined at /Users/taiker/workspace/async/asyncio_ex.py:8> result='B'>}

In [10]: pending
Out[10]: set()

簡單總結一下差別

  • asyncio.gather: 基本上只會返回結果,剩下的部分就跟黑盒子一樣,讓你無腦使用。
  • asyncio.wait: 會回傳封裝過後的 Task 物件,讓你自行操作物件得到你想要的結果,以上面的範例我們可以繼續對 asyncio.wait 回傳的結果做操作像下面範例一樣:
In [11]: task = list(done)[0]

In [12]: task
Out[12]: <Task finished coro=<b() done, defined at /Users/taiker/workspace/async/asyncio_ex.py:8> result='B'>

In [13]: task.result()
Out[13]: 'B'

以上就是 asyncio.gather & asyncio.wait 的差別介紹。

Reference