
之前寫交易程式聯網的程式,傳統的程式碼執行須得待上一行執行完畢,網路的回應的等待時間,成為這類程式性能的瓶頸,是否可以利用等待網路回應的時間,讓程式做點別的事?亦或者有效利用多核CPU來實現並行計算?現在CPU動輒8核16核,是否可以幫你的人生開掛呢?
之前寫交易程式聯網的程式,傳統的程式碼執行須得待上一行執行完畢,網路的回應的等待時間,成為這類程式性能的瓶頸,是否可以利用等待網路回應的時間,讓程式做點別的事?亦或者有效利用多核CPU來實現並行計算?現在CPU動輒8核16核,是否可以幫你的人生開掛呢?
然後我就會玩了Python 裡2個標準函式庫管模組 threading(多線程)multiprocessing(進程),它們的用法十分類似。頓時來了餿主意,寫一段算法很類似的程式碼比較【多線程】和【多進程】的效能差異,使其計算的結果相同。
這篇文章的程式碼改寫ChatGPT,如果不看原理接跑程式碼請自下方【完整程式碼】,以 AMD Ryzen 7 2700X 跑一次80秒以下。
概念簡介
Threading(線程):
- 優點:
- 創建和切換線程的開銷較小,記憶體需求較小。
- 適合 I/O 密集型任務,例如網絡請求或文件操作,因為 I/O 操作會釋放 GIL,允許其他線程繼續執行。
- 缺點:
- 由於 GIL 的存在,在計算密集型任務中無法充分利用多核 CPU 的性能。
Multiprocessing(多進程):
- 優點:
- 每個進程都有自己的 Python 解釋器和 GIL,因此多進程可以真正並行地執行計算密集型任務,充分利用多核 CPU 的性能。
- 每個進程都有自己的 Python 解釋器和 GIL,因此多進程可以真正並行地執行計算密集型任務,充分利用多核 CPU 的性能。
- 缺點:
- 創建和管理進程的開銷比線程高,記憶體需求很大。
- 進程之間的通信比線程間的通信更複雜且開銷更大。
Global Interpreter Lock (GIL)
- GIL 的作用: GIL 是 Python 中的一個機制,它限制了同一時間只有一個原生線程可以執行 Python 字節碼。這意味著,即使在多核 CPU 上,使用多線程在同一時間也只有一個線程在執行 Python 代碼。
- 對計算密集型任務的影響: 對於計算密集型任務(如平方和的計算),由於 GIL 的存在,Python 的多線程不能真正並行地執行這些任務,而是在線程之間切換,這導致了多線程的性能不如多進程。
算法
預計要計算 0 ~109的平方和,數學公式如下:

為了要利用多線程或多進程(以8為例),我們要先將 0 ~109 分成8等份,每段包含 2.5×108 個數字,分別餵給8個線程或進程,8堆資料都算完平方和之後再加總,完成本次計算。
要特別注意!我們用 for迴圈來分這8堆數字, for迴圈是從0開始計數,需要額外的判斷才不會漏加最後一個數109,下面我們還會再提到。

