Background

在網路上看到一系列的教學關於 Python 的 Concurrency,連結在此。想說找個時間把它全部讀完並且做個重點整理。

What Is Concurrency?

Concurrency 在字典中的意思是 simultaneous occurrence (同時發生)。那在 Python 中 simultaneous occurrence 又可以用不同的名字來表示 (thread, task, process),那當然這三個(thread, task, process)本質上還是有不一樣的地方,稍後會說明。

only multiprocessing actually runs these trains of thought at literally the same time. Threading and asyncio both run on a single processor and therefore only run one at a time.

上面這段話其實在 Python 中是一段很重要的觀念,在Python 中因為 Global Interpreter Lock 的關係 threading 和 asyncio 是不能跨 CPU 執行的。

In threading, the operating system actually knows about each thread and can interrupt it at any time to start running a different thread. This is called preemptive multitasking since the operating system can pre-empt your thread to make the switch.

preemptive multitasking 表示 OS 會要求每一個運行中的程序,定時放棄自己的執行權利,這樣有個好處,確保系統部會因為某個程式而卡住進而造成當機。

Asyncio, on the other hand, uses cooperative multitasking. The tasks must cooperate by announcing when they are ready to be switched out. That means that the code in the task has to change slightly to make this happen.

跟 Thread 不同的是 Asyncio 中的 Task 要做切換時必須等另外一個 Task 確認他已經 ready 才能做切換。再來跟 Thread 不一樣的事 Thread 的程式碼基本上不用做特別的修改,但 Asyncio 的程式碼需要。

What Is Parallelism?

So far, you’ve looked at concurrency that happens on a single processor.

With multiprocessing, Python creates new processes. A process here can be thought of as almost a completely different program.

Because they are different processes, each of your trains of thought in a multiprocessing program can run on a different core.

我們知道在 Python 中 concurrency 只會發生在 single processor,那如果我們需要程式跑在 muti processores 上的話我們需要用到 multiprocessing。下面有文章中列出的整理表:

Screen-Shot-2019-09-07-at-3.17.13-PM

When Is Concurrency Useful?

Concurrency can make a big difference for two types of problems. These are generally called CPU-bound and I/O-bound.

通常我們會想要加速我們的程式有以下兩種狀況: CPU-bound & I/O-bound

I/O-bound problems cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise frequently when your program is working with things that are much slower than your CPU.

I/O-bound 的問題很單純因為 CPU 的執行速度很快所以大量的 I/O 會使得 CPU 一直在等待。最常遇到的就是文件讀去&網路連結的情況。文章中利用下圖來表示 I/O-bound 的情況:

Screen-Shot-2019-09-07-at-3.34.44-PM

如上圖所示我們可以看到 CPU 做完之後就再等I/O處理進而浪費時間。

那有另外一種情況是程式可能進行大量的計算而不與網路等做互動,像是建立AI的模型等需要大量的運算,那這種情況就是 CPU-bound。如下圖所示:

Screen-Shot-2019-09-07-at-3.39.06-PM

下面是 CPU-bound & I/O-bound 整理表:

Screen-Shot-2019-09-07-at-3.43.08-PM

How to Speed Up an I/O-Bound Program

Let’s start by focusing on I/O-bound programs and a common problem: downloading content over the network. For our example, you will be downloading web pages from a few sites, but it really could be any network traffic. It’s just easier to visualize and set up with web pages.

接著文章中舉了一個例子: 在不同的網站中下載不同的 web pages,那接著我們會思考如何去 Speed up 這個程式。

Synchronous Version

先看一個 non-concurrent 的程式,我們通過一個迴圈來依序下載web page:

synchronous.py

import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")
$ python3 synchronous.py
.....
Downloaded 160 in 23.88224196434021 seconds

那我們可以看到這個程式花了大概 24 秒左右,那目前的情況也符合我們上面所提到 I/O-Bound 的問題。


threading Version

那接著我們來看 threading 的版本:

thread.py

