mysql迁移cassandra_使用datax迁移cassandra数据
DataX 是阿里巴巴集團內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺,實現(xiàn)各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。最近,阿里云cassandra團隊為datax提供了cassandra讀寫插件,進一步豐富了datax支持的數(shù)據(jù)源,可以很方便實現(xiàn)cassandra之間以及cassandra與其他數(shù)據(jù)源之間的數(shù)據(jù)同步。本文簡單介紹如何使用datax同步cassandra的數(shù)據(jù),針對幾種常見的場景給出配置文件示例,還提供了提升同步性能的建議和實測的性能數(shù)據(jù)。
datax快速入門
使用datax同步數(shù)據(jù)的方法很簡單,一共只需要三步:
部署datax。
編寫同步作業(yè)配置文件。
運行datax,等待同步作業(yè)完成。
datax的部署和運行都很簡單,可以通過datax官方提供的下載地址下載DataX工具包,下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業(yè):
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
同步作業(yè)的配置格式可以參考datax文檔。
一個典型的配置文件如下:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
一個同步作業(yè)的配置文件主要包括兩部分,setting包括任務調(diào)度的一些配置,content描述同步任務的內(nèi)容,里面包含reader插件的配置和writer插件的配置。例如我們需要從mysql同步數(shù)據(jù)到cassandra,那么我們只需要把reader配置為mysqlreader,writer配置為cassandrawriter,并提供相應的插件配置信息即可。在datax項目頁面上面可以看到datax支持的插件列表,點擊對應的鏈接就可以查看相關插件的文檔了解插件需要的配置內(nèi)容和格式要求。例如,cassandra插件的文檔可點擊如下鏈接:讀插件 寫插件。
以下列舉幾種常見的場景。
場景一 cassandra之間的數(shù)據(jù)同步
最常見的場景是把數(shù)據(jù)從一個集群同步到另一個集群,例如機房整體遷移、上云等。這時需要先手動在目標集群創(chuàng)建好keyspace和表的schema,然后使用datax進行同步。作為例子,下面的配置文件把數(shù)據(jù)從cassandra的一個表同步到另一個表:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"id",
"name"
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"id",
"name"
]
}
}
}
]
}
}
場景二 從mysql同步到cassandra
datax支持多種數(shù)據(jù)源,可以很方便做到cassandra和其他數(shù)據(jù)源之間的數(shù)據(jù)同步。下面的配置把數(shù)據(jù)從mysql同步到cassandra:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"id",
"name"
]
}
}
}
]
}
}
場景三 只同步cassandra中的一部分數(shù)據(jù)
我們在讀插件的配置中提供了where關鍵字,可以用來只同步一部分數(shù)據(jù)。例如對于時序數(shù)據(jù)等場景定期同步的情況,就可以通過增加where的條件來實現(xiàn)只同步增量數(shù)據(jù)。where條件的格式和cql相同,例如"where":"textcol='a'"的作用類似于使用select * from table_name where textcol = 'a'進行查詢。另外還有allowFiltering關鍵字配合where使用,作用和cql中的ALLOW FILTERING關鍵字也是相同的。下面給出一個配置的例子:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_src",
"column": [
"deviceId",
"time",
"log"
],
"where":"time > '2019-09-25'",
"allowFiltering":true
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "datax_dst",
"column": [
"deviceId",
"time",
"log"
]
}
}
}
]
}
}
提高同步速度
以cassandra之間的數(shù)據(jù)同步為例。如下這些配置會對數(shù)據(jù)同步任務的性能產(chǎn)生影響:
(1)并行度
可以通過調(diào)大任務的并行度來提高同步速度。這主要通過job.setting.speed.channel從參數(shù)來實現(xiàn)。例如下面這個配置的效果是會有10個線程并行執(zhí)行同步任務。
"job": {
"setting": {
"speed": {
"channel": 10
}
},
...
需要注意的是,cassandra讀插件里面,切分任務是通過在cql語句中增加token范圍條件來實現(xiàn)的,所以只有使用RandomPartitioner和Murmur3Partitioner的集群才能夠正確切分。如果您的集群使用了其他的Partitioner,cassandrareader插件會忽略channel配置,只用一個線程進行同步。
(2)batch
可以通過配置batchSize關鍵字在cassandra寫插件里面使用UNLOGGED batch來提高寫入速度。但是需要注意cassandra中對batch的使用有一些限制,使用這個關鍵字之前建議先閱讀[《簡析Cassandra的BATCH操作》(https://yq.aliyun.com/articles/719784?spm=a2c4e.11155435.0.0.65386b04OYOsvK))一文中關于batch使用限制的內(nèi)容。
(3)連接池配置
寫插件還提供了連接池相關的配置connectionsPerHost和maxPendingPerConnection。這兩個參數(shù)的具體含義可以參考[java driver文檔](https://docs.datastax.com/en/developer/java-driver/3.7/manual/pooling/)。
(4)一致性配置
讀寫插件中都提供了consistancyLevel關鍵字,默認的讀寫一致性級別都是LOCAL_QUORUM。如果您的業(yè)務場景里面可以允許兩個集群的數(shù)據(jù)有少量不一致,也可以考慮不使用默認一致性級別來提高讀寫性能,例如使用ONE級別來讀數(shù)據(jù)。
性能數(shù)據(jù)
我們通過一個測試來觀察datax同步數(shù)據(jù)的性能。
服務端使用阿里云cassandra,源集群和目標集群均為3節(jié)點,規(guī)格為4CPU 8GB??蛻舳耸褂靡慌_ECS,規(guī)則為4 CPU 16 GB。
首先使用cassandra-stress向源集群寫入500w行數(shù)據(jù):
cassandra-stress write cl=QUORUM n=5000000 -schema "replication(factor=3) keyspace=test" -rate "threads=300" -col "n=FIXED(10) size=FIXED(64)" -errors "retries=32" -mode "native cql3 user=$USER password=$PWD" -node "$NODE"
寫入過程的統(tǒng)計數(shù)據(jù)如下:
然后使用datax將這些數(shù)據(jù)從源集群同步到目標集群。配置文件如下:
{
"job": {
"setting": {
"speed": {
"channel": 10
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "",
"port": 9042,
"username":"",
"password":"",
"useSSL": false,
"keyspace": "test",
"table": "standard1",
"column": [
"key",
"\"C0\"",
"\"C1\"",
"\"C2\"",
"\"C3\"",
"\"C4\"",
"\"C5\"",
"\"C6\"",
"\"C7\"",
"\"C8\"",
"\"C9\""
]
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "",
"port": 9042,
"username":"",
"password":"",
"useSSL": false,
"keyspace": "test",
"table": "standard1",
"batchSize":6,
"column": [
"key",
"\"C0\"",
"\"C1\"",
"\"C2\"",
"\"C3\"",
"\"C4\"",
"\"C5\"",
"\"C6\"",
"\"C7\"",
"\"C8\"",
"\"C9\""
]
}
}
}
]
}
}
同步過程的統(tǒng)計數(shù)據(jù)如下:
可見,datax同步數(shù)據(jù)的性能和cassandra-stress的性能相當,甚至要好一些。
本文作者:_陸豪
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的mysql迁移cassandra_使用datax迁移cassandra数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mac系统python写文件_Mac中p
- 下一篇: pythoncharm快捷键_Pycha