<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前言…
最近工作中遇到了一個資料同步的問題
我們這邊系統的一個子業務需要依賴另一個系統的資料,當另一個系統資料變更時,我們這邊的資料庫要對資料進行同步…
那麼我自己想到的同步方式呢就兩種:
1、MQ訂閱,另一個系統資料變更後將變更資料方式到MQ 我們這邊訂閱接受
2、資料庫的觸發器
但是呢,兩者都被組長paas了!
1、MQ呢,會造成程式碼侵入,但是另一個系統暫時不會做任何程式碼更改…
2、資料庫的觸發器會直接跟生產資料庫強關聯,會搶佔資源,甚至有可能造成生產資料庫的不穩定…
對此很是苦惱…
於是啊,只能藉由強大的google、百度,看看能不能解決我這個問題!一番搜尋,有學習了一個很有趣的東西…
Canal
canal:阿里開源mysql binlog 資料元件
官網解釋的相當詳細了(國產牛逼)…下邊我也是照搬過來的…
官網地址如下:https://github.com/alibaba/canal/wiki
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的紀錄檔解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支援mysql5.x和oracle部分版本的紀錄檔解析
canal [kə’næl],譯意為水道/管道/溝渠,主要用途是基於 MySQL 資料庫增量紀錄檔解析,提供增量資料訂閱和消費
工作原理
canal呢,實際是就是運用了Mysql的主從複製原理…
MySQL主從複製實現
複製遵循三步過程:
show binary events
)如何運作
原理很簡單:
通過官網的介紹,讓我們瞭解到,canal實際上就是偽裝為了一個從庫,我們只需要訂閱到資料變更的主庫,那麼canal就會以從庫的身份讀取到其主庫的binlog紀錄檔!我們拿到canal解析好的binlog紀錄檔資訊,就等於拿到了變更的資料啦!…
這樣的話呢,我們即保證了不影響其系統資料庫正常使用,又不會侵入他的專案程式碼,一舉兩得
ok,接下來開始實戰篇…
使用canal呢,有一個前提條件,即被訂閱的資料庫需要開啟binlog
如何檢視是否開啟binlog呢?
登入伺服器上資料庫或在視覺化工具中 執行查詢語句: 如果出現 log_bin ON 表示已開啟Binlog
show variables like 'log_bin';
如果伺服器上的資料庫為自己安裝的,則找到組態檔my.conf 新增以下內容,如果買的雲範例,則詢問廠商開啟即可
在my.conf檔案中的 [mysqld] 下新增以下三行內容
log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 讀行 server_id=1 # 設定 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
canaltest:作為slave 角色的賬戶 Canal123…:為密碼
CREATE USER canaltest IDENTIFIED BY 'Canal123..'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ; FLUSH PRIVILEGES;
連線測試
那麼到這裡,準備工作就好了!
可能呢,有的小夥伴有點懵,你這是在幹啥?那麼咱們就來理那麼一理! 敲黑板了哈!
1、事前準備,是針對於訂閱資料庫的(即主庫)
2、實際步驟也就兩步 1:更改設定,開啟binlog 2:設定新賬號,賦予slave許可權,供canal讀取Binlog橋樑使用
3、以上操作與canal本身沒啥關係,僅僅是使用canal的前提條件罷遼…
canal admin 是 一個視覺化的 canal web管理運維工程,脫離以往伺服器運維,面向web…
canal-admin設計上是為canal提供整體設定管理、節點運維等面向運維的功能,提供相對友好的WebUI操作介面,方便更多使用者快速和安全的操作
canal-admin的限定依賴:
下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
解壓
mkdir /usr/local/canal-admin tar zxvf canal.admin-1.1.4.tar.gz -C /usr/local/canal-admin
進入canal-admin目錄下檢視
cd /usr/local/canal-admin
修改設定
vim conf/application.yml
裡邊的設定 按照自己的實際情況更改…
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 #這裡是設定canal-admin 所依賴的資料庫,,,存放web管理中設定的設定等,,, spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 # 連線所用的賬戶密碼 canal: adminUser: admin adminPasswd: leitest
匯入canaladmin 所需要的資料庫檔案
這裡需要注意了,要和 application.yml中的資料庫名對應,你可以選擇命令匯入,也可以Navicat 視覺化拖sql檔案匯入…一切…看你喜歡.
我這個玩canal的伺服器呢,是新安裝的,mysql直接用docker安裝即可,具體可檢視我的部落格:
Docker在CentOS7下不能下載映象timeout的解決辦法(圖解)
需要注意的是,使用docker 安裝的mysql 是無法直接使用 mysql -uroot -p
命令的哦,需要先將指令碼複製到容器中,docker不熟練或覺得麻煩的同鞋,請直接使用Navicat視覺化工具…
匯入canal-admin服務所必需的sql檔案
如果是伺服器軟體軟體安裝的mysql 則直接執行以下命令即可
mysql -uroot -p #......... # 匯入初始化SQL > source conf/canal_manager.sql
啟動
直接執行啟動指令碼即可
cd bin ./startup.sh
預設賬戶密碼:
admin:123456
canal-server 才是canal的核心我們前邊所講的canal的功能,實際上講述的就是canal-server的功能…admin 僅僅只是一個web管理而已,不要搞混主次關係…
下載
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解壓
mkdir /usr/local/canal-server tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal-server
啟動,並連線到canal-admin web端
首先,我們需要修改組態檔
cd /usr/local/canal-server vim /conf/canal_local.properties
注意了,密碼如何加密!!!
要記得,前邊 canal-admin 的 aplication.yml 中設定了賬戶密碼為 admin:leitest
# 連線所用的賬戶密碼 canal: adminUser: admin adminPasswd: leitest
所以,我們這裡需要對明文 leitest 加密並替換即可
使用資料庫函數 PASSWORD 加密即可
SELECT PASSWORD(‘要加密的明文’),然後去掉前邊的* 號就行
啟動並連線到admin
sh bin/startup.sh local
檢視埠看是否有 11110 、11111、11112
netstat -untlp
看了一下,發現沒有,說明server 沒有啟動成功
看下紀錄檔
vim logs/canal/canal.log
解決辦法:
1、canal-admin 先停止後從起
2、canal server 先以之前的形式執行,不輸入後邊 local 命令
3、關閉canal server
4、再以canal server 連線 admin 形式啟動
admin頁面上新建server
修改設定,註釋 (instance連線資訊,我們還是以前邊設定的 admin:leitest 為準,所有這裡需要註釋掉,如果不註釋,那麼我們程式碼中連線則需要使用此賬號以及密碼)
接下來咱們建立instance
如何理解server 和instance 呢,我認為,可以把它當做 java 中的 class 和 bean 即 類和物件
server 為類 instance 為其具體的範例物件 ,可建立多個不同的範例…
而我們這邊監聽到主庫變化的呢,則是根據業務,對不同的範例即(instance )做不同設定即可…
根據自己情況進行過濾資料
canal.instance.filter.regex | mysql 資料解析關注的表,Perl正規表示式.多個正則之間以逗號(,)分隔,跳脫符需要雙斜槓() 常見例子:1. 所有表:.* or .… 2. canal schema下所有表: canal…* 3. canal下的以canal打頭的表:canal.canal.* 4. canal schema下的一張表:canal.test15. 多個規則組合使用:canal…*,mysql.test1,mysql.test2 (逗號分隔) | ||
---|---|---|---|
canal.instance.filter.druid.ddl | 是否使用druid處理所有的ddl解析來獲取庫和表名 | true | |
canal.instance.filter.query.dcl | 是否忽略dcl語句 | false | |
canal.instance.filter.query.dml | 是否忽略dml語句 (mysql5.6之後,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL檔案) | false | |
canal.instance.filter.query.ddl | 是否忽略ddl語句 | false |
更多設定請見官網:https://github.com/alibaba/canal/wiki/AdminGuide
如此一來,一個簡單的canal環境就搭建好了,接下來,咱們開始測試吧!
引入canal所需依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
設定
canal: # instance 範例所在ip host: 192.168.96.129 # tcp通訊埠 port: 11111 # 賬號 canal-admin application.yml 設定的 username: admin # 密碼 password: leitest #範例名稱 instance: test
程式碼
package com.leilei; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; /** * @author lei * @version 1.0 * @date 2020/9/27 22:23 * @desc 讀取binlog紀錄檔 */ @Component public class ReadBinLogService implements ApplicationRunner { @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = getConn(); while (true) { conn.connect(); //訂閱範例中所有的資料庫和表 conn.subscribe(".*\..*"); // 回滾到未進行ack的地方 conn.rollback(); // 獲取資料 每次獲取一百條改變資料 Message message = conn.getWithoutAck(100); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { // 資料解析 analysis(message.getEntries()); }else { Thread.sleep(1000); } // 確認訊息 conn.ack(message.getId()); // 關閉連線 conn.disconnect(); } } /** * 資料解析 */ private void analysis(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { // 只解析mysql事務的操作,其他的不解析 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) { continue; if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { // 解析binlog CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("解析出現異常 data:" + entry.toString(), e); if (rowChange != null) { // 獲取操作型別 CanalEntry.EventType eventType = rowChange.getEventType(); // 獲取當前操作所屬的資料庫 String dbName = entry.getHeader().getSchemaName(); // 獲取當前操作所屬的表 String tableName = entry.getHeader().getTableName(); // 事務提交時間 long timestamp = entry.getHeader().getExecuteTime(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp); System.out.println("-------------------------------------------------------------"); } * 解析具體一條Binlog訊息的資料 * * @param dbName 當前操作所屬資料庫名稱 * @param tableName 當前操作所屬表名稱 * @param eventType 當前操作型別(新增、修改、刪除) private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) { System.out.println("資料庫:" + dbName); System.out.println("表名:" + tableName); System.out.println("操作型別:" + eventType); if (CanalEntry.EventType.INSERT.equals(eventType)) { System.out.println("新增資料:"); printColumn(afterColumns); } else if (CanalEntry.EventType.DELETE.equals(eventType)) { System.out.println("刪除資料:"); printColumn(beforeColumns); } else { System.out.println("更新資料:更新前資料--"); System.out.println("更新資料:更新後資料--"); System.out.println("操作時間:" + timestamp); private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); * 獲取連線 public CanalConnector getConn() { return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); }
測試檢視
資料庫修改資料庫時
資料新增資料時
刪除資料(把我們才新增的小明刪掉)
當我們操作監控的資料庫DM L操作的時候呢,會被canal監聽到…我們呢,通過canal監聽,拿到修改的庫,修改的表,修改的欄位,便可以根據自己業務進行資料處理了!
哎,這個時候啊,可能有小夥伴就要問了,那麼,我能不能直接獲取其操作的sql語句呢?
目前,我是自己解析其列來手動拼接的sql語句實現了
話不多說,先上效果:
canal 監聽到主庫sql變化----> update students set id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
canal 監聽到主庫sql變化----> delete from students where id=6
canal 監聽到主庫sql變化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','測試新增','深圳','2020-09-27 22:46:53','')
canal 監聽到主庫sql變化----> update students set id = '89', age = '98', name = '測試新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89
實際上呢,我們也就是拿到其執行前列資料變化 執行後列資料變化,自己拼接了一個sql罷了…附上程式碼
package com.leilei; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.google.protobuf.InvalidProtocolBufferException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author lei * @version 1.0 * @date 2020/9/27 22:33 * @desc 讀取binlog紀錄檔 */ @Component public class ReadBinLogToSql implements ApplicationRunner { //讀取的binlog sql 佇列快取 一邊Push 一邊poll private Queue<String> canalQueue = new ConcurrentLinkedQueue<>(); @Value("${canal.host}") private String host; @Value("${canal.port}") private int port; @Value("${canal.username}") private String username; @Value("${canal.password}") private String password; @Value("${canal.instance}") private String instance; @Override public void run(ApplicationArguments args) throws Exception { CanalConnector conn = getConn(); while (true) { try { conn.connect(); //訂閱範例中所有的資料庫和表 conn.subscribe(".*\..*"); // 回滾到未進行ack的地方 conn.rollback(); // 獲取資料 每次獲取一百條改變資料 Message message = conn.getWithoutAck(100); long id = message.getId(); int size = message.getEntries().size(); if (id != -1 && size > 0) { // 資料解析 analysis(message.getEntries()); } else { Thread.sleep(1000); } // 確認訊息 conn.ack(message.getId()); } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) { e.printStackTrace(); } finally { // 關閉連線 conn.disconnect(); } } } private void analysis(List<Entry> entries) throws InvalidProtocolBufferException { for (Entry entry : entries) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 儲存更新語句 * * @param entry */ private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> dataList = rowChange.getRowDatasList(); for (RowData rowData : dataList) { List<Column> afterColumnsList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < afterColumnsList.size(); i++) { sql.append(" ") .append(afterColumnsList.get(i).getName()) .append(" = '").append(afterColumnsList.get(i).getValue()) .append("'"); if (i != afterColumnsList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 儲存刪除語句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 儲存插入語句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> datasList = rowChange.getRowDatasList(); for (RowData rowData : datasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 獲取連線 */ public CanalConnector getConn() { return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } /** * 模擬消費canal轉換的sql語句 */ public void executeQueueSql() { int size = canalQueue.size(); for (int i = 0; i < size; i++) { String sql = canalQueue.poll(); System.out.println("canal 監聽到主庫sql變化----> " + sql); } } }
當然了,這只是簡單的demo 演示,您可根據自己的業務進行修改完善即可…
上邊的安裝步驟呢,我也是不斷的測試過,沒有問題,當然可能或多或少有些坑沒有踩到,但是如果您按照我的步驟來,大概率是一馬平川的…
附上專案原始碼:springboot-canal
到此這篇關於Springboot2.3.x整合Canal的文章就介紹到這了,更多相關Springboot2.3.x整合Canal內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45