步骤三:开发单元化应用

在单元化架构中,您需要对本地应用进行开发改造,完成单元化功能配置。本文将基于转账、积分等场景分别介绍微服务(MS)中的 SOFARPC、消息队列(MQ)以及分布式事务(DTX)如何完成 LDC 单元化相关的业务开发。

前提条件

路由参数为 userId,格式如 080066600000002,取第一位 + 0 作为分片位(sharding key)。当 userId = 080066600000002 时,分片位(sharding key)= 0 + 0 = 00当 userId = 180066600000000 时,分片位(sharding key)= 1 + 0 = 10

微服务中的 SOFARPC

按照 RPC 服务引用的标准使用规范,有如下要求:

  1. 接口入参的第一个参数为userId 路由信息。

  2. SOFARPC 默认从这个参数中提取和生成两位分片位(sharding key),默认提取 UID 的倒数二、三位,参见 com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider

在本 demo 中,因无法满足该规范,将自定义一个 PocLdcRouteProvider,实现提取两位分片位的逻辑,替代 DefaultLdcRouteProvider 的标准实现。

import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;

public class PocLdcRouteProvider implements LdcRouteProvider {
    @Override
    public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
        LdcRouteJudgeResult result = new LdcRouteJudgeResult();

        //如果没有开启ldc,那么就直接返回false
        if (!EnterpriseZoneClientHolder.isZoneMode()) {
            return result;
        }

        Object[] methodArgs = sofaRequest.getMethodArgs();
        if (CommonUtils.isEmpty(methodArgs)) {
            result.setSuccess(false);
            return result;
        }
        Object methodArg = methodArgs[0];
        if (methodArg instanceof String){
            String routeUid = (String) methodArg;
            String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
            result.setSuccess(true);
            result.setRouteId(uid);
            return result;
        }
        result.setSuccess(false);
        return result;
    }

    @Override
    public int order() {
        return 2;
    }
}

UIDUtil 类的代码如下:

public final class UIDUtil {
    private UIDUtil() { }

    public static String parseShardingKeyFromBacc(String bacc) {
        if (bacc == null) {
            throw new NullPointerException("bacc is null");
        }
        if ("".equals(bacc.trim())) {
            throw new IllegalArgumentException("bacc is empty");
        }
        return bacc.substring(0, 1) + "0";
    }
}

交易启动时,首先需要向 SOFARPC 注册定制的路由逻辑,代码如下:

com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());

在取款接口中,添加一个 UID 参数,代码如下:

 @TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
    public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
                                    BusinessActionContext businessActionContext);

在存款接口中,添加一个 UID 参数,代码如下:

@TwoPhaseBusinessAction(name ="pocCreditFirstAction", commitMethod ="commit", rollbackMethod ="rollback")
public AccountTransResult credit(String uid,@BusinessActionContextParameter(isParamInProperty =true)AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);

有关 SOFARPC 单元化配置的更多信息,参见 单元化配置

消息队列

此处假设一个存款加积分的场景:在账户 A 存入一笔钱后,需要增加账户 A 的积分。此场景下,需要消息队列(MQ)中间件通过一条发送事务消息,通知积分中心执行积分增加操作。

发送事务消息

