Background

一樣我們繼續對之前提到Python一系列的教學做重點整理。文章連結在此

What Is a Thread?

Thread 是一個分開的執行程序,你可以把他想像成兩個東西同時間發生。(但實際上不是同時發生,只是很像而已)

很容易將 Thread 視為在程序上運行兩個(或更多)不同的處理器,每個處理器同時執行一項獨立的任務。這幾乎是正確的。thread 可能在不同的處理器上運行,但它們一次只運行一個。

花費大量時間等待外部事件(I/O-Bound)的任務通常是很適合 threading 的。但如果是遇到 CPU-Bound 的問題時你應該需要利用 multiprocessing 去解決。

Starting a Thread

Ok那接下來我們來討論如何在 Python 中實現 thread 的機制。Python 提供了threading 模組給我們使用。

single_thread.py

import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

稍微講解一下程式,可以看到我們有一個 thread_function 那這個 function 主要是要讓 thread 來呼叫使用的,x = threading.Thread(target=thread_function, args=(1,)) 因此我們可以看到這段程式,接著再透過 x.start() 來啟動 thread。

$ python3 single_thread.py
17:45:45: Main    : before creating thread
17:45:45: Main    : before running thread
17:45:45: Thread 1: starting
17:45:45: Main    : wait for the thread to finish
17:45:45: Main    : all done
17:45:47: Thread 1: finishing

在執行的過程中我們可以注意到 thread 比 main section 的程式還要晚完成,這表示我們的 thread 有執行成功。

Daemon Threads

In computer science, a daemon is a process that runs in the background.

如果程序正在運行非 daemon 進程的 thread,則程序將在終止之前等待這些 thread 完成。然而,作為 daemon 進程的 thread 在程序退出時會立刻被終止。

讓我們回顧一下上面程序的輸出。最後兩行是有趣的一點。當你運行程序時,你會注意到在__main__ 印完所有完成消息之後和 thread 完成之前有一個暫停(約2秒)。

這個暫停是 Python 等待非 daemon 的 thread 完成。當 Python 程序結束時,清理 threading routine 是關閉程序的一部分。

如果你有看 Python threading 的原始碼,您會看到 threading._shutdown() 遍歷所有正在運行的 thread,並在沒有設置 daemon flag 的 thread 上呼叫 .join()

所以你的程序等待退出,因為 thread 本身正在等待睡眠。一旦完成印了消息的動作,透過 .join() 將返回並且讓程序可以退出。

ok 那我們來體驗一下 daemon 的作用,向下方一樣多加一個 daemon=True 的參數,接著執行它。

single_thread.py

...
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
...
$ python3 single_thread.py
18:05:59: Main    : before creating thread
18:05:59: Main    : before running thread
18:05:59: Thread 1: starting
18:05:59: Main    : wait for the thread to finish
18:05:59: Main    : all done

跟之前執行結果不一樣的地方是最後一行不見了,thread_function() 並沒有順利的執行完成,原因是因為他是 daemon 的 thread 所以當主程式完成是他也會被強迫結束。

join() a Thread

Daemon threads 很方便,但是當你想等待一個 thread 停止時呢?當你想要這樣做而不退出你的程序時呢?現在讓我們回到程式碼中,看看第二十行註釋掉的內容:

# x.join()

要告訴一個 thread 等待另一個 thread 完成,請利用.join()。如果你.join()了一個thread,該語句將等待任何一種 thread 完成。

Working With Many Threads

到目前為止,範例代碼只使用了兩個 thread:主線程和一個用threading.Thread 對象啟動的 thread。接下來我們來看多thread的範例

multiple_threads.py

import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

This code uses the same mechanism you saw above to start a thread, create a Thread object, and then call .start(). The program keeps a list of Thread objects so that it can then wait for them later using .join().

此代碼和在上面看到的相同機制來啟動 thread,創建 Thread 對象,然後呼叫 .start()。thread 保留在一個 Thread 的 list,以便稍後可以使用.join()等待它們。

$ python3 multiple_threads.py
18:52:35: Main    : create and start thread 0.
18:52:35: Thread 0: starting
18:52:35: Main    : create and start thread 1.
18:52:35: Thread 1: starting
18:52:35: Main    : create and start thread 2.
18:52:35: Thread 2: starting
18:52:35: Main    : before joining thread 0.
18:52:37: Thread 1: finishing
18:52:37: Thread 2: finishing
18:52:37: Thread 0: finishing
18:52:37: Main    : thread 0 done
18:52:37: Main    : before joining thread 1.
18:52:37: Main    : thread 1 done
18:52:37: Main    : before joining thread 2.
18:52:37: Main    : thread 2 done

