《Node.js设计模式》高级异步准则
本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。
歡迎關注我的專欄,之后的博文將在專欄同步:
- Encounter的掘金專欄
- 知乎專欄 Encounter的編程思考
- segmentfault專欄 前端小站
Advanced Asynchronous Recipes
幾乎所有我們迄今為止看到的設計模式都可以被認為是通用的,并且適用于應用程序的許多不同的領域。但是,有一套更具體的模式,專注于解決明確的問題。我們可以調用這些模式。就像現實生活中的烹飪一樣,我們有一套明確的步驟來實現預期的結果。當然,這并不意味著我們不能用一些創意來定制設計模式,以配合我們的客人的口味,對于書寫Node.js程序來說是必要的。在本章中,我們將提供一些常見的解決方案來解決我們在日常Node.js開發中遇到的一些具體問題。這些模式包括以下內容:
- 異步引入模塊并初始化
- 在高并發的應用程序中使用批處理和緩存異步操作的性能優化
- 運行與Node.js處理并發請求的能力相悖的阻塞事件循環的同步CPU綁定操作
異步引入模塊并初始化
在Chapter2-Node.js Essential Patterns中,當我們討論Node.js模塊系統的基本屬性時,我們提到了require()是同步的,并且module.exports也不能異步設置。
這是在核心模塊和許多npm包中存在同步API的主要原因之一,是否同步加載會被作為一個option參數被提供,主要用于初始化任務,而不是替代異步API。
不幸的是,這并不總是可能的。同步API可能并不總是可用的,特別是對于在初始化階段使用網絡的組件,例如執行三次握手協議或在網絡中檢索配置參數。 許多數據庫驅動程序和消息隊列等中間件系統的客戶端都是如此。
廣泛適用的解決方案
我們舉一個例子:一個名為db的模塊,它將會連接到遠程數據庫。 只有在連接和與服務器的握手完成之后,db模塊才能夠接受請求。在這種情況下,我們通常有兩種選擇:
- 在開始使用之前確保模塊已經初始化,否則則等待其初始化。每當我們想要在異步模塊上調用一個操作時,都必須完成這個過程:
- 使用依賴注入(Dependency Injection)而不是直接引入異步模塊。通過這樣做,我們可以延遲一些模塊的初始化,直到它們的異步依賴被完全初始化。 這種技術將管理模塊初始化的復雜性轉移到另一個組件,通常是它的父模塊。 在下面的例子中,這個組件是app.js:
我們可以看出,如果所涉及的異步依賴的數量過多,第一種方案便不太適用了。
另外,使用DI有時也是不理想的,正如我們在Chapter7-Wiring Modules中看到的那樣。在大型項目中,它可能很快變得過于復雜,尤其對于手動完成并使用異步初始化模塊的情況下。如果我們使用一個設計用于支持異步初始化模塊的DI容器,這些問題將會得到緩解。
但是,我們將會看到,還有第三種方案可以讓我們輕松地將模塊從其依賴關系的初始化狀態中分離出來。
預初始化隊列
將模塊與依賴項的初始化狀態分離的簡單模式涉及到使用隊列和命令模式。這個想法是保存一個模塊在尚未初始化的時候接收到的所有操作,然后在所有初始化步驟完成后立即執行這些操作。
實現一個異步初始化的模塊
為了演示這個簡單而有效的技術,我們來構建一個應用程序。首先創建一個名為asyncModule.js的異步初始化模塊:
const asyncModule = module.exports;asyncModule.initialized = false; asyncModule.initialize = callback => {setTimeout(() => {asyncModule.initialized = true;callback();}, 10000); };asyncModule.tellMeSomething = callback => {process.nextTick(() => {if(!asyncModule.initialized) {return callback(new Error('I don\'t have anything to say right now'));}callback(null, 'Current time is: ' + new Date());}); };在上面的代碼中,asyncModule展現了一個異步初始化模塊的設計模式。 它有一個initialize()方法,在10秒的延遲后,將初始化的flag變量設置為true,并通知它的回調調用(10秒對于真實應用程序來說是很長的一段時間了,但是對于具有互斥條件的應用來說可能會顯得力不從心)。
另一個方法tellMeSomething()返回當前的時間,但是如果模塊還沒有初始化,它拋出產生一個異常。
下一步是根據我們剛剛創建的服務創建另一個模塊。 我們設計一個簡單的HTTP請求處理程序,在一個名為routes.js的文件中實現:
在handler中調用asyncModule的tellMeSomething()方法,然后將其結果寫入HTTP響應中。 正如我們所看到的那樣,我們沒有對asyncModule的初始化狀態進行任何檢查,這可能會導致問題。
現在,創建app.js模塊,使用核心http模塊創建一個非常基本的HTTP服務器:
const http = require('http'); const routes = require('./routes'); const asyncModule = require('./asyncModule');asyncModule.initialize(() => {console.log('Async module initialized'); });http.createServer((req, res) => {if (req.method === 'GET' && req.url === '/say') {return routes.say(req, res);}res.writeHead(404);res.end('Not found'); }).listen(8000, () => console.log('Started'));上述模塊是我們應用程序的入口點,它所做的只是觸發asyncModule的初始化并創建一個HTTP服務器,它使用我們以前創建的handler(routes.say())來對網絡請求作出相應。
我們現在可以像往常一樣通過執行app.js模塊來嘗試啟動我們的服務器。
在服務器啟動后,我們可以嘗試使用瀏覽器訪問URL:http://localhost:8000/并查看從asyncModule返回的內容。
和預期的一樣,如果我們在服務器啟動后立即發送請求,結果將是一個錯誤,如下所示:
顯然,在異步模塊加載好了之后:
這意味著asyncModule尚未初始化,但我們仍嘗試使用它,則會拋出一個錯誤。
根據異步初始化模塊的實現細節,幸運的情況是我們可能會收到一個錯誤,乃至丟失重要的信息,崩潰整個應用程序。 總的來說,我們剛剛描述的情況總是必須要避免的。
大多數時候,可能并不會出現上述問題,畢竟初始化一般來說很快,以至于在實踐中,它永遠不會發生。 然而,對于設計用于自動調節的高負載應用和云服務器,情況就完全不同了。
用預初始化隊列包裝模塊
為了維護服務器的健壯性,我們現在要通過使用我們在本節開頭描述的模式來進行異步模塊加載。我們將在asyncModule尚未初始化的這段時間內對所有調用的操作推入一個預初始化隊列,然后在異步模塊加載好后處理它們時立即刷新隊列。這就是狀態模式的一個很好的應用!我們將需要兩個狀態,一個在模塊尚未初始化的時候將所有操作排隊,另一個在初始化完成時將每個方法簡單地委托給原始的asyncModule模塊。
通常,我們沒有機會修改異步模塊的代碼;所以,為了添加我們的排隊層,我們需要圍繞原始的asyncModule模塊創建一個代理。
接下來創建一個名為asyncModuleWrapper.js的新文件,讓我們依照每個步驟逐個構建它。我們需要做的第一件事是創建一個代理,并將原始異步模塊的操作委托給這個代理:
const asyncModule = require('./asyncModule'); const asyncModuleWrapper = module.exports; asyncModuleWrapper.initialized = false; asyncModuleWrapper.initialize = () => {activeState.initialize.apply(activeState, arguments); }; asyncModuleWrapper.tellMeSomething = () => {activeState.tellMeSomething.apply(activeState, arguments); };在前面的代碼中,asyncModuleWrapper將其每個方法簡單地委托給activeState。 讓我們來看看這兩個狀態是什么樣子
從notInitializedState開始,notInitializedState是指還沒初始化的狀態:
// 當模塊沒有被初始化時的狀態 let pending = []; let notInitializedState = {initialize: function(callback) {asyncModule.initialize(function() {asyncModuleWrapper.initalized = true;activeState = initializedState;pending.forEach(function(req) {asyncModule[req.method].apply(null, req.args);});pending = [];callback();});},tellMeSomething: function(callback) {return pending.push({method: 'tellMeSomething',args: arguments});}};當initialize()方法被調用時,我們觸發初始化asyncModule模塊,提供一個回調函數作為參數。 這使我們的asyncModuleWrapper知道什么時候原始模塊被初始化,在初始化后執行預初始化隊列的操作,之后清空預初始化隊列,再調用作為參數的回調函數,以下為具體步驟:
由于此時的模塊尚未初始化,此狀態的tellMeSomething()方法僅創建一個新的Command對象,并將其添加到預初始化隊列中。
此時,當原始的asyncModule模塊尚未初始化時,代理應該已經清楚,我們的代理將簡單地把所有接收到的請求防到預初始化隊列中。 然后,當我們被通知初始化完成時,我們執行所有預初始化隊列的操作,然后將內部狀態切換到initializedState。來看這個代理模塊最后的定義:
let initializedState = asyncModule;不出意外,initializedState對象只是對原始的asyncModule的引用!事實上,初始化完成后,我們可以安全地將任何請求直接發送到原始模塊。
最后,設定異步模塊還沒加載好的的狀態,即notInitializedState
let activeState = notInitializedState;我們現在可以嘗試再次啟動我們的測試服務器,但首先,我們不要忘記用我們新的asyncModuleWrapper對象替換原始的asyncModule模塊的引用; 這必須在app.js和routes.js模塊中完成。
這樣做之后,如果我們試圖再次向服務器發送一個請求,我們會看到在asyncModule模塊尚未初始化的時候,請求不會失敗; 相反,他們會掛起,直到初始化完成,然后才會被實際執行。我們當然可以肯定,比起之前,容錯率變得更高了。
可以看到,在剛剛初始化異步模塊的時候,服務器會等待請求的響應:
在異步模塊加載完成后,服務器才會返回響應的信息:
模式:如果模塊是需要異步初始化的,則對每個操作進行排隊,直到模塊完全初始化釋放隊列。
現在,我們的服務器可以在啟動后立即開始接受請求,并保證這些請求都不會由于其模塊的初始化狀態而失敗。我們能夠在不使用DI的情況下獲得這個結果,也不需要冗長且容易出錯的檢查來驗證異步模塊的狀態。
其它場景的應用
我們剛剛介紹的模式被許多數據庫驅動程序和ORM庫所使用。 最值得注意的是Mongoose,它是MongoDB的ORM。使用Mongoose,不必等待數據庫連接打開,以便能夠發送查詢,因為每個操作都排隊,稍后與數據庫的連接完全建立時執行。 這顯然提高了其API的可用性。
看一下Mongoose的源碼,它的每個方法是如何通過代理添加預初始化隊列。 可以看看實現這中模式的代碼片段:https://github.com/Automattic... for (var i in Collection.prototype) {(function(i){NativeCollection.prototype[i] = function () {if (this.buffer) {// mongoose中,在緩沖區不為空時,只是簡單地把這個操作加入緩沖區內this.addQueue(i, arguments);return;}var collection = this.collection, args = arguments, self = this, debug = self.conn.base.options.debug;if (debug) {if ('function' === typeof debug) {debug.apply(debug, [self.name, i].concat(utils.args(args, 0, args.length-1)));} else {console.error('\x1B[0;36mMongoose:\x1B[0m %s.%s(%s) %s %s %s', self.name, i, print(args[0]), print(args[1]), print(args[2]), print(args[3]))}}return collection[i].apply(collection, args);};})(i); }異步批處理和緩存
在高負載的應用程序中,緩存起著至關重要的作用,幾乎在網絡中的任何地方,從網頁,圖像和樣式表等靜態資源到純數據(如數據庫查詢的結果)都會使用緩存。 在本節中,我們將學習如何將緩存應用于異步操作,以及如何充分利用緩存解決高請求吞吐量的問題。
實現沒有緩存或批處理的服務器
在這之前,我們來實現一個小型的服務器,以便用它來衡量緩存和批處理等技術在解決高負載應用程序的優勢。
讓我們考慮一個管理電子商務公司銷售的web服務器,特別是對于查詢我們的服務器所有特定類型的商品交易的總和的情況。 為此,考慮到LevelUP的簡單性和靈活性,我們將再次使用LevelUP。我們要使用的數據模型是存儲在sales這一個sublevel中的簡單事務列表,它是以下的形式:
transactionId {amount, item}key由transactionId表示,value則是一個JSON對象,它包含amount,表示銷售金額和item,表示項目類型。
要處理的數據是非常基本的,所以讓我們立即在名為的totalSales.js文件中實現API,將如下所示:
該模塊的核心是totalSales函數,它也是唯一exports的API;它進行如下工作:
上述查詢方式可能在性能方面并不好。理想情況下,在實際的應用程序中,我們可以使用索引,甚至使用增量映射來縮短實時計算的時間;但是,由于我們需要體現緩存的優勢,對于上述例子來說,慢速的查詢實際上更好,因為它會突出顯示我們要分析的模式的優點。
為了完成總銷售應用程序,我們只需要從HTTP服務器公開totalSales的API;所以,下一步是構建一個(app.js文件):
const http = require('http'); const url = require('url'); const totalSales = require('./totalSales');http.createServer((req, res) => {const query = url.parse(req.url, true).query;totalSales(query.item, (err, sum) => {res.writeHead(200);res.end(`Total sales for item ${query.item} is ${sum}`);}); }).listen(8000, () => console.log('Started'));我們創建的服務器是非常簡單的;我們只需要它暴露totalSales API。
在我們第一次啟動服務器之前,我們需要用一些示例數據填充數據庫;我們可以使用專用于本節的代碼示例中的populate_db.js腳本來執行此操作。該腳本將在數據庫中創建100K個隨機銷售交易。
好的! 現在,一切都準備好了。 像往常一樣,啟動服務器,我們執行以下命令:
請求這個HTTP接口,訪問至以下URL:
http://localhost:8000/?item=book但是,為了更好地了解服務器的性能,我們需要連續發送多個請求;所以,我們創建一個名為loadTest.js的腳本,它以200 ms的間隔發送請求。它已經被配置為連接到服務器的URL,因此,要運行它,執行以下命令:
node loadTest我們會看到這20個請求需要一段時間才能完成。注意測試的總執行時間,因為我們現在開始我們的服務,并測量我們可以節省多少時間。
批量異步請求
在處理異步操作時,最基本的緩存級別可以通過將一組調用集中到同一個API來實現。這非常簡單:如果我們在調用異步函數的同時在隊列中還有另一個尚未處理的回調,我們可以將回調附加到已經運行的操作上,而不是創建一個全新的請求。看下圖的情況:
前面的圖像顯示了兩個客戶端(它們可以是兩臺不同的機器,或兩個不同的Web請求),使用完全相同的輸入調用相同的異步操作。 當然,描述這種情況的自然方式是由兩個客戶開始兩個單獨的操作,這兩個操作將在兩個不同的時刻完成,如前圖所示。現在考慮下一個場景,如下圖所示:
上圖向我們展示了如何對API的兩個請求進行批處理,或者換句話說,對兩個請求執行到相同的操作。通過這樣做,當操作完成時,兩個客戶端將同時被通知。這代表了一種簡單而又非常強大的方式來降低應用程序的負載,而不必處理更復雜的緩存機制,這通常需要適當的內存管理和緩存失效策略。
在電子商務銷售的Web服務器中使用批處理
現在讓我們在totalSales API上添加一個批處理層。我們要使用的模式非常簡單:如果在API被調用時已經有另一個相同的請求掛起,我們將把這個回調添加到一個隊列中。當異步操作完成時,其隊列中的所有回調立即被調用。
現在,讓我們來改變之前的代碼:創建一個名為totalSalesBatch.js的新模塊。在這里,我們將在原始的totalSales API之上實現一個批處理層:
const totalSales = require('./totalSales');const queues = {}; module.exports = function totalSalesBatch(item, callback) {if(queues[item]) { // [1]console.log('Batching operation');return queues[item].push(callback);}queues[item] = [callback]; // [2]totalSales(item, (err, res) => {const queue = queues[item]; // [3]queues[item] = null;queue.forEach(cb => cb(err, res));}); };totalSalesBatch()函數是原始的totalSales() API的代理,它的工作原理如下:
totalSalesBatch()函數的行為與原始的totalSales() API的行為相同,不同之處在于,現在對于相同內容的請求API進行批處理,從而節省時間和資源。
想知道相比于totalSales() API原始的非批處理版本,在性能方面的優勢是什么?然后,讓我們將HTTP服務器使用的totalSales模塊替換為我們剛剛創建的模塊,修改app.js文件如下:
//const totalSales = require('./totalSales'); const totalSales = require('./totalSalesBatch'); http.createServer(function(req, res) { // ... });如果我們現在嘗試再次啟動服務器并進行負載測試,我們首先看到的是請求被批量返回。
除此之外,我們觀察到請求的總時間大大減少;它應該至少比對原始totalSales() API執行的原始測試快四倍!
這是一個驚人的結果,證明了只需應用一個簡單的批處理層即可獲得巨大的性能提升,比起緩存機制,也沒有顯得太復雜,因為,無需考慮緩存淘汰策略。
批處理模式在高負載應用程序和執行較為緩慢的API中發揮巨大作用,正是由于這種模式的運用,可以批量處理大量的請求。異步請求緩存策略
異步批處理模式的問題之一是對于API的答復越快,我們對于批處理來說,其意義就越小。有人可能會爭辯說,如果一個API已經很快了,那么試圖優化它就沒有意義了。然而,它仍然是一個占用應用程序的資源負載的因素,總結起來,仍然可以有解決方案。另外,如果API調用的結果不會經常改變;因此,這時候批處理將并不會有較好的性能提升。在這種情況下,減少應用程序負載并提高響應速度的最佳方案肯定是更好的緩存模式。
緩存模式很簡單:一旦請求完成,我們將其結果存儲在緩存中,該緩存可以是變量,數據庫中的條目,也可以是專門的緩存服務器。因此,下一次調用API時,可以立即從緩存中檢索結果,而不是產生另一個請求。
對于一個有經驗的開發人員來說,緩存不應該是多么新的技術,但是異步編程中這種模式的不同之處在于它應該與批處理結合在一起,以達到最佳效果。原因是因為多個請求可能并發運行,而沒有設置緩存,并且當這些請求完成時,緩存將會被設置多次,這樣做則會造成緩存資源的浪費。
基于這些假設,異步請求緩存模式的最終結構如下圖所示:
上圖給出了異步緩存算法的兩個步驟:
另外我們需要考慮Zalgo的反作用(我們已經在Chapter 2-Node.js Essential Patterns中看到了它的實際應用)。在處理異步API時,我們必須確保始終以異步方式返回緩存的值,即使訪問緩存只涉及同步操作。
在電子商務銷售的Web服務器中使用異步緩存請求
實踐異步緩存模式的優點,現在讓我們將我們學到的東西應用到totalSales() API。
與異步批處理示例程序一樣,我們創建一個代理,其作用是添加緩存層。
然后創建一個名為totalSalesCache.js的新模塊,代碼如下:
const totalSales = require('./totalSales');const queues = {}; const cache = {};module.exports = function totalSalesBatch(item, callback) {const cached = cache[item];if (cached) {console.log('Cache hit');return process.nextTick(callback.bind(null, null, cached));}if (queues[item]) {console.log('Batching operation');return queues[item].push(callback);}queues[item] = [callback];totalSales(item, (err, res) => {if (!err) {cache[item] = res;setTimeout(() => {delete cache[item];}, 30 * 1000); //30 seconds expiry}const queue = queues[item];queues[item] = null;queue.forEach(cb => cb(err, res));}); };我們可以看到前面的代碼與我們異步批處理的很多地方基本相同。 其實唯一的區別是以下幾點:
- 我們需要做的第一件事就是檢查緩存是否被設置,如果是這種情況,我們將立即使用callback()返回緩存的值,這里必須要使用process.nextTick(),因為緩存可能是異步設定的,需要等到下一次事件輪詢時才能夠保證緩存已經被設定。
- 繼續異步批處理模式,但是這次,當原始API成功完成時,我們將結果保存到緩存中。此外,我們還設置了一個緩存淘汰機制,在30秒后使緩存失效。 一個簡單而有效的技術!
現在,我們準備嘗試我們剛創建的totalSales模塊。 先更改app.js模塊,如下所示:
// const totalSales = require('./totalSales'); // const totalSales = require('./totalSalesBatch'); const totalSales = require('./totalSalesCache');http.createServer(function(req, res) {// ... });現在,重新啟動服務器,并使用loadTest.js腳本進行配置,就像我們在前面的例子中所做的那樣。使用默認的測試參數,與簡單的異步批處理模式相比,很明顯地有了更好的性能提升。 當然,這很大程度上取決于很多因素;例如收到的請求數量,以及一個請求和另一個請求之間的延遲等。當請求數量較高且跨越較長時間時,使用高速緩存批處理的優勢將更為顯著。
Memoization被稱做緩存函數調用的結果的算法。 在npm中,你可以找到許多包來實現異步的memoization,其中最著名的之一之一是memoizee。有關實現緩存機制的說明
我們必須記住,在實際應用中,我們可能想要使用更先進的失效技術和存儲機制。 這可能是必要的,原因如下:
- 大量的緩存值可能會消耗大量內存。 在這種情況下,可以應用最近最少使用(LRU)算法來保持恒定的存儲器利用率。
- 當應用程序分布在多個進程中時,對緩存使用簡單變量可能會導致每個服務器實例返回不同的結果。如果這對于我們正在實現的特定應用程序來說是不希望的,那么解決方案就是使用共享存儲來存儲緩存。 常用的解決方案是Redis和Memcached。
- 與定時淘汰緩存相比,手動淘汰高速緩存可使得高速緩存使用壽命更長,同時提供更新的數據,但當然,管理起緩存來要復雜得多。
使用Promise進行批處理和緩存
在Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond中,我們看到了Promise如何極大地簡化我們的異步代碼,但是在處理批處理和緩存時,它則可以提供更大的幫助。
利用Promise進行異步批處理和緩存策略,有如下兩個優點:
- 多個then()監聽器可以附加到相同的Promise實例。
- then()監聽器最多保證被調用一次,即使在Promise已經被resolve了之后,then()也能正常工作。 此外,then()總是會被保證其是異步調用的。
簡而言之,第一個優點正是批處理請求所需要的,而第二個優點則在Promise已經是解析值的緩存時,也會提供同樣的的異步返回緩存值的機制。
下面開始看代碼,我們可以嘗試使用Promises為totalSales()創建一個模塊,在其中添加批處理和緩存功能。創建一個名為totalSalesPromises.js的新模塊:
const pify = require('pify'); // [1] const totalSales = pify(require('./totalSales'));const cache = {}; module.exports = function totalSalesPromises(item) {if (cache[item]) { // [2]return cache[item];}cache[item] = totalSales(item) // [3].then(res => { // [4]setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiryreturn res;}).catch(err => { // [5]delete cache[item];throw err;});return cache[item]; // [6] };Promise確實很好,下面是上述函數的功能描述:
非常簡單直觀,更重要的是,我們使用Promise也能夠實現批處理和緩存。
如果我們現在要嘗試使用totalSalesPromise()函數,稍微調整app.js模塊,因為現在使用Promise而不是回調函數。 讓我們通過創建一個名為appPromises.js的app模塊來實現:
它的實現與原始應用程序模塊幾乎完全相同,不同的是現在我們使用的是基于Promise的批處理/緩存封裝版本; 因此,我們調用它的方式也略有不同。
運行以下命令開啟這個新版本的服務器:
node appPromises運行與CPU-bound的任務
雖然上面的totalSales()在系統資源上面消耗較大,但是其也不會影響服務器處理并發的能力。 我們在Chapter1-Welcome to the Node.js Platform中了解到有關事件循環的內容,應該為此行為提供解釋:調用異步操作會導致堆棧退回到事件循環,從而使其免于處理其他請求。
但是,當我們運行一個長時間的同步任務時,會發生什么情況,從不會將控制權交還給事件循環?
這種任務也被稱為CPU-bound,因為它的主要特點是CPU利用率較高,而不是I/O操作繁重。
讓我們立即舉一個例子上看看這些類型的任務在Node.js中的具體行為。
解決子集總和問題
現在讓我們做一個CPU占用比較高的高計算量的實驗。下面來看的是子集總和問題,我們計算一個數組中是否具有一個子數組,其總和為0。例如,如果我們有數組[1, 2, -4, 5, -3]作為輸入,則滿足問題的子數組是[1, 2, -3]和[2, -4, 5, -3]。
最簡單的算法是把每一個數組元素做遍歷然后依次計算,時間復雜度為O(2^n),或者換句話說,它隨著輸入的數組長度成指數增長。這意味著一組20個整數則會有多達1, 048, 576中情況,顯然不能夠通過窮舉來做到。當然,這個問題的解決方案可能并不算復雜。為了使事情變得更加困難,我們將考慮數組和問題的以下變化:給定一組整數,我們要計算所有可能的組合,其總和等于給定的任意整數。
const EventEmitter = require('events').EventEmitter; class SubsetSum extends EventEmitter {constructor(sum, set) {super();this.sum = sum;this.set = set;this.totalSubsets = 0;} //... }SubsetSum類是EventEmitter類的子類;這使得我們每次找到一個匹配收到的總和作為輸入的新子集時都會發出一個事件。 我們將會看到,這會給我們很大的靈活性。
接下來,讓我們看看我們如何能夠生成所有可能的子集組合:
開始構建一個這樣的算法。創建一個名為subsetSum.js的新模塊。在其中聲明一個SubsetSum類:
_combine(set, subset) {for(let i = 0; i < set.length; i++) {let newSubset = subset.concat(set[i]);this._combine(set.slice(i + 1), newSubset);this._processSubset(newSubset);} }不管算法其中到底是什么內容,但有兩點要注意:
- _combine()方法是完全同步的;它遞歸地生成每一個可能的子集,而不把CPU控制權交還給事件循環。如果我們考慮一下,這對于不需要任何I/O的算法來說是非常正常的。
- 每當生成一個新的組合時,我們都會將這個組合提供給_processSubset()方法以供進一步處理。
_processSubset()方法負責驗證給定子集的元素總和是否等于我們要查找的數字:
_processSubset(subset) {console.log('Subset', ++this.totalSubsets, subset);const res = subset.reduce((prev, item) => (prev + item), 0);if (res == this.sum) {this.emit('match', subset);} }簡單地說,_processSubset()方法將reduce操作應用于子集,以便計算其元素的總和。然后,當結果總和等于給定的sum參數時,會發出一個match事件。
最后,調用start()方法開始執行算法:
start() {this._combine(this.set, []);this.emit('end'); }通過調用_combine()觸發算法,最后觸發一個end事件,表明所有的組合都被檢查過,并且任何可能的匹配都已經被計算出來。 這是可能的,因為_combine()是同步的; 因此,只要前面的函數返回,end事件就會觸發,這意味著所有的組合都被計算出來了。
接下來,我們在網絡上公開剛剛創建的算法。可以使用一個簡單的HTTP服務器對響應的任務作出響應。 特別是,我們希望以/subsetSum?data=<Array>&sum=<Integer>這樣的請求格式進行響應,傳入給定的數組和sum,使用SubsetSum算法進行匹配。
在一個名為app.js的模塊中實現這個簡單的服務器:
const http = require('http'); const SubsetSum = require('./subsetSum');http.createServer((req, res) => {const url = require('url').parse(req.url, true);if(url.pathname === '/subsetSum') {const data = JSON.parse(url.query.data);res.writeHead(200);const subsetSum = new SubsetSum(url.query.sum, data);subsetSum.on('match', match => {res.write('Match: ' + JSON.stringify(match) + '\n');});subsetSum.on('end', () => res.end());subsetSum.start();} else {res.writeHead(200);res.end('I\m alive!\n');} }).listen(8000, () => console.log('Started'));由于SubsetSum實例使用事件返回結果,所以我們可以在算法生成后立即對匹配的結果使用Stream進行處理。另一個需要注意的細節是,每次我們的服務器都會返回I'm alive!,這樣我們每次發送一個不同于/subsetSum的請求的時候。可以用來檢查我們服務器是否掛掉了,這在稍后將會看到。
開始運行:
node app一旦服務器啟動,我們準備發送我們的第一個請求;讓我們嘗試發送一組17個隨機數,這將導致產生131,071個組合,那么服務器將會處理一段時間:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"這是如果我們在第一個請求仍在運行的時候在另一個終端中嘗試輸入以下命令,我們將發現一個巨大的問題:
curl -G http://localhost:8000
我們會看到直到第一個請求結束之前,最后一個請求一直處于掛起的狀態。服務器沒有返回響應!這正如我們所想的那樣。Node.js事件循環運行在一個單獨的線程中,如果這個線程被一個長的同步計算阻塞,它將不能再執行一個循環來響應I'm alive!,
我們必須知道,這種代碼顯然不能夠用于同時接收到多個請求的應用程序。
但是不要對Node.js中絕望,我們可以通過幾種方式來解決這種情況。我們來分析一下最常見的兩種方案:
使用setImmediate
通常,CPU-bound算法是建立在一定規則之上的。它可以是一組遞歸調用,一個循環,或者基于這些的任何變化/組合。 所以,對于我們的問題,一個簡單的解決方案就是在這些步驟完成后(或者在一定數量的步驟之后),將控制權交還給事件循環。這樣,任何待處理的I / O仍然可以在事件循環在長時間運行的算法產生CPU的時間間隔中處理。對于這個問題而言,解決這一問題的方式是把算法的下一步在任何可能導致掛起的I/O請求之后運行。這聽起來像是setImmediate()方法的完美用例(我們已經在Chapter2-Node.js Essential Patterns中介紹過這一API)。
模式:使用setImmediate()交錯執行長時間運行的同步任務。使用setImmediate進行子集求和算法的步驟
現在我們來看看這個模式如何應用于子集求和算法。 我們所要做的只是稍微修改一下subsetSum.js模塊。 為方便起見,我們將創建一個名為subsetSumDefer.js的新模塊,將原始的subsetSum類的代碼作為起點。
我們要做的第一個改變是添加一個名為_combineInterleaved()的新方法,它是我們正在實現的模式的核心:
正如我們所看到的,我們所要做的只是使用setImmediate()調用原始的同步的_combine()方法。然而,現在的問題是因為該算法不再是同步的,我們更難以知道何時已經完成了所有的組合的計算。
為了解決這個問題,我們必須使用非常類似于我們在Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的異步并行執行的模式來追溯_combine()方法的所有正在運行的實例。 當_combine()方法的所有實例都已經完成運行時,觸發end事件,通知任何監聽器,進程需要做的所有動作都已經完成。
對于最終子集求和算法的重構版本。首先,我們需要將_combine()方法中的遞歸步驟替換為異步:
_combine(set, subset) {for(let i = 0; i < set.length; i++) {let newSubset = subset.concat(set[i]);this._combineInterleaved(set.slice(i + 1), newSubset);this._processSubset(newSubset);} }通過上面的更改,我們確保算法的每個步驟都將使用setImmediate()在事件循環中排隊,在事件循環隊列中I / O請求之后執行,而不是同步運行造成阻塞。
另一個小調整是對于start()方法:
start() {this.runningCombine = 0;this._combineInterleaved(this.set, []); }在前面的代碼中,我們將_combine()方法的運行實例的數量初始化為0.我們還通過調用_combineInterleaved()來將調用替換為_combine(),并移除了end的觸發,因為現在_combineInterleaved()是異步處理的。
通過這個最后的改變,我們的子集求和算法現在應該能夠通過事件循環可以運行的時間間隔交替地運行其可能大量占用CPU的代碼,并且不會再造成阻塞。
最后更新app.js模塊,以便它可以使用新版本的SubsetSum:
const http = require('http'); // const SubsetSum = require('./subsetSum'); const SubsetSum = require('./subsetSumDefer'); http.createServer(function(req, res) {// ... })和之前一樣的方式開始運行,結果如下:
此時,使用異步的方式運行,不再會阻塞CPU了。
interleaving模式
正如我們所看到的,在保持應用程序的響應性的同時運行一個CPU-bound的任務并不復雜,只需要使用setImmediate()把同步執行的代碼變為異步執行即可。但是,這不是效率最好的模式;實際上,延遲執行一個任務會額外帶來一個小的開銷,在這樣的算法中,積少成多,則會產生重大的影響。這通常是我們在運行CPU限制任務時所需要的最后一件事情,特別是如果我們必須將結果直接返回給用戶,這應該在合理的時間內進行響應。 緩解這個問題的一個可能的解決方案是只有在一定數量的步驟之后使用setImmediate(),而不是在每一步中使用它。但是這仍然不能解決問題的根源。
記住,這并不是說一旦我們想要通過異步的模式來執行CPU-bound的任務,我們就應該不惜一切代價來避免這樣的額外開銷,事實上,從更廣闊的角度來看,同步任務并不一定非常漫長和復雜,以至于造成麻煩。在繁忙的服務器中,即使是阻塞事件循環200毫秒的任務也會產生不希望的延遲。 在那些并發量并不高的服務器來說,即使產生一定短時的阻塞,也不會影響性能,使用交錯執行setImmediate()可能是避免阻塞事件循環的最簡單也是最有效的方法。
process.nextTick()不能用于交錯長時間運行的任務。正如我們在Chapter1-Welcome to the Node.js Platform中看到的,nextTick()會在任何未返回的I / O之前調度,并且在重復調用process.nextTick()最終會導致I / O饑餓。 你可以通過在前面的例子中用process.nextTick()替換setImmediate()來驗證。使用多個進程
使用interleaving模式并不是我們用來運行CPU-bound任務的唯一方法;防止事件循環阻塞的另一種模式是使用子進程。我們已經知道Node.js在運行I / O密集型應用程序(如Web服務器)的時候是最好的,因為Node.js可以使得我們可以通過異步來優化資源利用率。
所以,我們必須保持應用程序響應的最好方法是不要在主應用程序的上下文中運行昂貴的CPU-bound任務,而是使用單獨的進程。這有三個主要的優點:
- 同步任務可以全速運行,而不需要交錯執行的步驟
- 在Node.js中處理進程很簡單,可能比修改一個使用setImmediate()的算法更容易,并且多進程允許我們輕松使用多個處理器,而無需擴展主應用程序本身。
- 如果我們真的需要超高的性能,可以使用低級語言,如性能良好的C。
Node.js有一個充足的API庫帶來與外部進程交互。 我們可以在child_process模塊中找到我們需要的所有東西。 而且,當外部進程只是另一個Node.js程序時,將它連接到主應用程序是非常容易的,我們甚至不覺得我們在本地應用程序外部運行任何東西。這得益于child_process.fork()函數,該函數創建一個新的子Node.js進程,并自動創建一個通信管道,使我們能夠使用與EventEmitter非常相似的接口交換信息。來看如何用這個特性來重構我們的子集求和算法。
將子集求和任務委托給其他進程
重構SubsetSum任務的目標是創建一個單獨的子進程,負責處理CPU-bound的任務,使服務器的事件循環專注于處理來自網絡的請求:
實現一個進程池
先從構建processPool.js模塊開始:
const fork = require('child_process').fork; class ProcessPool {constructor(file, poolMax) {this.file = file;this.poolMax = poolMax;this.pool = [];this.active = [];this.waiting = [];} //... }在模塊的第一部分,引入我們將用來創建新進程的child_process.fork()函數。 然后,我們定義ProcessPool的構造函數,該構造函數接受表示要運行的Node.js程序的文件參數以及池中運行的最大實例數poolMax作為參數。然后我們定義三個實例變量:
- pool表示的是準備運行的進程
- active表示的是當前正在運行的進程列表
- waiting包含所有這些請求的任務隊列,保存由于缺少可用的資源而無法立即實現的任務
看ProcessPool類的acquire()方法,它負責取出一個準備好被使用的進程:
acquire(callback) {let worker;if(this.pool.length > 0) { // [1]worker = this.pool.pop();this.active.push(worker);return process.nextTick(callback.bind(null, null, worker));}if(this.active.length >= this.poolMax) { // [2]return this.waiting.push(callback);}worker = fork(this.file); // [3]this.active.push(worker);process.nextTick(callback.bind(null, null, worker)); }函數邏輯如下:
ProcessPool類的最后一個方法是release(),其目的是將一個進程放回進程池中:
release(worker) {if(this.waiting.length > 0) { // [1]const waitingCallback = this.waiting.shift();waitingCallback(null, worker);}this.active = this.active.filter(w => worker !== w); // [2]this.pool.push(worker); }前面的代碼也很簡單,其解釋如下:
- 如果在waiting任務隊列里面有任務需要被執行,我們只需為這個任務分配一個進程worker執行。
- 否則,如果在waiting任務隊列中都沒有需要被執行的任務,我們則把active的進程列表中的進程放回進程池中。
正如我們所看到的,進程從來沒有中斷,只在為其不斷地重新分配任務,使我們可以通過在每個請求不重新啟動一個進程達到節省時間和空間的目的。然而,重要的是要注意,這可能并不總是最好的選擇,這很大程度上取決于我們的應用程序的要求。為減少進程池長期占用內存,可能的調整如下:
- 在一個進程空閑一段時間后,終止進程,釋放內存空間。
- 添加一個機制來終止或重啟沒有響應的或者崩潰了的進程。
父子進程通信
現在我們的ProcessPool類已經準備就緒,我們可以使用它來實現SubsetSumFork模塊,SubsetSumFork的作用是與子進程進行通信得到子集求和的結果。前面曾說到,用child_process.fork()啟動一個進程也給了我們創建了一個簡單的基于消息的管道,通過實現subsetSumFork.js模塊來看看它是如何工作的:
const EventEmitter = require('events').EventEmitter; const ProcessPool = require('./processPool'); const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);class SubsetSumFork extends EventEmitter {constructor(sum, set) {super();this.sum = sum;this.set = set;}start() {workers.acquire((err, worker) => { // [1]worker.send({sum: this.sum, set: this.set});const onMessage = msg => {if (msg.event === 'end') { // [3]worker.removeListener('message', onMessage);workers.release(worker);}this.emit(msg.event, msg.data); // [4]};worker.on('message', onMessage); // [2]});} }module.exports = SubsetSumFork;首先注意,我們在subsetSumWorker.js調用ProcessPool的構造函數創建ProcessPool實例。 我們還將進程池的最大容量設置為2。
另外,我們試圖維持原來的SubsetSum類相同的公共API。實際上,SubsetSumFork是EventEmitter的子類,它的構造函數接受sum和set,而start()方法則觸發算法的執行,而這個SubsetSumFork實例運行在一個單獨的進程上。調用start()方法時會發生的情況:
這就是SubsetSumFork模塊現在我們來實現這個worker應用程序。
與父進程進行通信
現在我們來創建subsetSumWorker.js模塊,我們的應用程序,這個模塊的全部內容將在一個單獨的進程中運行:
const SubsetSum = require('./subsetSum');process.on('message', msg => { // [1]const subsetSum = new SubsetSum(msg.sum, msg.set);subsetSum.on('match', data => { // [2]process.send({event: 'match', data: data});});subsetSum.on('end', data => {process.send({event: 'end', data: data});});subsetSum.start(); });由于我們的handler處于一個單獨的進程中,我們不必擔心這類CPU-bound任務阻塞事件循環,所有的HTTP請求將繼續由主應用程序的事件循環處理,而不會中斷。
當子進程開始啟動時,父進程:
多進程模式
嘗試新版本的子集求和算法,我們只需要替換HTTP服務器使用的模塊(文件app.js):
運行結果如下:
更有趣的是,我們也可以嘗試同時啟動兩個subsetSum任務,我們可以充分看到多核CPU的作用。 相反,如果我們嘗試同時運行三個subsetSum任務,結果應該是最后一個啟動將掛起。這不是因為主進程的事件循環被阻塞,而是因為我們為subsetSum任務設置了兩個進程的并發限制。
正如我們所看到的,多進程模式比interleaving模式更加強大和靈活;然而,由于單個機器提供的CPU和內存資源量仍然是一個硬性限制,所以它仍然不可擴展。在這種情況下,將負載分配到多臺機器上,則是更優秀的解決辦法。
值得一提的是,在運行CPU-bound任務時,多線程可以成為多進程的替代方案。目前,有幾個npm包公開了一個用于處理用戶級模塊的線程的API;其中最流行的是webworker-threads。但是,即使線程更輕量級,完整的進程也可以提供更大的靈活性,并具備更高更可靠的容錯處理。總結
本章講述以下三點:
- 異步初始化模塊
- 批處理和緩存在Node.js異步中的運用
- 使用異步或者多進程來處理CPU-bound的任務
總結
以上是生活随笔為你收集整理的《Node.js设计模式》高级异步准则的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2018,微软可能要在方方面面融入进企业
- 下一篇: 微信公众号管理系统 RhaPHP1.2.