在单元化架构中,您需要对本地应用进行开发改造,完成单元化功能配置。本文将基于转账、积分等场景分别介绍微服务(MS)中的 SOFARPC、消息队列(MQ)以及分布式事务(DTX)如何完成 LDC 单元化相关的业务开发。
前提条件
路由参数为 userId,格式如 080066600000002,取第一位 + 0 作为分片位(sharding key)。当 userId = 080066600000002 时,分片位(sharding key)= 0 + 0 = 00当 userId = 180066600000000 时,分片位(sharding key)= 1 + 0 = 10
微服务中的 SOFARPC
按照 RPC 服务引用的标准使用规范,有如下要求:
接口入参的第一个参数为
userId
路由信息。SOFARPC 默认从这个参数中提取和生成两位分片位(sharding key),默认提取 UID 的倒数二、三位,参见
com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider
。
在本 demo 中,因无法满足该规范,将自定义一个 PocLdcRouteProvider,实现提取两位分片位的逻辑,替代 DefaultLdcRouteProvider 的标准实现。
import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;
public class PocLdcRouteProvider implements LdcRouteProvider {
@Override
public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
LdcRouteJudgeResult result = new LdcRouteJudgeResult();
//如果没有开启ldc,那么就直接返回false
if (!EnterpriseZoneClientHolder.isZoneMode()) {
return result;
}
Object[] methodArgs = sofaRequest.getMethodArgs();
if (CommonUtils.isEmpty(methodArgs)) {
result.setSuccess(false);
return result;
}
Object methodArg = methodArgs[0];
if (methodArg instanceof String){
String routeUid = (String) methodArg;
String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
result.setSuccess(true);
result.setRouteId(uid);
return result;
}
result.setSuccess(false);
return result;
}
@Override
public int order() {
return 2;
}
}
UIDUtil 类的代码如下:
public final class UIDUtil {
private UIDUtil() { }
public static String parseShardingKeyFromBacc(String bacc) {
if (bacc == null) {
throw new NullPointerException("bacc is null");
}
if ("".equals(bacc.trim())) {
throw new IllegalArgumentException("bacc is empty");
}
return bacc.substring(0, 1) + "0";
}
}
交易启动时,首先需要向 SOFARPC 注册定制的路由逻辑,代码如下:
com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());
在取款接口中,添加一个 UID 参数,代码如下:
@TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);
在存款接口中,添加一个 UID 参数,代码如下:
@TwoPhaseBusinessAction(name ="pocCreditFirstAction", commitMethod ="commit", rollbackMethod ="rollback")
public AccountTransResult credit(String uid,@BusinessActionContextParameter(isParamInProperty =true)AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);
有关 SOFARPC 单元化配置的更多信息,参见 单元化配置。
消息队列
此处假设一个存款加积分的场景:在账户 A 存入一笔钱后,需要增加账户 A 的积分。此场景下,需要消息队列(MQ)中间件通过一条发送事务消息,通知积分中心执行积分增加操作。
发送事务消息
发送事务消息代码示例如下:
// 启动 producer
public void afterPropertiesSet()throwsException {
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();
Properties properties = new Properties();
//替换 GID_PGROUP 为您实际创建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
transactionProducer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
@Override
publicTransactionStatus check (Message msg){
returnTransactionStatus.CommitTransaction;
}
});
transactionProducer.start();
LogUtil.info(LOGGER, "transaction producer started");
}
// 发送消息
public void publishMessage(TxnFlowRequest request) {
try {
PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
Message message = new Message(TOPIC, EVENT_CODE, hessianSerializer.serialize(pointAcctDTO));
String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
transactionProducer.send(message, (msg, arg) -> {
returnTransactionStatus.CommitTransaction;
}, null);
LogUtil.info(LOGGER, "Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
} catch (Exception e) {
LogUtil.error(LOGGER, e, "Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
}
}
接收事务消息
接收事务消息代码示例如下:
// 启动 consumer
@Override
public void afterPropertiesSet() throws Exception {
Properties properties = new Properties();
//替换 GID_PGROUP 为您实际创建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
//替换 TP_TEST_POC 为您实际创建的消息 Topic
consumer.subscribe("TP_TEST_POC", "EC_ACCT_POINT", this);
consumer.start();
}
// 消息
public Action consume(Message message, ConsumeContext context) {
try {
PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
PointAcctDTO.class.getName());
serviceTemplate.executeWithTransaction(pointAcctDTO, newServiceCallback() {
@Override
public String getServiceName () {
return "returnPoint";
}
@Override
public void checkParam () {
//参数校验,如果为null或者空则消息不用重试
pointAcctDTO.checkParameters();
}
@Override
public void checkIdempotent () {
//消息有可能出现重发的情况,如果这次积分已经更新过了,则直接返回成功。 消息幂等处理有问题
List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
throwPcException.valueOf(PcStatusCode.IDEMPOTENT);
}
}
@Override
public void execute () {
//埋点
mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
pointAcctDTO.getBacc());
List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());
//根据原来代码逻辑,如果没找到则打日志;
if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
LogUtil.error(LOGGER, "error = {}, payload = {}",
PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
+ tPotAcctDOs);
return;
}
TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);
//根据原来代码逻辑,如果状态不为0则直接返回。
if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
return;
}
tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
tPotAcctDAO.update(tPotAcctDO);
TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
tPointOrderDAO.insertOrder(tPointOrderDO);
}
});
} catch (CodecException e) {
LogUtil.error(LOGGER, e, "consume pointAcctDTO={} exception.");
}
// do something
return Action.CommitMessage;
}
分布式事务
在事务发起方的 spring 事务模板内,调用 dtxService.start()
方法开启分布式事务,需要在开启时,增加单元化架构下的分片参数,用做服务路由和数据库路由。
代码示例如下:
Map<String,Object> properties =newHashMap<String,Object>();
// 开启分布式事务
String shardingKey =UIDUtil.parseShardingKeyFromBacc(request.getBacc());
dtxService.start("accttrans", request.getTxnSn(), shardingKey, properties);
有关分布式事务单元化配置的更多信息,参见 接入单元化能力。
任务调度
您无需进行额外配置,即可使用单元化的任务调度能力,请参见 使用跨 zone 网关。
建议检查客户端的启动参数是否正常传入:-D com.alipay.ldc.zone=xxx //指定所属的逻辑单元
。
API 网关
您无需进行额外配置,即可使用 API 网关的单元化路由能力,请参见 创建路由规则、创建 API。
应用参数配置
在应用部署时,您还需要传入以下参数。各参数均通过 JVM 的 -D 参数传入到应用中。
中间件相关参数
com.alipay.instanceid
:当前租户中间件实例唯一标识,可以在消息队列控制台概览页 接入配置 > 实例 ID 中获取。com.antcloud.antvip.endpoint
:ACVIP 地址,可以在消息队列控制台概览页 接入配置 > TCP 协议内网接入点 中获取。com.alipay.env
:环境标识,取值 shared,表示运行在共享模式。com.antcloud.mw.access
:中间件访问控制键值。com.antcloud.mw.secret
:中间件访问控制密钥。
同城双活相关参数
com.alipay.ldc.zone
:单元名称。com.alipay.ldc.datacenter
:物理机房名称。
LDC 单元化相关参数
zmode
:LDC 单元化开关。取值为 true 时,表示开启 LDC 单元化。com.alipay.ldc.strictmode
:LDC 单元化严格模式开关。取值为 true 时,表示开启 LDC 单元化严格模式。