如果仔細瀏覽輸出,您將看到所有三個 threads 按照您預期的順序啟動,但這次,它們以相反的順序完成!多次運行將產生不同的排序。查找 Thread x:finishing message,告訴您何時完成每個thread。

thread 的運行順序由操作系統決定,很難預測。它可能(並且可能會)因運行而異,因此在設計使用thread 的演算法時需要注意這一點。

幸運的是,Python為您提供了幾個原函數,您稍後會看到它們來幫助協調 thread 並讓它們一起運行。在此之前,讓我們看看如何更輕鬆地管理一組 thread。

Using a ThreadPoolExecutor

Python3.2 之後提供了 ThreadPoolExecutor 的原件讓你方便的啟動多個 threads。創建它的最簡單方法是作為上下文管理器,使用with語句來管理池的創建和銷毀。

這是使用ThreadPoolExecutor重寫的最後一個範例中的 __main__:

executor.py

import logging
import concurrent.futures
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))
$ python3 executor.py
19:33:08: Thread 0: starting
19:33:08: Thread 1: starting
19:33:08: Thread 2: starting
19:33:10: Thread 0: finishing
19:33:10: Thread 1: finishing
19:33:10: Thread 2: finishing

Race Conditions

接的我們來討論在寫 thread 程式中會遇到的問題 Race Conditions

一旦了解 race condition 是什麼和怎麼發生,我們會使用標準庫提供的一些的方式來防止 race condition 的發生。

當兩個或多個 thread 訪問共享數據或資源時,可能會發生 race condition。在這個例子中,你將創建一個每次都發生的 race condition 情況,但要注意大多數 race condition 都不是那麼明顯。通常,它們很少發生,並且它們會產生令人困惑的結果,這使得它們很難被除錯。

ok,那我們先看以下的範例:

racecond.py

import concurrent.futures
import logging
import time

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

在 FakeDatabase 中我們要注意的是 value 這個變數,我們呼叫多個 threads 來共用這個變數,因此來創造產生 race condition 的狀況。

.update()則是模擬從資料庫中讀取值,對其進行一些計算,然後將新值寫回資料庫。

__name__ 中, 我們透過ThreadPoolExecutor建立了兩個 threads,這兩個 threads 會呼叫 database.update() 這個 function。我們來看執行的結果:

$ python3 racecond.py
15:42:36: Testing update. Starting value is 0.
15:42:36: Thread 0: starting update
15:42:36: Thread 1: starting update
15:42:36: Thread 0: finishing update
15:42:36: Thread 1: finishing update
15:42:36: Testing update. Ending value is 1.

你可能希望 database.value 在最後印出來的結果是2,但實際上卻是1,為什麼呢?詳細原因我們接著討論。

One Thread

ThreadPoolExecutor 運行每個 thread 時,你告訴它運行哪個函數以及傳遞給它的參數:executor.submit(database.update,index)。結果是 pool 中的每個 thread 都將調用 database.update(index)。每個 thread 都將引用相同的 FakeDatabase Object。每個 thread 還將具有唯一值 index,以使 log 記錄語句更容易閱讀:

Screen-Shot-2019-09-19-at-3.57.44-PM

當 thread 開始運行.update()時,它有自己的函數的 local_copy。這絕對是件好事。否則,運行相同功能的兩個 thread 將始終相互混淆。這意味著對函數作用域(或本地)的所有變量都是安全的。

現在,您可以開始了解如果您使用單個 thread 和單個調用.update()運行上述程序時會發生的情況。

如果只運行一個 thread,下面的圖像將逐步執行.update()。該語句顯示在左側,後面是一個圖表,顯示了 thread 的 local_value 和共享 database.value 中的值:

intro-threading-single-thread.85204fa11210

根據圖表 thread1 啟動時,FakeDatabase.value 為 0。方法中的第一行代碼 local_copy = self.value 將值 0 複製到局部變量。接下來,它使用local_copy + = 1語句遞增local_copy 的值。您可以在 thread1 中看到 .value 設置為1。

調用 .sleep(),這會使當前 thread 暫停並允許其他線程運行。由於此示例中只有一個線程,因此無效。

當 thread1 喚醒並繼續時,它會將新值從 local_copy 複製到 FakeDatabase.value,然後線程完成。您可以看到 database.value 設置為 1。

