线程队列 线程池 协程
1 . 線程隊列
from multiprocessing Queue , JoinableQueue? #進程IPC隊列
from queue import Queue? #線程隊列? 先進先出
from queue import LifoQueue? #后進先出的
方法都是一樣的 : put , get , put_nowait , get_nowait , full , empty , qsize
隊列 Queue : 先進先出 , 自帶鎖 , 數據安全
棧 LifoQueue : 后進先出 , 自帶鎖 , 數據安全
q = LifoQueue(5)
q.put(3)
q.put_nowait(4)
print(q.get()) #誰最后進的,就會先取誰
q.get_nowait()
print(q.full())
pirnt(q.empty())
print(q.qsize())
from queue import PriorityQueue? #優先級隊列
q = PriorityQueue()
q.put((10,"aaa"))
q.put((4,"bbb"))
print(q.get())
?
線程池
Threading 沒有線程池的
Multiprocessing Pool
concurrent.futures? 幫助管理線程池和進程池
import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor? #幫助啟動線程池
from concurrent.futures import ProcessPoolExecutor? #幫助啟動進程池
def func(i):
time.sleep(1)
print("in%s%s"%(i,currentThread()))
return i**2
def back(fn):
print(fn.result(),currentThread())
#map啟動多線程任務
t = ThreadPoolExecutor(5)
t.map(func,range(20))?? ==? ? ?for i in range(20):
? ? t.submit(func,i)
#submit異步提交任務
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i)
t.shutdown()
print("main:",currentThread()) #起多少個線程池 , 5*CPU的個數
#獲取任務結果
t = ThreadPoolExecutor(5)
li = []
for i in range(20):
ret = t.submit(func,i)
li.append(ret)
t.shutdown()
for i in li:
print(i.result())
print("main:",currentThread())
#回調函數
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i).add_done_callback(back)
#回調函數(進程版)
import os,time
from concurrent.futures import ProcessPoolExecutor
def func(i):
print("in%s%s"%(i,os.getpid()))
return i**2
def back(fn):
print(fn.result(),os.getpid())
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
for i in range(20):
p.submit(func,i).add_done_callback(back)
print("main:",os.getpid())
multiprocessing模塊自帶進程池的
threading模塊是沒有線程池的
concurrent.futures 進程池和線程池 : 高度封裝 , 進程池/線程池的統一的使用方式
創建線程池/進程池 ProcessPoolExecutor? ThreadPoolExecutor
ret .result()獲取結果,如果想實現異步效果,應該使用列表
shutdown? == close + join 同步控制
add_done_callback 回調函數,在回調函數內接收的參數是一個對象,需要通過result來獲取返回值. 進程池的回調函數仍然在主進程中執行,但是線程池的回調函數是在線程中執行.
進程 : 資源分配的最小單位? ,? 班級
線程 : CPU調度最小單位 , 人
Cpython線程不能利用多核的 ,多線程無法利用多核 ,一個線程能同時執行多個任務.
協程 : 能在一條線程的基礎上 ,再過個任務之間互相切換 .節省了線程開啟的消耗.?
協程是從python代碼的級別調度的 , 正常的線程是CPU調度的最小單位 ; 協程的調度并不是由操作系統來完成的.
之前學過的協程在兩個任務之間相互切換的是生成器函數:yield
def func():
print(1)
x = yield "aaa"
print(x)
yield? "bbb"
g? = func()
print(next(g))
print(g.send("***"))
在多個函數之間互相切換的功能? ---? 協程
def consumer():
while True:
x = yield
print(x)
def producer():
g = consumer()
next(g)? ? #預激
for i in range(20):
g.send(i)
producer()
yield只有程序之間的切換,沒有重利用任何IO操作的時間
模塊的安裝 :
pip3 install 要安裝的模塊的名字
?
使用協程減少IO操作帶來的時間消耗
from gevent import monkey ; monkey.patch_all()
import gevent,time
def eat():
print("吃")
time.sleep(2)
print("吃完了")
def play():
print("玩")
time.sleep(1)
print("玩完了")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
沒有執行 , 為什么沒執行???是需要開啟么?
沒有開啟但是切換了
gevent幫我們做了切換,做切換是有條件的,遇到IO才切換
gevent不認識除了gevent這個模塊內以外的IO操作
使用join可以一直阻塞直到協程任務完成
幫助gevent來認識其他模塊中的阻塞
from gevent import monkey;monkey.patch_all() 寫在其他模塊導入之前.
使用協程來實現TCP協議 :
服務器 :
from gevent import monkey;monkey.patch_all()
import gevent,socket
def func(conn):
while 1:
conn.send(b"hello")
print(conn.recv(1024))
sk = socket.socket()
sk.bind(("127.0.0.1",9090))
sk.listen()
while 1:
conn,addr = sk.accept()
gevent.spawn(func,conn)
客戶端 :
import socket
from threading import Thread
def client():
sk = socket.socket()
sk.connect(("127.0.0.1",9090))
while 1:
print(sk.recv(1024))
sk.send(b"bye")
for i in range(20):
Thread(target=client).start()
?
4c 并發50000? qps
5個進程
20個線程
500個協程
協程能夠在單核的情況極大地提高CPU的利用率
協程不存在數據不安全 , 也不存在線程切換/創造的時間開銷 ; 切換時用戶級別的,程序不會因為協程中某一個任務進入阻塞狀態而使整條線程阻塞
線程的切換 :
時間片到了 降低了CPU的效率
IO會切? 提高了CPU的效率
?
轉載于:https://www.cnblogs.com/fengkun125/p/9403360.html
總結
以上是生活随笔為你收集整理的线程队列 线程池 协程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FileChannel与ByteBuff
- 下一篇: Python3 列表的基本操作