Python 多線程(multi-threading) vs 多進程(multi-processing) 

利用一段計算結果相同,算法類似的程式碼,來探討和比較 threading(線程)和 multiprocessing(多進程)的性能差異,以 AMD Ryzen 7 2700X 跑一次80秒以下。

之前寫交易程式聯網的程式,傳統的程式碼執行須得待上一行執行完畢,網路的回應的等待時間,成為這類程式性能的瓶頸,是否可以利用等待網路回應的時間,讓程式做點別的事?亦或者有效利用多核CPU來實現並行計算?現在CPU動輒8核16核,是否可以幫你的人生開掛呢?

之前寫交易程式聯網的程式,傳統的程式碼執行須得待上一行執行完畢,網路的回應的等待時間,成為這類程式性能的瓶頸,是否可以利用等待網路回應的時間,讓程式做點別的事?亦或者有效利用多核CPU來實現並行計算?現在CPU動輒8核16核,是否可以幫你的人生開掛呢?

然後我就會玩了Python 裡2個標準函式庫管模組 threading(多線程)multiprocessing(進程),它們的用法十分類似。頓時來了餿主意,寫一段算法很類似的程式碼比較【多線程】和【多進程】的效能差異,使其計算的結果相同。

這篇文章的程式碼改寫ChatGPT,如果不看原理接跑程式碼請自下方【完整程式碼】,以 AMD Ryzen 7 2700X 跑一次80秒以下。

概念簡介

Threading(線程):

  • 優點:
    • 創建和切換線程的開銷較小,記憶體需求較小。
    • 適合 I/O 密集型任務,例如網絡請求或文件操作,因為 I/O 操作會釋放 GIL,允許其他線程繼續執行。
  • 缺點:
    • 由於 GIL 的存在,在計算密集型任務中無法充分利用多核 CPU 的性能。

Multiprocessing(多進程):

  • 優點:
    • 每個進程都有自己的 Python 解釋器和 GIL,因此多進程可以真正並行地執行計算密集型任務,充分利用多核 CPU 的性能。
  • 缺點:
    • 創建和管理進程的開銷比線程高,記憶體需求很大。
    • 進程之間的通信比線程間的通信更複雜且開銷更大。

Global Interpreter Lock (GIL)

  • GIL 的作用: GIL 是 Python 中的一個機制,它限制了同一時間只有一個原生線程可以執行 Python 字節碼。這意味著,即使在多核 CPU 上,使用多線程在同一時間也只有一個線程在執行 Python 代碼。
  • 對計算密集型任務的影響: 對於計算密集型任務(如平方和的計算),由於 GIL 的存在,Python 的多線程不能真正並行地執行這些任務,而是在線程之間切換,這導致了多線程的性能不如多進程。

算法

預計要計算 0 ~109的平方和,數學公式如下:


為了要利用多線程或多進程(以8為例),我們要先將 0 ~109 分成8等份,每段包含 2.5×108 個數字,分別餵給8個線程或進程,8堆資料都算完平方和之後再加總,完成本次計算。

要特別注意!我們用 for迴圈來分這8堆數字, for迴圈是從0開始計數,需要額外的判斷才不會漏加最後一個數109,下面我們還會再提到。


主程式

#### 主程式 ####
if __name__ == "__main__": 
    
    num_threads_processes = 16         # 線程數和進程數
 
    range_start = 0
    range_end = 1000000000  # 請依測試的電腦性能修改
    step = (range_end - range_start) // num_threads_processes
    
    threading_exp()
    print("---------------------------------------------------")
    multiprocessing_exp()

行4:變數 num_threads_processes 用來設定線程數和進程數,因為我們要做比較所以我們設定程數和進程數相同。

行6~8:range_start 平方和的起始值,本篇為0,range_end 為平方和的結束值,本篇為109step 為每一等份的數,本篇的計算結果為1.25×108

注意:如果要改num_threads_processesrange_startrange_end 要確保第8行的除式為整除,否則結果會計算錯誤,例如把 num_threads_processes 改成3就不行。

行10~12:分別呼叫多線程和多進程函式。

多線程

def threading_exp():
    
    threads = []
    results = [0] * num_threads_processes
    start_time = time.time()

    for i in range(num_threads_processes):

        start = range_start + i * step      #  將資料依線程數分割
        end = start + step - 1

        if i == num_threads_processes - 1:            # 確保最後一個數範圍包含 range_end
            end = range_end

        t = threading.Thread(target=worker, args=(start, end, results, i))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    total_sum = sum(results)
    end_time = time.time()

    print(f"Threading result: {total_sum}")
    print(f"Threading time: {end_time - start_time} seconds")

行16:建立一個空的串列(list),準備用來存線程物件。

行17:建立 results 串列[0]乘以 線程數(以8為例),會變成串列[0,0,0,0,0,0,0,0],準備儲存每個線程或進程計算結果。

行18:計時開始。

行20:跑 for 迴圈,若設定線程數8迴圈就是0到7,每圈要做的事包括,分割資料再將資料分配給每一個線程。

行24~25:參考上圖,透過計算賦予 start 一個區間的的起始值,end 賦予一個區間的結束值,透過已計算出的 step 值,藉由迴圈數 i 向上進行累加式移動。