Two Threads

回到 race condition,兩個 thread 它們都有自己的 local_copy 版本,並且每個版本都指向同一個數據庫。正是這個共享數據庫對象會導致問題。接著回到上述的程式,Thread 1 running .update():

intro-threading-two-threads-part1.c1c0e65a8481

當 Thread 1 呼叫 time.sleep(),這會使得程式切換到另外一個 thread,切換到 Thread 2時他會做一模一樣的事情直到 sleep() 再切換到 Thread 1。

intro-threading-two-threads-part2.df42d4fbfe21

Thread 1 now wakes up and saves its version of local_copy and then terminates, giving Thread 2 a final chance to run. Thread 2 has no idea that Thread 1 ran and updated database.value while it was sleeping. It stores its version of local_copy into database.value, also setting it to one:

接著就依序運行 .update() 的動作,但是 thread2 不知道 thread1 更新了 database.value,所以他也會把自己的 local_copy 的值存儲到 database.value 中。

intro-threading-two-threads-part3.18576920f88f

這兩個 thread 具有對單個共享對象的交叉訪問權限,從而覆蓋彼此的結果。當一個 thread 在另一個 thread 完成訪問之前釋放內存或關閉文件時,就可能會出現類似的 race condition 情形。

Basic Synchronization Using Lock

有許多方法可以避免或解決 race conditions。你不會在這裡看到所有這些,但有一些會經常使用。讓我們從 Lock 開始吧。

要解決上面的 race conditions,您需要找到一種方法,一次只允許一個 thread 進入代碼。最常見的方法是在 Python 中使用 Lock。在其他一些語言中,這個想法被稱為互斥體。

Lock 是一個像大廳通行證的對象。一次只有一個 thread 可以擁有 Lock。任何其他想要 Lock 的thread 必須等到 Lock 的所有者放棄它。

執行此操作的基本功能是 .acquire().release()。thread 將調用my_lock.acquire()來獲取 Lock。如果 Lock 已經被佔用,則調用 thread 將一直等到它被釋放。這裡有一個重點。如果一個線程獲得 Lock 定但從未將其返回,則程序將被卡住,我們會在稍後討論這種情況。

幸運的是,Python 的 Lock 也將作為上下文管理器運行,因此您可以在 with 語句中使用它,並且當 with 因任何原因退出時它會自動釋放。

我們把剛剛上面 racecond.py 程式加入 Lock 的機制。

fixrace.py

import threading
import concurrent.futures
import logging
import time

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.DEBUG,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.locked_update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

除了添加一堆除錯用的log以便您可以更清楚地看到鎖定之外,這裡的重大變化是添加一個名為._lock 的成員,它是一個 threading.Lock()對象。此._lock 在解鎖狀態下初始化,並由 with 語句鎖定和釋放。

值得注意的是,運行此函數的 thread 將保持該 Lock,直到完全更新數據庫為止。在這種情況下,這意味著它將在復制,更新,休眠,然後將值寫回數據庫時保持鎖定。

如果運行此版本並將日誌記錄設置為警告級別,您將看到:

$ python3 fixrace.py
16:46:52: Testing update. Starting value is 0.
16:46:52: Thread 0: starting update
16:46:52: Thread 0 about to lock
16:46:52: Thread 0 has lock
16:46:52: Thread 1: starting update
16:46:52: Thread 1 about to lock
16:46:52: Thread 0 about to release lock
16:46:52: Thread 0 after release
16:46:52: Thread 0: finishing update
16:46:52: Thread 1 has lock
16:46:52: Thread 1 about to release lock
16:46:52: Thread 1 after release
16:46:52: Thread 1: finishing update
16:46:52: Testing update. Ending value is 2.

在此輸出中,您可以看到 Thread0 獲取 Lock 並在進入 sleep() 時仍保持鎖定。然後 Thread1啟動並嘗試獲取相同的 Lock。因為 Thread0 仍然持有它,所以 Thread1 必須等待。這是Lock提供的互斥。

Deadlock

在繼續之前,您應該想想在使用 Locks 時會發生哪些常見問題。如您所見,如果已經獲取了Lock,則第二次調用.acquire() 將等到持有Lock的 thread 調用.release()。運行此代碼時,您認為會發生什麼:

import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

當程序第二次調用 l.acquire()時,它會掛起等待 Lock 被釋放。在此示例中,您可以通過刪除第二個調用來修復 deadlock,但 deadlock 通常發生在兩個微妙的事情之一:

  • Lock 未正確釋放的實現錯誤
  • 設計問題,函數需要被可能有或可能沒有 Lock 的函數調用

