首頁 > 軟體

tcc分散式事務框架體系解析

2022-03-01 10:00:54

前言碎語

樓主之前推薦過2pc的分散式事務框架LCN。今天來詳細聊聊TCC事務協定。

首先我們瞭解下什麼是tcc,如下圖

tcc分散式事務協定控制整體業務事務分為三個階段。

try:執行業務邏輯

confirm:確定業務邏輯執行無誤後,確定業務邏輯執行完成

cancel:假如try階段有問題,執行cancel階段邏輯,取消try階段的資料

這就需要我們在設計業務時,在try階段多想想業務處理的折中狀態,比如,處理中,支付中,進行中等,在confirm階段變更為處理完成,或者在cancel階段變更為處理失敗。

以電商下單為例

假設我們有一個電商下單的業務,有三個服務組成,訂單服務處理下單邏輯,庫存服務處理減庫存邏輯,支付服務處理減賬戶餘額邏輯。在下單服務裡先後呼叫減庫存和減餘額的方法。如果使用tcc分散式事務來協調事務,我們服務就要做如下設計:

訂單服務:

  • try:支付狀態設定為支付中
  • confirm:設定為支付完成
  • cancel:設定為支付失敗

庫存服務:

多加一個鎖定庫存的欄位記錄,用於記錄業務處理中狀態

  • try:總庫存-1,鎖定庫存+1
  • confirm:鎖定庫存-1
  • cancel:總庫存+1,鎖定庫存-1

支付服務:

多加一個凍結金額的欄位記錄,用於記錄業務處理中狀態

  • try:餘額-1,凍結金額+1
  • confirm:凍結金額-1
  • cancel:餘額+1,凍結金額-1

tcc分散式事務在這裡起到了一個事務協調者的角色。真實業務只需要呼叫try階段的方法。confirm和cancel階段的額方法由tcc框架來幫我們呼叫完成最終業務邏輯。下面我們假設如下三個場景的業務情況,看tcc如何協調業務最終一致的。

  • 服務一切正常:所有服務的try方法執行後都沒有問題,庫存足夠,餘額足夠。tcc事務協調器會觸發訂單服務的confirm方法,將訂單更新為支付完成,觸發庫存服務的confirm方法鎖定庫存-1,觸發支付服務的confirm方法凍結金額-1
  • 庫存服務故障,無法調通:這個時候訂單已經生成,狀態為待支付。當呼叫庫存超時拋異常後,tcc事務協調器會觸發訂單服務的cancel方法將訂單狀態更新為支付失敗。
  • 支付服務故障,無法調通:這個時候訂單已經生成,狀態為待支付,總庫存-1,鎖定庫存+1了。當呼叫支付服務超時拋異常時,tcc事務協調器會觸發訂單服務的cancel方法將訂單狀態更新為支付失敗,觸發庫存服務的cancel方法將庫存+1,鎖定庫存-1。

hmily事務框架怎麼做的?

通過上面對tcc事務協定說明大家應該都瞭解了tcc的處理協調機制,下面我們來看看hmily是怎麼做到的,我們以接入支援dubbo服務為例。

概要:首先最基礎兩個應用點是aop和dubbo的filter機制,其次針對一組事務,定義了啟動事務處理器,參與事務處理器去協調處理不同的事務單元。外加一個disruptor+ScheduledService處理事務紀錄檔,補償處理失敗的事務。

hmily框架以@Hmily註解為切入點,定義了一個環繞織入的切面,註解必填兩個引數confirmMethod和cancelMethod,也就是tcc協調的兩個階段方法。在需要tcc事務的方法上面加上這個註解,也就託管了tcc三個階段的處理流程。下面是aspect切面的抽象類,不同的RPC框架支援會有不同的實現 。其中真正處理業務邏輯需要實現HmilyTransactionInterceptor介面。

實現HmilyTransactionInterceptor介面

