Python中的并行处理(Pool.map()、Pool.starmap()、Pool.apply()、)
1.并行處理
? ? ?并行處理是一種在同一臺(tái)計(jì)算機(jī)的多個(gè)處理器中同時(shí)運(yùn)行任務(wù)的工作模式。這種工作模式的目的就是減少總的任務(wù)處理時(shí)間,但是進(jìn)程之間的通信會(huì)有額外的開銷,因此對小的任務(wù)而言,總的任務(wù)時(shí)間會(huì)有所增加而不是是減少。
? ? ? 在Python語言中,multiprocessing模塊通過使用子進(jìn)程(而不是線程)來運(yùn)行獨(dú)立的并行進(jìn)程。它可以讓您利用機(jī)器上的多個(gè)處理器(Windows和Unix),也就是說,多個(gè)進(jìn)程可以完全獨(dú)立的在內(nèi)存中運(yùn)行
2.自己的設(shè)備最多可以進(jìn)行多少個(gè)并行處理
? ? ? 一次可以運(yùn)行的最大進(jìn)程數(shù)受計(jì)算機(jī)中處理器數(shù)量的限制。可以使用multiprocessing模塊中的cpu_count()函數(shù)進(jìn)行顯示
import multiprocessing as mpprint("Number of processers: ", mp.cpu_count())像我電腦只有四個(gè):
3.同步執(zhí)行和異步執(zhí)行
? ? 并行處理中,有兩種執(zhí)行類型:? ?同步和異步
? ? 同步執(zhí)行就是各個(gè)進(jìn)程按照啟動(dòng)的先后順序完成。這是通過鎖定主程序直到相應(yīng)的進(jìn)程執(zhí)行完畢來實(shí)現(xiàn)的。
? ??異步執(zhí)行,換句話說,進(jìn)程的執(zhí)行不涉及鎖定,這樣做的結(jié)果就是,進(jìn)程結(jié)果返回的順序可能混淆,但通常情況下,異步執(zhí)行會(huì)更快完成。
multiprocessing模塊中有兩個(gè)對象是用來實(shí)現(xiàn)函數(shù)并行執(zhí)行的:Pool類和Process類
? ??
?
4.解決實(shí)際問題實(shí)例 :?計(jì)算每行中給定數(shù)值范圍內(nèi)的元素個(gè)數(shù)
? ? ? 給定一個(gè)二維矩陣(或者列表和多維列表),計(jì)算每行中給定數(shù)值范圍內(nèi)的元素個(gè)數(shù)
import numpy as np from time import time# RandomState()是一個(gè)偽隨機(jī)數(shù)生成器 np.random.RandomState(100) # 0, 10 : 生成0到10的隨機(jī)整數(shù) # size=[200000, 5] 即生成200000行,一列的 ndarray(二維矩陣的形式,每個(gè)里面5個(gè)元素) arr = np.random.randint(0, 10, size=[200000, 5]) data = arr.tolist() # 將numpy.ndarray 轉(zhuǎn)化為list # 因?yàn)槭请S機(jī)的,所以每次的數(shù)字不確定 data = data[:5] print("數(shù)據(jù)為:", data)""" 運(yùn)行結(jié)果: 數(shù)據(jù)為: [[5, 6, 7, 0, 9], [4, 0, 6, 7, 4], [7, 3, 8, 3, 9], [2, 1, 9, 3, 2], [0, 0, 9, 5, 2]]"""? 4.1?不使用并行處理的參考代碼
? ? 函數(shù)howmany_within_range()進(jìn)行重復(fù)以檢查在范圍內(nèi)的數(shù)有多少個(gè)病返回計(jì)數(shù)
"""不使用并行處理"""def howmany_within_range(row, minimum, maximum):count = 0for n in row:if minimum <= n <= maximum:count += 1return countresult = [] for row in data:result.append(howmany_within_range(row, minimum=4, maximum=8)) print("給定數(shù)值范圍中的元素個(gè)數(shù):", result[:10]) """ 注意:以下只是參考輸出,因?yàn)檩斎胄蛄惺请S機(jī)的,每次輸出結(jié)果并不固定 運(yùn)行結(jié)果: 給定數(shù)值范圍中的元素 [3, 2, 3, 4, 2, 3, 3, 2, 2, 2] """? 4.2 對函數(shù)進(jìn)行并行化處理
? ? ? 對代碼進(jìn)行并行處理通常的做法是取出其中可以多次運(yùn)行的特定函數(shù),將其放在不同的處理器上并運(yùn)行,要做到這一點(diǎn),就需要使用Pool類對數(shù)目為n的處理器進(jìn)行初始化,之后將想要并運(yùn)行的函數(shù)傳遞給Pool類中并行方法。
multipprocessing.Pool()中提供了apply(),map()和starmap()等方法對傳入的函數(shù)并行運(yùn)行。
? apply()和map()? 之間又有什么區(qū)別呢?
apply()和map()都是把要進(jìn)行并行化的函數(shù)作為主要參數(shù),但是不同的是,apply()?接收args參數(shù),通過args將各個(gè)參數(shù)傳送給被并行化處理的函數(shù),而map僅將一個(gè)迭代器作為參數(shù)。
?因此使用,對于簡單的可迭代的操作,使用map()進(jìn)行并行處理更適合,而且能更快完成工作
?4.2.1? Pool.apply()進(jìn)行并行化處理
if __name__ == '__main__':# 1.初始化 multiprocessing.Pool()pool = mp.Pool(mp.cpu_count())# 2.使用apply(), 將函數(shù)howmany_within_range作為主參傳進(jìn)去results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]# 3. 不要忘記關(guān)閉進(jìn)程pool.close()print(results[:10])? 注意:?使用? if __name__ == '__main__':? 將你的代碼放到下面去執(zhí)行,不然會(huì)報(bào)錯(cuò)
? ? ??The "freeze_support()" line can be omitted if the program
? ? ? ? is not going to be frozen to produce an executable.??
? ?我們?nèi)绻谶@段程序之外打印,會(huì)發(fā)現(xiàn),程序會(huì)有個(gè)并行進(jìn)行運(yùn)行,也就多打印程序外的內(nèi)容多次
4.2.2? Parallelizing using Pool.map()
? ? ? ?Pool.map()僅接受一個(gè)迭代器參數(shù)。對howmany_within_range()函數(shù)進(jìn)行簡單的修改,修改為howmany_within_range_rowonly()把minimum和maximum設(shè)置為固定值,即為? 只接受行數(shù)據(jù)列表迭代器作為輸入,不是最好的辦法,但清楚的顯示了它與apply()的不同之處
import multiprocessing as mpdef howmany_within_range_rowonly(row, minimum=4, maximum=8):count = 0for n in row:if minimum <= n <= maximum:count += 1return countpool = mp.Pool(mp.cpu_count())results = pool.map(howmany_within_range_rowonly,[row for row in data])pool.close()print(results[:10])?4.2.3? 使用Pool.starmap()進(jìn)行并行化
? ? ? ? 與Pool.map()一樣,Pool.starmap()也只僅接受一個(gè)迭代器參數(shù),但在starmap()中,迭代器中的每一個(gè)元件也是一個(gè)迭代器。你可以通過這個(gè)內(nèi)部迭代器向被并行化處理的函數(shù)傳遞參數(shù),在執(zhí)行時(shí)再順序解開,只要傳遞和解開的順序一致就行
?實(shí)際上,Pool.starmap()就像是一個(gè)接受參數(shù)的Pool.map()版本
import multiprocessing as mppool = mp.Pool(mp.cpu_count())results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])pool.close()print(results[:10])5.異步并行處理
? ?和同步并行處理對等的異步并行處理函數(shù)apply_async(),map_async()和starmap_async()允許以異步方式并行執(zhí)行進(jìn)程,即下一個(gè)進(jìn)程可以在前一個(gè)進(jìn)程完成時(shí)立即啟動(dòng),而不考慮啟動(dòng)順序。因此,無法保證結(jié)果與輸入的順序相同
6.?使用Pool.apply_async()進(jìn)行并行化
持續(xù)更新
?
總結(jié)
以上是生活随笔為你收集整理的Python中的并行处理(Pool.map()、Pool.starmap()、Pool.apply()、)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的__new__(new函
- 下一篇: 1.设计模式中监听模式(观察者模式)(P