<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
我們在很多部落格中都有發現,Seata AT模式裡面的全域性鎖其實是行鎖,這也是Seata AT模式和XA模式在鎖粒度上的最大區別。我們可以在官網看到這樣一個例子:
兩個全域性事務 tx1 和 tx2,分別對 a 表的 m 欄位進行更新操作,m 的初始值 1000。
tx1 先開始,開啟本地事務,拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全域性鎖 ,本地提交釋放本地鎖。 tx2 後開始,開啟本地事務,拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全域性鎖 ,tx1 全域性提交前,該記錄的全域性鎖被 tx1 持有,tx2 需要重試等待 全域性鎖 。
tx1 二階段全域性提交,釋放 全域性鎖 。tx2 拿到 全域性鎖 提交本地事務。
如果 tx1 的二階段全域性回滾,則 tx1 需要重新獲取該資料的本地鎖,進行反向補償的更新操作,實現分支的回滾。
此時,如果 tx2 仍在等待該資料的 全域性鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全域性鎖 等鎖超時,放棄 全域性鎖 並回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。
因為整個過程 全域性鎖 在 tx1 結束前一直是被 tx1 持有的,所以不會發生 髒寫 的問題。
那麼你知道Seata AT模式是如何實現行鎖的嘛?為了搞明白AT模式到底是怎麼獲取全域性鎖的,我們深入原始碼來看看。
為了證實全域性鎖就是我們所說的行鎖,經過一番尋找,我在BaseTransactionalExecutor
類中的prepareUndoLog()
方法中找到了這樣一段程式碼:
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; String lockKeys = buildLockKey(lockKeyRecords); if (null != lockKeys) { connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); }
beforeImage
生成行鎖標記,否則通過afterImage
生成行鎖標記;比如表名wallet_tbl
,裡面有一個主鍵id
值為1,那麼最終生成的lockKeys
為wallet_tbl:1
,如果有多行記錄id
值分別為1、2、3,那麼最終生成的lockKeys
為wallet_tbl:1,2,3
;多個主鍵索引的話使用_
連線。所以我們可以總結出lockKeys
的生成規則為:tableName:1_A,2_B,3_C
,1
、2
、3
、A
、B
、C
分別為主鍵索引的值。
此時還沒有真正地拿到鎖,只是生成一個鎖的標記。真正地上鎖需要檢視ConnectionProxy.register()
方法:
private void register() throws TransactionException { if (!context.hasUndoLog() || !context.hasLockKey()) { return; } Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys()); context.setBranchId(branchId); }
branchRegister()
方法就是RM
向TC
進行分支註冊,同時會申請行鎖。那麼獲取行鎖的核心程式碼應該就是在TC
端了,我們順著branchRegister()
邏輯一路找到BranchSession.lock()
:
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException { if (this.getBranchType().equals(BranchType.AT)) { // 只有AT模式需要獲取行鎖 return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock); } return true; }
下面就要真正地開始進入LockerManager
來申請鎖了:
@Override public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch // 將lockKey解析成多行RowLock List<RowLock> locks = collectRowLocks(branchSession); if (CollectionUtils.isEmpty(locks)) { // no lock return true; } return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock); }
這裡做了一步將lockKey解析成多行RowLock,根據上面的tableName:1_A,2_B,3_C
規則,最終解析成3個RowLock
物件:{tableName,1_A},{tableName,2_B},{tableName,3_C}
最終我們追蹤到最後一個關鍵方法LockStoreDataBaseDAO.acquireLock()
:
@Override public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; Set<String> dbExistedRowKeys = new HashSet<>(); boolean originalAutoCommit = true; // 如果有多行鎖,那麼先去重 if (lockDOs.size() > 1) { lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList()); } try { conn = lockStoreDataSource.getConnection(); if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } List<LockDO> unrepeatedLockDOs = lockDOs; //check lock if (!skipCheckLock) { boolean canLock = true; // 查詢是否已經存在行鎖 // "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc" // in裡面最多限制1000個 String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size()); ps = conn.prepareStatement(checkLockSQL); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 1, lockDOs.get(i).getRowKey()); } rs = ps.executeQuery(); String currentXID = lockDOs.get(0).getXid(); boolean failFast = false; while (rs.next()) { String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID); // 如果發現有其他分散式事務和當前申請行鎖的資料一致,那麼加鎖失敗 if (!StringUtils.equals(dbXID, currentXID)) { if (LOGGER.isInfoEnabled()) { String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK); String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME); long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID); LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId); } if (!autoCommit) { int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS); if (status == LockStatus.Rollbacking.getCode()) { failFast = true; } } // 加鎖失敗 canLock = false; break; } dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY)); } // 加鎖失敗,回滾拋異常 if (!canLock) { conn.rollback(); if (failFast) { throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast)); } return false; } // 如果是同一個分散式事務中申請行鎖,那麼剔除重複的鎖資料 if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) { unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())) .collect(Collectors.toList()); } // 如果剔除後不需要再補充行鎖,那麼直接返回申請成功 if (CollectionUtils.isEmpty(unrepeatedLockDOs)) { conn.rollback(); return true; } } // 申請行鎖,分1行和多行兩種情況 if (unrepeatedLockDOs.size() == 1) { LockDO lockDO = unrepeatedLockDOs.get(0); if (!doAcquireLock(conn, lockDO)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk()); } conn.rollback(); return false; } } else { if (!doAcquireLocks(conn, unrepeatedLockDOs)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList())); } conn.rollback(); return false; } } conn.commit(); return true; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(rs, ps); if (conn != null) { try { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } catch (SQLException e) { } } } }
1.先通過查詢語句檢查是否存在鎖衝突,鎖衝突的話,就直接失敗拋異常;
2.不存在鎖衝突,檢查是否鎖重入,重入的話,補充行鎖;
3.新增行鎖;
檢查鎖衝突的SQL語句如下:
select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc
新增行鎖SQL語句如下:
insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)
根據上面加鎖的邏輯,我們發現一直比較的都是row_key
這個主鍵,那麼為什麼row_key
代表的是行鎖呢?這個問題就要回到row_key
是如何產生的:
protected LockDO convertToLockDO(RowLock rowLock) { LockDO lockDO = new LockDO(); lockDO.setBranchId(rowLock.getBranchId()); lockDO.setPk(rowLock.getPk()); lockDO.setResourceId(rowLock.getResourceId()); // row_key的生成 lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk())); lockDO.setXid(rowLock.getXid()); lockDO.setTransactionId(rowLock.getTransactionId()); lockDO.setTableName(rowLock.getTableName()); return lockDO; }
根據上面程式碼,我們很清楚地瞭解到,row_key
是由resource_id
、tableName
、pk
這三個欄位連線生成的,也就意味著row_key
是代表表裡面的具體一行資料,也就是我們的行記錄,所以我們確信AT
模式的全域性鎖其實就是行鎖。
以上就是Seata AT模式如何實現行鎖詳解的詳細內容,更多關於Seata AT模式實現行鎖的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45