zookeeper系列(二)实战master选举
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
master選舉
考慮7*24小時向外提供服務(wù)的系統(tǒng),不能有單點故障,于是我們使用集群,采用的是Master+Slave。集群中有一臺主機(jī)和多臺備機(jī),由主機(jī)向外提供服務(wù),備機(jī)監(jiān)聽主機(jī)狀態(tài),一旦主機(jī)宕機(jī),備機(jī)必需迅速接管主機(jī)繼續(xù)向外提供服務(wù)。在這個過程中,從備機(jī)選出一臺機(jī)作為主機(jī)的過程,就是Master選舉。
架構(gòu)圖
左邊是ZooKeeper集群,右邊是3臺工作服務(wù)器。工作服務(wù)器啟動時,會去ZooKeeper的Servers節(jié)點下創(chuàng)建臨時節(jié)點,并把基本信息寫入臨時節(jié)點。這個過程叫服務(wù)注冊,系統(tǒng)中的其他服務(wù)可以通過獲取Servers節(jié)點的子節(jié)點列表,來了解當(dāng)前系統(tǒng)哪些服務(wù)器可用,這該過程叫做服務(wù)發(fā)現(xiàn)。接著這些服務(wù)器會嘗試創(chuàng)建Master臨時節(jié)點,誰創(chuàng)建成功誰就是Master,其他的兩臺就作為Slave。所有的Work Server必需關(guān)注Master節(jié)點的刪除事件。通過監(jiān)聽Master節(jié)點的刪除事件,來了解Master服務(wù)器是否宕機(jī)(創(chuàng)建臨時節(jié)點的服務(wù)器一旦宕機(jī),它所創(chuàng)建的臨時節(jié)點即會自動刪除)。一旦Master服務(wù)器宕機(jī),必需開始新一輪的Master選舉。
流程圖
核心類圖
WorkServer對應(yīng)架構(gòu)圖的WorkServer,是主工作類;
RunningData用來描述WorkServer的基本信息;
LeaderSelectorZkClient作為調(diào)度器來啟動和停止WorkServer;
代碼實現(xiàn)
/*** 工作服務(wù)器信息*/ public class RunningData implements Serializable {private static final long serialVersionUID = 4260577459043203630L;private Long cid;private String name;public Long getCid() {return cid;}public void setCid(Long cid) {this.cid = cid;}public String getName() {return name;}public void setName(String name) {this.name = name;}} /*** 工作服務(wù)器*/ public class WorkServer {// 記錄服務(wù)器狀態(tài)private volatile boolean running = false;private ZkClient zkClient;// Master節(jié)點對應(yīng)zookeeper中的節(jié)點路徑private static final String MASTER_PATH = "/master";// 監(jiān)聽Master節(jié)點刪除事件private IZkDataListener dataListener;// 記錄當(dāng)前節(jié)點的基本信息private RunningData serverData;// 記錄集群中Master節(jié)點的基本信息private RunningData masterData;private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);private int delayTime = 5;public WorkServer(RunningData rd) {this.serverData = rd; // 記錄服務(wù)器基本信息this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {//takeMaster();if (masterData != null && masterData.getName().equals(serverData.getName())){// 自己就是上一輪的Master服務(wù)器,則直接搶takeMaster();} else {// 否則,延遲5秒后再搶。主要是應(yīng)對網(wǎng)絡(luò)抖動,給上一輪的Master服務(wù)器優(yōu)先搶占master的權(quán)利,避免不必要的數(shù)據(jù)遷移開銷delayExector.schedule(new Runnable(){public void run(){takeMaster();}}, delayTime, TimeUnit.SECONDS);}}public void handleDataChange(String dataPath, Object data)throws Exception {}};}public ZkClient getZkClient() {return zkClient;}public void setZkClient(ZkClient zkClient) {this.zkClient = zkClient;}// 啟動服務(wù)器public void start() throws Exception {if (running) {throw new Exception("server has startup...");}running = true;// 訂閱Master節(jié)點刪除事件zkClient.subscribeDataChanges(MASTER_PATH, dataListener);// 爭搶Master權(quán)利takeMaster();}// 停止服務(wù)器public void stop() throws Exception {if (!running) {throw new Exception("server has stoped");}running = false;delayExector.shutdown();// 取消Master節(jié)點事件訂閱zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);// 釋放Master權(quán)利releaseMaster();}// 爭搶Masterprivate void takeMaster() {if (!running)return;try {// 嘗試創(chuàng)建Master臨時節(jié)點zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);masterData = serverData;System.out.println(serverData.getName()+" is master");// 作為演示,我們讓服務(wù)器每隔5秒釋放一次Master權(quán)利delayExector.schedule(new Runnable() { public void run() {// TODO Auto-generated method stubif (checkMaster()){releaseMaster();}}}, 5, TimeUnit.SECONDS);} catch (ZkNodeExistsException e) { // 已被其他服務(wù)器創(chuàng)建了// 讀取Master節(jié)點信息RunningData runningData = zkClient.readData(MASTER_PATH, true);if (runningData == null) {takeMaster(); // 沒讀到,讀取瞬間Master節(jié)點宕機(jī)了,有機(jī)會再次爭搶} else {masterData = runningData;}} catch (Exception e) {// ignore;}}// 釋放Master權(quán)利private void releaseMaster() {if (checkMaster()) {zkClient.delete(MASTER_PATH);}}// 檢測自己是否為Masterprivate boolean checkMaster() {try {RunningData eventData = zkClient.readData(MASTER_PATH);masterData = eventData;if (masterData.getName().equals(serverData.getName())) {return true;}return false;} catch (ZkNoNodeException e) {return false; // 節(jié)點不存在,自己肯定不是Master了} catch (ZkInterruptedException e) {return checkMaster();} catch (ZkException e) {return false;}}} /*** 調(diào)度器*/ public class LeaderSelectorZkClient {//啟動的服務(wù)個數(shù)private static final int CLIENT_QTY = 10;//zookeeper服務(wù)器的地址private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";public static void main(String[] args) throws Exception {//保存所有zkClient的列表List<ZkClient> clients = new ArrayList<ZkClient>();//保存所有服務(wù)的列表List<WorkServer> workServers = new ArrayList<WorkServer>();try {for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬創(chuàng)建10個服務(wù)器并啟動//創(chuàng)建zkClientZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());clients.add(client);//創(chuàng)建serverDataRunningData runningData = new RunningData();runningData.setCid(Long.valueOf(i));runningData.setName("Client #" + i);//創(chuàng)建服務(wù)WorkServer workServer = new WorkServer(runningData);workServer.setZkClient(client);workServers.add(workServer);workServer.start();}System.out.println("敲回車鍵退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for ( WorkServer workServer : workServers ) {try {workServer.stop();} catch (Exception e) {e.printStackTrace();} }for ( ZkClient client : clients ) {try {client.close();} catch (Exception e) {e.printStackTrace();} }} } }?
/*** 工作服務(wù)器信息*/ public class RunningData implements Serializable {private static final long serialVersionUID = 4260577459043203630L;private Long cid;private String name;public Long getCid() {return cid;}public void setCid(Long cid) {this.cid = cid;}public String getName() {return name;}public void setName(String name) {this.name = name;}} /*** 工作服務(wù)器*/ public class WorkServer {// 記錄服務(wù)器狀態(tài)private volatile boolean running = false;private ZkClient zkClient;// Master節(jié)點對應(yīng)zookeeper中的節(jié)點路徑private static final String MASTER_PATH = "/master";// 監(jiān)聽Master節(jié)點刪除事件private IZkDataListener dataListener;// 記錄當(dāng)前節(jié)點的基本信息private RunningData serverData;// 記錄集群中Master節(jié)點的基本信息private RunningData masterData;private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);private int delayTime = 5;public WorkServer(RunningData rd) {this.serverData = rd; // 記錄服務(wù)器基本信息this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {//takeMaster();if (masterData != null && masterData.getName().equals(serverData.getName())){// 自己就是上一輪的Master服務(wù)器,則直接搶takeMaster();} else {// 否則,延遲5秒后再搶。主要是應(yīng)對網(wǎng)絡(luò)抖動,給上一輪的Master服務(wù)器優(yōu)先搶占master的權(quán)利,避免不必要的數(shù)據(jù)遷移開銷delayExector.schedule(new Runnable(){public void run(){takeMaster();}}, delayTime, TimeUnit.SECONDS);}}public void handleDataChange(String dataPath, Object data)throws Exception {}};}public ZkClient getZkClient() {return zkClient;}public void setZkClient(ZkClient zkClient) {this.zkClient = zkClient;}// 啟動服務(wù)器public void start() throws Exception {if (running) {throw new Exception("server has startup...");}running = true;// 訂閱Master節(jié)點刪除事件zkClient.subscribeDataChanges(MASTER_PATH, dataListener);// 爭搶Master權(quán)利takeMaster();}// 停止服務(wù)器public void stop() throws Exception {if (!running) {throw new Exception("server has stoped");}running = false;delayExector.shutdown();// 取消Master節(jié)點事件訂閱zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);// 釋放Master權(quán)利releaseMaster();}// 爭搶Masterprivate void takeMaster() {if (!running)return;try {// 嘗試創(chuàng)建Master臨時節(jié)點zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);masterData = serverData;System.out.println(serverData.getName()+" is master");// 作為演示,我們讓服務(wù)器每隔5秒釋放一次Master權(quán)利delayExector.schedule(new Runnable() { public void run() {// TODO Auto-generated method stubif (checkMaster()){releaseMaster();}}}, 5, TimeUnit.SECONDS);} catch (ZkNodeExistsException e) { // 已被其他服務(wù)器創(chuàng)建了// 讀取Master節(jié)點信息RunningData runningData = zkClient.readData(MASTER_PATH, true);if (runningData == null) {takeMaster(); // 沒讀到,讀取瞬間Master節(jié)點宕機(jī)了,有機(jī)會再次爭搶} else {masterData = runningData;}} catch (Exception e) {// ignore;}}// 釋放Master權(quán)利private void releaseMaster() {if (checkMaster()) {zkClient.delete(MASTER_PATH);}}// 檢測自己是否為Masterprivate boolean checkMaster() {try {RunningData eventData = zkClient.readData(MASTER_PATH);masterData = eventData;if (masterData.getName().equals(serverData.getName())) {return true;}return false;} catch (ZkNoNodeException e) {return false; // 節(jié)點不存在,自己肯定不是Master了} catch (ZkInterruptedException e) {return checkMaster();} catch (ZkException e) {return false;}}} /*** 調(diào)度器*/ public class LeaderSelectorZkClient {//啟動的服務(wù)個數(shù)private static final int CLIENT_QTY = 10;//zookeeper服務(wù)器的地址private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";public static void main(String[] args) throws Exception {//保存所有zkClient的列表List<ZkClient> clients = new ArrayList<ZkClient>();//保存所有服務(wù)的列表List<WorkServer> workServers = new ArrayList<WorkServer>();try {for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬創(chuàng)建10個服務(wù)器并啟動//創(chuàng)建zkClientZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());clients.add(client);//創(chuàng)建serverDataRunningData runningData = new RunningData();runningData.setCid(Long.valueOf(i));runningData.setName("Client #" + i);//創(chuàng)建服務(wù)WorkServer workServer = new WorkServer(runningData);workServer.setZkClient(client);workServers.add(workServer);workServer.start();}System.out.println("敲回車鍵退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for ( WorkServer workServer : workServers ) {try {workServer.stop();} catch (Exception e) {e.printStackTrace();} }for ( ZkClient client : clients ) {try {client.close();} catch (Exception e) {e.printStackTrace();} }} } }zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰(zhàn)master選舉
zookeeper系列(三)實戰(zhàn)數(shù)據(jù)發(fā)布訂閱
zookeeper系列(四)實戰(zhàn)負(fù)載均衡
zookeeper系列(五)實戰(zhàn)分布式鎖
zookeeper系列(六)實戰(zhàn)分布式隊列
zookeeper系列(七)實戰(zhàn)分布式命名服務(wù)
zookeeper系列(八)zookeeper運(yùn)維
master選舉
考慮7*24小時向外提供服務(wù)的系統(tǒng),不能有單點故障,于是我們使用集群,采用的是Master+Slave。集群中有一臺主機(jī)和多臺備機(jī),由主機(jī)向外提供服務(wù),備機(jī)監(jiān)聽主機(jī)狀態(tài),一旦主機(jī)宕機(jī),備機(jī)必需迅速接管主機(jī)繼續(xù)向外提供服務(wù)。在這個過程中,從備機(jī)選出一臺機(jī)作為主機(jī)的過程,就是Master選舉。
架構(gòu)圖
左邊是ZooKeeper集群,右邊是3臺工作服務(wù)器。工作服務(wù)器啟動時,會去ZooKeeper的Servers節(jié)點下創(chuàng)建臨時節(jié)點,并把基本信息寫入臨時節(jié)點。這個過程叫服務(wù)注冊,系統(tǒng)中的其他服務(wù)可以通過獲取Servers節(jié)點的子節(jié)點列表,來了解當(dāng)前系統(tǒng)哪些服務(wù)器可用,這該過程叫做服務(wù)發(fā)現(xiàn)。接著這些服務(wù)器會嘗試創(chuàng)建Master臨時節(jié)點,誰創(chuàng)建成功誰就是Master,其他的兩臺就作為Slave。所有的Work Server必需關(guān)注Master節(jié)點的刪除事件。通過監(jiān)聽Master節(jié)點的刪除事件,來了解Master服務(wù)器是否宕機(jī)(創(chuàng)建臨時節(jié)點的服務(wù)器一旦宕機(jī),它所創(chuàng)建的臨時節(jié)點即會自動刪除)。一旦Master服務(wù)器宕機(jī),必需開始新一輪的Master選舉。
流程圖
核心類圖
WorkServer對應(yīng)架構(gòu)圖的WorkServer,是主工作類;
RunningData用來描述WorkServer的基本信息;
LeaderSelectorZkClient作為調(diào)度器來啟動和停止WorkServer;
代碼實現(xiàn)
/*** 工作服務(wù)器信息*/ public class RunningData implements Serializable {private static final long serialVersionUID = 4260577459043203630L;private Long cid;private String name;public Long getCid() {return cid;}public void setCid(Long cid) {this.cid = cid;}public String getName() {return name;}public void setName(String name) {this.name = name;}} /*** 工作服務(wù)器*/ public class WorkServer {// 記錄服務(wù)器狀態(tài)private volatile boolean running = false;private ZkClient zkClient;// Master節(jié)點對應(yīng)zookeeper中的節(jié)點路徑private static final String MASTER_PATH = "/master";// 監(jiān)聽Master節(jié)點刪除事件private IZkDataListener dataListener;// 記錄當(dāng)前節(jié)點的基本信息private RunningData serverData;// 記錄集群中Master節(jié)點的基本信息private RunningData masterData;private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);private int delayTime = 5;public WorkServer(RunningData rd) {this.serverData = rd; // 記錄服務(wù)器基本信息this.dataListener = new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {//takeMaster();if (masterData != null && masterData.getName().equals(serverData.getName())){// 自己就是上一輪的Master服務(wù)器,則直接搶takeMaster();} else {// 否則,延遲5秒后再搶。主要是應(yīng)對網(wǎng)絡(luò)抖動,給上一輪的Master服務(wù)器優(yōu)先搶占master的權(quán)利,避免不必要的數(shù)據(jù)遷移開銷delayExector.schedule(new Runnable(){public void run(){takeMaster();}}, delayTime, TimeUnit.SECONDS);}}public void handleDataChange(String dataPath, Object data)throws Exception {}};}public ZkClient getZkClient() {return zkClient;}public void setZkClient(ZkClient zkClient) {this.zkClient = zkClient;}// 啟動服務(wù)器public void start() throws Exception {if (running) {throw new Exception("server has startup...");}running = true;// 訂閱Master節(jié)點刪除事件zkClient.subscribeDataChanges(MASTER_PATH, dataListener);// 爭搶Master權(quán)利takeMaster();}// 停止服務(wù)器public void stop() throws Exception {if (!running) {throw new Exception("server has stoped");}running = false;delayExector.shutdown();// 取消Master節(jié)點事件訂閱zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);// 釋放Master權(quán)利releaseMaster();}// 爭搶Masterprivate void takeMaster() {if (!running)return;try {// 嘗試創(chuàng)建Master臨時節(jié)點zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);masterData = serverData;System.out.println(serverData.getName()+" is master");// 作為演示,我們讓服務(wù)器每隔5秒釋放一次Master權(quán)利delayExector.schedule(new Runnable() { public void run() {// TODO Auto-generated method stubif (checkMaster()){releaseMaster();}}}, 5, TimeUnit.SECONDS);} catch (ZkNodeExistsException e) { // 已被其他服務(wù)器創(chuàng)建了// 讀取Master節(jié)點信息RunningData runningData = zkClient.readData(MASTER_PATH, true);if (runningData == null) {takeMaster(); // 沒讀到,讀取瞬間Master節(jié)點宕機(jī)了,有機(jī)會再次爭搶} else {masterData = runningData;}} catch (Exception e) {// ignore;}}// 釋放Master權(quán)利private void releaseMaster() {if (checkMaster()) {zkClient.delete(MASTER_PATH);}}// 檢測自己是否為Masterprivate boolean checkMaster() {try {RunningData eventData = zkClient.readData(MASTER_PATH);masterData = eventData;if (masterData.getName().equals(serverData.getName())) {return true;}return false;} catch (ZkNoNodeException e) {return false; // 節(jié)點不存在,自己肯定不是Master了} catch (ZkInterruptedException e) {return checkMaster();} catch (ZkException e) {return false;}}} /*** 調(diào)度器*/ public class LeaderSelectorZkClient {//啟動的服務(wù)個數(shù)private static final int CLIENT_QTY = 10;//zookeeper服務(wù)器的地址private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";public static void main(String[] args) throws Exception {//保存所有zkClient的列表List<ZkClient> clients = new ArrayList<ZkClient>();//保存所有服務(wù)的列表List<WorkServer> workServers = new ArrayList<WorkServer>();try {for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬創(chuàng)建10個服務(wù)器并啟動//創(chuàng)建zkClientZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());clients.add(client);//創(chuàng)建serverDataRunningData runningData = new RunningData();runningData.setCid(Long.valueOf(i));runningData.setName("Client #" + i);//創(chuàng)建服務(wù)WorkServer workServer = new WorkServer(runningData);workServer.setZkClient(client);workServers.add(workServer);workServer.start();}System.out.println("敲回車鍵退出!\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for ( WorkServer workServer : workServers ) {try {workServer.stop();} catch (Exception e) {e.printStackTrace();} }for ( ZkClient client : clients ) {try {client.close();} catch (Exception e) {e.printStackTrace();} }} } }轉(zhuǎn)載于:https://my.oschina.net/u/164027/blog/1923921
總結(jié)
以上是生活随笔為你收集整理的zookeeper系列(二)实战master选举的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用掘金-Markdown 官方语法总结大
- 下一篇: 纪中2018暑假培训day3提高a组改题