多线程与线程锁
Thread基本方法
#! /usr/bin/env python3 # encoding: utf-8""" @Author: zengchunyun @Date: 2017/7/12 """import threading import sys import timedef read(event):print('get a task ...')print('read is_set', event.is_set()) # 獲取event狀態,如果event狀態為False,調用event.wait()就會進入阻塞狀態,# 直到event狀態為True,才會繼續執行event.wait()后面的代碼event.wait() # 此處會進入等待狀態,不會繼續執行下面的代碼,直到event對象被設置為True,則會通知此wait進入非阻塞狀態,即,會繼續執行下面的代碼print('read is_set', event.is_set()) # 獲取event狀態,如果在wait后,通常該狀態為True,如果wait后,又立即對event調用clear()方法,則該狀態又為Falseevent.clear() # 將event狀態設置為Falseprint('read is_set', event.is_set()) # 獲取event狀態,如果在wait后,通常該狀態為True,如果wait后,又立即對event調用clear()方法,則該狀態又為Falsecurrent_thread = threading.current_thread() # 獲取當前線程對象print('current_thread', current_thread)# current_thread._tstate_lock=None# current_thread._stop()print('is_alive', current_thread.is_alive()) # 通過獲取到當前線程對象后,可以對當前線程做一些操作,比如判斷當前線程狀態,這里對狀態肯定是為True,因為如果為False,那就不可能執行到這里了.也可以通過一些特殊方式,在此處將該結果得到的值為False,不過這種方式對程序而言就沒有太大意義了。current_thread.name = 'read_thread' # 可以對當前線程修改名字print(current_thread.name)print('get_ident', threading.get_ident()) # 返回當前的線程唯一標識符,為非0的整數print('main_thread', threading.main_thread()) # 獲取主線程對象,通常這個主線程是有python解釋器啟動的print("doing now")time.sleep(50)print('doing done.')def write(frame, event, arg):print('write', frame, event, arg, frame.f_lineno)def worker():# threading.settrace(write) # 通過調用sys.settrace方法去獲取執行過程的棧信息,通過這些棧信息可以用于記錄日志分析,也能方便理解程序的運行過程,該函數在run()之前調用,它有call,line,return,exception,c_call,c_return,c_exception這些事件類型# sys.settrace(write)# threading.setprofile(write) # 通過調用sys.setprofile()獲取棧信息,它不是每次都會調用。僅僅是在call一個方法時和方法return時調用。所以它的事件信息只有call,c_call,c_return, return,當一個異常已經發生,這個return事件也會返回print('stack_size', threading.stack_size(36864)) # 設置棧大小,最少為32KiB,即32768,每次增加的大小必須是4KiB,即4096大小,print('get_ident', threading.get_ident()) # 返回當前的線程唯一標識符print('active_count', threading.active_count()) # 當前有多少個正在運行的線程,等同于len(threading.enumerate())print('TIMEOUT_MAX', threading.TIMEOUT_MAX) # 最大超時參數,用于Lock.acquire(),Rlock.acquire(),Condition.wait(),等使用超時參數等方法,如果設置一個比這個值還大等值,會出現OverflowError異常print('enumerate', threading.enumerate())event = threading.Event()t1 = threading.Thread(target=read, args=(event,))t1.daemon = False # 設置子線程是否為守護線程,當為守護線程時,該線程將在后臺運行,主線程執行完成就不會等待子線程執行是否完成,而是直接退出,如果不是守護線程,主線程執行完成,如果子線程沒有完成,會繼續等待# 創建一個線程對象有兩種方式,一種是調用線程的構造方法,或者子類繼承Thread,重寫run方法,要啟動這個線程對象,還需要調用線程的start()方法,然后會在獨立的線程里管理調用run里面的代碼,當線程start().# 我們為認為這個線程就是活動的狀態,直到這個線程的run方法執行完成,或者run方法內部出現異常,則這個線程就是不活躍狀態,可以通過is_alive()去獲取線程狀態t1.start()print('stack_size', threading.stack_size())print('worker is_set', event.is_set())event.clear()print('worker is_set', event.is_set())timer = threading.Timer(interval=5, function=event.set)timer.start()print('active_count', threading.active_count())print('enumerate', threading.enumerate())# 調用線程的join方法會阻塞其它線程的調用,直到調用join的那個線程方法調用終止,即如果有多個線程,都調用里join,那么會依次按誰先調用join,誰就先執行,直到調用join的那個線程執行完成,才會繼續調用下一個線程,這樣就等于把線程變成串行方式執行了,而不是并行# join實際上就是使用了線程鎖,讓同一時刻只能有一個線程在運行。# 如果線程不是daemon線程,分兩張情況,# 一:timeout為None,主線程就會等待子線程執行完畢,才繼續執行主線程后面的代碼,直到主線程執行完,程序才退出# 二:timeout設置了大于0的浮點值,就會在該超時時間內等待子線程的返回,如果這個時間內,子線程沒有執行完成,那么主線程不會繼續等子線程,而是繼續執行主線程后面的代碼,最后如果主線程代碼執行完了,如果子線程還沒有執行完,會繼續等待子線程,直到子線程完全返回,主線程才退出# 如果子線程是daemon線程,也分兩種情況# 一: 如果timeout為None, 主線程會等待子線程執行完畢,才會繼續執行主線程后面的代碼,直到主線程執行完,程序退出# 二:如果timeout設置為大于0的浮點值,就會在該超時時間內等待子線程的返回,如果這個時間內,子線程沒有執行完成,那么主線程不會繼續等待子線程,而是繼續執行主線程后面的代碼,最后如果主線程代碼執行完了,如果子線程還沒有執行完,不會繼續等待子線程,而是直接退出程序# 總結,在沒有timeout時,不管是不是daemon線程,主線程都會等待子線程執行完成后才會繼續執行主線程后面的代碼,直到主線程代碼執行完成,程序退出# 設置了timeout時,子線程超時后,主線程不會繼續等待子線程返回結果,而是繼續執行主線程代碼,最終,主線程執行完成后,在非daemon狀態時,如果子線程還沒執行完成,會繼續等待,如果是daemon狀態,那么這時主線程是不會等待子線程完成,而是直接退出程序# t1.join(timeout=10.1)print('begin ...')print('all task done.')if __name__ == '__main__':worker()線程鎖Lock基本概念
#! /usr/bin/env python3 # encoding: utf-8""" @Author: zengchunyun @Date: 2017/7/12 """ import threading import time""" lock有兩種狀態,一種是locked,一種是unlocked 創建lock時。狀態為unlocked,它有兩個基本的方法,acquire()和release(),當狀態為unlocked,acquire()改變這個狀態為locked, 并立即返回,當狀態是locked時,調用acquire()方法時會阻塞當前線程,直到另一個線程調用它的release()方法,將狀態改為unlocked。 然后這個acquire()調用重置lock為locked狀態,并立即返回,這個release()方法應該在lock狀態為locked時調用,它會改變lock狀態為unlocked,并立即返回, 如果對一個已經是unlocked狀態對lock調用release()時,會拋出RuntimeError錯誤當多個線程調用acquire()時進入阻塞狀態,等待這個lock狀態變為unlocked,只有一個線程會在release()被調用后,lock狀態會變為unlocked后執行, 具體是哪些線程會執行,并沒有一個明確規定條件,且在代碼實現上差異也很大"""""" lock支持使用上下文管理 例如: with lock:# do something...它等效于下面這種寫法 lock.acquire() try:# do something... finally:lock.release() """lock = threading.Lock()def read(lock_obj):print('entry read function...')lock_status = lock_obj.acquire() # acquire接受兩個參數,blocking=True,timeout=-1,兩個默認參數值# 當lock_status狀態為True時,說明獲取到鎖了,當為False說明沒有獲得鎖print(lock_status)time.sleep(5)print('read do something...')lock_obj.release()print('read done.')def write(lock_obj):print('entry write function...')# 當blocking設置為True,默認為True,線程會阻塞,直到lock狀態變為unlocked# 當blocking設置為False,線程不會阻塞,如果一個調用使用blocking為True會被阻塞,并立即返回False,其它情況設置這個lock為locked,并返回True# 如果設置timeout值時,blocking值必須是True,默認值為True,所以可以不用指定blocking,也就是說,通常這兩個參數最好不要同時存在# 當timeout為非-1時。線程會阻塞這個timeout值當秒數,如果超時了,依然沒有獲得鎖,則不繼續阻塞,但是返回值為False,也就是說lock狀態為unlocked,所以當返回狀態為False時,是不能調用lock的release()方法,否則拋異常# 當timeout為-1時,線程會進入無限當的等待狀態,不允許在blocking為False時為timeout指定值lock_status = lock_obj.acquire(blocking=True, timeout=2) # acquire接受兩個參數,blocking=True,timeout=-1,兩個默認參數值,當把blocking設置為False時,即不會阻塞該線程,會繼續執行后面的代碼,print(lock_status)print('write do something...')if lock_status: # 需要判斷是否獲得鎖,如果獲得,狀態為True,則需要釋放鎖lock_obj.release()print('write done.')if __name__ == '__main__':t1 = threading.Thread(target=read, args=(lock,))t1.start()t2 = threading.Thread(target=write, args=(lock,))t2.start() # 首先要明白,獲取鎖時,默認是阻塞狀態,直到獲取到鎖,可以給阻塞狀態獲取鎖時設置等待超時時間,超過時間不管有沒有獲取到都會繼續執行后面的代碼 # 對于一個新的鎖對象,也就是剛創建的鎖,第一次調用acquire()方法去獲取時,不管是不是阻塞,都能獲取到鎖,也就是這個方法會返回True,但是 # 當你再次使用這個鎖對象acquire(False)不阻塞方式去獲取鎖,其結果如果還是True,那其結果只有一種,那就是之前肯定釋放了一次鎖,否則,對于 # 已經拿到了鎖,再次調用acquire(False)其結果將為False# 第一種情況 import threading lock = threading.Lock() print(lock.acquire(True)) # 返回True print(lock.acquire(False)) # 返回Falselock2 = threading.Lock() print(lock2.acquire(False)) # 返回True print(lock2.acquire(False)) # 返回False# 第二種情況 lock3 = threading.Lock() print(lock3.acquire(True)) # 返回True print(lock3.release()) print(lock3.acquire(False)) # 返回Truelock4 = threading.Lock() print(lock4.acquire(False)) # 返回True print(lock4.release()) print(lock4.acquire(False)) # 返回True多把鎖正確使用方式
import threading import time import random""" 2把鎖對兩個不同資源進行多線程操作 """read_lock = threading.Lock() write_lock = threading.Lock()num = 0 num2 = 0def read(r_lock, w_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時刻只能有一個線程操作共享數據global num # 準備對共享數據進行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費時,需要大約0-3秒num += 1 # 對共享數據加1# 這時還想對一個比較耗費時間對資源進行操作,所以又開啟一個新線程,該資源也是共享資源,所以也需要鎖定資源new_task = threading.Thread(target=write, args=(w_lock,))new_task.start()# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準備對共享資源進行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費時,需要大約0-5秒num2 += 1 # 花費數秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個任務t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當前剩下1個活動的線程,說明其它子線程任務都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break錯誤使用鎖方式
""" 錯誤的加鎖方式 以下代碼加鎖方式不可取,一個線程內,應該加鎖和解鎖是成對的,且加完鎖后,在一個線程內,不該再去加鎖,必須先解鎖后,再加鎖,否則非常容易造成死鎖 """read_lock = threading.Lock() write_lock = threading.Lock()num = 0 num2 = 0def read(r_lock, w_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時刻只能有一個線程操作共享數據global num # 準備對共享數據進行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費時,需要大約0-3秒num += 1 # 對共享數據加1# 這時還想對一個比較耗費時間對資源進行操作,所以又開啟一個新線程,該資源也是共享資源,所以也需要鎖定資源new_task = threading.Thread(target=write, args=(w_lock,)) # 注意,子線程如果使用也需要鎖時,解鎖操作必須在子線程內把鎖釋放完成new_task.start()# 然后需要釋放共享資源w_lock.release() # 不能在此處釋放鎖,這種代碼設計很容易造成死鎖現象,r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準備對共享資源進行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費時,需要大約0-5秒num2 += 1 # 花費數秒秒才把資源修改完成w_lock.release() # 此處注釋后,很可能會有鎖沒有得到釋放,這種寫法容易造成死鎖,就是遞歸鎖也不能這樣寫,if __name__ == '__main__':for task in range(10): # 開啟10個任務t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當前剩下1個活動的線程,說明其它子線程任務都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break常見的死鎖
""" 使用同一把鎖對不同資源加鎖,沒有及時釋放,造成死鎖 """read_lock = threading.Lock()num = 0 num2 = 0def read(r_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時刻只能有一個線程操作共享數據global num # 準備對共享數據進行操作time.sleep(random.randint(0, 1)) # 修改該資源比較費時,需要大約0-3秒num += 1 # 對共享數據加1write(r_lock) # 使用了同一把鎖,且未先釋放鎖,會造成死鎖# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準備對共享資源進行操作time.sleep(random.randint(0, 1)) # 修改該資源比較費時,需要大約0-5秒num2 += 1 # 花費數秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個任務t1 = threading.Thread(target=read, args=(read_lock,)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當前剩下1個活動的線程,說明其它子線程任務都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break遞歸鎖
上面的這個死鎖解決方式
""" 使用1把遞歸鎖可以解決上面對問題,但是上面對問題是可以避免的,只需先釋放鎖,在調用write(r_lock)方法,即可 """read_lock = threading.RLock()num = 0 num2 = 0def read(r_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時刻只能有一個線程操作共享數據global num # 準備對共享數據進行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費時,需要大約0-3秒num += 1 # 對共享數據加1# 這時還想對一個比較耗費時間對資源進行操作,所以又開啟一個新線程,該資源也是共享資源,所以也需要鎖定資源# new_task = threading.Thread(target=write, args=(w_lock,))# new_task.start()write(r_lock)# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準備對共享資源進行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費時,需要大約0-5秒num2 += 1 # 花費數秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個任務t1 = threading.Thread(target=read, args=(read_lock,)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當前剩下1個活動的線程,說明其它子線程任務都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break轉載于:https://www.cnblogs.com/zengchunyun/p/7155819.html
總結
- 上一篇: Spark学习笔记(8)---Spark
- 下一篇: 织梦采集文章