@Aspect
public abstract class AbstractHmilyTransactionAspect {
    private HmilyTransactionInterceptor hmilyTransactionInterceptor;
    protected void setHmilyTransactionInterceptor(final HmilyTransactionInterceptor hmilyTransactionInterceptor) {
        this.hmilyTransactionInterceptor = hmilyTransactionInterceptor;
    }
    /**
     * this is point cut with {@linkplain Hmily }.
     */
    @Pointcut("@annotation(org.dromara.hmily.annotation.Hmily)")
    public void hmilyInterceptor() {
    }
    /**
     * this is around in {@linkplain Hmily }.
     * @param proceedingJoinPoint proceedingJoinPoint
     * @return Object
     * @throws Throwable  Throwable
     */
    @Around("hmilyInterceptor()")
    public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return hmilyTransactionInterceptor.interceptor(proceedingJoinPoint);
    }
    /**
     * spring Order.
     *
     * @return int
     */
    public abstract int getOrder();
}

dubbo的aspect抽象實現

@Aspect
@Component
public class DubboHmilyTransactionAspect extends AbstractHmilyTransactionAspect implements Ordered {
    @Autowired
    public DubboHmilyTransactionAspect(final DubboHmilyTransactionInterceptor dubboHmilyTransactionInterceptor) {
        super.setHmilyTransactionInterceptor(dubboHmilyTransactionInterceptor);
    }
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

dubbo的HmilyTransactionInterceptor實現

@Component
public class DubboHmilyTransactionInterceptor implements HmilyTransactionInterceptor {
    private final HmilyTransactionAspectService hmilyTransactionAspectService;
    @Autowired
    public DubboHmilyTransactionInterceptor(final HmilyTransactionAspectService hmilyTransactionAspectService) {
        this.hmilyTransactionAspectService = hmilyTransactionAspectService;
    }
  @Override
public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
    final String context = RpcContext.getContext().getAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT);
    HmilyTransactionContext hmilyTransactionContext;
    //判斷dubbo上下文中是否攜帶了tcc事務,如果有就取出反序列化為事務上下文物件
    if (StringUtils.isNoneBlank(context)) {
        hmilyTransactionContext = GsonUtils.getInstance().fromJson(context, HmilyTransactionContext.class);
        RpcContext.getContext().getAttachments().remove(CommonConstant.HMILY_TRANSACTION_CONTEXT);
    } else {
        //如果dubbo上下文中沒有,就從當前上下文中獲取。如果是事務發起者,這裡其實也獲取不到事務
        hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
    }
    return hmilyTransactionAspectService.invoke(hmilyTransactionContext, pjp);
}
}

這裡主要判斷了dubbo上下文中是否攜帶了tcc事務。如果沒有就從當前執行緒上下文中獲取,如果是事務的發起者,這裡其實獲取不到事務上下文物件的。在invoke裡有個獲取事務處理器的邏輯,如果事務上下文入參 為null,那麼獲取到的就是啟動事務處理器。

啟動事務處理器處理邏輯如下

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context)
        throws Throwable {
    System.err.println("StarterHmilyTransactionHandler");
    Object returnValue;
    try {
        HmilyTransaction hmilyTransaction = hmilyTransactionExecutor.begin(point);
        try {
            //execute try
            returnValue = point.proceed();
            hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
            hmilyTransactionExecutor.updateStatus(hmilyTransaction);
        } catch (Throwable throwable) {
            //if exception ,execute cancel
            final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
            executor.execute(() -> hmilyTransactionExecutor
                    .cancel(currentTransaction));
            throw throwable;
        }
        //execute confirm
        final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
        executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
    } finally {
        HmilyTransactionContextLocal.getInstance().remove();
        hmilyTransactionExecutor.remove();
    }
    return returnValue;
}

真正業務處理方法,point.proceed();被try,catch包起來了,如果try裡面的方法出現異常,就會走hmilyTransactionExecutor.cancel(currentTransaction)的邏輯,如果成功,就走hmilyTransactionExecutor.confirm(currentTransaction)邏輯。其中cancel和confirm裡都有協調參與者事務的處理邏輯,以confirm邏輯為例。

