在线实时大数据平台Storm集成redis开发(分布锁)
生活随笔
收集整理的這篇文章主要介紹了
在线实时大数据平台Storm集成redis开发(分布锁)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1、需求場景:spout從ftp列表中拿到未讀取的文件讀取并發(fā)射行到Bolt,bolt進行業(yè)務(wù)處理后提交下一Bolt入庫。用redis主要是:保存文件列表對象,使用分布鎖來同步互斥訪問共享對象,使文件處理不重復(fù)。
2、topo主函數(shù)代碼:
package ct.topo;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;import ct.tool.ComUtil;public class TopoMain {public static void main(String[] args) throws InterruptedException {//Configuration Config conf = new Config(); //conf.setMaxSpoutPending(2); //緩存tuple //conf.setMessageTimeoutSecs(5); // 消息處理延時//conf.setNumAckers(2); // 消息處理ackerconf.setNumWorkers(3);//設(shè)置個進程 //提取參數(shù)conf.put("city", args[0]); //地址,如深圳輸入szconf.put("date", args[1]); //日期,如20160808//ftp服務(wù)器字符串格式:IP|port|username|passwordString strFtpSrv="127.0.0.1|21|name|pwd";conf.put("FtpSrv", strFtpSrv);//Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("FTPReader",new FTPReader(),3); //根據(jù)IMSI字段匯聚到同一task執(zhí)行builder.setBolt("IMSICounter",new IMSICounter(),300).fieldsGrouping("FTPReader", new Fields("imsi"));builder.setBolt("DBWriter", new DBWriter(),50).shuffleGrouping("IMSICounter"); //集群生產(chǎn)模式 try { //storm jar /mnt/dis.jar ct.topo.TopoMain sz 20160825 > /data/storm/log/debug.logStormSubmitter.submitTopology("O2OTopo", conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } //本地調(diào)試模式 //storm jar /mnt/dis.jar ct.topo.TopoMain sz 20160825 > /data/storm/log/debug.log &//conf.setMaxTaskParallelism(1); //LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("O2OTopo", conf, builder.createTopology()); } } 3、spout代碼:? ? ?
package ct.topo;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils;import ct.tool.ComUtil; import ct.tool.FTPHandle; import ct.tool.RedisDisLock; import redis.clients.jedis.Jedis;public class FTPReader extends BaseRichSpout{private SpoutOutputCollector collector;private String FtpSrv=null;private String FtpCD=null;private Jedis jd;public void ack(Object msgId) {//成功處理tuple//ComUtil.writeLogs("OK:"+msgId);}public void close() {this.jd.close();}public void fail(Object msgId) {//失敗處理tupleComUtil.writeLogs("FTPReader.fail:"+msgId+"emit fail!");}/*** We will create the file and get the collector object* 三個參數(shù),第一個是創(chuàng)建Topology時的配置,第二個是所有的Topology數(shù)據(jù),第三個是用來把Spout的數(shù)據(jù)發(fā)射給bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { this.jd = new Jedis("127.0.0.1",6379); //jedis長連接//初始化發(fā)射器this.collector = collector;//獲取ftp服務(wù)器參數(shù)和文件讀取路徑FtpSrv=conf.get("FtpSrv").toString();String city=conf.get("city").toString();String date=conf.get("date").toString();FtpCD="/"+city+"/"+date;//文件目錄}/*** ftp下載文件并解壓,去除每一行發(fā)射* 下載的文件需要過濾已下載*/public void nextTuple() { //Utils.sleep(200);String fileName=null;try{//從redis的FtpFileMap對象中獲取未讀取的文件名//業(yè)務(wù)處理,獲取一份未讀取的文件名字列表 Iterator<String> iter=jd.hkeys("FtpFileMap").iterator(); while (iter.hasNext()){ String key = iter.next(); List<String> list=jd.hmget("FtpFileMap",key);String value=list.get(0);if(value.equals("n")) {fileName=key;//如果文件未讀取,則提取 //非阻塞拿鎖,nexttuple不能阻塞,本身就是循環(huán),沒拿到鎖直接返回if (RedisDisLock.acquireLockNonBlocking(jd,"FtpFileLock")){return;} jd.hset("FtpFileMap", key,"y");//設(shè)置為已讀,原子性更新操作//釋放鎖RedisDisLock.releaseLock(jd, "FtpFileLock");break;}} jd.close(); }catch(Exception e){ComUtil.writeLogs("FTPReader.nextTuple:"+e.getMessage());}try{//對未讀取的文件列表下載并解壓、讀取并發(fā)射if(fileName!=null){String localFileName=FTPHandle.FtpFileDownLoad(FtpSrv,FtpCD,fileName);BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(localFileName)));String line=null;List<String> baselist=jd.lrange("BaseList",0,-1);//返回所有元素while((line = reader.readLine()) != null){if (line != null){String[] fields=line.split("\\|",-1);if(fields.length>=12){String imsi=fields[4];String userno=fields[5];String lac=fields[9];String ci=fields[10];String starttime=fields[11];this.collector.emit(new Values(imsi,userno,lac,ci,starttime),imsi); }}}}}reader.close();ComUtil.writeLogs("FTPReader.nextTuple:"+localFileName+":File has been processed successfully!");} }catch(Exception e){ComUtil.writeLogs("FTPReader.nextTuple:"+e.getMessage());}}/*** Declare the output field */public void declareOutputFields(OutputFieldsDeclarer declarer) {String[] fields=new String[]{"imsi","userno","lac","ci","starttime"};declarer.declare(new Fields(fields));} }4、Bolt代碼的業(yè)務(wù)邏輯就不體現(xiàn);
5、總結(jié):集成redis,主要是保存共享的對象,然后在對象訪問時使用分布鎖來互斥操作,鎖內(nèi)是原子性操作,nexttuple要使用非阻塞鎖。但加鎖會影響性能,在調(diào)試過程中出現(xiàn)部分記錄發(fā)射失敗且spout/bolt任務(wù)停止情況,需要進一步優(yōu)化。
? ? ? 本文提供了一種解決思路,就是集成reids,應(yīng)用其分布鎖,解決多spout重復(fù)處理輸入文件的情況。
總結(jié)
以上是生活随笔為你收集整理的在线实时大数据平台Storm集成redis开发(分布锁)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos下排查vsftpd出现put
- 下一篇: Ansj中文分词Java开发小记