第一種情況有時會發生,但使用 Lock 作為上下文管理器會大大減少頻率。建議盡可能使用上下文管理器來編寫代碼,因為它們有助於避免異常跳過.release()調用的情況。

在某些語言中,設計問題可能有點棘手。值得慶幸的是,Python的 thread 有第二個對象,稱為RLock,專為這種情況而設計。它允許一個 thread 在調用.release()之前多次.acquire()一個RLock。該thread 仍然需要調用.release()與調用.acquire()的次數相同,但無論如何都應該這樣做。

Lock 和 RLock 是 thread 編程中用於防止 race conditions 的兩個基本工具。

Producer-Consumer Threading

Producer-Consumer 問題是一種標準的計算機科學問題,用於查看 thread 或 process 同步問題。我們藉由 Producer-Consumer 問題來得到一些靈感用以了解 Python 的 thread 模塊提供哪些功能。

在此示例中,該程序需要從網路讀取消息並將其寫入磁盤。該程序必須持續偵聽並接受傳入的消息。消息不會按常規速度傳入,但會突然爆發。我們把這部分的程式稱作為 Producer

另一方面,一旦收到消息,就需要將其寫入數據庫。數據庫訪問很慢,但還是足夠應付一般的情況,但在面對爆發性的訊息時可能就不夠快了。這部分稱作 Consumer

但是足夠快,可以跟上消息的平均速度。當出現大量消息時,它趕不上足夠快的速度。這部分是使用者。

在 Producer 和 Consumer 之間,您將創建一個管道,該管道將隨著您了解不同的同步對象而發生變化。

這是基本佈局。讓我們看一下使用Lock的解決方案。它雖然不能完美運行,但是會使用您已經知道的工具,因此是一個很好的起點。

Producer-Consumer Using Lock

由於這是一篇有關 Python thread 的文章,並且您剛剛閱讀了 Lock 相關功能,所以我們嘗試使用一個或是多個 Lock 來解決此問題。

設計的想法是,有一個producer thread 從假網路讀取數據並將消息放入管道中,以下是完整個程式碼:

prodcom_lock.py

import random 
import logging
import threading
import concurrent.futures

class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)


SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

要生成偽造的消息,producer 將獲得一個介於1~100 的隨機數。它在管道上調用.set_message()以將其發送給使用者。

producer 還使用 SENTINEL 值來指示消費者在發送十個值後停止。這有點尷尬,但請放心,您將在完成本示例後看到擺脫 SENTINEL 值的方法。

管道的另一端是consumer

consumer 從管道中讀取一條消息,並將其寫入到偽造的數據庫中,在這種情況下,該數據庫只是將其印到顯示器上。如果獲得 SENTINEL值,則該函數將終止其 thread。

接著我們看回 __main__ 的段落,我們可以發現其實跟之前的範例長得蠻像的。現在讓我們看一下將消息從 producer 傳遞到 consumer 的管道。此版本的代碼中的 Pipeline 具有三個成員:

  1. .message 存儲要傳遞的消息。
  2. .producer_lock 是一個 threading.Lock 對象,它限制生產者 thread 對 message 的訪問。
  3. .consumer_lock也是一個 threading.Lock,它限制使用者 thread 對 message 的訪問。

__init __() 初始化這三個成員,然後在 .consumer_lock 上調用 .acquire()。這是您要開始的狀態。允許 producer 添加新消息,但是 consumer 需要等待直到出現消息為止。

.get_message().set_messages()幾乎相反。 .get_message()在 consumer 鎖上調用 .acquire()。這是使 consumer 等待消息準備就緒的呼叫。

consumer 獲得 .consumer_lock後,它將復制 .message 中的值,然後在 .producer_lock 上調用 .release()。釋放此鎖定是使 producer 可以將下一條消息插入管道的原因。

在繼續使用.set_message()之前,.get_message()中有些微妙的事情很容易被忽略。可以想想為什麼要多這行程式碼 message = self.message 而不是選擇直接 return self.message 在結尾就好?

答案是在 consumer 調用 .producer_lock.release()時,producer 可以開始運行。這可能在會發生在return self.message 之前,因此我們必須先複製一份 self.message 的訊息,確保不會被蓋掉。