public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
    LogUtil.debug(LOGGER, () -> "tcc confirm .......!start");
    if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
        return;
    }
    currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
    updateStatus(currentTransaction);
    final ListhmilyParticipants = currentTransaction.getHmilyParticipants();
    ListfailList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
    boolean success = true;
    if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
        for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
            try {
                HmilyTransactionContext context = new HmilyTransactionContext();
                context.setAction(HmilyActionEnum.CONFIRMING.getCode());
                context.setRole(HmilyRoleEnum.START.getCode());
                context.setTransId(hmilyParticipant.getTransId());
                HmilyTransactionContextLocal.getInstance().set(context);
                executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
            } catch (Exception e) {
                LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
                success = false;
                failList.add(hmilyParticipant);
            } finally {
                HmilyTransactionContextLocal.getInstance().remove();
            }
        }
        executeHandler(success, currentTransaction, failList);
    }
}

可以看到executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation()),這裡執行了事務參與者的confirm方法。同理cancel裡面也有類似程式碼,執行事務參與者的cancel方法。那麼事務參與者的資訊是怎麼獲取到的呢?我們需要回到一開始提到的dubbo的filter機制。

@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER})
public class DubboHmilyTransactionFilter implements Filter {
    private HmilyTransactionExecutor hmilyTransactionExecutor;
    /**
     * this is init by dubbo spi
     * set hmilyTransactionExecutor.
     *
     * @param hmilyTransactionExecutor {@linkplain HmilyTransactionExecutor }
     */
    public void setHmilyTransactionExecutor(final HmilyTransactionExecutor hmilyTransactionExecutor) {
        this.hmilyTransactionExecutor = hmilyTransactionExecutor;
    }
    @Override
    @SuppressWarnings("unchecked")
    public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException {
        String methodName = invocation.getMethodName();
        Class clazz = invoker.getInterface();
        Class[] args = invocation.getParameterTypes();
        final Object[] arguments = invocation.getArguments();
        converterParamsClass(args, arguments);
        Method method = null;
        Hmily hmily = null;
        try {
            method = clazz.getMethod(methodName, args);
            hmily = method.getAnnotation(Hmily.class);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
        if (Objects.nonNull(hmily)) {
            try {
                final HmilyTransactionContext hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
                if (Objects.nonNull(hmilyTransactionContext)) {
                    if (hmilyTransactionContext.getRole() == HmilyRoleEnum.LOCAL.getCode()) {
                        hmilyTransactionContext.setRole(HmilyRoleEnum.INLINE.getCode());
                    }
                    RpcContext.getContext().setAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT, GsonUtils.getInstance().toJson(hmilyTransactionContext));
                }
                final Result result = invoker.invoke(invocation);
                //if result has not exception
                if (!result.hasException()) {
                    final HmilyParticipant hmilyParticipant = buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args);
                    if (hmilyTransactionContext.getRole() == HmilyRoleEnum.INLINE.getCode()) {
                        hmilyTransactionExecutor.registerByNested(hmilyTransactionContext.getTransId(),
                                hmilyParticipant);
                    } else {
                        hmilyTransactionExecutor.enlistParticipant(hmilyParticipant);
                    }
                } else {
                    throw new HmilyRuntimeException("rpc invoke exception{}", result.getException());
                }
                return result;
            } catch (RpcException e) {
                e.printStackTrace();
                throw e;
            }
        } else {
            return invoker.invoke(invocation);
        }
    }
    @SuppressWarnings("unchecked")
    private HmilyParticipant buildParticipant(final HmilyTransactionContext hmilyTransactionContext,
                                              final Hmily hmily,
                                              final Method method, final Class clazz,
                                              final Object[] arguments, final Class... args) throws HmilyRuntimeException {
        if (Objects.isNull(hmilyTransactionContext)
                || (HmilyActionEnum.TRYING.getCode() != hmilyTransactionContext.getAction())) {
            return null;
        }
        //獲取協調方法
        String confirmMethodName = hmily.confirmMethod();
        if (StringUtils.isBlank(confirmMethodName)) {
            confirmMethodName = method.getName();
        }
        String cancelMethodName = hmily.cancelMethod();
        if (StringUtils.isBlank(cancelMethodName)) {
            cancelMethodName = method.getName();
        }
        HmilyInvocation confirmInvocation = new HmilyInvocation(clazz, confirmMethodName, args, arguments);
        HmilyInvocation cancelInvocation = new HmilyInvocation(clazz, cancelMethodName, args, arguments);
        //封裝呼叫點
        return new HmilyParticipant(hmilyTransactionContext.getTransId(), confirmInvocation, cancelInvocation);
    }
    private void converterParamsClass(final Class[] args, final Object[] arguments) {
        if (arguments == null || arguments.length < 1) {
            return;
        }
        for (int i = 0; i < arguments.length; i++) {
            args[i] = arguments[i].getClass();
        }
    }
}