例如:
i =0 , start = 0 , end = 1.25×108-1
i =1 , start = 1.25×108 , end = 2.5×108-1

行30:建立子線程, target 指定要在該線程執行 worker 函式,設定 args 為要傳給函式的引數,是元祖(tuple)格式。

行31:使用 append(t) 函式將線程物件增加在 threads 串列的尾端

行32:啟用子線程。

行34~35:加入等待線程等待,直到所有線程執行完畢後才往下執行。

行37:呼叫內建函數 sum()results 串列裡所有的值進行加總。

多進程

#### 多進程(process)範例 ####
def multiprocessing_exp():

    processes = []
    manager = multiprocessing.Manager()             # Manager可以避免race condition
    results = manager.list([0] * num_threads_processes)     # 透過Manager物件創建串列
    start_time = time.time()

    for i in range(num_threads_processes):
        
        start = range_start + i * step      # 將資料依進程數分割
        end = start + step - 1
        
        if i == num_threads_processes - 1:          # 確保最後一個數範圍包含 range_end
            end = range_end
        
        p = multiprocessing.Process(target=worker, args=(start, end, results, i))        # 將multiprocessing.Process 物件塞入列表內
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    total_sum = sum(results)
    end_time = time.time()

    print(f"Multiprocessing result: {total_sum}")
    print(f"Multiprocessing time: {end_time - start_time} seconds")

行48~49:使用 manager.list 來創建一個共享的列表,各個進程可以安全地寫入和讀取這個列表。這段代碼的意義在於讓多個進程能夠共享數據並進行同步。

manager.list 是由 multiprocessing.Manager 提供的共享數據結構之一,它允許多個進程之間安全地共享數據。除了 list 之外,Manager 還提供了其他類型的共享數據結構,比如 dict、Namespace 等。

為什麼需要 manager.list?在多進程環境中,每個進程都有自己的獨立內存空間,進程之間無法直接共享數據。因此,透過 manager.list 的串列將各進程產生的數據分區,不會產生競爭危害(競爭危害)。


*另外還有 workersquare_sum 函式,但因為較簡單就不做說明囉~

完整程式碼

# https://deeptek.asia/program/multithreading_vs_multiprocessing/

import threading, multiprocessing
import time

#### 計算平方和 ####
def square_sum(start, end):
    total = 0
    for i in range(start, end + 1):     # 包含 end
        total += i * i
    return total

def worker(start, end, result, index):
    result[index] = square_sum(start, end)

#### 多線程(thread)範例 ####
def threading_exp():
    
    threads = []
    results = [0] * num_threads_processes
    start_time = time.time()

    for i in range(num_threads_processes):

        start = range_start + i * step      #  將資料依線程數分割
        end = start + step - 1

        if i == num_threads_processes - 1:            # 確保最後一個數範圍包含 range_end
            end = range_end

        t = threading.Thread(target=worker, args=(start, end, results, i))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    total_sum = sum(results)
    end_time = time.time()

    print(f"Threading result: {total_sum}")
    print(f"Threading time: {end_time - start_time} seconds")

#### 多進程(process)範例 ####
def multiprocessing_exp():

    processes = []
    manager = multiprocessing.Manager()             # Manager可以避免race condition
    results = manager.list([0] * num_threads_processes)     # 透過Manager物件創建串列
    start_time = time.time()

    for i in range(num_threads_processes):
        
        start = range_start + i * step      # 將資料依進程數分割
        end = start + step - 1
        
        if i == num_threads_processes - 1:          # 確保最後一個數範圍包含 range_end
            end = range_end
        
        p = multiprocessing.Process(target=worker, args=(start, end, results, i))        # 將multiprocessing.Process 物件塞入列表內
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    total_sum = sum(results)
    end_time = time.time()

    print(f"Multiprocessing result: {total_sum}")
    print(f"Multiprocessing time: {end_time - start_time} seconds")


#### 主程式 ####
if __name__ == "__main__": 
    
    num_threads_processes = 8         # 線程數和進程數
 
    range_start = 0
    range_end = 1000000000  # 請依測試的電腦性能修改
    step = (range_end - range_start) // num_threads_processes
    
    threading_exp()
    print("---------------------------------------------------")
    multiprocessing_exp()

執行結果

CPU為AMD Ryzen 7 2700X,陸續補齊~



num_threads_processes = 8

Threading result: 333333333833333333500000000
Threading time: 59.0412712097168 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 9.554643392562866 seconds



num_threads_processes = 4

Threading result: 333333333833333333500000000
Threading time: 58.00310969352722 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 16.429932355880737 seconds



num_threads_processes = 2

Threading result: 333333333833333333500000000
Threading time: 60.96465802192688 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 31.881603479385376 seconds



num_threads_processes = 1

Threading result: 333333333833333333500000000
Threading time: 69.91795206069946 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 63.157376289367676 seconds


參考文獻

[1] ChatGPT

[2] 程序(進程)、執行緒(線程)、協程,傻傻分得清楚!

[3] Python模塊-進程間的通信(Queue,Pipe)與數據共享(Manager)

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *