Python爬虫进阶五之多线程的用法
前言
我們之前寫的爬蟲都是單個線程的?這怎么夠?一旦一個地方卡到不動了,那不就永遠等待下去了?為此我們可以使用多線程或者多進程來處理。
首先聲明一點!
多線程和多進程是不一樣的!一個是 thread 庫,一個是 multiprocessing 庫。而多線程 thread 在 Python 里面被稱作雞肋的存在!而沒錯!本節介紹的是就是這個庫 thread。
不建議你用這個,不過還是介紹下了,如果想看可以看看下面,不想浪費時間直接看
multiprocessing 多進程
雞肋點
名言:
“Python下多線程是雞肋,推薦使用多進程!”
那當然有同學會問了,為啥?
背景
1、GIL是什么?
GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了數據安全所做的決定。
2、每個CPU在同一時間只能執行一個線程(在單核CPU下的多線程其實都只是并發,不是并行,并發和并行從宏觀上來講都是同時處理多路請求的概念。但并發和并行又有區別,并行是指兩個或者多個事件在同一時刻發生;而并發是指兩個或多個事件在同一時間間隔內發生。)
在Python多線程下,每個線程的執行方式:
- 獲取GIL
- 執行代碼直到sleep或者是python虛擬機將其掛起。
- 釋放GIL
可見,某個線程想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,并且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不允許進入CPU執行。
在Python2.x里,GIL的釋放邏輯是當前線程遇見IO操作或者ticks計數達到100(ticks可以看作是Python自身的一個計數器,專門做用于GIL,每次釋放后歸零,這個計數可以通過 sys.setcheckinterval 來調整),進行釋放。
而每次釋放GIL鎖,線程進行鎖競爭、切換線程,會消耗資源。并且由于GIL鎖存在,python里一個進程永遠只能同時執行一個線程(拿到GIL的線程才能執行),這就是為什么在多核CPU上,python的多線程效率并不高。
那么是不是python的多線程就完全沒用了呢?
在這里我們進行分類討論:
1、CPU密集型代碼(各種循環處理、計數等等),在這種情況下,由于計算工作多,ticks計數很快就會達到閾值,然后觸發GIL的釋放與再競爭(多個線程來回切換當然是需要消耗資源的),所以python下的多線程對CPU密集型代碼并不友好。
2、IO密集型代碼(文件處理、網絡爬蟲等),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費CPU的資源,從而能提升程序執行效率)。所以python的多線程對IO密集型代碼比較友好。
而在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到閾值后,當前線程釋放GIL),這樣對CPU密集型程序更加友好,但依然沒有解決GIL導致的同一時間只能執行一個線程的問題,所以效率依然不盡如人意。
多核性能
多核多線程比單核多線程更差,原因是單核下多線程,每次釋放GIL,喚醒的那個線程都能獲取到GIL鎖,所以能夠無縫執行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會進行競爭,但GIL可能會馬上又被CPU0拿到,導致其他幾個CPU上被喚醒后的線程會醒著等待到切換時間后又進入待調度狀態,這樣會造成線程顛簸(thrashing),導致效率更低
多進程為什么不會這樣?
每個進程有各自獨立的GIL,互不干擾,這樣就可以真正意義上的并行執行,所以在python中,多進程的執行效率優于多線程(僅僅針對多核CPU而言)。
所以在這里說結論:多核下,想做并行提升效率,比較通用的方法是使用多進程,能夠有效提高執行效率。
所以,如果不想浪費時間,可以直接看多進程。
直接利用函數創建多線程
Python中使用線程有兩種方式:函數或者用類來包裝線程對象。
函數式:調用thread模塊中的start_new_thread()函數來產生新線程。語法如下:
?
| 1 | <span class="s1">thread</span><span class="s2">.</span><span class="s1">start_new_thread </span><span class="s2">(</span> <span class="s3">function</span><span class="s2">,</span><span class="s1"> args</span><span class="s2">[,</span><span class="s1"> kwargs</span><span class="s2">]</span> <span class="s2">)</span> |
?
參數說明:
- function – 線程函數。
- args – 傳遞給線程函數的參數,他必須是個tuple類型。
- kwargs – 可選參數。
先用一個實例感受一下:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # -*- coding: UTF-8 -*- import thread import time # 為線程定義一個函數 def print_time(threadName, delay): ????count = 0 ????while count < 5: ????????time.sleep(delay) ????????count += 1 ????????print "%s: %s" % (threadName, time.ctime(time.time())) # 創建兩個線程 try: ????thread.start_new_thread(print_time, ("Thread-1", 2,)) ????thread.start_new_thread(print_time, ("Thread-2", 4,)) except: ????print "Error: unable to start thread" while 1: ?? pass print "Main Finished" |
?
運行結果如下:
?
| 1 2 3 4 5 6 7 8 9 10 | Thread-1: Thu Nov??3 16:43:01 2016 Thread-2: Thu Nov??3 16:43:03 2016 Thread-1: Thu Nov??3 16:43:03 2016 Thread-1: Thu Nov??3 16:43:05 2016 Thread-2: Thu Nov??3 16:43:07 2016 Thread-1: Thu Nov??3 16:43:07 2016 Thread-1: Thu Nov??3 16:43:09 2016 Thread-2: Thu Nov??3 16:43:11 2016 Thread-2: Thu Nov??3 16:43:15 2016 Thread-2: Thu Nov??3 16:43:19 2016 |
?
可以發現,兩個線程都在執行,睡眠2秒和4秒后打印輸出一段話。
注意到,在主線程寫了
?
| 1 2 | while 1: ?? pass |
?
這是讓主線程一直在等待
如果去掉上面兩行,那就直接輸出
?
| 1 | Main Finished |
?
程序執行結束。
使用Threading模塊創建線程
使用Threading模塊創建線程,直接從threading.Thread繼承,然后重寫init方法和run方法:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | #!/usr/bin/python # -*- coding: UTF-8 -*- import threading import time import thread exitFlag = 0 class myThread (threading.Thread):?? #繼承父類threading.Thread ????def __init__(self, threadID, name, counter): ????????threading.Thread.__init__(self) ????????self.threadID = threadID ????????self.name = name ????????self.counter = counter ????def run(self):?????????????????? #把要執行的代碼寫到run函數里面 線程在創建后會直接運行run函數 ????????print "Starting " + self.name ????????print_time(self.name, self.counter, 5) ????????print "Exiting " + self.name def print_time(threadName, delay, counter): ????while counter: ????????if exitFlag: ????????????thread.exit() ????????time.sleep(delay) ????????print "%s: %s" % (threadName, time.ctime(time.time())) ????????counter -= 1 # 創建新線程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 開啟線程 thread1.start() thread2.start() print "Exiting Main Thread" |
?
運行結果:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | Starting Thread-1Starting Thread-2 Exiting Main Thread Thread-1: Thu Nov??3 18:42:19 2016 Thread-2: Thu Nov??3 18:42:20 2016 Thread-1: Thu Nov??3 18:42:20 2016 Thread-1: Thu Nov??3 18:42:21 2016 Thread-2: Thu Nov??3 18:42:22 2016 Thread-1: Thu Nov??3 18:42:22 2016 Thread-1: Thu Nov??3 18:42:23 2016 Exiting Thread-1 Thread-2: Thu Nov??3 18:42:24 2016 Thread-2: Thu Nov??3 18:42:26 2016 Thread-2: Thu Nov??3 18:42:28 2016 Exiting Thread-2 |
?
有沒有發現什么奇怪的地方?打印的輸出格式好奇怪。比如第一行之后應該是一個回車的,結果第二個進程就打印出來了。
那是因為什么?因為這幾個線程沒有設置同步。
線程同步
如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。
使用Thread對象的Lock和Rlock可以實現簡單的線程同步,這兩個對象都有acquire方法和release方法,對于那些需要每次只允許一個線程操作的數據,可以將其操作放到acquire和release方法之間。如下:
多線程的優勢在于可以同時運行多個任務(至少感覺起來是這樣)。但是當線程需要共享數據時,可能存在數據不同步的問題。
考慮這樣一種情況:一個列表里所有元素都是0,線程”set”從后向前把所有元素改成1,而線程”print”負責從前往后讀取列表并打印。
那么,可能線程”set”開始改的時候,線程”print”便來打印列表了,輸出就成了一半0一半1,這就是數據的不同步。為了避免這種情況,引入了鎖的概念。
鎖有兩種狀態——鎖定和未鎖定。每當一個線程比如”set”要訪問共享數據時,必須先獲得鎖定;如果已經有別的線程比如”print”獲得鎖定了,那么就讓線程”set”暫停,也就是同步阻塞;等到線程”print”訪問完畢,釋放鎖以后,再讓線程”set”繼續。
經過這樣的處理,打印列表時要么全部輸出0,要么全部輸出1,不會再出現一半0一半1的尷尬場面。
看下面的例子:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | # -*- coding: UTF-8 -*- import threading import time class myThread (threading.Thread): ????def __init__(self, threadID, name, counter): ????????threading.Thread.__init__(self) ????????self.threadID = threadID ????????self.name = name ????????self.counter = counter ????def run(self): ????????print "Starting " + self.name ?????? # 獲得鎖,成功獲得鎖定后返回True ?????? # 可選的timeout參數不填時將一直阻塞直到獲得鎖定 ?????? # 否則超時后將返回False ????????threadLock.acquire() ????????print_time(self.name, self.counter, 3) ????????# 釋放鎖 ????????threadLock.release() def print_time(threadName, delay, counter): ????while counter: ????????time.sleep(delay) ????????print "%s: %s" % (threadName, time.ctime(time.time())) ????????counter -= 1 threadLock = threading.Lock() threads = [] # 創建新線程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # 開啟新線程 thread1.start() thread2.start() # 添加線程到線程列表 threads.append(thread1) threads.append(thread2) # 等待所有線程完成 for t in threads: ????t.join() print "Exiting Main Thread" |
?
在上面的代碼中運用了線程鎖還有join等待。
運行結果如下:
?
| 1 2 3 4 5 6 7 8 9 | Starting Thread-1 Starting Thread-2 Thread-1: Thu Nov??3 18:56:49 2016 Thread-1: Thu Nov??3 18:56:50 2016 Thread-1: Thu Nov??3 18:56:51 2016 Thread-2: Thu Nov??3 18:56:53 2016 Thread-2: Thu Nov??3 18:56:55 2016 Thread-2: Thu Nov??3 18:56:57 2016 Exiting Main Thread |
?
這樣一來,你可以發現就不會出現剛才的輸出混亂的結果了。
線程優先級隊列
Python的Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(后入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。
Queue模塊中的常用方法:
- Queue.qsize() 返回隊列的大小
- Queue.empty() 如果隊列為空,返回True,反之False
- Queue.full() 如果隊列滿了,返回True,反之False
- Queue.full 與 maxsize 大小對應
- Queue.get([block[, timeout]])獲取隊列,timeout等待時間
- Queue.get_nowait() 相當Queue.get(False)
- Queue.put(item) 寫入隊列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
- Queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
- Queue.join() 實際上意味著等到隊列為空,再執行別的操作
用一個實例感受一下:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | # -*- coding: UTF-8 -*- import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): ????def __init__(self, threadID, name, q): ????????threading.Thread.__init__(self) ????????self.threadID = threadID ????????self.name = name ????????self.q = q ????def run(self): ????????print "Starting " + self.name ????????process_data(self.name, self.q) ????????print "Exiting " + self.name def process_data(threadName, q): ????while not exitFlag: ????????queueLock.acquire() ????????if not workQueue.empty(): ????????????data = q.get() ????????????queueLock.release() ????????????print "%s processing %s" % (threadName, data) ????????else: ????????????queueLock.release() ????????time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # 創建新線程 for tName in threadList: ????thread = myThread(threadID, tName, workQueue) ????thread.start() ????threads.append(thread) ????threadID += 1 # 填充隊列 queueLock.acquire() for word in nameList: ????workQueue.put(word) queueLock.release() # 等待隊列清空 while not workQueue.empty(): ????pass # 通知線程是時候退出 exitFlag = 1 # 等待所有線程完成 for t in threads: ????t.join() print "Exiting Main Thread" |
?
運行結果:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 | Starting Thread-1 Starting Thread-2 Starting Thread-3 Thread-3 processing One Thread-1 processing Two Thread-2 processing Three Thread-3 processing Four Thread-2 processing Five Exiting Thread-2 Exiting Thread-3 Exiting Thread-1 Exiting Main Thread |
?
上面的例子用了FIFO隊列。當然你也可以換成其他類型的隊列。
參考文章
2.?http://www.runoob.com/python/python-multithreading.html
轉載:靜覓???Python爬蟲進階五之多線程的用法
轉載于:https://www.cnblogs.com/BigFishFly/p/6380048.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Python爬虫进阶五之多线程的用法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux权限管理 - 特殊权限之文件特
- 下一篇: Spring Boot 入门例子 Hel