tornado异步请求非阻塞
前言
也許有同學(xué)很迷惑:tornado不是標榜異步非阻塞解決10K問題的嘛?但是我卻發(fā)現(xiàn)不是torando不好,而是你用錯了.比如最近發(fā)現(xiàn)一個事情:某網(wǎng)站打開頁面很慢,服務(wù)器cpu/內(nèi)存都正常.網(wǎng)絡(luò)狀態(tài)也良好. 后來發(fā)現(xiàn),打開頁面會有很多請求后端數(shù)據(jù)庫的訪問,有一個mongodb的數(shù)據(jù)庫業(yè)務(wù)api的rest服務(wù).但是它的tornado卻用錯了,一步步的來研究問題:
?
說明
以下的例子都有2個url,一個是耗時的請求,一個是可以或者說需要立刻返回的請求,我想就算一個對技術(shù)不熟,從道理上來說的用戶, 他希望的是他訪問的請求不會影響也不會被其他人的請求影響
?
#!/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import time
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
class SleepHandler(tornado.web.RequestHandler):
? ? def get(self):
? ? ? ? time.sleep(5)
? ? ? ? self.write("when i sleep 5s")
class JustNowHandler(tornado.web.RequestHandler):
? ? def get(self):
? ? ? ? self.write("i hope just now see you")
if __name__ == "__main__":
? ? tornado.options.parse_command_line()
? ? app = tornado.web.Application(handlers=[
? ? ? ? ? ? (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])
? ? http_server = tornado.httpserver.HTTPServer(app)
? ? http_server.listen(options.port)
? ? tornado.ioloop.IOLoop.instance().start()
假如你使用頁面請求或者使用哪個httpie,curl等工具先訪問http://localhost:8000/sleep,再訪問http://localhost:8000/justnow.你會發(fā)現(xiàn)本來可以立刻返回的/jsutnow的請求會一直阻塞到/sleep請求完才返回.
?
這是為啥?為啥我的請求被/sleep請求阻塞了?如果平時我們的web請求足夠快我們可能不會意識到這個問題,但是事實上經(jīng)常會有一些耗時的進程,意味著應(yīng)用程序被有效的鎖定直至處理結(jié)束.
?
這是時候你有沒有想起@tornado.web.asynchronous這個裝飾器?但是使用這個裝飾器有個前提就是你要耗時的執(zhí)行需要執(zhí)行異步,比如上面的time.sleep,你只是加裝飾器是沒有作用的,而且需要注意的是 Tornado默認在函數(shù)處理返回時關(guān)閉客戶端的連接,但是當你使用@tornado.web.asynchonous裝飾器時,Tornado永遠不會自己關(guān)閉連接,需要顯式的self.finish()關(guān)閉
?
我們大部分的函數(shù)都是阻塞的, 比如上面的time.sleep其實tornado有個異步的實現(xiàn):
?
#!/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
import tornado.httpclient
import tornado.concurrent
import tornado.ioloop
import time
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
class SleepHandler(tornado.web.RequestHandler):
? ? @tornado.web.asynchronous
? ? @tornado.gen.coroutine
? ? def get(self):
? ? ? ? yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5)
? ? ? ? self.write("when i sleep 5s")
class JustNowHandler(tornado.web.RequestHandler):
? ? def get(self):
? ? ? ? self.write("i hope just now see you")
if __name__ == "__main__":
? ? tornado.options.parse_command_line()
? ? app = tornado.web.Application(handlers=[
? ? ? ? ? ? (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])
? ? http_server = tornado.httpserver.HTTPServer(app)
? ? http_server.listen(options.port)
? ? tornado.ioloop.IOLoop.instance().start()
這里有個新的tornado.gen.coroutine裝飾器, coroutine是3.0之后新增的裝飾器.以前的辦法是用回調(diào),還是看我這個例子:
?
class SleepHandler(tornado.web.RequestHandler):
? ? @tornado.web.asynchronous
? ? def get(self):
? ? ? ? tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response)
? ? def on_response(self):
? ? ? ? self.write("when i sleep 5s")
? ? ? ? self.finish()
使用了callback, 但是新的裝飾器讓我們通過yield實現(xiàn)同樣的效果:你在打開/sleep之后再點擊/justnow, justnow的請求都是立刻返回不受影響.但是用了asynchronous的裝飾器你的耗時的函數(shù)也需要執(zhí)行異步
?
剛才說的都是沒有意義的例子,下面寫個有點用的:讀取mongodb數(shù)據(jù)庫數(shù)據(jù),然后再前端按行write出來
?
#!/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
import tornado.httpclient
import tornado.concurrent
import tornado.ioloop
import time
# 一個mongodb出品的支持異步的數(shù)據(jù)庫的python驅(qū)動
import motor
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
# db其實就是test數(shù)據(jù)庫的游標
db = motor.MotorClient().open_sync().test
class SleepHandler(BaseHandler):
? ? @tornado.web.asynchronous
? ? @tornado.gen.coroutine
? ? def get(self):
? ? ? ? # 這一行執(zhí)行還是阻塞需要時間的,我的tt集合有一些數(shù)據(jù)并且沒有索引
? ? ? ? cursor = db.tt.find().sort([('a', -1)])
? ? ? ? # 這部分會異步非阻塞的執(zhí)行二不影響其他頁面請求
? ? ? ? while (yield cursor.fetch_next):
? ? ? ? ? ? message = cursor.next_object()
? ? ? ? ? ? self.write('<li>%s</li>' % message['a'])
? ? ? ? self.write('</ul>')
? ? ? ? self.finish()
? ? def _on_response(self, message, error):
? ? ? ? if error:
? ? ? ? ? ? raise tornado.web.HTTPError(500, error)
? ? ? ? elif message:
? ? ? ? ? ? for i in message:
? ? ? ? ? ? ? ? self.write('<li>%s</li>' % i['a'])
? ? ? ? else:
? ? ? ? ? ? self.write('</ul>')
? ? ? ? ? ? self.finish()
class JustNowHandler(BaseHandler):
? ? def get(self):
? ? ? ? self.write("i hope just now see you")
if __name__ == "__main__":
? ? tornado.options.parse_command_line()
? ? app = tornado.web.Application(handlers=[
? ? ? ? ? ? (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])
? ? http_server = tornado.httpserver.HTTPServer(app)
? ? http_server.listen(options.port)
? ? tornado.ioloop.IOLoop.instance().start()
一個同事提示為什么這個耗時的東西不能異步的丟給某工具去執(zhí)行而不阻塞我的請求呢?好吧,我也想到了:celery,正好github有這個東西:tornado-celery
?
執(zhí)行下面的程序首先你要安裝rabbitmq和celery:
?
#!/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
import tornado.httpclient
import tcelery, tasks
import time
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
tcelery.setup_nonblocking_producer()
class SleepHandler(tornado.web.RequestHandler):
? ? @tornado.web.asynchronous
? ? @tornado.gen.coroutine
? ? def get(self):
? ? ? ? # tornado.gen.Task的參數(shù)是:要執(zhí)行的函數(shù), 參數(shù)
? ? ? ? yield tornado.gen.Task(tasks.sleep.apply_async, args=[5])
? ? ? ? self.write("when i sleep 5s")
? ? ? ? self.finish()
class JustNowHandler(tornado.web.RequestHandler):
? ? def get(self):
? ? ? ? self.write("i hope just now see you")
if __name__ == "__main__":
? ? tornado.options.parse_command_line()
? ? app = tornado.web.Application(handlers=[
? ? ? ? ? ? (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])
? ? http_server = tornado.httpserver.HTTPServer(app)
? ? http_server.listen(options.port)
? ? tornado.ioloop.IOLoop.instance().start()
task是celery的任務(wù)定義的文件,包含我們說的time.sleep的函數(shù)
?
import time
from celery import Celery
celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
celery.conf.CELERY_RESULT_BACKEND = "amqp"
@celery.task
def sleep(seconds):
? ? time.sleep(float(seconds))
? ? return seconds
if __name__ == "__main__":
? ? celery.start()
然后啟動celelry worker(要不然你的任務(wù)怎么執(zhí)行呢?肯定需要一個消費者取走):
?
celery -A tasks worker --loglevel=info
但是這里的問題也可能很嚴重:我們的異步非阻塞依賴于celery,還是這個隊列的長度,假如任務(wù)很多那么就需要等待,效率很低.有沒有一種辦法把我的同步阻塞函數(shù)變?yōu)楫惒?或者說被tornado的裝飾器理解和識別)呢?
?
#!/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
# 這個并發(fā)庫在python3自帶在python2需要安裝sudo pip install futures
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)
class SleepHandler(tornado.web.RequestHandler):
? ? executor = ThreadPoolExecutor(2)
#executor 是局部變量 ?不是全局的
? ? @tornado.web.asynchronous
? ? @tornado.gen.coroutine
? ? def get(self):
? ? ? ? # 假如你執(zhí)行的異步會返回值被繼續(xù)調(diào)用可以這樣(只是為了演示),否則直接yield就行
? ? ? ? res = yield self.sleep()
? ? ? ? self.write("when i sleep %s s" % res)
? ? ? ? self.finish()
? ? @run_on_executor
? ? def sleep(self):
? ? ? ? time.sleep(5)
? ? ? ? return 5
class JustNowHandler(tornado.web.RequestHandler):
? ? def get(self):
? ? ? ? self.write("i hope just now see you")
if __name__ == "__main__":
? ? tornado.options.parse_command_line()
? ? app = tornado.web.Application(handlers=[
? ? ? ? ? ? (r"/sleep", SleepHandler), (r"/justnow", JustNowHandler)])
? ? http_server = tornado.httpserver.HTTPServer(app)
? ? http_server.listen(options.port)
? ? tornado.ioloop.IOLoop.instance().start()
總結(jié)
以上是生活随笔為你收集整理的tornado异步请求非阻塞的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用python连接数据库
- 下一篇: HDU 1848 Fibonacci a