协程asyncio_Asyncio深入浅出
Asyncio是一個(gè)異步編程的框架,可以解決異步編程,協(xié)程調(diào)度問題,線程問題,是整個(gè)異步IO的解決方案。
在學(xué)習(xí)asyncio之前,我們先來理清楚同步/異步的概念:
同步是指完成事務(wù)的邏輯,先執(zhí)行第一個(gè)事務(wù),如果阻塞了,會(huì)一直等待,直到這個(gè)事務(wù)完成,再執(zhí)行第二個(gè)事務(wù),順序執(zhí)行 一直等待
異步是和同步相對的,異步是指在處理調(diào)用這個(gè)事務(wù)的之后,不會(huì)等待這個(gè)事務(wù)的處理結(jié)果,直接處理第二個(gè)事務(wù)去了,通過狀態(tài)、通知、回調(diào)來通知調(diào)用者處理結(jié)果。
異步IO采用消息循環(huán)的模式,重復(fù)“讀取消息—處理消息”的過程,也就是說異步IO模型”需要一個(gè)消息循環(huán),在消息循環(huán)中,主線程不斷地重復(fù)“讀取消息-處理消息”這一過程。
- event_loop 事件循環(huán):程序開啟一個(gè)無限的循環(huán),程序員會(huì)把一些函數(shù)注冊到事件循環(huán)上。當(dāng)滿足事件發(fā)生的時(shí)候,調(diào)用相應(yīng)的協(xié)程函數(shù)。
- coroutine 協(xié)程:協(xié)程對象,指一個(gè)使用async關(guān)鍵字定義的函數(shù),它的調(diào)用不會(huì)立即執(zhí)行函數(shù),而是會(huì)返回一個(gè)協(xié)程對象。協(xié)程對象需要注冊到事件循環(huán),由事件循環(huán)調(diào)用。
- task 任務(wù):一個(gè)協(xié)程對象就是一個(gè)原生可以掛起的函數(shù),任務(wù)則是對協(xié)程進(jìn)一步封裝,其中包含任務(wù)的各種狀態(tài)。
- async/await 關(guān)鍵字: 用于定義協(xié)程的關(guān)鍵字,async定義一個(gè)協(xié)程,await用于掛起阻塞的異步調(diào)用接口。(async和await這兩個(gè)關(guān)鍵詞是在python3.5開始正式提出定義,asyncio是python解決異步io編程的一個(gè)完整框架。關(guān)于定義和原理請參考官方文檔: 協(xié)程與任務(wù), 如何理解await)
其中協(xié)程編程離不開的三大要點(diǎn):
- 事件循環(huán)
- 回調(diào)
- epoll/select(IO多路復(fù)用)
以下內(nèi)容有刪減的摘自 :
Asyncio并發(fā)編程?www.langzi.fun事件循環(huán)
簡單案例(訪問一個(gè)網(wǎng)站)
async def get_url_title(url): # 使用關(guān)鍵詞async定義一個(gè)協(xié)程print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)# 這一步至關(guān)重要# asyncio.sleep(2) 功能:異步非阻塞等待2s,作用是模擬訪問網(wǎng)站消耗的時(shí)間# await 的作用類似 yield,即這個(gè)時(shí)候把線程資源控制權(quán)交出去,監(jiān)聽這個(gè)描述符直到這個(gè)任務(wù)完成# await 后面只能接三種類型'''1. 協(xié)程:Python 協(xié)程屬于 可等待 對象,因此可以在其他協(xié)程中被等待:2. 任務(wù):任務(wù) 被用來設(shè)置日程以便 并發(fā) 執(zhí)行協(xié)程。(當(dāng)一個(gè)協(xié)程通過 asyncio.create_task() 等函數(shù)被打包為一個(gè) 任務(wù),該協(xié)程將自動(dòng)排入日程準(zhǔn)備立即運(yùn)行)3. Future 對象:Future 是一種特殊的 低層級 可等待對象,表示一個(gè)異步操作的 最終結(jié)果。(當(dāng)一個(gè) Future 對象 被等待,這意味著協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢。)如果await time.sleep(2) 是會(huì)報(bào)錯(cuò)的'''print('網(wǎng)站訪問成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 一行代碼創(chuàng)造事件循環(huán)loop.run_until_complete(get_url_title('http://www.langzi.fun'))# 這是一個(gè)阻塞的方法,可以理解成多線程中的join方法# 直到get_url_title('http://www.langzi.fun')完成后,才會(huì)繼續(xù)執(zhí)行下面的代碼end_time = time.time()print('消耗時(shí)間:{}'.format(end_time-start_time))返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 消耗時(shí)間:2.0018768310546875簡單案例(訪問多個(gè)網(wǎng)站)
協(xié)程的優(yōu)勢是多任務(wù)協(xié)作,單任務(wù)訪問網(wǎng)站沒法發(fā)揮出他的功能,一次性訪問多個(gè)網(wǎng)站或者一次性等待多個(gè)IO響應(yīng)時(shí)間才能發(fā)揮它的優(yōu)勢。
# -*- coding:utf-8 -*- import asyncio import timeasync def get_url_title(url):print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)print('網(wǎng)站訪問成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)造一個(gè)事件循環(huán)tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# 這個(gè)列表代表總?cè)蝿?wù)量,即執(zhí)行10次get_url_title()函數(shù)loop.run_until_complete(asyncio.wait(tasks))# asyncio.wait后面接上非空可迭代對象,一般來說是功能函數(shù)列表# 功能是一次性提交多個(gè)任務(wù),等待完成# loop.run_until_complete(asyncio.gather(*tasks))# 和上面代碼功能一致,但是gather更加高級,如果是列表就需要加上*# 這里會(huì)等到全部的任務(wù)執(zhí)行完后才會(huì)執(zhí)行后面的代碼end_time = time.time()print('消耗時(shí)間:{}'.format(end_time-start_time))對一個(gè)網(wǎng)站發(fā)起10次請求,返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 消耗時(shí)間:2.0015649795532227gather與wait的區(qū)別:
- gather更擅長于將函數(shù)聚合在一起
- wait更擅長篩選運(yùn)行狀況
即gather更加高級,他可以將任務(wù)分組,也可以取消任務(wù)
import asyncioasync def get_url_title(url):print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)print('網(wǎng)站訪問成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用wait方法# tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))# 使用gather方法實(shí)現(xiàn)分組導(dǎo)入(方法1)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]loop.run_until_complete(asyncio.gather(*group1,*group2))# 這種方法會(huì)把兩個(gè)全部一次性導(dǎo)入# 使用gather方法實(shí)現(xiàn)分組導(dǎo)入(方法2)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)#group2.cancel() 取消group2任務(wù)loop.run_until_complete(asyncio.gather(group1,group2))# 這種方法會(huì)先把group1導(dǎo)入,然后導(dǎo)入group2返回結(jié)果:
開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 開始訪問網(wǎng)站:http://www.baidu.com 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功另外一種使用gather獲取返回結(jié)果:
import asyncioasync def get_url_title(url):print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)print('網(wǎng)站訪問成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用gather方法傳遞任務(wù)獲取結(jié)果group1 = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))loop.run_until_complete(asyncio.gather(group1))# 如果不是列表就不需要加*print(group1.result())返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 success還有一些復(fù)雜的區(qū)別轉(zhuǎn)移到python 異步協(xié)程中查看
協(xié)程的調(diào)用和組合十分靈活,尤其是對于結(jié)果的處理,如何返回,如何掛起,需要逐漸積累經(jīng)驗(yàn)和前瞻的設(shè)計(jì)。
簡單案例(獲取返回值)
# -*- coding:utf-8 -*- import asyncio import timeasync def get_url_title(url):print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)print('網(wǎng)站訪問成功')return 'success'if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)get_future = loop.create_task(get_url_title('http://www.langzi.fun'))#get_future = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))# 這兩行代碼功能用法一模一樣loop.run_until_complete(get_future)print('獲取結(jié)果:{}'.format(get_future.result()))# 獲取結(jié)果end_time = time.time()print('消耗時(shí)間:{}'.format(end_time-start_time))返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 獲取結(jié)果:success 消耗時(shí)間:2.0019724369049072如果是多個(gè)網(wǎng)址傳入,訪問多個(gè)網(wǎng)址的返回值呢?只需要把前面的知識(shí)點(diǎn)匯總一起即可使用:
if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)tasks = [loop.create_task(get_url_title('http://www.langzi.fun')) for i in range(10)]# 把所有要返回的函數(shù)加載到一個(gè)列表loop.run_until_complete(asyncio.wait(tasks))# 這里和上面用法一樣print('獲取結(jié)果:{}'.format([x.result() for x in tasks]))# 因?yàn)榻Y(jié)果都在一個(gè)列表,在列表中取值即可end_time = time.time()print('消耗時(shí)間:{}'.format(end_time-start_time))返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 網(wǎng)站訪問成功 獲取結(jié)果:['success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success'] 消耗時(shí)間:2.0016491413116455簡單案例(回調(diào)函數(shù))
上面的例子是一個(gè)協(xié)程函數(shù),當(dāng)這個(gè)協(xié)程函數(shù)的await xxx執(zhí)行完畢后,想要執(zhí)行另一個(gè)函數(shù)后,然后再返回這個(gè)協(xié)程函數(shù)的返回結(jié)果該這么做:
# -*- coding:utf-8 -*- import asyncio from functools import partial # partial的功能是 固定函數(shù)參數(shù),返回一個(gè)新的函數(shù)。你可以這么理解: ''' from functools import partialdef go(x,y):return x+yg = partial(go,y=2)print(g(1)) 返回結(jié)果:3g = partial(go,x=5,y=2)print(g()) 返回結(jié)果:7''' async def get_url_title(url):print('開始訪問網(wǎng)站:{}'.format(url))await asyncio.sleep(2)print('網(wǎng)站訪問成功')# 當(dāng)這個(gè)協(xié)程函數(shù)快要結(jié)束返回值的時(shí)候,會(huì)調(diào)用下面的call_back函數(shù)# 等待call_back函數(shù)執(zhí)行完畢后,才返回這個(gè)協(xié)程函數(shù)的值return 'success'def call_back(future,url):# 注意這里必須要傳遞future參數(shù),因?yàn)檫@里的future即代表下面的get_future對象print('檢測網(wǎng)址:{}狀態(tài)正常'.format(url))if __name__ == '__main__':loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)get_future = loop.create_task(get_url_title('http://www.langzi.fun'))# 將一個(gè)任務(wù)注冊到loop事件循環(huán)中g(shù)et_future.add_done_callback(partial(call_back,url = 'http://www.langzi.fun'))# 這里是設(shè)置,當(dāng)上面的任務(wù)完成要返回結(jié)果的時(shí)候,執(zhí)行call_back函數(shù)# 注意call_back函數(shù)不能加上(),也就意味著你只能依靠partial方法進(jìn)行傳遞參數(shù)loop.run_until_complete(get_future)# 等待任務(wù)完成print('獲取結(jié)果:{}'.format(get_future.result()))# 獲取結(jié)果返回結(jié)果:
開始訪問網(wǎng)站:http://www.langzi.fun 網(wǎng)站訪問成功 檢測網(wǎng)址:http://www.langzi.fun狀態(tài)正常 獲取結(jié)果:success梳理
取消協(xié)程任務(wù)
存在多個(gè)任務(wù)協(xié)程,想使用ctrl c退出協(xié)程,使用例子講解:
import asyncio async def get_time_sleep(t):print('開始運(yùn)行,等待:{}s'.format(t))await asyncio.sleep(t)print('運(yùn)行結(jié)束')if __name__ == '__main__':loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)task_1 = get_time_sleep(1)task_2 = get_time_sleep(2)task_3 = get_time_sleep(3)tasks = [task_1,task_2,task_3]# 三個(gè)協(xié)程任務(wù)加載到一個(gè)列表try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt:# 當(dāng)檢測到鍵盤輸入 ctrl c的時(shí)候all_tasks = asyncio.Task.all_tasks()# 獲取注冊到loop下的所有taskfor task in all_tasks:print('開始取消協(xié)程')task.cancel()# 取消該協(xié)程,如果取消成功則返回Trueloop.stop()# 停止循環(huán)loop.run_forever()# loop事件循環(huán)一直運(yùn)行# 這兩步必須要做finally:loop.close()# 關(guān)閉事件循環(huán)run_forever 會(huì)一直運(yùn)行,直到 stop 被調(diào)用,但是你不能像下面這樣調(diào) stop
loop.run_forever() loop.stop()run_forever 不返回,stop 永遠(yuǎn)也不會(huì)被調(diào)用。所以,只能在協(xié)程中調(diào) stop:
async def do_some_work(loop, x):print('Waiting ' + str(x))await asyncio.sleep(x)print('Done')loop.stop()這樣并非沒有問題,假如有多個(gè)協(xié)程在 loop 里運(yùn)行:
asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3))loop.run_forever()第二個(gè)協(xié)程沒結(jié)束,loop 就停止了——被先結(jié)束的那個(gè)協(xié)程給停掉的。
要解決這個(gè)問題,可以用 gather 把多個(gè)協(xié)程合并成一個(gè) future,并添加回調(diào),然后在回調(diào)里再去停止 loop。
其實(shí)這基本上就是 run_until_complete 的實(shí)現(xiàn)了,run_until_complete 在內(nèi)部也是調(diào)用 run_forever。
關(guān)于loop.close(),簡單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行。
loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close()但是如果關(guān)閉了,就不能再運(yùn)行了:
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常梳理
協(xié)程相互嵌套
import asyncio async def sum_tion(x,y):print('開始執(zhí)行傳入?yún)?shù)相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模擬等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)print(result)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(print_sum(1000,2000))loop.close()返回結(jié)果:
開始執(zhí)行傳入?yún)?shù)相加:1000 + 2000 3000執(zhí)行流程:
如果想要獲取協(xié)程嵌套函數(shù)返回的值,就必須使用回調(diào):
import asyncio async def sum_tion(x,y)->int:print('開始執(zhí)行傳入?yún)?shù)相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模擬等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)return resultdef callback(future):return future.result()if __name__ == '__main__':loop = asyncio.get_event_loop()future = loop.create_task(print_sum(100,200))# 如果想要獲取嵌套協(xié)程返回的值,就必須使用回調(diào)future.add_done_callback(callback)loop.run_until_complete(future)print(future.result())loop.close()返回結(jié)果:
開始執(zhí)行傳入?yún)?shù)相加:100 + 200 300定時(shí)啟動(dòng)任務(wù)
asyncio提供定時(shí)啟動(dòng)協(xié)程任務(wù),通過call_soon,call_later,call_at實(shí)現(xiàn),他們的區(qū)別如下:
call_soon
call_soon是立即執(zhí)行
def callback(sleep_times):print("預(yù)計(jì)消耗時(shí)間 {} s".format(sleep_times)) def stoploop(loop):print('時(shí)間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)loop.call_soon(callback,5)# 立即啟動(dòng)callback函數(shù)loop.call_soon(stoploop,loop)# 上面執(zhí)行完畢后,立即啟動(dòng)執(zhí)行stoploop函數(shù)loop.run_forever()#要用這個(gè)run_forever運(yùn)行,因?yàn)闆]有傳入?yún)f(xié)程print('總共耗時(shí):{}'.format(time.time()-start_time))返回結(jié)果:
預(yù)計(jì)消耗時(shí)間 5 s 時(shí)間消耗完畢 總共耗時(shí):0.0010013580322265625call_later
call_later是設(shè)置一定時(shí)間啟動(dòng)執(zhí)行
def callback(sleep_times):print("預(yù)計(jì)消耗時(shí)間 {} s".format(sleep_times)) def stoploop(loop):print('時(shí)間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()loop.call_later(1,callback,1.0)# 等待1秒后執(zhí)行callback函數(shù),傳入?yún)?shù)是1.0loop.call_later(5,stoploop,loop)# 等待5秒后執(zhí)行stoploop函數(shù),傳入?yún)?shù)是looploop.run_forever()print('總共耗時(shí):{}'.format(time.time()-start_time))返回結(jié)果:
預(yù)計(jì)消耗時(shí)間 1.0 s 時(shí)間消耗完畢 總共耗時(shí):5.002613544464111call_at
call_at類似與call_later,但是他指定的時(shí)間不再是傳統(tǒng)意義上的時(shí)間,而是loop的內(nèi)部時(shí)鐘時(shí)間,效果和call_later一樣, call_later內(nèi)部其實(shí)調(diào)用了call_later
import time import asynciodef callback(loop):print("傳入loop.time()時(shí)間為: {} s".format(loop.time())) def stoploop(loop):print('時(shí)間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()now = loop.time()# loop內(nèi)部的時(shí)鐘時(shí)間loop.call_at(now+1,callback,loop)# 等待loop內(nèi)部時(shí)鐘時(shí)間加上1s后,執(zhí)行callba函數(shù),傳入?yún)?shù)為looploop.call_at(now+3,callback,loop)# 等待loop內(nèi)部時(shí)鐘時(shí)間加上3s后,執(zhí)行callba函數(shù),傳入?yún)?shù)為looploop.call_at(now+5,stoploop,loop)# 等待loop內(nèi)部時(shí)鐘時(shí)間加上1s后,執(zhí)行stoploop函數(shù),傳入?yún)?shù)為loop返回結(jié)果:
傳入loop.time()時(shí)間為: 3989.39 s 傳入loop.time()時(shí)間為: 3991.39 s 時(shí)間消耗完畢 總共耗時(shí):5.002060174942017call_soon_threadsafe 線程安全的call_soon
call_soon_threadsafe用法和call_soon一致。但在涉及多線程時(shí), 會(huì)使用它.
梳理
結(jié)合線程池
Asyncio是異步IO編程的解決方案,異步IO是包括多線程,多進(jìn)程,和協(xié)程的。所以asyncio是可以完成多線程多進(jìn)程和協(xié)程的,在開頭說到,協(xié)程是單線程的,如果遇到阻塞的話,會(huì)阻塞所有的代碼任務(wù),所以是不能加入阻塞IO的,但是比如requests庫是阻塞的,socket如果不設(shè)置setblocking(false)的話,也是阻塞的,這個(gè)時(shí)候可以放到一個(gè)線程中去做也是可以解決的,即在協(xié)程中集成阻塞IO,就加入多線程一起解決問題。
用requests完成異步編程(使用線程池)
from concurrent.futures import ThreadPoolExecutor import requests import asyncio import time import redef get_url_title(url):# 功能是獲取網(wǎng)址的標(biāo)題r = requests.get(url)try:title = re.search('<title>(.*?)</title>',r.content.decode(),re.S|re.I).group(1)except Exception as e:title = eprint(title)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個(gè)事件循環(huán)p = ThreadPoolExecutor(5)# 創(chuàng)建一個(gè)線程池,開啟5個(gè)線程tasks = [loop.run_in_executor(p,get_url_title,'http://www.langzi.fun')for i in range(10)]# 這一步很重要,使用loop.run_in_executor()函數(shù):內(nèi)部接受的是阻塞的線程池,執(zhí)行的函數(shù),傳入的參數(shù)# 即對網(wǎng)站訪問10次,使用線程池訪問loop.run_until_complete(asyncio.wait(tasks))# 等待所有的任務(wù)完成print(time.time()-start_time)返回結(jié)果:
Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) 5.589502334594727訪問10次消耗時(shí)間為5.5s,嘗試將 p = ThreadPoolExecutor(10),線程數(shù)量設(shè)置成10個(gè)線程,消耗時(shí)間為4.6s,改用從進(jìn)程池p = ProcessPoolExecutor(10),也是一樣可以運(yùn)行的,不過10個(gè)進(jìn)程消耗時(shí)間也是5.5s,并且消耗更多的CPU資源。
### 用socket完成異步編程(使用線程池)
import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse import time import redef get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect((host, 80))client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode('utf8')html_data = data.split('rnrn')[1]# 把請求頭信息去掉, 只要網(wǎng)頁內(nèi)容title = re.search('<title>(.*?)</title>',html_data,re.S|re.I).group(1)print(title)client.close()if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()p = ThreadPoolExecutor(3) # 線程池 放3個(gè)線程tasks = [loop.run_in_executor(p,get_url,'http://www.langzi.fun') for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print('last time:{}'.format(time.time() - start_time))返回結(jié)果:
Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網(wǎng)絡(luò)安全工具,分享Python底層與進(jìn)階知識(shí),漏洞掃描器開發(fā)與爬蟲開發(fā) last time:5.132313966751099使用socket完成http請求(未使用線程池)
import asyncio from urllib.parse import urlparse import timeasync def get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket連接reader, writer = await asyncio.open_connection(host, 80) # 協(xié)程 與服務(wù)端建立連接writer.write("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))all_lines = []async for raw_line in reader: # __aiter__ __anext__魔法方法line = raw_line.decode('utf8')all_lines.append(line)html = 'n'.join(all_lines)return htmlasync def main():tasks = []tasks = [asyncio.ensure_future(get_url('http://www.langzi.fun')) for i in range(10)]for task in asyncio.as_completed(tasks): # 完成一個(gè) print一個(gè)result = await taskprint(result)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()loop.run_until_complete(main())print('last time:{}'.format(time.time() - start_time))asyncio協(xié)程和之前講解的select事件循環(huán)原理是一樣的
梳理
與多進(jìn)程的結(jié)合
既然異步協(xié)程和多進(jìn)程對網(wǎng)絡(luò)請求都有提升,那么為什么不把二者結(jié)合起來呢?在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點(diǎn),并開發(fā)了一個(gè)新的庫,叫做 aiomultiprocess
這個(gè)庫的安裝方式是:
pip3 install aiomultiprocess需要 Python 3.6 及更高版本才可使用。
使用這個(gè)庫,我們可以將上面的例子改寫如下:
import asyncio import aiohttp import time from aiomultiprocess import Poolstart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'urls = [url for _ in range(100)]async with Pool() as pool:result = await pool.map(get, urls)return resultcoroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task)end = time.time() print('Cost time:', end - start)這樣就會(huì)同時(shí)使用多進(jìn)程和異步協(xié)程進(jìn)行請求,但在真實(shí)情況下,我們在做爬取的時(shí)候遇到的情況千變?nèi)f化,一方面我們使用異步協(xié)程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節(jié)省時(shí)間其實(shí)還是非常可觀的。
同步與通信
和多線程多進(jìn)程任務(wù)一樣,協(xié)程也可以實(shí)現(xiàn)和需要進(jìn)行同步與通信。
簡單例子(順序啟動(dòng)多任務(wù))
協(xié)程是單線程的,他的執(zhí)行依賴于事件循環(huán)中最后的loop.run_until_complate()
import asyncionum = 0async def add():global numfor i in range(10):await asyncio.sleep(0.1)num += i async def desc():global numfor i in range(10):await asyncio.sleep(0.2)num -= iif __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [add(),desc()]loop.run_until_complete(asyncio.wait(tasks))# 這里執(zhí)行順序是先執(zhí)行add函數(shù),然后執(zhí)行desc函數(shù)# 所以最后的結(jié)果是0loop.close()print(num)返回結(jié)果:
0這里使用一個(gè)共有變量,協(xié)程下不需要加鎖。
簡單例子(Lock(鎖))
# -*- coding:utf-8 -*- import asyncio import functoolsdef unlock(lock):print('線程鎖釋放成功')lock.release()async def test(locker, lock):print(f'{locker} 等待線程鎖釋放')# ---------------------------------# with await lock:# print(f'{locker} 線程鎖上鎖')# ---------------------------------# 上面這兩行代碼等同于:# ---------------------------------# await lock.acquire()# print(f'{locker} 線程鎖上鎖')# lock.release()# ---------------------------------await lock.acquire()print(f'{locker} 線程鎖上鎖')lock.release()print(f'{locker} 線程鎖釋放')async def main(loop):lock = asyncio.Lock()await lock.acquire()loop.call_later(0.5, functools.partial(unlock, lock))# call_later() 表達(dá)推遲一段時(shí)間的回調(diào), 第一個(gè)參數(shù)是以秒為單位的延遲, 第二個(gè)參數(shù)是回調(diào)函數(shù)await asyncio.wait([test('任務(wù) 1 ', lock), test('任務(wù) 2', lock)])if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結(jié)果:
任務(wù) 1 等待線程鎖釋放 任務(wù) 2 等待線程鎖釋放 線程鎖釋放成功 任務(wù) 1 線程鎖上鎖 任務(wù) 1 線程鎖釋放 任務(wù) 2 線程鎖上鎖 任務(wù) 2 線程鎖釋放簡單例子(Semaphore(信號(hào)量))
可以使用 Semaphore(信號(hào)量) 來控制并發(fā)訪問的數(shù)量:
import asyncio from aiohttp import ClientSessionasync def fetch(sem,url):async with sem:# 最大訪問數(shù)async with ClientSession() as session:async with session.get(url) as response:status = response.statusres = await response.text()print("{}:{} ".format(response.url, status))return resif __name__ == '__main__':loop = asyncio.get_event_loop()url = "http://www.langzi.fun"sem = asyncio.Semaphore(1000)# 設(shè)置最大并發(fā)數(shù)為1000tasks = [loop.create_task(fetch(sem,url))for i in range(100)]# 對網(wǎng)站訪問100次loop.run_until_complete(asyncio.wait(tasks))簡單例子(Condition(條件))
import asyncioasync def consumer(cond, name, second):# 消費(fèi)者函數(shù)await asyncio.sleep(second)# 等待延遲with await cond:await cond.wait()print('{}: 得到響應(yīng)'.format(name))async def producer(cond):await asyncio.sleep(2)for n in range(1, 3):with await cond:print('生產(chǎn)者 {} 號(hào)'.format(n))cond.notify(n=n) # 挨個(gè)通知單個(gè)消費(fèi)者await asyncio.sleep(0.1)async def producer2(cond):await asyncio.sleep(2)with await cond:print('釋放信號(hào)量,通知所有消費(fèi)者')cond.notify_all()# 一次性通知全部的消費(fèi)者async def main(loop):condition = asyncio.Condition()# 設(shè)置信號(hào)量task = loop.create_task(producer(condition))# producer 和 producer2 是兩個(gè)協(xié)程, 不能使用 call_later(), 需要用到 create_task() 把它們創(chuàng)建成一個(gè) taskconsumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()print('---分割線---')task = loop.create_task(producer2(condition))consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()# 取消任務(wù)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結(jié)果:
生產(chǎn)者 1 號(hào) c1: 得到響應(yīng) 生產(chǎn)者 2 號(hào) c2: 得到響應(yīng) ---分割線--- 釋放信號(hào)量,通知所有消費(fèi)者 c1: 得到響應(yīng) c2: 得到響應(yīng)簡單例子(Event(事件))
與 Lock(鎖) 不同的是, 事件被觸發(fā)的時(shí)候, 兩個(gè)消費(fèi)者不用獲取鎖, 就要盡快地執(zhí)行下去了
import asyncio import functoolsdef set_event(event):print('開始設(shè)置事件')event.set()async def test(name, event):print('{} 的事件未設(shè)置'.format(name))await event.wait()print('{} 的事件已設(shè)置'.format(name))async def main(loop):event = asyncio.Event()# 聲明事件print('事件是否設(shè)置: {}'.format(event.is_set()))loop.call_later(0.1, functools.partial(set_event, event))# 在0.1s后執(zhí)行set_event()函數(shù),對事件進(jìn)行設(shè)置await asyncio.wait([test('e1', event), test('e2', event)])print('最終事件狀態(tài): {}'.format(event.is_set()))if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結(jié)果:
事件是否設(shè)置: False e1 的事件未設(shè)置 e2 的事件未設(shè)置 開始設(shè)置事件 e1 的事件已設(shè)置 e2 的事件已設(shè)置 最終事件狀態(tài): True簡單例子(協(xié)程間通信)
協(xié)程是單線程,因此使用list、dict就可以實(shí)現(xiàn)通信,而不會(huì)有線程安全問題,當(dāng)然可以使用asyncio.Queue
from asyncio import Queue queue = Queue(maxsize=3) # queue的put和get需要用await舉個(gè)例子:
import asyncio from asyncio import Queue import random import string q = Queue(maxsize=100)async def add():while 1:await q.put(random.choice(string.ascii_letters))async def desc():while 1:res = await q.get()print(res)await asyncio.sleep(1)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([add(),desc()]))loop.run_forever()返回結(jié)果:
D b S x ...加速asyncio
uvloop,這個(gè)使用庫可以有效的加速asyncio,本庫基于libuv,也就是nodejs用的那個(gè)庫。使用它也非常方便,不過目前不支持windows
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())沒錯(cuò)就是2行代碼,就可以提速asyncio。
tokio同樣可以做異步資源循環(huán)
import tokio asyncio.set_event_loop_policy(tokio.EventLoopPolicy())參考:
python異步編程之a(chǎn)syncio(百萬并發(fā)) - 三只松鼠 - 博客園?www.cnblogs.comAsyncio并發(fā)編程?www.langzi.fun總結(jié)
以上是生活随笔為你收集整理的协程asyncio_Asyncio深入浅出的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode 658. 找到 K 个
- 下一篇: LeetCode 669. 修剪二叉搜索