# What Is Concurrency?

only multiprocessing actually runs these trains of thought at literally the same time. Threading and asyncio both run on a single processor and therefore only run one at a time.

In threading, the operating system actually knows about each thread and can interrupt it at any time to start running a different thread. This is called preemptive multitasking since the operating system can pre-empt your thread to make the switch.

Asyncio, on the other hand, uses cooperative multitasking. The tasks must cooperate by announcing when they are ready to be switched out. That means that the code in the task has to change slightly to make this happen.

# What Is Parallelism?

So far, you’ve looked at concurrency that happens on a single processor.

With multiprocessing, Python creates new processes. A process here can be thought of as almost a completely different program.

Because they are different processes, each of your trains of thought in a multiprocessing program can run on a different core.

# When Is Concurrency Useful?

Concurrency can make a big difference for two types of problems. These are generally called CPU-bound and I/O-bound.

I/O-bound problems cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise frequently when your program is working with things that are much slower than your CPU.

I/O-bound 的問題很單純因為 CPU 的執行速度很快所以大量的 I/O 會使得 CPU 一直在等待。最常遇到的就是文件讀去&網路連結的情況。文章中利用下圖來表示 I/O-bound 的情況:

# How to Speed Up an I/O-Bound Program

Let’s start by focusing on I/O-bound programs and a common problem: downloading content over the network. For our example, you will be downloading web pages from a few sites, but it really could be any network traffic. It’s just easier to visualize and set up with web pages.

## Synchronous Version

synchronous.py

import requests
import time

with session.get(url) as response:

with requests.Session() as session:
for url in sites:

if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
duration = time.time() - start_time

$python3 synchronous.py ..... Downloaded 160 in 23.88224196434021 seconds  那我們可以看到這個程式花了大概 24 秒左右，那目前的情況也符合我們上面所提到 I/O-Bound 的問題。 ## threading Version 那接著我們來看 threading 的版本: thread.py import concurrent.futures import requests import threading import time thread_local = threading.local() def get_session(): if not hasattr(thread_local, "session"): thread_local.session = requests.Session() return thread_local.session def download_site(url): session = get_session() with session.get(url) as response: print(f"Read {len(response.content)} from {url}") def download_all_sites(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(download_site, sites) if __name__ == "__main__": sites = [ "https://www.jython.org", "http://olympus.realpython.org/dice", ] * 80 start_time = time.time() download_all_sites(sites) duration = time.time() - start_time print(f"Downloaded {len(sites)} in {duration} seconds")  $ python3 thread.py
....


concurrent.futures 模組的基礎是 Exectuor，Executor 是一個抽象類，它不能被直接使用。但是它提供的兩個子類 ThreadPoolExecutorProcessPoolExecutor 卻非常有用，兩者分別被用來建立執行緒池和程序池的程式碼。我們可以直接將tasks直接放入執行緒池/程序池中，不需要維護 Queue 來操心死鎖的問題，執行緒池/程序池會自動幫我們排程。

Threading.local() creates an object that look like a global but is specific to each individual thread.

## asyncio Version

The general concept of asyncio is that a single Python object, called the event loop, controls how and when each task gets run.

The ready state will indicate that a task has work to do and is ready to be run, and the waiting state means that the task is waiting for some external thing to finish, such as a network operation.

An important point of asyncio is that they never get interrupted in the middle of an operation. This allows us to share resources a bit more easily in asyncio than in threading.

### async and await

• await as the magic that allows the task to hand control back to the event loop.
• async as a flag to Python telling it that the function about to be defined uses await.

you do need to remember that any function that calls await needs to be marked with async.

await 像是一個動作 async 是一個標示符號來表示這個 function 目前是 await 的狀態。

asyncio_ex.py

import asyncio
import time
import aiohttp

async with session.get(url) as response:

async with aiohttp.ClientSession() as session:
for url in sites:

if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
duration = time.time() - start_time