发送事务消息代码示例如下:

 // 启动 producer
    public void afterPropertiesSet()throwsException {
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();

        Properties properties = new Properties();
        //替换 GID_PGROUP 为您实际创建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
        transactionProducer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
            @Override
            publicTransactionStatus check (Message msg){
                returnTransactionStatus.CommitTransaction;
            }
        });
        transactionProducer.start();
        LogUtil.info(LOGGER, "transaction producer started");
    }

    // 发送消息
    public void publishMessage(TxnFlowRequest request) {
        try {
            PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
            Message message = new Message(TOPIC, EVENT_CODE,     hessianSerializer.serialize(pointAcctDTO));
            String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
            message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
            transactionProducer.send(message, (msg, arg) -> {
                returnTransactionStatus.CommitTransaction;
            }, null);
            LogUtil.info(LOGGER, "Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
                    message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
        } catch (Exception e) {
            LogUtil.error(LOGGER, e, "Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
                    TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
            throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
        }
    }

接收事务消息

接收事务消息代码示例如下:

// 启动 consumer
    @Override
    public void afterPropertiesSet() throws Exception {
        Properties properties = new Properties();
        //替换 GID_PGROUP 为您实际创建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
        properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
        Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
        //替换 TP_TEST_POC 为您实际创建的消息 Topic
        consumer.subscribe("TP_TEST_POC", "EC_ACCT_POINT", this);
        consumer.start();
    }

    // 消息
    public Action consume(Message message, ConsumeContext context) {
        try {
            PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
                    PointAcctDTO.class.getName());


            serviceTemplate.executeWithTransaction(pointAcctDTO, newServiceCallback() {

                @Override
                public String getServiceName () {
                    return "returnPoint";
                }

                @Override
                public void checkParam () {

                    //参数校验,如果为null或者空则消息不用重试
                    pointAcctDTO.checkParameters();
                }

                @Override
                public void checkIdempotent () {
                    //消息有可能出现重发的情况,如果这次积分已经更新过了,则直接返回成功。 消息幂等处理有问题
                    List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
                            pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
                    if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
                        throwPcException.valueOf(PcStatusCode.IDEMPOTENT);
                    }
                }

                @Override
                public void execute () {

                    //埋点
                    mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
                            pointAcctDTO.getBacc());

                    List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());

                    //根据原来代码逻辑,如果没找到则打日志;
                    if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
                        LogUtil.error(LOGGER, "error = {}, payload = {}",
                                PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
                                        + tPotAcctDOs);
                        return;
                    }
                    TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);

                    //根据原来代码逻辑,如果状态不为0则直接返回。
                    if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
                        return;
                    }

                    tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
                    tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
                    tPotAcctDAO.update(tPotAcctDO);
                    TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
                            pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
                    tPointOrderDAO.insertOrder(tPointOrderDO);

                }
            });

        } catch (CodecException e) {
            LogUtil.error(LOGGER, e, "consume pointAcctDTO={} exception.");
        }
        // do something
        return Action.CommitMessage;
    }

分布式事务

在事务发起方的 spring 事务模板内,调用 dtxService.start() 方法开启分布式事务,需要在开启时,增加单元化架构下的分片参数,用做服务路由和数据库路由。

代码示例如下:

Map<String,Object> properties =newHashMap<String,Object>();

// 开启分布式事务
String shardingKey =UIDUtil.parseShardingKeyFromBacc(request.getBacc());
dtxService.start("accttrans", request.getTxnSn(), shardingKey, properties);

有关分布式事务单元化配置的更多信息,参见 接入单元化能力

任务调度

您无需进行额外配置,即可使用单元化的任务调度能力,请参见 使用跨 zone 网关

说明

建议检查客户端的启动参数是否正常传入:-D com.alipay.ldc.zone=xxx //指定所属的逻辑单元

API 网关

您无需进行额外配置,即可使用 API 网关的单元化路由能力,请参见 创建路由规则创建 API

应用参数配置

在应用部署时,您还需要传入以下参数。各参数均通过 JVM 的 -D 参数传入到应用中。

中间件相关参数

  • com.alipay.instanceid:当前租户中间件实例唯一标识,可以在消息队列控制台概览页 接入配置 > 实例 ID 中获取。

  • com.antcloud.antvip.endpoint:ACVIP 地址,可以在消息队列控制台概览页 接入配置 > TCP 协议内网接入点 中获取。

  • com.alipay.env:环境标识,取值 shared,表示运行在共享模式。

  • com.antcloud.mw.access:中间件访问控制键值。

  • com.antcloud.mw.secret:中间件访问控制密钥。

同城双活相关参数

  • com.alipay.ldc.zone:单元名称。

  • com.alipay.ldc.datacenter:物理机房名称。

LDC 单元化相关参数

  • zmode:LDC 单元化开关。取值为 true 时,表示开启 LDC 单元化。

  • com.alipay.ldc.strictmode:LDC 单元化严格模式开关。取值为 true 时,表示开启 LDC 单元化严格模式。