需要注意三個地方

  • 一個是filter的group定義@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER}),這裡這樣定義後,就只有服務的消費者會生效,也就是事務的發起者,服務的呼叫方會進filter的invoke邏輯。
  • 只有加@Hmily註解的方法或進事務處理邏輯,其他的方法直接跳過處理
  • 最關鍵的是buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args)方法。dubbo的filter唯一的作用就是收集事務參與者資訊並更新當前事務上線文資訊。那麼在事務協調時就能夠從當前事務上線文裡面獲取到需要協調的事務參與者資訊了。

引數者事務處理器

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
    HmilyTransaction hmilyTransaction = null;
    HmilyTransaction currentTransaction;
    switch (HmilyActionEnum.acquireByCode(context.getAction())) {
        case TRYING:
            try {
                hmilyTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
                final Object proceed = point.proceed();
                hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
                //update log status to try
                hmilyTransactionExecutor.updateStatus(hmilyTransaction);
                return proceed;
            } catch (Throwable throwable) {
                //if exception ,delete log.
                hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
                throw throwable;
            } finally {
               HmilyTransactionContextLocal.getInstance().remove();
            }
        case CONFIRMING:
            currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
            hmilyTransactionExecutor.confirm(currentTransaction);
            break;
        case CANCELING:
            currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
            hmilyTransactionExecutor.cancel(currentTransaction);
            break;
        default:
            break;
    }
    Method method = ((MethodSignature) (point.getSignature())).getMethod();
    logger.error(HmilyActionEnum.acquireByCode(context.getAction()).getDesc());
    return DefaultValueUtils.getDefaultValue(method.getReturnType());
}

參與者事務處理器的邏輯比啟動事務處理器要簡單很多,try階段記錄事務紀錄檔用於事務補償的時候使用。其他的confirm和cancel都是由啟動事務管理器來觸發呼叫執行的。這個地方之前糾結了樓主幾個小時,怎麼一個環繞織入的切面會被觸發執行兩次,其實是啟動事務處理器裡的confirm或cancel觸發的。

disruptor+ScheduledService處理事務紀錄檔,補償處理失敗的事務

這個不細聊了,簡述下。disruptor是一個高效能的佇列。對事務紀錄檔落地的所有操作都是通過disruptor來非同步完成的。ScheduledService預設128秒執行一次,來檢查是否有處理失敗的事務紀錄檔,用於補償事務協調失敗的事務

文末結語

相比較2pc的LCN而言,tcc分散式事務對業務侵入性更高。也因2pc的長時間佔用事務資源,tcc的效能肯定比2pc要好。兩者之間本身不存在誰優誰劣的問題。所以在做分散式事務選型時,選一個對的適合自身業務的分散式事務框架就比較重要了。

以上就是tcc分散式事務框架體系解析的詳細內容,更多關於tcc分散式事務框架的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com