使用EMR-Kafka Connect进行数据迁移
1.背景
流式處理中經常會遇到Kafka與其他系統進行數據同步或者Kafka集群間數據遷移的情景。使用EMR Kafka Connect可以方便快速的實現數據同步或者數據遷移。
Kafka Connect是一種可擴展的、可靠的,用于在Kafka和其他系統之間快速地進行流式數據傳輸的工具。例如可以使用Kafka Connect獲取數據庫的binglog數據,將數據庫的數據遷入Kafka集群,以同步數據庫的數據,或者對接下游的流式處理系統。同時,Kafka Connect提供的REST API接口可以方便的進行Kafka Connect的創建和管理。
 Kafka Connect分為standalone和distributed兩種運行模式。standalone模式下,所有的worker都在一個進程中運行;相比之下,distributed模式更具擴展性和容錯性,是最常用的方式,也是生產環境推薦使用的模式。
本文介紹使用EMR Kafka Connect的REST API接口在Kafka集群間進行數據遷移,使用distributed模式。
2.環境準備
創建兩個EMR集群,集群類型為Kafka。EMR Kafka Connect安裝在task節點上,進行數據遷移的目的Kafka集群需要創建task節點。集群創建好后,task節點上EMR Kafka Connect服務會默認啟動,端口號為8083。
注意要保證兩個集群的網路互通,詳細的創建流程見創建集群。
3.數據遷移
3.1準備工作
EMR Kafka Connect的配置文件路徑為/etc/ecm/kafka-conf/connect-distributed.properties。
在源Kafka集群創建需要同步的topic,例如
另外,Kafka Connect會將offsets, configs和任務狀態保存在topic中,topic名對應配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三個配置項。默認的,Kafka Connect會自動的使用默認的partition和replication factor創建這三個topic。
3.2創建Kafka Connect
在目的Kafka集群的task節點(例如emr-worker-3節點),使用curl命令通過json數據創建一個Kafka Connect。
json數據中,name字段代表創建的connect的名稱,此處為connect-test;config字段需要根據實際情況進行配置,其中的變量說明如下表
3.3查看Kafka Connect
查看所有的Kafka Connect
查看創建的connect-test的狀態
查看task的信息
3.4數據同步
在源Kafka集群創建需要同步的數據。
3.5查看同步結果
在目的Kafka集群消費同步的數據。
可以看到,在源Kafka集群發送的100000條數據已經遷移到了目的Kafka集群。
4.小結
本文介紹并演示了使用EMR kafka Connect在Kafka集群間進行數據遷移的方法,關于Kafka Connect更詳細的使用請參考Kafka官網資料和REST API使用。
原文鏈接
 本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的使用EMR-Kafka Connect进行数据迁移的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Spark内置图像数据源初探
 - 下一篇: Soloπ:支付宝开源的Android专