<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
我們公司有個專案的資料量高達五千萬,但是因為報表那塊資料不太準確,業務庫和報表庫又是跨庫操作,所以並不能使用 SQL 來進行同步。當時的打算是通過 mysqldump
或者儲存的方式來進行同步,但是嘗試後發現這些方案都不切實際:
mysqldump
:不僅備份需要時間,同步也需要時間,而且在備份的過程,可能還會有資料產出(也就是說同步等於沒同步)
儲存方式:這個效率太慢了,要是資料量少還好,我們使用這個方式的時候,三個小時才同步兩千條資料…
後面在網上檢視後,發現 DataX 這個工具用來同步不僅速度快,而且同步的資料量基本上也相差無幾。
DataX 是阿里雲 DataWorks 資料整合 的開源版本,主要就是用於實現資料間的離線同步。 DataX 致力於實現包括關係型資料庫(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各種異構資料來源(即不同的資料庫) 間穩定高效的資料同步功能。
為了 解決異構資料來源同步問題,DataX 將複雜的網狀同步鏈路變成了星型資料鏈路,DataX 作為中間傳輸載體負責連線各種資料來源;
當需要接入一個新的資料來源時,只需要將此資料來源對接到 DataX,便能跟已有的資料來源作為無縫資料同步。
DataX 採用 Framework + Plugin 架構,將資料來源讀取和寫入抽象稱為 Reader/Writer 外掛,納入到整個同步框架中。
角色 | 作用 |
---|---|
Reader(採集模組) | 負責採集資料來源的資料,將資料傳送給 Framework 。 |
Writer(寫入模組) | 負責不斷向 Framework 中取資料,並將資料寫入到目的端。 |
Framework(中間商) | 負責連線 Reader 和 Writer ,作為兩者的資料傳輸通道,並處理緩衝,流控,並行,資料轉換等核心技術問題。 |
DataX 完成單個資料同步的作業,我們稱為 Job,DataX 接收到一個 Job 後,將啟動一個程序來完成整個作業同步過程。DataX Job 模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分、TaskGroup 管理等功能。
DataX Job 啟動後,會根據不同源端的切分策略,將 Job 切分成多個小的 Task (子任務),以便於並行執行。
接著 DataX Job 會呼叫 Scheduler 模組,根據設定的並行數量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組)
每一個 Task 都由 TaskGroup 負責啟動,Task 啟動後,會固定啟動 Reader --> Channel --> Writer 執行緒來完成任務同步工作。
DataX 作業執行啟動後,Job 會對 TaskGroup 進行監控操作,等待所有 TaskGroup 完成後,Job 便會成功退出(異常退出時 值非 0)
DataX 排程過程:
首先 DataX Job 模組會根據分庫分表切分成若干個 Task,然後根據使用者設定並行數,來計算需要分配多少個 TaskGroup;
計算過程:Task / Channel = TaskGroup
,最後由 TaskGroup 根據分配好的並行數來執行 Task(任務)
準備工作:
tar
包方式不需要安裝)主機名 | 作業系統 | IP 地址 | 軟體包 |
---|---|---|---|
MySQL-1 | CentOS 7.4 | 192.168.1.1 | jdk-8u181-linux-x64.tar.gz datax.tar.gz |
MySQL-2 | CentOS 7.4 | 192.168.1.2 |
安裝 JDK:下載地址(需要建立 Oracle 賬號)
[root@MySQL-1 ~]# ls anaconda-ks.cfg jdk-8u181-linux-x64.tar.gz [root@MySQL-1 ~]# tar zxf jdk-8u181-linux-x64.tar.gz [root@DataX ~]# ls anaconda-ks.cfg jdk1.8.0_181 jdk-8u181-linux-x64.tar.gz [root@MySQL-1 ~]# mv jdk1.8.0_181 /usr/local/java [root@MySQL-1 ~]# cat <<END >> /etc/profile export JAVA_HOME=/usr/local/java export PATH=$PATH:"$JAVA_HOME/bin" END [root@MySQL-1 ~]# source /etc/profile [root@MySQL-1 ~]# java -version
因為 CentOS 7
上自帶 Python 2.7
的軟體包,所以不需要進行安裝。
[root@MySQL-1 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz [root@MySQL-1 ~]# tar zxf datax.tar.gz -C /usr/local/ [root@MySQL-1 ~]# rm -rf /usr/local/datax/plugin/*/._* # 需要刪除隱藏檔案 (重要)
當未刪除時,可能會輸出:
[/usr/local/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 請檢查您的組態檔.
驗證:
[root@MySQL-1 ~]# cd /usr/local/datax/bin [root@MySQL-1 ~]# python datax.py ../job/job.json # 用來驗證是否安裝成功
輸出:
2021-12-13 19:26:28.828 [job-0] INFO JobContainer - PerfTrace not enable! 2021-12-13 19:26:28.829 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.060s | All Task WaitReaderTime 0.068s | Percentage 100.00% 2021-12-13 19:26:28.829 [job-0] INFO JobContainer - 任務啟動時刻 : 2021-12-13 19:26:18 任務結束時刻 : 2021-12-13 19:26:28 任務總計耗時 : 10s 任務平均流量 : 253.91KB/s 記錄寫入速度 : 10000rec/s 讀出記錄總數 : 100000 讀寫失敗總數 : 0
檢視 streamreader --> streamwriter
的模板:
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter
輸出:
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
根據模板編寫 json
檔案
[root@MySQL-1 ~]# cat <<END > test.json { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ # 同步的列名 (* 表示所有) { "type":"string", "value":"Hello." }, { "type":"string", "value":"河北彭于晏" }, ], "sliceRecordCount": "3" # 列印數量 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "utf-8", # 編碼 "print": true } } } ], "setting": { "speed": { "channel": "2" # 並行 (即 sliceRecordCount * channel = 結果) } } } }
輸出:(要是複製我上面的話,需要把 #
帶的內容去掉)
分別在兩臺主機上安裝:
[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel [root@MySQL-1 ~]# systemctl start mariadb # 安裝 MariaDB 資料庫 [root@MySQL-1 ~]# mysql_secure_installation # 初始化 NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB SERVERS IN PRODUCTION USE! PLEASE READ EACH STEP CAREFULLY! Enter current password for root (enter for none): # 直接回車 OK, successfully used password, moving on... Set root password? [Y/n] y # 設定 root 密碼 New password: Re-enter new password: Password updated successfully! Reloading privilege tables.. ... Success! Remove anonymous users? [Y/n] y # 移除匿名使用者 ... skipping. Disallow root login remotely? [Y/n] n # 允許 root 遠端登入 ... skipping. Remove test database and access to it? [Y/n] y # 移除測試資料庫 ... skipping. Reload privilege tables now? [Y/n] y # 重新載入表 ... Success!
1)準備同步資料(要同步的兩臺主機都要有這個表)
MariaDB [(none)]> create database `course-study`; Query OK, 1 row affected (0.00 sec) MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30)); Query OK, 0 rows affected (0.00 sec)
因為是使用 DataX 程式進行同步的,所以需要在雙方的資料庫上開放許可權:
grant all privileges on *.* to root@'%' identified by '123123'; flush privileges;
2)建立儲存過程:
DELIMITER $$ CREATE PROCEDURE test() BEGIN declare A int default 1; while (A < 3000000)do insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com")); set A = A + 1; END while; END $$ DELIMITER ;
3)呼叫儲存過程(在資料來源設定,驗證同步使用):
call test();
1)生成 MySQL 到 MySQL 同步的模板:
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter { "job": { "content": [ { "reader": { "name": "mysqlreader", # 讀取端 "parameter": { "column": [], # 需要同步的列 (* 表示所有的列) "connection": [ { "jdbcUrl": [], # 連線資訊 "table": [] # 連線表 } ], "password": "", # 連線使用者 "username": "", # 連線密碼 "where": "" # 描述篩選條件 } }, "writer": { "name": "mysqlwriter", # 寫入端 "parameter": { "column": [], # 需要同步的列 "connection": [ { "jdbcUrl": "", # 連線資訊 "table": [] # 連線表 } ], "password": "", # 連線密碼 "preSql": [], # 同步前. 要做的事 "session": [], "username": "", # 連線使用者 "writeMode": "" # 操作型別 } } } ], "setting": { "speed": { "channel": "" # 指定並行數 } } } }
2)編寫 json
檔案:
[root@MySQL-1 ~]# vim install.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123123", "column": ["*"], "splitPk": "ID", "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8" ], "table": ["t_member"] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["*"], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8", "table": ["t_member"] } ], "password": "123123", "preSql": [ "truncate t_member" ], "session": [ "set session sql_mode='ANSI'" ], "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "5" } } } }
3)驗證
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json
輸出:
2021-12-15 16:45:15.120 [job-0] INFO JobContainer - PerfTrace not enable!
2021-12-15 16:45:15.120 [job-0] INFO StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 82.173s | All Task WaitReaderTime 75.722s | Percentage 100.00%
2021-12-15 16:45:15.124 [job-0] INFO JobContainer -
任務啟動時刻 : 2021-12-15 16:44:32
任務結束時刻 : 2021-12-15 16:45:15
任務總計耗時 : 42s
任務平均流量 : 2.57MB/s
記錄寫入速度 : 74999rec/s
讀出記錄總數 : 2999999
讀寫失敗總數 : 0
你們可以在目的資料庫進行檢視,是否同步完成。
上面的方式相當於是完全同步,但是當資料量較大時,同步的時候被中斷,是件很痛苦的事情;所以在有些情況下,增量同步還是蠻重要的。
使用 DataX 進行全量同步和增量同步的唯一區別就是:增量同步需要使用 where
進行條件篩選。(即,同步篩選後的 SQL)
1)編寫 json
檔案:
[root@MySQL-1 ~]# vim where.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123123", "column": ["*"], "splitPk": "ID", "where": "ID <= 1888", "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8" ], "table": ["t_member"] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["*"], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8", "table": ["t_member"] } ], "password": "123123", "preSql": [ "truncate t_member" ], "session": [ "set session sql_mode='ANSI'" ], "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "5" } } } }
需要注意的部分就是:where
(條件篩選) 和 preSql
(同步前,要做的事) 引數。
2)驗證:
[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json
輸出:
2021-12-16 17:34:38.534 [job-0] INFO JobContainer - PerfTrace not enable! 2021-12-16 17:34:38.534 [job-0] INFO StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.002s | All Task WaitReaderTime 100.570s | Percentage 100.00% 2021-12-16 17:34:38.537 [job-0] INFO JobContainer - 任務啟動時刻 : 2021-12-16 17:34:06 任務結束時刻 : 2021-12-16 17:34:38 任務總計耗時 : 32s 任務平均流量 : 1.61KB/s 記錄寫入速度 : 62rec/s 讀出記錄總數 : 1888 讀寫失敗總數 : 0
目標資料庫上檢視:
3)基於上面資料,再次進行增量同步:
主要是 where 設定:"where": "ID > 1888 AND ID <= 2888" # 通過條件篩選來進行增量同步
同時需要將我上面的 preSql 刪除(因為我上面做的操作時 truncate 表)
以上就是高效的資料同步工具DataX的使用及實現範例的詳細內容,更多關於DataX資料同步工具的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45