18ch
18.2 線程和進程
18.2.1 什么是進程?
18.2.1 什么是進程?
計算機程序只不過是磁盤中可執行的,二進制的數據。它們只有在被讀取到內存中,被操作系統調用的時候才開始它們的生命周期。進程(重量級進程)是程序的一 次執行,每個進程都有自己的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。操作系統管理在其上運行的所有進程,并為這些進程公平的分配時間。 進程也可以通過fork和spawn操作來完成其它的任務。不過各個進程有自己的內存空間,數據棧等,所以只能使用進程間通訊(IPC),而不能直接共享 信息。
18.2.2 什么是線程
線程(輕量級進程)跟進程有些相似,不同的是:所有的線程運行在同一個進程中,共享相同的運行環境。它們可以想象成是在主進程或“主線程”中并行運行的“迷你進程”。
線程有開始,順序執行和結束三部分。它有一個自己的指令指針,記錄自己運行到什么地方。線程的運行可能被搶占(中斷),或暫時的被掛起(也叫睡眠),讓其 它的線程運行,這叫做讓步。一個進程中的各個線程之間共享同一片數據空間,所以線程之間可以比進程之間更方便的共享數據以及相互通訊。線程一般都是并發執 行的,正式由于這種并行和數據共享的機制使得多個任務的合作變成可能。實際上,在單CPU的系統中,真正的并發是不可能的,每個線程會被安排成每次只運行 一會,然后就把CPU讓出來,讓其它的線程去運行。在進程的整個運行過程中,每個線程都只做自己的事,在需要的時候跟其它的線程共享運行的結果。
當然,這樣的共享并不是完全沒有危險的。如果多個線程共同訪問同一片數據,則由于數據訪問的順序不一樣,有可能導致數據結果的不一致問題。
另一個需要注意的地方是:由于有的函數會在完成之前阻塞住,在沒有特別為多線程做修改的情況下,這種“貪婪”的函數會讓CPU的市價分配有所傾斜。導致各個線程分配到的運行時間可能不盡相同,不盡公平。
18.3.5 Python的threading模塊
核心提示:避免使用thread模塊。更高級別的 threading 模塊更為先進,對線程的支持更為完善,而且使用 thread 模塊里的屬性有可能會與 threading 出現沖突。其次,低級別的 thread 模塊的同步原語很少(實際上只有一個),而 threading 模塊則有很多。
thread 模塊函數
start_new_thread(function,
args, kwargs=None) 產生一個新的線程,在新線程中用指定的參數和可選的
kwargs來調用這個函數。
allocate_lock() 分配一個LockType 類型的鎖對象
exit() 讓線程退出
LockType類型鎖對象方法
acquire(wait=None) 嘗試獲取鎖對象
locked() 如果獲取了鎖對象返回True,否則返回False
release() 釋放鎖
例子1:簡單的多線程
1 from time import sleep, ctime 2 import thread 3 def loop0(): 4 print "start loop 0 at:", ctime() 5 sleep(4) 6 print "loop 0 done at:", ctime() 7 def loop1(): 8 print "start loop 1 at:", ctime() 9 sleep(2) 10 print "loop 1 done at:", ctime() 11 def main(): 12 print "starting at:", ctime() 13 thread.start_new_thread(loop0, ()) 14 thread.start_new_thread(loop1, ()) 15 sleep(6) 16 print "all DONE at:", ctime() 17 18 if __name__ == "__main__": 19 main()
mtsleep1.py
其中sleep(6)是為了讓線程中的sleep(2)和sleep(4)能夠完整運行(而不是隨著主線程結束直接終止)。
我們可以通過使用鎖來保證主線程不會提前結束。
1 #!/usr/bin/env python 2 3 import thread 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec, lock): 9 print 'start loop', nloop, 'at: ', ctime() #運行時這條語句有錯誤輸出 10 sleep(nsec) 11 print 'loop', nloop, 'done at: ', ctime() 12 lock.release() 13 14 def main(): 15 print 'starting at:', ctime() 16 locks = [] 17 nloops = range(len(loops)) 18 19 for i in nloops: 20 lock = thread.allocate_lock() 21 lock.acquire() #嘗試獲得鎖(將鎖鎖上) 22 locks.append(lock) 23 for i in nloops: 24 thread.start_new_thread(loop, (i, loops[i], locks[i])) 25 26 for i in nloops: 27 while locks[i].locked(): 28 pass 29 print 'all DONE at:', ctime() 30 main()
mtsleep2.py
18.5 threading 模塊
接下來,我們要介紹的是更高級別的threading模塊,它不僅提供了Thread類,還提供了各種好用的同步機制。
threading 模塊對象 描述
Thread 表示一個線程的執行的對象
Lock 鎖原語對象(跟thread 模塊里的鎖對象相同)
RLock 可重入鎖對象。使單線程可以再次獲得已經獲得了的鎖(遞歸鎖定)。
Condition 條件變量對象能讓一個線程停下來,等待其它線程滿足了某個“條件”。
如,狀態的改變或值的改變。
Event 通用的條件變量。多個線程可以等待某個事件的發生,在事件發生后,
所有的線程都會被激活。
Semaphore 為等待鎖的線程提供一個類似“等候室”的結構
BoundedSemaphore 與Semaphore 類似,只是它不允許超過初始值
Timer 與Thread 相似,只是,它要等待一段時間后才開始運行。
核心提示:守護線程
另一個避免使用thread 模塊的原因是,它不支持守護線程。當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。有時,我們并不期望這種行為,這時,就引入了守護線程的概念
threading
模塊支持守護線程,它們是這樣工作的:守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求,它就在那等著。如果你設定一個線程為守護線程,就
表示你在說這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。就像你在第16
章網絡編程看到的,服務器線程運行在一個無限循環中,一般不會退出。
如果你的主線程要退出的時候,不用等待那些子線程完成,那就設定這些線程的daemon
屬性。即,在線程開始(調用thread.start())之前,調用setDaemon()函數設定線程的daemon
標志(thread.setDaemon(True))就表示這個線程“不重要”
如果你想要等待子線程完成再退出, 那就什么都不用做,
或者顯式地調用thread.setDaemon(False)以保證其daemon
標志為False。你可以調用thread.isDaemon()函數來判斷其daemon 標志的值。新的子線程會繼承其父線程的daemon
標志。整個Python 會在所有的非守護線程退出后才會結束,即進程中沒有非守護線程存在的時候才結束。
18.5.1Thread類
函數 描述
start() 開始線程的執行
run() 定義線程的功能的函數(一般會被子類重寫)
join(timeout=None) 程序掛起,直到線程結束;如果給了timeout,則最多阻塞timeout 秒
getName() 返回線程的名字
setName(name) 設置線程的名字
isAlive() 布爾標志,表示這個線程是否還在運行中
isDaemon() 返回線程的daemon 標志
setDaemon(daemonic) 把線程的daemon 標志設為daemonic(一定要在調用start()函數前調用)
有三種方法用Thread類創建線程。
1. 創建一個Thread的實例,傳給它一個函數
2. 創建一個Thread的實例,傳給它一個可調用的類對象
3. 從Thread派生出一個子類, 創建一個這個子類的實例
下面分別給出例子
1.創建一個Thread的實例,傳給它一個函數
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 def loop(nloop, nsec): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 13 def main(): 14 print 'starting at:', ctime() 15 threads = [] 16 nloops = range(len(loops)) 17 18 for i in nloops: 19 t = threading.Thread(target = loop, args = (i, loops[i])) 20 threads.append(t) 21 22 for i in nloops: 23 threads[i].start() 24 25 for i in nloops: 26 threads[i].join() 27 28 print 'all DONE at:', ctime() 29 30 if __name__ == '__main__': 31 main()
mtsleep3.py
所有線程都創建了之后,再一起調用start()函數啟動,而不是創建一個啟動一個,而且,不用再管理一堆鎖(分配鎖,獲得鎖,釋放鎖,檢查鎖的狀態),只要簡單地對每個線程調用join()函數就可以了。
join()會等到線程結束,或者在給力timeout參數的情況下等到超時,使用join()會比使用一個等待鎖釋放的無限循環更清晰(這種鎖也被稱為自旋鎖)。
join()的另一個比較重要的方面是它可以完全不被調用。事實上一旦線程啟動以后就會一直運行,直到線程的函數結束。如果你的主線程除了等線程結束以外還有其他的事情要做,那就不用調用join(),只有當你要等待線程結束的時候才調用join()。
2.創建一個Thread的實例,傳給它一個可調用的類對象。
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4,2] 7 8 class ThreadFunc(object): 9 10 def __init__(self, func, args, name=''): 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def __call__(self): 16 self.res = self.func(*self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: 29 t = threading.Thread(target = ThreadFunc(loop, (i,loops[i]), 30 loop.__name__)) 31 threads.append(t) 32 33 for i in nloops: 34 threads[i].start() 35 36 for i in nloops: 37 threads[i].join() 38 39 print 'all DONE at:', ctime() 40 41 if __name__ == '__main__': 42 main() 43
mtsleep4.py
3.從Thread類中派生出一個子類,創建一個這個子類的實例
mtsleep5和mtsleep4的最大區別在于1.MyThread子類的構造器一定要先調用其基類的構造器 2.之前的特殊函數__call__()在子類中,名字要改為run()。
1 #!/usr/bin/env python 2 3 import threading 4 from time import sleep, ctime 5 6 loops = (4, 2) 7 8 class MyThread(threading.Thread): 9 def __init__(self, func, args, name=''): 10 threading.Thread.__init__(self) 11 self.name = name 12 self.func = func 13 self.args = args 14 15 def run(self): 16 self.func(*self.args) #apply(self.func, self.args) 17 18 def loop(nloop, nsec): 19 print 'start loop', nloop, 'at:', ctime() 20 sleep(nsec) 21 print 'loop', nloop, 'done at:', ctime() 22 23 def main(): 24 print 'starting at:', ctime() 25 threads = [] 26 nloops = range(len(loops)) 27 28 for i in nloops: 29 t = MyThread(loop, (i, loops[i]), loop.__name__) 30 threads.append(t) 31 32 for i in nloops: 33 threads[i].start() 34 35 for i in nloops: 36 threads[i].join() 37 38 print 'all DONE at:', ctime() 39 40 if __name__ == '__main__': 41 main()
mtsleep5.py
為了讓mtsleep5中的Thread的子類更為通用我們把子類單獨放在一個模塊中,并加上一個getResult()函數用以返回函數的運行結果
1 #!/usr/bin/eny python 2 3 import threading 4 from time import ctime 5 6 class MyThread(threading.Thread): 7 def __init__(self, func, args, name=''): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.func = func 11 self.args = args 12 13 def getResult(self): 14 return self.res 15 16 def run(self): 17 print 'starting', self.name, 'at:'. ctime() 18 self.res = self.func(*self.args) 19 print self.name, 'finished at:', ctime()
myThread.py
接下來給出一個腳本比較遞歸求斐波那契、階乘和累加和函數的運行。腳本先在單線程中運行再在多線程中運行以說明多線程的好處。
1 #!/usr/bin/eny python 2 3 from myThread import MyThread 4 from time import ctime, sleep 5 6 def fib(x): 7 sleep(0.005) 8 if x < 2: return 1 9 return (fib(x-2) + fib(x-1)) 10 11 def fac(x): 12 sleep(0.1) 13 if x < 2:return 1 14 return (x * fac(x-1)) 15 16 def sum(x): 17 sleep(0.1) 18 if x < 2:return 1 19 return (x + sum(x - 1)) 20 21 funcs = [fib, fac, sum] 22 n = 12 23 24 def main(): 25 nfuncs = range(len(funcs)) 26 27 print '*** SINGLE THREAD' 28 for i in nfuncs: 29 print 'starting', funcs[i].__name__, 'at:', ctime() 30 print funcs[i](n) 31 print funcs[i].__name__, 'finished at:', ctime() 32 33 print ' *** MUTIPLE THREADS' 34 threads = [] 35 for i in nfuncs: 36 t = MyThread(funcs[i],(n,), funcs[i].__name__) 37 threads.append(t) 38 39 for i in nfuncs: 40 threads[i].start() 41 42 for i in nfuncs: 43 threads[i].join() 44 print threads[i].getResult() 45 46 print 'all DONE' 47 48 if __name__ == '__main__': 49 main()
mtfacfib.py
18.5.3threading模塊中的其他函數
| 函數 | 描述 |
| activeCount() | 當前活動的線程對象的數量 |
| crrentThread() | 返回當前線程對象 |
| enumerate() | 返回當前活動線程的列表 |
| settrace(func) | 為所有線程設置一個跟蹤函數 |
| setprofile(func) | 為所有線程設置一個profile函數 |
18.5.4 生產者-消費者問題和Queue模塊
生產者-消費者問題,就是生產者把生產的貨物放進隊列一類的數據結構中供消費者使用,其中生產貨物和消費貨物的時間都是不固定的。
Queue模塊可以用來解決生產者-消費者問題,讓各個線程之間通信,所用到的屬性如下:
| 函數 | 描述 |
| Queue模塊函數 | |
| queue(size) | 創建一個大小為size的Queue對象 |
| Queue對象函數 | |
| qsize() | 返回隊列的大小(由于在返回的時候,隊列可能會被其他線程修改,所以這個值是近似值) |
| empty() |
如果隊列為空返回True, 否則返回False |
| full() | 如果隊列已滿返回True,否則返回False |
| put(item, block=0) | 把item放到隊列中,如果給了block,函數會一直阻塞到隊列中有空間為止 |
| get(block=0) | 從隊列中取一個對象,如果給了block,函數會一直阻塞直到隊列中有對象為止。 |
1 #!/usr/bin/env python
2
3 from random import randint
4 from time import sleep
5 from Queue import Queue
6 from myThread import MyThread
7
8 def writeQ(queue):
9 print 'producing object for Q...'
10 queue.put('xxx', 1)
11 print "size now", queue.qsize()
12
13 def readQ(queue):
14 val = queue.get(1)
15 print 'consumed object from Q... size now', queue.qsize()
16
17 def writer(queue, loops):
18 for i in range(loops):
19 writeQ(queue)
20 sleep(randint(1, 3))
21
22 def reader(queue, loops):
23 for i in range(loops):
24 readQ(queue)
25 sleep(randint(1, 3))
26
27 funcs = [writer, reader]
28 nfuncs = range(len(funcs))
29
30 def main():
31 nloops = randint(2, 5)
32 q = Queue(32)
33
34 threads = []
35 for i in nfuncs:
36 t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
37 threads.append(t)
38
39 for i in nfuncs:
40 threads[i].start()
41
42 for i in nfuncs:
43 threads[i].join()
44
45 print 'all DONE'
46
47 if __name__ == '__main__':
48 main()
procons.py
18.6相關模塊
| 模塊 | 描述 |
| thread | 基本的、低級別的線程模塊 |
| threading | 高級別的線程和同步對象 |
| Queue | 供多線程使用的同步隊列 |
| mutex | 互斥對象 |
| SocketServer | 具有線程控制的TCP和UDP管理器 |
總結
- 上一篇: 使用BOOTICE 恢复系统启动项
- 下一篇: Python函数式编程简介(三)匿名函数