Spark系列(八)Worker工作原理
工作原理圖
?
源代碼分析
包名:org.apache.spark.deploy.worker
啟動driver入口點:registerWithMaster方法中的case LaunchDriver
| 1? | case?LaunchDriver(driverId,?driverDesc)?=>?{ | 
| 2? | ??? logInfo(s"Asked?to?launch?driver?$driverId") | 
| 3? | ??? //?創建DriverRunner對象啟動Driver | 
| 4? | ??? val?driver?=?new?DriverRunner( | 
| 5? | ??? conf, | 
| 6? | ??? driverId, | 
| 7? | ??? workDir, | 
| 8? | ??? sparkHome, | 
| 9? | ??? driverDesc.copy(command?=?Worker.maybeUpdateSSLSettings(driverDesc.command,?conf)), | 
| 10? | ??? self, | 
| 11? | ??? akkaUrl) | 
| 12? | ??? //?將driver加入本地緩存 | 
| 13? | ??? drivers(driverId)?=?driver | 
| 14? | ??? driver.start() | 
| 15? | ? | 
| 16? | ??? //?增加已使用core | 
| 17? | ??? coresUsed?+=?driverDesc.cores | 
| 18? | ??? //?增加已使用內存 | 
| 19? | ??? memoryUsed?+=?driverDesc.mem | 
| 20? | } | 
?
DriverRunner
管理一個driver的執行,包括失敗時自動重啟driver,這種方式僅僅適用于standalone集群部署模式
DriverRunner類中start方法實現
| 1? | def?start()?=?{ | 
| 2? | ??? //?創建新線程 | 
| 3? | ??? new?Thread("DriverRunner?for?"?+?driverId)?{ | 
| 4? | ??? ??override?def?run()?{ | 
| 5? | ??? ??? try?{ | 
| 6? | ??? ??? ??//?創建driver工作目錄 | 
| 7? | ??? ??? ??val?driverDir?=?createWorkingDirectory() | 
| 8? | ??? ??? ??//?下載應用所需的的Jar包 | 
| 9? | ??? ??? ??val?localJarFilename?=?downloadUserJar(driverDir) | 
| 10? | ? | 
| 11? | ??? ??? ??def?substituteVariables(argument:?String):?String?=?argument?match?{ | 
| 12? | ??? ??? ??? case?"{{WORKER_URL}}"?=>?workerUrl | 
| 13? | ??? ??? ??? case?"{{USER_JAR}}"?=>?localJarFilename | 
| 14? | ??? ??? ??? case?other?=>?other | 
| 15? | ??? ??? ??} | 
| 16? | ? | 
| 17? | ??? ??? ??//?TODO:?If?we?add?ability?to?submit?multiple?jars?they?should?also?be?added?here | 
| 18? | ??? ??? ??//?構建ProcessBuilder對象,傳入啟動driver命令(所需內存大小) | 
| 19? | ??? ??? ??val?builder?=?CommandUtils.buildProcessBuilder(driverDesc.command,?driverDesc.mem, | 
| 20? | ??? ??? ??? sparkHome.getAbsolutePath,?substituteVariables) | 
| 21? | ??? ??? ??//?啟動driver進程 | 
| 22? | ??? ??? ??launchDriver(builder,?driverDir,?driverDesc.supervise) | 
| 23? | ??? ??? } | 
| 24? | ??? ??? catch?{ | 
| 25? | ??? ??? ??case?e:?Exception?=>?finalException?=?Some(e) | 
| 26? | ??? ??? } | 
| 27? | ? | 
| 28? | ??? ??? //?Driver退出狀態處理 | 
| 29? | ??? ??? val?state?= | 
| 30? | ??? ??? ??if?(killed)?{ | 
| 31? | ??? ??? ??? DriverState.KILLED | 
| 32? | ??? ??? ??}?else?if?(finalException.isDefined)?{ | 
| 33? | ??? ??? ??? DriverState.ERROR | 
| 34? | ??? ??? ??}?else?{ | 
| 35? | ??? ??? ??? finalExitCode?match?{ | 
| 36? | ??? ??? ??? ??case?Some(0)?=>?DriverState.FINISHED | 
| 37? | ??? ??? ??? ??case?_?=>?DriverState.FAILED | 
| 38? | ??? ??? ??? } | 
| 39? | ??? ??? ??} | 
| 40? | ? | 
| 41? | ??? ??? finalState?=?Some(state) | 
| 42? | ??? ??? //?向Driver所屬worker發送DriverStateChanged消息 | 
| 43? | ??? ??? worker?!?DriverStateChanged(driverId,?state,?finalException) | 
| 44? | ??? ??} | 
| 45? | ??? }.start() | 
| 46? | } | 
?
LaunchExecutor
管理LaunchExecutor的啟動
| 1? | case?LaunchExecutor(masterUrl,?appId,?execId,?appDesc,?cores_,?memory_)?=> | 
| 2? | ??? if?(masterUrl?!=?activeMasterUrl)?{ | 
| 3? | ??? logWarning("Invalid?Master?("?+?masterUrl?+?")?attempted?to?launch?executor.") | 
| 4? | ??? }?else?{ | 
| 5? | ??? try?{ | 
| 6? | ??? ??logInfo("Asked?to?launch?executor?%s/%d?for?%s".format(appId,?execId,?appDesc.name)) | 
| 7? | ? | 
| 8? | ??? ??//?Create?the?executor's?working?directory | 
| 9? | ??? ??//?創建executor本地工作目錄 | 
| 10? | ??? ??val?executorDir?=?new?File(workDir,?appId?+?"/"?+?execId) | 
| 11? | ??? ??if?(!executorDir.mkdirs())?{ | 
| 12? | ??? ??? throw?new?IOException("Failed?to?create?directory?"?+?executorDir) | 
| 13? | ??? ??} | 
| 14? | ? | 
| 15? | ??? ??//?Create?local?dirs?for?the?executor.?These?are?passed?to?the?executor?via?the | 
| 16? | ??? ??//?SPARK_LOCAL_DIRS?environment?variable,?and?deleted?by?the?Worker?when?the | 
| 17? | ??? ??//?application?finishes. | 
| 18? | ??? ??val?appLocalDirs?=?appDirectories.get(appId).getOrElse?{ | 
| 19? | ??? ??? Utils.getOrCreateLocalRootDirs(conf).map?{?dir?=> | 
| 20? | ??? ??? ??Utils.createDirectory(dir).getAbsolutePath() | 
| 21? | ??? ??? }.toSeq | 
| 22? | ??? ??} | 
| 23? | ??? ??appDirectories(appId)?=?appLocalDirs | 
| 24? | ??? ??//?創建ExecutorRunner對象 | 
| 25? | ??? ??val?manager?=?new?ExecutorRunner( | 
| 26? | ??? ??? appId, | 
| 27? | ??? ??? execId, | 
| 28? | ??? ??? appDesc.copy(command?=?Worker.maybeUpdateSSLSettings(appDesc.command,?conf)), | 
| 29? | ??? ??? cores_, | 
| 30? | ??? ??? memory_, | 
| 31? | ??? ??? self, | 
| 32? | ??? ??? workerId, | 
| 33? | ??? ??? host, | 
| 34? | ??? ??? webUi.boundPort, | 
| 35? | ??? ??? publicAddress, | 
| 36? | ??? ??? sparkHome, | 
| 37? | ??? ??? executorDir, | 
| 38? | ??? ??? akkaUrl, | 
| 39? | ??? ??? conf, | 
| 40? | ??? ??? appLocalDirs,?ExecutorState.LOADING) | 
| 41? | ??? ??//?executor加入本地緩存 | 
| 42? | ??? ??executors(appId?+?"/"?+?execId)?=?manager | 
| 43? | ??? ??manager.start() | 
| 44? | ??? ??//?增加worker已使用core | 
| 45? | ??? ??coresUsed?+=?cores_ | 
| 46? | ??? ??//?增加worker已使用memory | 
| 47? | ??? ??memoryUsed?+=?memory_ | 
| 48? | ??? ??//?通知master發送ExecutorStateChanged消息 | 
| 49? | ??? ??master?!?ExecutorStateChanged(appId,?execId,?manager.state,?None,?None) | 
| 50? | ??? } | 
| 51? | ??? //?異常情況處理,通知master發送ExecutorStateChanged?FAILED消息 | 
| 52? | ??? catch?{ | 
| 53? | ??? ??case?e:?Exception?=>?{ | 
| 54? | ??? ??? logError(s"Failed?to?launch?executor?$appId/$execId?for?${appDesc.name}.",?e) | 
| 55? | ??? ??? if?(executors.contains(appId?+?"/"?+?execId))?{ | 
| 56? | ??? ??? ??executors(appId?+?"/"?+?execId).kill() | 
| 57? | ??? ??? ??executors?-=?appId?+?"/"?+?execId | 
| 58? | ??? ??? } | 
| 59? | ??? ??? master?!?ExecutorStateChanged(appId,?execId,?ExecutorState.FAILED, | 
| 60? | ??? ??? ??Some(e.toString),?None) | 
| 61? | ??? ??} | 
| 62? | ??? } | 
| 63? | } | 
?
總結
1、Worker、Driver、Application啟動后都會向Master進行注冊,并緩存到Master內存數據模型中 
2、完成注冊后發送LaunchExecutor、LaunchDriver到Worker 
3、Worker收到消息后啟動executor和driver進程,并調用Worker的ExecutorStateChanged和DriverStateChanged方法 
4、發送ExecutorStateChanged和DriverStateChanged消息到Master的,根據各自的狀態信息進行處理,最重要的是會調用schedule方法進行資源的重新調度
轉載于:https://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BWorker%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html
總結
以上是生活随笔為你收集整理的Spark系列(八)Worker工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Educational Codeforc
- 下一篇: MongoDB 连接数高产生原因及解决