$python3 asyncio_ex.py .... Downloaded 160 sites in 3.513211250305176 seconds  • Context Manager：支持上下文管理協議的對象，這種對象實現__enter__（）__exit __（） 方法。上下文管理器定義執行 with 語句時要建立的運行時上下文，負責執行 with 語句塊上下文中的進入與退出操作。 You can share the session across all tasks, so the session is created here as a context manager. download_all_sites 的 function 中，你可以在所有的 tasks 中共用同一個 session，因此這個 session 在這裡被當作一個 Context Manager的對象。那 tasks 能共用同一個 session 原因是他們都是在同一個 thead 上運行，加上 asyncio 的特性，task 之間不會任意打斷。 在 context manager 中，我們創建了 list of tasks，同時利用 asyncio.ensure_future() 這個 function 創建了 task 並把它塞在 list of tasks 裡面。(Python3.7 後提供了 asyncio.create_task 來取代 asyncio.ensure_future()) 接著我們利用 asyncio.gather() 使得 session context 能持續存活直到所有 tasks 完成。 asyncio 的一個很酷的優點是它比 thread 更好地擴展。 與 thread 相比，每個 task 所需的資源和創建時間都少得多，因此創建和運行更多的 tasks 時效果很好。 最後說明一下我是用python3 3.7.2 所以直接用 asyncio.run 來取代 asyncio.get_event_loop().run_until_complete 即可。 asyncio 操作的順序會像下圖一樣， ## multiprocessing Version 與上述的方法不一樣，multiprocessing的版本充分利用了多個CPU。 mutiprocess_ex.py import requests import multiprocessing import time session = None def set_global_session(): global session if not session: session = requests.Session() def download_site(url): with session.get(url) as response: name = multiprocessing.current_process().name print(f"{name}:Read {len(response.content)} from {url}") def download_all_sites(sites): with multiprocessing.Pool(initializer=set_global_session) as pool: pool.map(download_site, sites) if __name__ == "__main__": sites = [ "https://www.jython.org", "http://olympus.realpython.org/dice", ] * 80 start_time = time.time() download_all_sites(sites) duration = time.time() - start_time print(f"Downloaded {len(sites)} in {duration} seconds")  $ python3 mutiprocess_ex.py
...


mutiprocess 在 python 的標準庫中的目的是為了打破 GLI 的限制，讓程式能夠運行在多個 CPU 上。接著我們來看一下程式碼，

Pool 中的每個 process 都有自己的內存空間，因此我們不希望每一次呼叫 fucntion 的時候就創建一個新的 session。我們希望每個 process 共用同一個 session 即可。

multiprocessing 操作的順序會像下圖一樣，

# How to Speed Up a CPU-Bound Program

def cpu_bound(number):
return sum(i * i for i in range(number))


## CPU-Bound Synchronous Version

cpu_synchronous.py

import time

def cpu_bound(number):
return sum(i * i for i in range(number))

def find_sums(numbers):
for number in numbers:
cpu_bound(number)

if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds")

$python3 cpu_synchronous.py Duration 9.869978904724121 seconds  執行的順序如下圖所示: ## threading and asyncio Versions 基本上 thrading 和 asyncio 對 CPU-bound 的運算速度不會有任何幫助，反而還有可能變慢(context switch 的關係)，這邊就不多做討論。 ## CPU-Bound multiprocessing Version multiprocessing 的目的就是把 CPU 繁重的工作負載分散到不同的 CPU 上，執行的狀況會如下表: cpu_multiprocessing.py import multiprocessing import time def cpu_bound(number): return sum(i * i for i in range(number)) def find_sums(numbers): with multiprocessing.Pool() as pool: pool.map(cpu_bound, numbers) if __name__ == "__main__": numbers = [5_000_000 + x for x in range(20)] start_time = time.time() find_sums(numbers) duration = time.time() - start_time print(f"Duration {duration} seconds")  $ python3 cpu_multiprocessing.py
Duration 3.118217945098877 seconds


# When to Use Concurrency

CPU bound 的問題就利用 multiprocessing 來解決，反之 I/O bound 的問題就使用 asyncio or threading 來解決。在 Python 的社群中有一個經驗法則:

Use asyncio when you can, threading when you must.

asyncio 可以讓效能最有效的提升。