首頁 > 軟體

Seata AT模式啟動過程圖文範例詳解

2022-10-02 14:00:14

背景

為了瞭解Seata AT模式的原理,我通過原始碼解讀的方式畫出了Seata AT模式啟動的圖示:

如果是基於Springboot專案的話,專案啟動的使用,一般約定會先檢視spring.factories檔案,設定了哪些類是需要自動裝配的。Seata也是利用了這個約定,在專案啟動的時候,預設會裝配指定的類,以完成Seata相關元件的初始化。

下面我們來一起根據原始碼解讀Seata AT模式啟動流程。

由上圖可知,Seata AT模式可大概分成以下三部分:

1.與底層資料庫打交道的DataSource,這部分功能處理交給了SeataDataSourceAutoConfiguration。

2.處理@GlobalTransactional註解,實現分散式事務管理功能,這部分交給SeataAutoConfiguration處理。

3.分支事務獲取、銷燬全域性事務XID,這部分功能交給HttpAutoConfiguration。

SeataDataSourceAutoConfiguration

首先,我們來看看Seata是如何處理DataSource的。

// 依賴DataSource
@ConditionalOnBean(DataSource.class)
// 三個設定都要為true
@ConditionalOnExpression("${seata.enabled:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataDataSourceAutoConfiguration {
    /**
     * The bean seataAutoDataSourceProxyCreator.
     */
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    // 可替換
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
            seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }
}

1.@ConditionalOnBean(DataSource.class)意味著我們的專案中一定要有DataSource這個Bean。

2.@ConditionalOnExpression裡面表示要滿足以下三個條件才會建立SeataDataSourceAutoConfiguration:

seata.enabled=true

seata.enableAutoDataSourceProxy=true

seata.enable-auto-data-source-proxy=true

3.@AutoConfigureAfter表示當前Bean建立一定在指定的SeataCoreAutoConfiguration之後。

根據以上分析,我們在引入Seata AT模式的時候,一定要先建立專案的DataSource Bean物件,其次保證相關的設定滿足要求,那麼才能正確地保證DataSource被Seata代理。

下面繼續看SeataAutoDataSourceProxyCreator的建立:

@ConditionalOnMissingBean表示這個Bean的建立其實是可以開發人員自定義的,如果開發人員沒有自定義,那麼就由Seata自己建立。

SeataAutoDataSourceProxyCreator類中,它繼承了AbstractAutoProxyCreator,也就是AOP功能的標準實現。這個時候,我們主要關注wrapIfNecessary方法的實現:

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
  @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // 不是DataSource物件不代理
        if (!(bean instanceof DataSource)) {
            return bean;
        }
        // 如果是DataSource物件,但是不是SeataDataSourceProxy物件
        if (!(bean instanceof SeataDataSourceProxy)) {
            // 先呼叫父類別包裝一層
            Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
            // 如果代理後的物件和代理前的物件是同一個物件
            // 說明要麼這個物件之前已經被代理過
            // 要麼SeataDataSourceProxy被開發人員excluded
            if (bean == enhancer) {
                return bean;
            }
            // 如果是正常的DataSource物件的話,那麼就會被自動構建成SeataDataSourceProxy,並返回
            DataSource origin = (DataSource) bean;
            SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
            DataSourceProxyHolder.put(origin, proxy);
            return enhancer;
        }
        /*
         * things get dangerous when you try to register SeataDataSourceProxy bean by yourself!
         * if you insist on doing so, you must make sure your method return type is DataSource,
         * because this processor will never return any subclass of SeataDataSourceProxy
         */
        // Seata是不建議使用者自己構建SeataDataSourceProxy物件的,即使使用者自己構建了SeataDataSourceProxy物件,Seata也會重新處理
        LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName);
        // 獲取使用者包裝好的代理物件
        SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
        // 獲取原生DataSource
        DataSource origin = proxy.getTargetDataSource();
        // 重新包裝,並返回
        Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
        // this mean origin is either excluded by user or had been proxy before
        if (origin == originEnhancer) {
            return origin;
        }
        // else, put <origin, proxy> to holder and return originEnhancer
        DataSourceProxyHolder.put(origin, proxy);
        // 返回包裝好的代理物件SeataDataSourceProxy
        return originEnhancer;
    }
}

1.通過以上程式碼解讀,有一個點我們需要注意,就是開發人員不需要自己的構建SeataDataSourceProxy物件,使用原生的DataSource即可,Seata會幫助我們構建SeataDataSourceProxy物件。

SeatAutoConfiguration

SeatAutoConfiguration主要功能就是建立GlobalTransactionScanner物件,所以核心功能全部在GlobalTransactionScanner裡面。

// 設定seata.enabled=true
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
// 裝配順序
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
    @Bean(BEAN_NAME_FAILURE_HANDLER)
    // 失敗處理器,可替換
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }
    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    // 開發人員可自定義GlobalTransactionScanner
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
            ConfigurableListableBeanFactory beanFactory,
            @Autowired(required = false) List<ScannerChecker> scannerCheckers) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        // set bean factory
        GlobalTransactionScanner.setBeanFactory(beanFactory);
        // add checkers
        // '/META-INF/services/io.seata.spring.annotation.ScannerChecker'
        GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
        // spring beans
        GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
        // add scannable packages
        GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
        // add excludeBeanNames
        GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
        //set accessKey and secretKey
        GlobalTransactionScanner.setAccessKey(seataProperties.getAccessKey());
        GlobalTransactionScanner.setSecretKey(seataProperties.getSecretKey());
        // create global transaction scanner
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
}

1.裝配SeataAutoConfiguration要求設定中seata.enabled=true