轉到 .set_message(),您可以看到事件的另一面。producer 產生一個新的 meessage。它將獲取 .producer_lock,設置 .message,然後在 consumer_lock 上調用 .release(),這將允許 consumer 讀取該值。

整個程式碼執行結果如下:

$ python3 prodcom_lock.py
16:29:48: Producer got message: 35
16:29:48: Producer:about to acquire setlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer got message: 21
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:about to acquire getlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 35
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer got message: 64
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 21
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer got message: 95
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Producer:have setlock
16:29:48: Consumer storing message: 64
16:29:48: Producer:about to release getlock
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:getlock released
16:29:48: Consumer:have getlock
16:29:48: Producer got message: 7
16:29:48: Consumer:about to release setlock
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:setlock released
16:29:48: Producer:have setlock
16:29:48: Consumer storing message: 95
16:29:48: Producer:about to release getlock
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:getlock released
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 7
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer got message: 44
16:29:48: Producer:about to acquire setlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer got message: 11
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 44
16:29:48: Producer:have setlock
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Consumer:have getlock
16:29:48: Producer got message: 51
16:29:48: Consumer:about to release setlock
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:setlock released
16:29:48: Producer:have setlock
16:29:48: Consumer storing message: 11
16:29:48: Producer:about to release getlock
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:getlock released
16:29:48: Consumer:have getlock
16:29:48: Producer got message: 64
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 51
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:about to acquire setlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer got message: 12
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 64
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Producer:about to acquire setlock
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released
16:29:48: Consumer storing message: 12
16:29:48: Consumer:about to acquire getlock
16:29:48: Producer:have setlock
16:29:48: Producer:about to release getlock
16:29:48: Producer:getlock released
16:29:48: Consumer:have getlock
16:29:48: Consumer:about to release setlock
16:29:48: Consumer:setlock released

起初,您可能會覺得奇怪,producer 在 consumer 運行之前就收到了兩條消息。如果回頭查看producer 和 .set_message(),您會注意到,唯一等待 Lock 的地方是當它試圖將消息放入管道中時。因此 producer 將會獲取消息並記錄它具有消息之後,直到當 producer 嘗試發送第二條消息時,它將第二次調用 .set_message()並將阻塞。

操作系統可以隨時交換 thread,但是通常它使每個 thread 在交換掉之前有合理的時間運行。這就是 producer 通常會運行直到它阻塞對.set_message()的第二次調用的原因。

但是,一旦 thread 被阻塞,操作系統將始終將其換出並找到要運行的其他 thread。在這種情況下,唯一需要處理的其他 thread 就是 consumer。

consumer 調用 .get_message(),後者讀取消息並在 .producer_lock 上調用.release(),因此允許 producer 在下次交換 thread 時再次運行。

儘管它適用於這種有限的測試,但通常不能解決 producer-consumer problem,因為它一次只能在管道中提供單個值。當 producer 收到大量消息時,它將無處可放。

讓我們繼續使用 Queue 尋求解決此問題的更好方法。

Producer-Consumer Using Queue

如果您希望一次處理一個在管道中的多個值,對管道來說就需要一個資料結構來存放資料。Python的標準庫具有一個 queue 模塊,反過來說有一個 Queue Class。接著我們將管道更改為使用 queue ,而不只是使用受 Lock 保護的變數。

讓我們從事件開始。 threading.Event 對象允許一個 thread 發出事件信號,而許多其他 thread 可以等待該事件發生。此代碼中的關鍵用法是,等待事件的 thread 不一定需要停止其正在執行的操作,它們可以偶爾檢查一次事件的狀態。

事件的觸發可能有很多事情。在此示例中,主線程將僅休眠一段時間,然後對其進行 .set(),下面為

prodcom_queue.py

import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

if __name__ == "__main__": 的段落中有改變的部分是在第6行創建了事件對象,在第8行和第9行傳遞了該事件作為參數,並在第11到13行的最後一部分進行了睡眠一秒鐘,記錄了一條消息,然後調用.set()在事件上。

producer 的段落也沒有變動太多,現在它將持續循環直到看到事件。也不再將 SENTINEL 值放入管道中。

consumer 的段落則是改變的稍微多一點:

跟之前必須取出與 SENTINEL 值相關的代碼不一樣的是,您的確需要做一些稍微複雜的 while 條件它不僅會循環直到設置了事件為止,而且還需要保持循環直到清空管道為止。