主程式
#### 主程式 ####
if __name__ == "__main__":
num_threads_processes = 16 # 線程數和進程數
range_start = 0
range_end = 1000000000 # 請依測試的電腦性能修改
step = (range_end - range_start) // num_threads_processes
threading_exp()
print("---------------------------------------------------")
multiprocessing_exp()行4:變數 num_threads_processes 用來設定線程數和進程數,因為我們要做比較所以我們設定程數和進程數相同。
行6~8:range_start 平方和的起始值,本篇為0,range_end 為平方和的結束值,本篇為109,step 為每一等份的數,本篇的計算結果為1.25×108
注意:如果要改num_threads_processes、range_start 和 range_end 要確保第8行的除式為整除,否則結果會計算錯誤,例如把 num_threads_processes 改成3就不行。
行10~12:分別呼叫多線程和多進程函式。
多線程
def threading_exp():
threads = []
results = [0] * num_threads_processes
start_time = time.time()
for i in range(num_threads_processes):
start = range_start + i * step # 將資料依線程數分割
end = start + step - 1
if i == num_threads_processes - 1: # 確保最後一個數範圍包含 range_end
end = range_end
t = threading.Thread(target=worker, args=(start, end, results, i))
threads.append(t)
t.start()
for t in threads:
t.join()
total_sum = sum(results)
end_time = time.time()
print(f"Threading result: {total_sum}")
print(f"Threading time: {end_time - start_time} seconds")行16:建立一個空的串列(list),準備用來存線程物件。
行17:建立 results 串列[0]乘以 線程數(以8為例),會變成串列[0,0,0,0,0,0,0,0],準備儲存每個線程或進程計算結果。
行18:計時開始。
行20:跑 for 迴圈,若設定線程數8迴圈就是0到7,每圈要做的事包括,分割資料再將資料分配給每一個線程。
行24~25:參考上圖,透過計算賦予 start 一個區間的的起始值,end 賦予一個區間的結束值,透過已計算出的 step 值,藉由迴圈數 i 向上進行累加式移動。
例如:
i =0 , start = 0 , end = 1.25×108-1
i =1 , start = 1.25×108 , end = 2.5×108-1
行27~28:這一步很重要,判斷是否到了迴圈的最後1圈,如果「是」用end = range_end 把最後一個數包含近來,若少了這2行程式依舊可以運行,但會漏掉要加109,將導致最後的計算結果錯誤。
i =7 , start = 8.75×108 , end = 109 <-這一步很重要
行30:建立子線程, target 指定要在該線程執行 worker 函式,設定 args 為要傳給函式的引數,是元祖(tuple)格式。
行31:使用 append(t) 函式將線程物件增加在 threads 串列的尾端
行32:啟用子線程。
行34~35:加入等待線程等待,直到所有線程執行完畢後才往下執行。
行37:呼叫內建函數 sum() 將 results 串列裡所有的值進行加總。
多進程
#### 多進程(process)範例 ####
def multiprocessing_exp():
processes = []
manager = multiprocessing.Manager() # Manager可以避免race condition
results = manager.list([0] * num_threads_processes) # 透過Manager物件創建串列
start_time = time.time()
for i in range(num_threads_processes):
start = range_start + i * step # 將資料依進程數分割
end = start + step - 1
if i == num_threads_processes - 1: # 確保最後一個數範圍包含 range_end
end = range_end
p = multiprocessing.Process(target=worker, args=(start, end, results, i)) # 將multiprocessing.Process 物件塞入列表內
processes.append(p)
p.start()
for p in processes:
p.join()
total_sum = sum(results)
end_time = time.time()
print(f"Multiprocessing result: {total_sum}")
print(f"Multiprocessing time: {end_time - start_time} seconds")行48~49:使用 manager.list 來創建一個共享的列表,各個進程可以安全地寫入和讀取這個列表。這段代碼的意義在於讓多個進程能夠共享數據並進行同步。
manager.list 是由 multiprocessing.Manager 提供的共享數據結構之一,它允許多個進程之間安全地共享數據。除了 list 之外,Manager 還提供了其他類型的共享數據結構,比如 dict、Namespace 等。
為什麼需要 manager.list?在多進程環境中,每個進程都有自己的獨立內存空間,進程之間無法直接共享數據。因此,透過 manager.list 的串列將各進程產生的數據分區,不會產生競爭危害(競爭危害)。
*裡面的串列能用 print(results) 顯示出來,結果為 [0, 0, 0, 0, 0, 0, 0, 0],有沒有發現這玩意和【多線程】函式程式碼裡的 results = [0] * num_threads 一模一樣,這也顯示了我們在多線程和多進程用了相似的算法。
*另外還有 worker、square_sum 函式,但因為較簡單就不做說明囉~
完整程式碼
# https://deeptek.asia/program/multithreading_vs_multiprocessing/
import threading, multiprocessing
import time
#### 計算平方和 ####
def square_sum(start, end):
total = 0
for i in range(start, end + 1): # 包含 end
total += i * i
return total
def worker(start, end, result, index):
result[index] = square_sum(start, end)
#### 多線程(thread)範例 ####
def threading_exp():
threads = []
results = [0] * num_threads_processes
start_time = time.time()
for i in range(num_threads_processes):
start = range_start + i * step # 將資料依線程數分割
end = start + step - 1
if i == num_threads_processes - 1: # 確保最後一個數範圍包含 range_end
end = range_end
t = threading.Thread(target=worker, args=(start, end, results, i))
threads.append(t)
t.start()
for t in threads:
t.join()
total_sum = sum(results)
end_time = time.time()
print(f"Threading result: {total_sum}")
print(f"Threading time: {end_time - start_time} seconds")
#### 多進程(process)範例 ####
def multiprocessing_exp():
processes = []
manager = multiprocessing.Manager() # Manager可以避免race condition
results = manager.list([0] * num_threads_processes) # 透過Manager物件創建串列
start_time = time.time()
for i in range(num_threads_processes):
start = range_start + i * step # 將資料依進程數分割
end = start + step - 1
if i == num_threads_processes - 1: # 確保最後一個數範圍包含 range_end
end = range_end
p = multiprocessing.Process(target=worker, args=(start, end, results, i)) # 將multiprocessing.Process 物件塞入列表內
processes.append(p)
p.start()
for p in processes:
p.join()
total_sum = sum(results)
end_time = time.time()
print(f"Multiprocessing result: {total_sum}")
print(f"Multiprocessing time: {end_time - start_time} seconds")
#### 主程式 ####
if __name__ == "__main__":
num_threads_processes = 8 # 線程數和進程數
range_start = 0
range_end = 1000000000 # 請依測試的電腦性能修改
step = (range_end - range_start) // num_threads_processes
threading_exp()
print("---------------------------------------------------")
multiprocessing_exp()執行結果
CPU為AMD Ryzen 7 2700X,陸續補齊~
num_threads_processes = 8
Threading result: 333333333833333333500000000
Threading time: 59.0412712097168 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 9.554643392562866 seconds
num_threads_processes = 4
Threading result: 333333333833333333500000000
Threading time: 58.00310969352722 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 16.429932355880737 seconds
num_threads_processes = 2
Threading result: 333333333833333333500000000
Threading time: 60.96465802192688 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 31.881603479385376 seconds
num_threads_processes = 1
Threading result: 333333333833333333500000000
Threading time: 69.91795206069946 seconds
---------------------------------------------------
Multiprocessing result: 333333333833333333500000000
Multiprocessing time: 63.157376289367676 seconds
參考文獻
[1] ChatGPT