2.我們可以自定義FailureHandler;這個失敗處理器是專門給TM使用的;

3.同樣我們也可以自定義GlobalTransactionScanner,不過基本上不會這麼做,除非有特殊需求;

GlobalTransactionScanner裡面基本上做兩個事情:

  • 代理所有被@GlobalTransactional@GlobalLock註解的方法;
  • 使用Neety初始化TM ClientRM Client,以便實現和TC通訊;TC也就是我們的Seata Server
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  
}
  • AbstractAutoProxyCreator:通過wrapIfNecessary方法代理所有被@GlobalTransactional@GlobalLock註解的方法;
  • ConfigurationChangeListener:通過onChangeEvent方法監聽設定service.disableGlobalTransaction的變化;
  • InitializingBean:通過afterPropertiesSet方法初始化TM ClientRM Client
  • ApplicationContextAware:通過setApplicationContext方法獲取IOC容器;
  • DisposableBean:當GlobalTransactionScanner被銷燬時,通過destroy方法來回收資源;

我們重點關注wrapIfNecessaryafterPropertiesSet方法:

@Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // 檢查Bean是否符合被代理的要求
        // 1. 不能是設定類,比如以Configuration、Properties、Config結尾的Bean名稱
        // 2. Bean所在的包名在掃描範圍內
        // 3. 不能被@Scope註解
        if (!doCheckers(bean, beanName)) {
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                // 如果已經被代理,就跳過
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                // 檢查是否被TCC註解
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // 初始化TCC Fence Clean Task
                   TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    // 建立TCC代理類
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)interceptor);
                } else {
                    // 如果不是TCC代理,那麼先獲取當前類和它實現的介面
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    // 判斷當前類及相關方法是否被@GlobalTransactional或@GlobalLock註解
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        // 沒有被註解,不代理
                        return bean;
                    }
                  
                    // 準備建立方法攔截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    // 攔截器建立完畢
                    interceptor = globalTransactionalInterceptor;
                }
                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                // 如果bean不是代理物件,那麼不做方法攔截,直接返回
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 準備把方法攔截器插入進去
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    // 獲取所有的方法攔截器,包括GlobalTransactionalInterceptor
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    int pos;
                    // 依次新增進目標物件中
                    for (Advisor avr : advisor) {
                        // Find the position based on the advisor's order, and add to advisors by pos
                        pos = findAddSeataAdvisorPosition(advised, avr);
                        advised.addAdvisor(pos, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                // 返回被代理的bean
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

通過上述原始碼分析可知:Seata是根據類、介面和方法上的@GlobalTransactional@GlobalLock註解來判斷是否需要針對目標方法做攔截的。

@Override
    public void afterPropertiesSet() {
        // 如果不允許全域性事務
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
          // 新增監聽器,監聽設定的變化
            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)this);
            return;
        }
        
        if (initialized.compareAndSet(false, true)) {
          // 準備初始化TM Client、RM Client
            initClient();
        }
    }
private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
            LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        //初始化TM Client
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        //初始化RM Client
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        registerSpringShutdownHook();
    }

至此,SeatAutoConfiguration的工作處理完畢;

HttpAutoConfiguration

HttpAutoConfiguration的工作比較簡單,我們想象一下,RM如何知道它屬於哪一個分散式事務?這就需要一個統一的標識來決定所有的分支事務都屬於同一個分散式事務,這個標識在Seata中叫做XID

XID由TM開啟分散式事務時生成,通過RPC的方式從一個分支事務傳遞到另一個分支事務,所以我們在RM端需要一個從RPC中解析獲取XID的功能,以及在業務邏輯處理完畢後,銷燬當前執行緒中XID的功能。

@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication
public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {
    // 註冊攔截器
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new TransactionPropagationInterceptor());
    }
  
    // 新增異常解析處理器
    @Override
    public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) {
        exceptionResolvers.add(new HttpHandlerExceptionResolver());
    }
}
public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
    // 前置處理邏輯
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        // 獲取當前執行緒XID
        String xid = RootContext.getXID();
        // 從rpc中獲取XID
        String rpcXid = request.getHeader(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
        }
        // 如果執行緒中沒有XID,並且從請求中拿到了XID,那麼把請求中的XID繫結到當前執行緒
        if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
            RootContext.bind(rpcXid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind[{}] to RootContext", rpcXid);
            }
        }
        return true;
    }
    // 後置處理邏輯
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 業務邏輯處理完畢,從當前執行緒中刪除XID
        if (RootContext.inGlobalTransaction()) {
            XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
        }
    }
}
public class HttpHandlerExceptionResolver extends AbstractHandlerExceptionResolver {
    // 發生異常後,刪除當前執行緒中的XID
    @Override
    protected ModelAndView doResolveException(HttpServletRequest request, HttpServletResponse httpServletResponse, Object o, Exception e) {
        XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
        return null;
    }
}

小結

通過以上原始碼分析和圖解Seata AT模式,我們可以瞭解到以下幾點:

1.GlobalTransactionInterceptor屬於TM側,它主要負責通過TM Client開啟分散式事務、提交分散式事務以及回滾分散式事務;屬於大總管。

2.SeataDataSourceProxy屬於RM側,它主要負責分支事務的開啟,提交以及回滾,屬於真正幹活的小兵。

3.TM ClientRM Client純屬於兩個通訊工具,負責與TC端建立通訊。

4.TransactionPropagationInterceptorHttpHandlerExceptionResolver服務於分支事務,負責全域性事務XID的獲取以及業務邏輯處理完畢的善後事宜。

以上就是Seata AT模式啟動過程圖文範例詳解的詳細內容,更多關於Seata AT模式啟動過程的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com