在 consumer 完成操作之前確保 Queue 為空可以防止出現另一個有趣的問題。如果 consumer 在管道中還有消息的情況下退出了,則可能發生兩種不良情況。第一種是你將丟失了最後的消息,但是更嚴重的是,producer 可能會被抓住,試圖將消息添加到完整的隊列中而永不返回。

這發生在如果在 producer 檢查了 .is_set()條件之後但在調用 pipeline.set_message()之前觸發了該事件,則會發生這種情況。

如果發生這種情況, producer 有可能醒來並在 Queue 仍然完全滿的情況下退出。然後,producer 將調用.set_message(),它將等待直到 Queue 中有新消息的空間。consumer 已經退出,因此不會發生,producer 也不會退出。

編寫標準庫的核心開發人員知道 Queue 在多線程環境中經常使用,並將所有 Locking 代碼合併到 Queue本身中。因此 Queue is thread-safe

接著我們來執行程式:

$ python3 prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting

Note: Your output will be different. Your output will change from run to run. That’s the fun part of working with threads!

如果在我的範例中仔細閱讀了輸出,您會發現一些有趣的事情正在發生。在頂部,您可以看到生產者必須創建五個消息並將其中四個放置在隊列中。再放置第五個之前被作業系統替換掉。

然後,consumer 運行並提取了第一條消息,並且印出了該消息以及此時 Queue 的深度:

由於第五條消息尚未進入管道,再刪除單個消息後,Queue 的大小減小到三。那因為 Queue 可以容納10條消息,因此 producer 的 thread 不會被 Queue 阻塞。因此又被作業系統替換掉。

當程序開始結束時,您是否可以看到主線程生成了導致 producer 立即退出的事件。consumer 仍然有很多工作要做,因此它一直運行直到清除管道。

嘗試在 producer 或使用者中使用不同的 Queue 大小並調用 time.sleep()來分別模擬更長的網絡或磁盤訪問時間。即使對該程序的這些元素稍作更改,也會在結果上產生很大的差異。

這是解決 producer-consumer 問題的更好的解決方案,但是您可以進一步簡化它。這個問題確實不需要管道。可以利用 queue.Queue 來取代。

Threading Objects

Python thread 模塊還提供了其他一些原始物件。儘管您在上面的示例中並不需要這些,但是它們可以在不同的應用中派上用場,因此熟悉它們是很好的。

Semaphore

要查看的第一個 Python thread 對像是 threading.Semaphore。 Semaphore 具有一些特殊屬性的計數器。第一個是計數是原子的,這意味著可以保證操作系統在遞增或遞減計數器的過程中不會替換thread。

當您調用 .release()時,內部計數器遞增,而當您調用 .acquire()時,內部計數器遞減。

下一個特殊的屬性是,如果一個線程在計數器為零時調用 .acquire(),則該線程將阻塞,直到另一個線程調用.release()並將計數器增加到一個。

Semaphore 通常用於保護容量有限的資源。例如,如果您有一個連接池,並且想要將該池的大小限制為特定數目。

Timer

threading.Timer 是一種在經過一定時間後安排要調用的函數的方法。通過傳遞等待的秒數和要調用的函數來創建計時器:

t = threading.Timer(30.0, my_function)

您可以通過調用.start()啟動計時器。在指定時間之後的某個時間,該函數將在新的 thread 上被呼叫,但是請注意這不能保證將在您希望的時間準確地調用該函數。

如果您想停止已經啟動的計時器,可以通過調用.cancel()來取消它。計時器觸發後調用.cancel()不會執行任何操作,也不會產生異常。

計時器可用於在特定時間段後提示用戶採取措施。如果用戶在計時器到期之前執行了操作,則可以調用.cancel()

Barrier

threading.Barrier 可用於保持固定數量的線程同步。創建 Barrier 時,使用者必須指定要在其上同步的 thread 數。每個 thread 在 Barrier 上調用 .wait()。它們將一直處於阻塞狀態,直到等待指定數量的 thread,然後所有 thread 都將同時釋放。

請記住,thread 是由操作系統調度的,因此,即使所有 thread 都同時釋放,它們也會被調度為一次運行一個。

Barrier 的一種用途是允許thread pool 始化自己。在初始化 thread 之後讓它們在 Barrier 上等待將確保所有 thread 在完成其初始化之前沒有任何 thread 開始運行。

Conclusion: Threading in Python

現在,您已經了解了 Python thread 必須提供的許多功能,以及一些有關如何構建 thread 程序及其解決問題的示例。您還看到了一些編寫和調試 threadd 程序時出現的問題的實例。

Reference