import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")
$ python3 thread.py
....
Downloaded 160 in 4.80456018447876 seconds

從Python3.2開始,標準庫為我們提供了concurrent.futures模組,這是一組高階 API 提供給使用者操作非同步執行的任務。

concurrent.futures 模組的基礎是 Exectuor,Executor 是一個抽象類,它不能被直接使用。但是它提供的兩個子類 ThreadPoolExecutorProcessPoolExecutor 卻非常有用,兩者分別被用來建立執行緒池和程序池的程式碼。我們可以直接將tasks直接放入執行緒池/程序池中,不需要維護 Queue 來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。

ThreadPoolExecutor = Thread + Pool + Executor,簡單的想法就是 Pool 是一群 Threads 存放的地方,Executor 則是控制 Thread 用的。

Threading.local() creates an object that look like a global but is specific to each individual thread.

在編寫 Thread 的程式時常用的變數有兩種, Local Variable 和 Global Variable。各有各的優缺點,但懶惰如工程師有時候比較貪心想要方便存儲又不想像 Global Variable 那樣 lock 來 lock 去,也不想像 Local Variable 一直 pass 來 pass 去,因此有 Threading.local 的產生。

thread 操作的順序會像下圖一樣,

Screen-Shot-2019-09-11-at-11.45.40-AM

The Problems with the threading Version

當然使用 thread 還是有些問題需要注意的,第一程式上也不太好撰寫,尤其是有 bug 的時候,更不好發現問題出在哪。再來因為是多個 thread 在作切換,所以需要留意 race condition 的問題。


asyncio Version

在看 code 之前我們先討論一下 asycio 的觀念。

The general concept of asyncio is that a single Python object, called the event loop, controls how and when each task gets run.

asycio 是事件循環的的物件,用來管理事件中的 tasks。

實際上,task 可以有很多狀態,但為了方便思考我們先假設只有兩種狀態。(ready & waiting)

The ready state will indicate that a task has work to do and is ready to be run, and the waiting state means that the task is waiting for some external thing to finish, such as a network operation.

有了這兩個狀態之後,我們可以把 event loop 想成這兩個狀態的 list 集合。我們會根據 Task 實際狀態而給他放置在相對應的 list 中。

An important point of asyncio is that they never get interrupted in the middle of an operation. This allows us to share resources a bit more easily in asyncio than in threading.

上面這段話又提到 asyncio 跟 threading 的差別,上面有提到過了,所以這邊就不在翻譯。

async and await

  • await as the magic that allows the task to hand control back to the event loop.
  • async as a flag to Python telling it that the function about to be defined uses await.

you do need to remember that any function that calls await needs to be marked with async.

await 像是一個動作 async 是一個標示符號來表示這個 function 目前是 await 的狀態。

asyncio_ex.py

import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    asyncio.run(download_all_sites(sites))
    #asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")
$ python3 asyncio_ex.py
....
Downloaded 160 sites in 3.513211250305176 seconds
  • Context Manager:支持上下文管理協議的對象,這種對象實現__enter__()__exit __() 方法。上下文管理器定義執行 with 語句時要建立的運行時上下文,負責執行 with 語句塊上下文中的進入與退出操作。

You can share the session across all tasks, so the session is created here as a context manager.

download_all_sites 的 function 中,你可以在所有的 tasks 中共用同一個 session,因此這個 session 在這裡被當作一個 Context Manager的對象。那 tasks 能共用同一個 session 原因是他們都是在同一個 thead 上運行,加上 asyncio 的特性,task 之間不會任意打斷。

在 context manager 中,我們創建了 list of tasks,同時利用 asyncio.ensure_future() 這個 function 創建了 task 並把它塞在 list of tasks 裡面。(Python3.7 後提供了 asyncio.create_task 來取代 asyncio.ensure_future()) 接著我們利用 asyncio.gather() 使得 session context 能持續存活直到所有 tasks 完成。

