python线程唤醒_Python 并发编程(一)之线程
常用用法
t.is_alive()
Python中線程會在一個單獨的系統(tǒng)級別線程中執(zhí)行(比如一個POSIX線程或者一個Windows線程)
這些線程將由操作系統(tǒng)來全權管理。線程一旦啟動,將獨立執(zhí)行直到目標函數(shù)返回。可以通過查詢
一個線程對象的狀態(tài),看它是否還在執(zhí)行t.is_alive()
t.join()
可以把一個線程加入到當前線程,并等待它終止
Python 解釋器在所有線程都終止后才繼續(xù)執(zhí)行代碼剩余的部分
daemon
對于需要長時間運行的線程或者需要一直運行的后臺任務,可以用后臺線程(也稱為守護線程)
例:
t = Thread(target = func, args(1,), daemon = True)
t.start()
后臺線程無法等待,這些線程會在主線程終止時自動銷毀
小結:
后臺線程無法等待,不過,這些線程會在主線程終止時自動銷毀。你無法結束一個線程,無法給它發(fā)送信
號,無法調整它的調度,也無法執(zhí)行其他高級操作。如果需要這些特性,你需要自己添加。比如說,
如果你需要終止線程,那么這個線程必須通過編程在某個特定點輪詢來退出
如果線程執(zhí)行一些像 I/O 這樣的阻塞操作,那么通過輪詢來終止線程將使得線程之間的協(xié)調變得非常棘手。
比如,如果一個線程一直阻塞在一個 I/O 操作上,它就永遠無法返回,也就無法檢查自己是否已經被結束了。
要正確處理這些問題,需要利用超時循環(huán)來小心操作線程。
線程間通信
queue
一個線程向另外一個線程發(fā)送數(shù)據(jù)最安全的方式應該就是queue庫中的隊列
先看一下使用例子,這里是一個簡單的生產者和消費者模型:
1 from queue importQueue2 from threading importThread3 importrandom4 importtime5
6
7 _sentinel =object()8
9
10 defproducer(out_q):11 n = 10
12 whilen:13 time.sleep(1)14 data = random.randint(0, 10)15 out_q.put(data)16 print("生產者生產了數(shù)據(jù){0}".format(data))17 n -= 1
18 out_q.put(_sentinel)19
20
21 defconsumer(in_q):22 whileTrue:23 data =in_q.get()24 print("消費者消費了{0}".format(data))25 if data is_sentinel:26 in_q.put(_sentinel)27 break
28
29
30 q =Queue()31 t1 = Thread(target=consumer, args=(q,))32 t2 = Thread(target=producer, args=(q,))33
34 t1.start()35 t2.start()
上述代碼中設置了一個特殊值_sentinel用于當獲取到這個值的時候終止執(zhí)行
關于queue的功能有個需要注意的地方:
Queue對象雖然已經包含了必要的鎖,主要有q.put和q.get
而q.size(),q.full(),q.empty()等方法不是線程安全的
使用隊列進行線程通信是一個單向、不確定的過程。通常情況下,是沒有辦法知道接收數(shù)據(jù)的線程是什么時候接收到的數(shù)據(jù)并開始工作的。但是隊列提供了一些基本的特性:q.task_done()和q.join()
如果一個線程需要在另外一個線程處理完特定的數(shù)據(jù)任務后立即得到通知,可以把要發(fā)送的數(shù)據(jù)和一個Event放到一起使用
關于線程中的Event
線程有一個非常關鍵的特性:每個線程都是獨立運行的,且狀態(tài)不可預測
如果程序中的其他線程需要通過判斷每個線程的狀態(tài)來確定自己下一步的操作,這時線程同步問題就會比較麻煩。
解決方法:
使用threading庫中的Event
Event對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發(fā)生。
在初始化狀態(tài)下,event對象中的信號標志被設置為假。
如果有線程等待一個event對象,而這個event的標志為假,這個線程將一直被阻塞知道該標志為真。
一個線程如果把event對象的標志設置為真,就會喚醒所有等待這個event對象的線程。
通過一個代碼例子理解:
1 from threading importThread, Event2 importtime3
4
5 defcountdown(n, started_evt):6 print("countdown starting")7 #set將event的標識設置為True
8 started_evt.set()9 while n >0:10 print("T-mins", n)11 n -= 1
12 time.sleep(2)13
14 #初始化的started_evt為False
15 started_evt =Event()16 print("Launching countdown")17 t = Thread(target=countdown, args=(10, started_evt,))18 t.start()19 #會一直等待直到event的標志為True的時候
20 started_evt.wait()21 print("countdown is running")
而結果,我們也可以看出當線程執(zhí)行了set之后,才打印running
實際用event對象最好是單次使用,創(chuàng)建一個event對象,讓某個線程等待這個對象,一旦對象被設置為Tru,就應該丟棄它,我們雖然可以通過clear()方法重置event對象,但是這個沒法確保安全的清理event對象并對它進行重新的賦值。會發(fā)生錯過事件,死鎖等各種問題。
event對象的一個重要特點是它被設置為True時會喚醒所有等待它的線程,如果喚醒單個線程的最好用Condition或信號量Semaphore
和event功能類似的線程中還有一個Condition
關于線程中的Condition
關于Condition官網的一段話:
A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. Passing one in is useful when several condition variables must share the same lock. The lock is part of the condition object: you don’t have to track it separately.
Other methods must be called with the associated lock held. The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout.
但是需要注意的是:notify() and notify_all()這兩個方法,不會釋放鎖,這意味著線程或者被喚醒的線程不會立刻執(zhí)行wait()
我們可以通過Conditon對象實現(xiàn)一個周期定時器的功能,每當定時器超時的時候,其他線程都可以檢測到,代碼例子如下:
1 importthreading2 importtime3
4
5 classPeriodicTimer:6 """
7 這里做了一個定時器8 """
9
10 def __init__(self, interval):11 self._interval =interval12 self._flag =013 self._cv =threading.Condition()14
15 defstart(self):16 t = threading.Thread(target=self.run)17 t.daemon =True18 t.start()19
20 defrun(self):21 whileTrue:22 time.sleep(self._interval)23 with self._cv:24 #這個點還是非常有意思的^=
25 self._flag ^= 1
26 self._cv.notify_all()27
28 defwait_for_tick(self):29 with self._cv:30 last_flag =self._flag31
32 while last_flag ==self._flag:33 self._cv.wait()34
35
36 #下面兩個分別為兩個需要定時執(zhí)行的任務
37 defcountdown(nticks):38 while nticks >0:39 ptimer.wait_for_tick()40 print('T-minus', nticks)41 nticks -= 1
42
43
44 defcountup(last):45 n =046 while n <47 ptimer.wait_for_tick print n>
50
51
52 ptimer = PeriodicTimer(5)53 ptimer.start()54
55 threading.Thread(target=countdown, args=(10,)).start()56 threading.Thread(target=countup, args=(5,)).start()
關于線程中鎖的使用
要在多線程中安全使用可變對象,需要使用threading庫中的Lock對象
先看一個關于鎖的基本使用:
1 importthreading2
3
4 classSharedCounter:5
6 def __init__(self, initial_value=0):7 self._value =initial_value8 self._value_lock =threading.Lock()9
10
11 def incr(self,delta = 1):12 with self._value_lock:13 self._value +=delta14
15 def decr(self, delta=1):16 with self._value_lock:17 self._value -= delta
Lock對象和with語句塊一起使用可以保證互斥執(zhí)行,這樣每次就只有一個線程可以執(zhí)行with語句包含的代碼塊。with語句會在這個代碼快執(zhí)行前自動獲取鎖,在執(zhí)行結束后自動釋放所。
線程的調度本質上是不確定的,因此,在多線程程序中錯誤的使用鎖機制可能會導致隨機數(shù)據(jù)
損壞或者其他異常錯誤,我們稱之為競爭條件
你可能看到有些“老python程序員”
還是通過_value_lock.acquire() 和_value_lock.release(),明顯看來
還是with更加方便,不容易出錯,畢竟你無法保證那次就忘記釋放鎖了
為了避免死鎖,使用鎖機制的程序應該設定每個線程一次只能獲取一個鎖
threading庫中還提供了其他的同步原語:RLock,Semaphore對象。但是這兩個使用場景相對來說比較特殊
RLock(可重入鎖)可以被同一個線程多次獲取,主要用來實現(xiàn)基于檢測對象模式的鎖定和同步。在使用這種鎖的時候,當鎖被持有時,只有一個線程可以使用完整的函數(shù)或者類中的方法,例子如下:
1 importthreading2
3
4 classSharedCounter:5
6 _lock =threading.RLock()7
8 def __init__(self,initial_value=0):9 self._value =initial_value10
11 def incr(self,delta=1):12
13 with SharedCounter._lock:14 self._value +=delta15
16 def decr(self,delta=1):17
18 with SharedCounter._lock:19 self.incr(-delta)
這個例子中的鎖是一個類變量,也就是所有實例共享的類級鎖,這樣就保證了一次只有一個線程可以調用這個類的方法。與標準鎖不同的是已經持有這個鎖的方法再調用同樣適用這個鎖的方法時,無需再次獲取鎖,例如上面例子中的decr方法。
這種方法的特點是:無論這個類有多少實例都使用一個鎖。因此在需要使用大量使用計數(shù)器的情況下內存效率更高。
缺點:在程序中使用大量線程并頻繁更新計數(shù)器時會有競爭用鎖的問題。
信號量對象是一個建立在共享計數(shù)器基礎上的同步原語,如果計數(shù)器不為0,with語句講計數(shù)器減1,
線程被允許執(zhí)行。with語句執(zhí)行結束后,計數(shù)器加1。如果計數(shù)器為0,線程將被阻塞,直到其他線程結束并將計數(shù)器加1。但是信號量不推薦使用,增加了復雜性,影響程序性能。
所以信號量更適用于哪些需要在線程之間引入信號或者限制的程序。例如限制一段代碼的并發(fā)量
1 from threading importSemaphore2 importrequests3
4
5 _fetch_url_sema = Semaphore(5)6
7
8 deffetch_url(url):9 with _fetch_url_sema:10 return requests.get(url)
關于防止死鎖的加鎖機制
在多線程程序中,死鎖問題很大一部分是由于多線程同時獲取多個鎖造成的。
舉個例子:一個線程獲取一個第一個鎖,在獲取第二個鎖的時候發(fā)生阻塞,那么這個線程就可能阻塞其他線程執(zhí)行,從而導致整個程序假死。
一種解決方法:為程序中每一個鎖分配一個唯一的id,然后只允許按照升序規(guī)則來使用多個鎖。
1 importthreading2 from contextlib importcontextmanager3
4 #存儲已經請求鎖的信息
5 _local =threading.local()6
7
8 @contextmanager9 def acquire(*locks):10 #把鎖通過id進行排序
11 locks = sorted(locks, key=lambdax: id(x))12
13 acquired = getattr(_local, 'acquired', [])14
15 if acquired and max(id(lock) for lock in acquired) >=id(locks[0]):16 raise RuntimeError("Lock order Violation")17 acquired.extend(locks)18 _local.acquired =acquired19
20 try:21 for lock inlocks:22 lock.acquire()23 yield
24 finally:25 for lock inreversed(locks):26 lock.release()27 del acquired[-len(locks):]28
29
30 x_lock =threading.Lock()31 y_lock =threading.Lock()32
33
34 defthread_1():35 whileTrue:36 with acquire(x_lock,y_lock):37 print("Thread-1")38
39
40 defthread_2():41 whileTrue:42 with acquire(y_lock,x_lock):43 print("Thread-2")44
45
46 t1 = threading.Thread(target=thread_1)47 t1.daemon =True48 t1.start()49
50 t2 = threading.Thread(target=thread_2)51 t2.daemon =True52 t2.start()
通過排序,不管以什么樣的順序來請求鎖,這些鎖都會按照固定的順序被獲取。
這里也用了thread.local()來保存請求鎖的信息
同樣的這個東西也可以用來保存線程的信息,而這個線程對其他的線程是不可見的
47>總結
以上是生活随笔為你收集整理的python线程唤醒_Python 并发编程(一)之线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: accept 阻塞怎么断开_暖气片放水就
- 下一篇: android 获取serialno_[