<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
為了瞭解Seata AT模式的原理,我通過原始碼解讀的方式畫出了Seata AT模式啟動的圖示:
如果是基於Springboot專案的話,專案啟動的使用,一般約定會先檢視spring.factories檔案,設定了哪些類是需要自動裝配的。Seata也是利用了這個約定,在專案啟動的時候,預設會裝配指定的類,以完成Seata相關元件的初始化。
下面我們來一起根據原始碼解讀Seata AT模式啟動流程。
由上圖可知,Seata AT模式可大概分成以下三部分:
1.與底層資料庫打交道的DataSource,這部分功能處理交給了SeataDataSourceAutoConfiguration。
2.處理@GlobalTransactional註解,實現分散式事務管理功能,這部分交給SeataAutoConfiguration處理。
3.分支事務獲取、銷燬全域性事務XID,這部分功能交給HttpAutoConfiguration。
首先,我們來看看Seata是如何處理DataSource的。
// 依賴DataSource @ConditionalOnBean(DataSource.class) // 三個設定都要為true @ConditionalOnExpression("${seata.enabled:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}") @AutoConfigureAfter({SeataCoreAutoConfiguration.class}) public class SeataDataSourceAutoConfiguration { /** * The bean seataAutoDataSourceProxyCreator. */ @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR) // 可替換 @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class) public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) { return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode()); } }
1.@ConditionalOnBean(DataSource.class)意味著我們的專案中一定要有DataSource這個Bean。
2.@ConditionalOnExpression裡面表示要滿足以下三個條件才會建立SeataDataSourceAutoConfiguration:
seata.enabled=true
seata.enableAutoDataSourceProxy=true
seata.enable-auto-data-source-proxy=true
3.@AutoConfigureAfter表示當前Bean建立一定在指定的SeataCoreAutoConfiguration之後。
根據以上分析,我們在引入Seata AT模式的時候,一定要先建立專案的DataSource Bean物件,其次保證相關的設定滿足要求,那麼才能正確地保證DataSource被Seata代理。
下面繼續看SeataAutoDataSourceProxyCreator
的建立:
@ConditionalOnMissingBean表示這個Bean的建立其實是可以開發人員自定義的,如果開發人員沒有自定義,那麼就由Seata自己建立。
在SeataAutoDataSourceProxyCreator
類中,它繼承了AbstractAutoProxyCreator
,也就是AOP功能的標準實現。這個時候,我們主要關注wrapIfNecessary
方法的實現:
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator { @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // 不是DataSource物件不代理 if (!(bean instanceof DataSource)) { return bean; } // 如果是DataSource物件,但是不是SeataDataSourceProxy物件 if (!(bean instanceof SeataDataSourceProxy)) { // 先呼叫父類別包裝一層 Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey); // 如果代理後的物件和代理前的物件是同一個物件 // 說明要麼這個物件之前已經被代理過 // 要麼SeataDataSourceProxy被開發人員excluded if (bean == enhancer) { return bean; } // 如果是正常的DataSource物件的話,那麼就會被自動構建成SeataDataSourceProxy,並返回 DataSource origin = (DataSource) bean; SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode); DataSourceProxyHolder.put(origin, proxy); return enhancer; } /* * things get dangerous when you try to register SeataDataSourceProxy bean by yourself! * if you insist on doing so, you must make sure your method return type is DataSource, * because this processor will never return any subclass of SeataDataSourceProxy */ // Seata是不建議使用者自己構建SeataDataSourceProxy物件的,即使使用者自己構建了SeataDataSourceProxy物件,Seata也會重新處理 LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName); // 獲取使用者包裝好的代理物件 SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean; // 獲取原生DataSource DataSource origin = proxy.getTargetDataSource(); // 重新包裝,並返回 Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey); // this mean origin is either excluded by user or had been proxy before if (origin == originEnhancer) { return origin; } // else, put <origin, proxy> to holder and return originEnhancer DataSourceProxyHolder.put(origin, proxy); // 返回包裝好的代理物件SeataDataSourceProxy return originEnhancer; } }
1.通過以上程式碼解讀,有一個點我們需要注意,就是開發人員不需要自己的構建SeataDataSourceProxy物件,使用原生的DataSource即可,Seata會幫助我們構建SeataDataSourceProxy物件。
SeatAutoConfiguration
主要功能就是建立GlobalTransactionScanner
物件,所以核心功能全部在GlobalTransactionScanner
裡面。
// 設定seata.enabled=true @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) // 裝配順序 @AutoConfigureAfter({SeataCoreAutoConfiguration.class}) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) // 失敗處理器,可替換 @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) // 開發人員可自定義GlobalTransactionScanner @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler, ConfigurableListableBeanFactory beanFactory, @Autowired(required = false) List<ScannerChecker> scannerCheckers) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } // set bean factory GlobalTransactionScanner.setBeanFactory(beanFactory); // add checkers // '/META-INF/services/io.seata.spring.annotation.ScannerChecker' GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class)); // spring beans GlobalTransactionScanner.addScannerCheckers(scannerCheckers); // add scannable packages GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages()); // add excludeBeanNames GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning()); //set accessKey and secretKey GlobalTransactionScanner.setAccessKey(seataProperties.getAccessKey()); GlobalTransactionScanner.setSecretKey(seataProperties.getSecretKey()); // create global transaction scanner return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
1.裝配SeataAutoConfiguration要求設定中seata.enabled=true
;
2.我們可以自定義FailureHandler
;這個失敗處理器是專門給TM
使用的;
3.同樣我們也可以自定義GlobalTransactionScanner
,不過基本上不會這麼做,除非有特殊需求;
GlobalTransactionScanner
裡面基本上做兩個事情:
@GlobalTransactional
或@GlobalLock
註解的方法;TM Client
和RM Client
,以便實現和TC
通訊;TC
也就是我們的Seata Server
;public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { }
AbstractAutoProxyCreator
:通過wrapIfNecessary
方法代理所有被@GlobalTransactional
或@GlobalLock
註解的方法;ConfigurationChangeListener
:通過onChangeEvent
方法監聽設定service.disableGlobalTransaction
的變化;InitializingBean
:通過afterPropertiesSet
方法初始化TM Client
和RM Client
;ApplicationContextAware
:通過setApplicationContext
方法獲取IOC容器;DisposableBean
:當GlobalTransactionScanner被銷燬時,通過destroy
方法來回收資源;我們重點關注wrapIfNecessary
和afterPropertiesSet
方法:
@Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // 檢查Bean是否符合被代理的要求 // 1. 不能是設定類,比如以Configuration、Properties、Config結尾的Bean名稱 // 2. Bean所在的包名在掃描範圍內 // 3. 不能被@Scope註解 if (!doCheckers(bean, beanName)) { return bean; } try { synchronized (PROXYED_SET) { // 如果已經被代理,就跳過 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; // 檢查是否被TCC註解 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // 初始化TCC Fence Clean Task TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); // 建立TCC代理類 interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { // 如果不是TCC代理,那麼先獲取當前類和它實現的介面 Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // 判斷當前類及相關方法是否被@GlobalTransactional或@GlobalLock註解 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { // 沒有被註解,不代理 return bean; } // 準備建立方法攔截器 if (globalTransactionalInterceptor == null) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } // 攔截器建立完畢 interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); // 如果bean不是代理物件,那麼不做方法攔截,直接返回 if (!AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { // 準備把方法攔截器插入進去 AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); // 獲取所有的方法攔截器,包括GlobalTransactionalInterceptor Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); int pos; // 依次新增進目標物件中 for (Advisor avr : advisor) { // Find the position based on the advisor's order, and add to advisors by pos pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } PROXYED_SET.add(beanName); // 返回被代理的bean return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } }
通過上述原始碼分析可知:Seata是根據類、介面和方法上的@GlobalTransactional
或@GlobalLock
註解來判斷是否需要針對目標方法做攔截的。
@Override public void afterPropertiesSet() { // 如果不允許全域性事務 if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } // 新增監聽器,監聽設定的變化 ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this); return; } if (initialized.compareAndSet(false, true)) { // 準備初始化TM Client、RM Client initClient(); } } private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //初始化TM Client TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //初始化RM Client RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }
至此,SeatAutoConfiguration的工作處理完畢;
HttpAutoConfiguration
的工作比較簡單,我們想象一下,RM如何知道它屬於哪一個分散式事務?這就需要一個統一的標識來決定所有的分支事務都屬於同一個分散式事務,這個標識在Seata中叫做XID
;
XID
由TM開啟分散式事務時生成,通過RPC的方式從一個分支事務傳遞到另一個分支事務,所以我們在RM端需要一個從RPC中解析獲取XID
的功能,以及在業務邏輯處理完畢後,銷燬當前執行緒中XID
的功能。
@Configuration(proxyBeanMethods = false) @ConditionalOnWebApplication public class HttpAutoConfiguration extends WebMvcConfigurerAdapter { // 註冊攔截器 @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new TransactionPropagationInterceptor()); } // 新增異常解析處理器 @Override public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) { exceptionResolvers.add(new HttpHandlerExceptionResolver()); } } public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class); // 前置處理邏輯 @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { // 獲取當前執行緒XID String xid = RootContext.getXID(); // 從rpc中獲取XID String rpcXid = request.getHeader(RootContext.KEY_XID); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid); } // 如果執行緒中沒有XID,並且從請求中拿到了XID,那麼把請求中的XID繫結到當前執行緒 if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) { RootContext.bind(rpcXid); if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind[{}] to RootContext", rpcXid); } } return true; } // 後置處理邏輯 @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { // 業務邏輯處理完畢,從當前執行緒中刪除XID if (RootContext.inGlobalTransaction()) { XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)); } } } public class HttpHandlerExceptionResolver extends AbstractHandlerExceptionResolver { // 發生異常後,刪除當前執行緒中的XID @Override protected ModelAndView doResolveException(HttpServletRequest request, HttpServletResponse httpServletResponse, Object o, Exception e) { XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)); return null; } }
通過以上原始碼分析和圖解Seata AT模式,我們可以瞭解到以下幾點:
1.GlobalTransactionInterceptor
屬於TM側,它主要負責通過TM Client
開啟分散式事務、提交分散式事務以及回滾分散式事務;屬於大總管。
2.SeataDataSourceProxy
屬於RM側,它主要負責分支事務的開啟,提交以及回滾,屬於真正幹活的小兵。
3.TM Client
和RM Client
純屬於兩個通訊工具,負責與TC
端建立通訊。
4.TransactionPropagationInterceptor
和HttpHandlerExceptionResolver
服務於分支事務,負責全域性事務XID的獲取以及業務邏輯處理完畢的善後事宜。
以上就是Seata AT模式啟動過程圖文範例詳解的詳細內容,更多關於Seata AT模式啟動過程的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45