asyncio 的一個很酷的優點是它比 thread 更好地擴展。 與 thread 相比,每個 task 所需的資源和創建時間都少得多,因此創建和運行更多的 tasks 時效果很好。

最後說明一下我是用python3 3.7.2 所以直接用 asyncio.run 來取代 asyncio.get_event_loop().run_until_complete 即可。

asyncio 操作的順序會像下圖一樣,

Screen-Shot-2019-09-11-at-7.08.35-PM


multiprocessing Version

與上述的方法不一樣,multiprocessing的版本充分利用了多個CPU。

mutiprocess_ex.py

import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")
$ python3 mutiprocess_ex.py
...
Downloaded 160 in 3.8495850563049316 seconds

mutiprocess 在 python 的標準庫中的目的是為了打破 GLI 的限制,讓程式能夠運行在多個 CPU 上。接著我們來看一下程式碼,

首先我們建立 multiprocessing.Pool Object 然後逐一 map site 到 download_site fun 中。這裏跟 threading 的用法蠻接近的。

透過 multiprocessing module 不同的 processes 彼此間會自己進行溝通來拿取相對應的東西(這裡指的是網站的網址),並執行指定的 function 但過程中不會造成 race condition 的問題。

創建 multiprocessing pool 不用特別指定要多少的 process 數量,multiprocessing.Pool() 會自動讀取你的電腦有多少顆 CPU 自動填入。當然你也可以自己填入你想要的數字,但記得填入的數字如果超過電腦本身的 CPU 數並不會加快程式的運算,反而有可能變慢。

Pool 中的每個 process 都有自己的內存空間,因此我們不希望每一次呼叫 fucntion 的時候就創建一個新的 session。我們希望每個 process 共用同一個 session 即可。

multiprocessing 操作的順序會像下圖一樣,

Screen-Shot-2019-09-12-at-12.11.19-PM

How to Speed Up a CPU-Bound Program

接下來我們討論 CPU-Bound 的問題,一樣先給一個 CPU-Bound 的程式範例:

def cpu_bound(number):
    return sum(i * i for i in range(number))

這個程式就單存在做運算,沒有其他I/O的問題,因此我們想如何增加這個程式的運行速度。

CPU-Bound Synchronous Version

cpu_synchronous.py

import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")
$ python3 cpu_synchronous.py
Duration 9.869978904724121 seconds

執行的順序如下圖所示:

Screen-Shot-2019-09-12-at-12.43.31-PM

threading and asyncio Versions

基本上 thrading 和 asyncio 對 CPU-bound 的運算速度不會有任何幫助,反而還有可能變慢(context switch 的關係),這邊就不多做討論。

CPU-Bound multiprocessing Version

multiprocessing 的目的就是把 CPU 繁重的工作負載分散到不同的 CPU 上,執行的狀況會如下表:

Screen-Shot-2019-09-12-at-12.48.36-PM

cpu_multiprocessing.py

import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")
$ python3 cpu_multiprocessing.py
Duration 3.118217945098877 seconds

基本上就是創一個 pool 再加上 map 的 function。使用 multiprocessing 有一些缺點。在現實生活中不一定每個問題都有辦法猜解成多個 CPU 可獨立工作共同解決這個問題。

When to Use Concurrency

讓我們回顧一些關鍵想法,然後討論一些決策點,這些決策點將幫助您確定要在項目中使用哪個(如果有)concurrency module。

此過程的第一步是決定是否應使用 concurrency module。雖然這裡的示例使每個範例都看起來非常簡單,但 concurrency 總是帶來額外的複雜性,並且通常會導致難以找到的錯誤。

一旦你決定優化程序,確定您的程序是否是 CPU bound 或 I/O bound 是一個很好的下一步。

CPU bound 的問題就利用 multiprocessing 來解決,反之 I/O bound 的問題就使用 asyncio or threading 來解決。在 Python 的社群中有一個經驗法則:

Use asyncio when you can, threading when you must.

asyncio 可以讓效能最有效的提升。

Reference