<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
首先,到底啥是分散式事務呢,比如我們在執行一個業務邏輯的時候有兩步分別操作A資料來源和B資料來源,當我們在A資料來源執行資料更改後,在B資料來源執行時出現執行時異常,那麼我們必須要讓B資料來源的操作回滾,並回滾對A資料來源的操作;這種情況在支付業務時常常出現;比如買票業務在最後支付失敗,那之前的操作必須全部回滾,如果之前的操作分佈在多個資料來源中,那麼這就是典型的分散式事務回滾;
瞭解了什麼是分散式事務,那分散式事務在java的解決方案就是JTA(即Java Transaction API);springboot官方提供了 Atomikos or Bitronix的解決思路;
其實,大多數情況下很多公司是使用訊息佇列的方式實現分散式事務。
本篇文章重點講解springboot環境下,整合 Atomikos +mysql+mybatis+tomcat/jetty;
pom.xml中新增atomikos的springboot相關依賴:
<!--分散式事務--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
點進去會發現裡面整合好了:transactions-jms
、transactions-jta
、transactions-jdbc
、javax.transaction-api
把資料來源的相關設定項單獨提煉到一個application.yml中:
注意:
spring.datasource.type
是com.alibaba.druid.pool.xa.DruidXADataSource;
spring.jta.transaction-manager-id
的值在你的電腦中是唯一的,這個詳細請閱讀官方檔案;完整的yml檔案如下:
spring: datasource: type: com.alibaba.druid.pool.xa.DruidXADataSource druid: systemDB: name: systemDB url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8 username: root password: root # 下面為連線池的補充設定,應用到上面所有資料來源中 # 初始化大小,最小,最大 initialSize: 5 minIdle: 5 maxActive: 20 # 設定獲取連線等待超時的時間 maxWait: 60000 # 設定間隔多久才進行一次檢測,檢測需要關閉的空閒連線,單位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 設定一個連線在池中最小生存的時間,單位是毫秒 minEvictableIdleTimeMillis: 30 validationQuery: SELECT 1 validationQueryTimeout: 10000 testWhileIdle: true testOnBorrow: false testOnReturn: false # 開啟PSCache,並且指定每個連線上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 filters: stat,wall # 通過connectProperties屬性來開啟mergeSql功能;慢SQL記錄 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合併多個DruidDataSource的監控資料 useGlobalDataSourceStat: true businessDB: name: businessDB url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8 username: root password: root # 下面為連線池的補充設定,應用到上面所有資料來源中 # 初始化大小,最小,最大 initialSize: 5 minIdle: 5 maxActive: 20 # 設定獲取連線等待超時的時間 maxWait: 60000 # 設定間隔多久才進行一次檢測,檢測需要關閉的空閒連線,單位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 設定一個連線在池中最小生存的時間,單位是毫秒 minEvictableIdleTimeMillis: 30 validationQuery: SELECT 1 validationQueryTimeout: 10000 testWhileIdle: true testOnBorrow: false testOnReturn: false # 開啟PSCache,並且指定每個連線上PSCache的大小 poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 filters: stat,wall # 通過connectProperties屬性來開啟mergeSql功能;慢SQL記錄 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 合併多個DruidDataSource的監控資料 useGlobalDataSourceStat: true #jta相關引數設定 jta: log-dir: classpath:tx-logs transaction-manager-id: txManager
在DruidConfig.java中實現多個資料來源的註冊;分散式事務管理器的註冊;druid的註冊
package com.zjt.config; import com.alibaba.druid.filter.stat.StatFilter; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; import com.alibaba.druid.wall.WallConfig; import com.alibaba.druid.wall.WallFilter; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.env.Environment; import org.springframework.transaction.jta.JtaTransactionManager; import javax.sql.DataSource; import javax.transaction.UserTransaction; import java.util.Properties; /** * Druid設定 * * */ @Configuration public class DruidConfig { @Bean(name = "systemDataSource") @Primary @Autowired public DataSource systemDataSource(Environment env){ AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); Properties prop = build(env, "spring.datasource.druid.systemDB."); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName("systemDB"); ds.setPoolSize(5); ds.setXaProperties(prop); return ds; } @Autowired @Bean(name = "businessDataSource") public AtomikosDataSourceBean businessDataSource(Environment env){ AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); Properties prop = build(env, "spring.datasource.druid.businessDB."); ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); ds.setUniqueResourceName("businessDB"); ds.setPoolSize(5); ds.setXaProperties(prop); return ds; } /** * 注入事物管理器 * @return */ @Bean(name = "xatx") public JtaTransactionManager regTransactionManager (){ UserTransactionManager userTransactionManager = new UserTransactionManager(); UserTransaction userTransaction = new UserTransactionImp(); return new JtaTransactionManager(userTransaction, userTransactionManager); } private Properties build(Environment env, String prefix){ Properties prop = new Properties(); prop.put("url", env.getProperty(prefix + "url")); prop.put("username", env.getProperty(prefix + "username")); prop.put("password", env.getProperty(prefix + "password")); prop.put("driverClassName", env.getProperty(prefix + "driverClassName", "")); prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class)); prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class)); prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class)); prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class)); prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class)); prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class)); prop.put("maxPoolPreparedStatementPerConnectionSize", env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class)); prop.put("validationQuery", env.getProperty(prefix + "validationQuery")); prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class)); prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class)); prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class)); prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class)); prop.put("timeBetweenEvictionRunsMillis", env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class)); prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class)); prop.put("filters", env.getProperty(prefix + "filters")); return prop; } @Bean public ServletRegistrationBean druidServlet(){ ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*"); //控制檯管理使用者,加入下面2行 進入druid後臺就需要登入 //servletRegistrationBean.addInitParameter("loginUsername", "admin"); //servletRegistrationBean.addInitParameter("loginPassword", "admin"); return servletRegistrationBean; } @Bean public FilterRegistrationBean filterRegistrationBean(){ FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(); filterRegistrationBean.setFilter(new WebStatFilter()); filterRegistrationBean.addUrlPatterns("/*"); filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"); filterRegistrationBean.addInitParameter("profileEnable", "true"); return filterRegistrationBean; } @Bean public StatFilter statFilter(){ StatFilter statFilter = new StatFilter(); statFilter.setLogSlowSql(true); //slowSqlMillis用來設定SQL慢的標準,執行時間超過slowSqlMillis的就是慢。 statFilter.setMergeSql(true); //SQL合併設定 statFilter.setSlowSqlMillis(1000);//slowSqlMillis的預設值為3000,也就是3秒。 return statFilter; } @Bean public WallFilter wallFilter(){ WallFilter wallFilter = new WallFilter(); //允許執行多條SQL WallConfig config = new WallConfig(); config.setMultiStatementAllow(true); wallFilter.setConfig(config); return wallFilter; } }
分別設定每個資料來源對應的sqlSessionFactory,以及MapperScan掃描的包
MybatisDatasourceConfig.java
package com.zjt.config; import com.zjt.util.MyMapper; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /** * * @description */ @Configuration // 精確到 mapper 目錄,以便跟其他資料來源隔離 @MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory") public class MybatisDatasourceConfig { @Autowired @Qualifier("systemDataSource") private DataSource ds; @Bean public SqlSessionFactory sqlSessionFactory() throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(ds); //指定mapper xml目錄 ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml")); return factoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate() throws Exception { SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面設定的Factory return template; } //關於事務管理器,不管是JPA還是JDBC等都實現自介面 PlatformTransactionManager // 如果你新增的是 spring-boot-starter-jdbc 依賴,框架會預設注入 DataSourceTransactionManager 範例。 //在Spring容器中,我們手工註解@Bean 將被優先載入,框架不會重新範例化其他的 PlatformTransactionManager 實現類。 /*@Bean(name = "transactionManager") @Primary public DataSourceTransactionManager masterTransactionManager() { //MyBatis自動參與到spring事務管理中,無需額外設定,只要org.mybatis.spring.SqlSessionFactoryBean參照的資料來源 // 與DataSourceTransactionManager參照的資料來源一致即可,否則事務管理會不起作用。 return new DataSourceTransactionManager(ds); }*/ }
MybatisDatasource2Config.java
package com.zjt.config; import com.zjt.util.MyMapper; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /** * * @description */ @Configuration // 精確到 mapper 目錄,以便跟其他資料來源隔離 @MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2") public class MybatisDatasource2Config { @Autowired @Qualifier("businessDataSource") private DataSource ds; @Bean public SqlSessionFactory sqlSessionFactory2() throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(ds); //指定mapper xml目錄 ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml")); return factoryBean.getObject(); } @Bean public SqlSessionTemplate sqlSessionTemplate2() throws Exception { SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面設定的Factory return template; } //關於事務管理器,不管是JPA還是JDBC等都實現自介面 PlatformTransactionManager // 如果你新增的是 spring-boot-starter-jdbc 依賴,框架會預設注入 DataSourceTransactionManager 範例。 //在Spring容器中,我們手工註解@Bean 將被優先載入,框架不會重新範例化其他的 PlatformTransactionManager 實現類。 /*@Bean(name = "transactionManager2") @Primary public DataSourceTransactionManager masterTransactionManager() { //MyBatis自動參與到spring事務管理中,無需額外設定,只要org.mybatis.spring.SqlSessionFactoryBean參照的資料來源 // 與DataSourceTransactionManager參照的資料來源一致即可,否則事務管理會不起作用。 return new DataSourceTransactionManager(ds); }*/ }
由於我們本例中只使用一個事務管理器:xatx,故就不在使用TxAdviceInterceptor.java
和TxAdvice2Interceptor.java
中設定的事務管理器了;有需求的童鞋可以自己設定其他的事務管理器;(見DruidConfig.java中檢視)
新建分散式業務測試介面JtaTestService.java和實現類JtaTestServiceImpl.java
其實就是一個很簡單的test01()方法,在該方法中我們分別先後呼叫classService.saveOrUpdateTClass(tClass);
和teacherService.saveOrUpdateTeacher(teacher);
實現先後操作兩個資料來源:然後我們可以自己debug跟蹤事務的提交時機,此外,也可以在在兩個方法全執行結束之後,手動製造一個執行時異常,來檢查分散式事務是否全部回滾;
注意:
在實現類的方法中我使用的是:
@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })從而指定了使用哪個事務管理器,事務隔離級別(一般都用我這個預設的),回滾的條件(一般可以使用Exception),這三個可以自己根據業務實際修改;
package com.zjt.service3; import java.util.Map; public interface JtaTestService { public Map<String,Object> test01(); }
package com.zjt.service3.impl; import com.zjt.entity.TClass; import com.zjt.entity.Teacher; import com.zjt.service.TClassService; import com.zjt.service2.TeacherService; import com.zjt.service3.JtaTestService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.LinkedHashMap; import java.util.Map; @Service("jtaTestServiceImpl") public class JtaTestServiceImpl implements JtaTestService{ @Autowired @Qualifier("teacherServiceImpl") private TeacherService teacherService; @Autowired @Qualifier("tclassServiceImpl") private TClassService tclassService; @Override @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class }) public Map<String, Object> test01() { LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>(); TClass tClass=new TClass(); tClass.setName("8888"); tclassService.saveOrUpdateTClass(tClass); Teacher teacher=new Teacher(); teacher.setName("8888"); teacherService.saveOrUpdateTeacher(teacher); System.out.println(1/0); resultMap.put("state","success"); resultMap.put("message","分散式事務同步成功"); return resultMap; } }
建立JtaTestContoller.java,接受一個來自前端的http請求,觸發JtaTestService 的test01方法
package com.zjt.web; import com.zjt.service3.JtaTestService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.LinkedHashMap; import java.util.Map; @Controller @RequestMapping("/jtaTest") public class JtaTestContoller { @Autowired @Qualifier("jtaTestServiceImpl") private JtaTestService taTestService; @ResponseBody @RequestMapping("/test01") public Map<String,Object> test01(){ LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>(); try { return taTestService.test01(); }catch (Exception e){ resultMap.put("state","fail"); resultMap.put("message","分散式事務同步失敗"); return resultMap; } } }
//分散式事務測試 $("#JTATest").click(function(){ $.ajax({ type: "POST", url: "${basePath!}/jtaTest/test01", data: {} , async: false, error: function (request) { layer.alert("與伺服器連線失敗/(ㄒoㄒ)/~~"); return false; }, success: function (data) { if (data.state == 'fail') { layer.alert(data.message); return false; }else if(data.state == 'success'){ layer.alert(data.message); } } }); }); <button class="layui-btn" id="JTATest">同時向班級和老師表插入名為8888的班級和老師</button>
點選這個按鈕,跳轉到controller:
當正常執行了sql語句之後,我們可以發現資料庫並沒有變化,因為整個方法的事務還沒有走完,當我們走到1/0這步時:
丟擲執行時異常,並被spring事務攔截器攔截,並捕獲異常:
在this.completeTransactionAfterThrowing(txInfo, var16);
方法中會將事務全部回滾:
22:09:04.243 logback [http-nio-8080-exec-5] INFO c.a.i.imp.CompositeTransactionImp - rollback() done of transaction 192.168.1.103.tm0000400006
此時,當我們再次開啟資料庫驗證,依舊沒有變化,證明分散式事務設定成功;
大家可以基於我的程式碼自己練習一下,自己嘗試著使用多事務管理器的情況下的靈活設定;
到此這篇關於java SpringBoot 分散式事務的解決方案(JTA+Atomic+多資料來源)的文章就介紹到這了,更多相關java SpringBoot 分散式事務 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45