单元化开发

本文介绍如何设置 LDC 单元化相关参数。

说明

该功能仅适用于支持 LDC 单元化架构的环境。

准备环境

通过 Maven 方式引入依赖。

<dependency>
            <groupId>com.alipay.zoneclient</groupId>
            <artifactId>zoneclient-core</artifactId>
            <version>1.2.1.antcloud</version>
</dependency>

SOFABoot 生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.openmessaging.api.OMS;
import io.openmessaging.api.Producer;
import java.util.Properties;

@Configuration
public class ProducerClient {
    @Autowired
    Producer producer;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer buildProducer() {
        Properties properties = new Properties();
        // sofaboot will pass this properties by system property, if not, you can manually set it
        properties.setProperty(PropertyKeyConst.LDC, "true"); // -Dzmode=true
        properties.setProperty(PropertyKeyConst.CELL, "RZXX"); // -Dcom.alipay.ldc.zone=RZ00B
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "XXX"); // -Dcom.alipay.instanceid=XXX
        properties.setProperty(PropertyKeyConst.DATA_CENTER, "XXX"); // -Dcom.alipay.ldc.datacenter=XXX
        properties.setProperty(PropertyKeyConst.ENDPOINT, "acvip://1.2.X.X"); // -Dcom.antcloud.antvip.endpoint=1.2.X.X
        // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险  
        properties.setProperty(PropertyKeyConst.ACCESS_KEY, "SOFA_AK_ENV"); // -Dcom.antcloud.mw.access=XXX
        properties.setProperty(PropertyKeyConst.SECRET_KEY, "SOFA_SK_ENV"); // -Dcom.antcloud.mw.secret=XXX
        properties.setProperty(PropertyKeyConst.GROUP_ID, "XXXX");
        Producer producer = OMS.builder().driver("sofamq").build().createProducer(properties);
        return producer;
    }

    public void send() {
        Message message = new Message("TP_XXX", "TAGXXX", "body".getBytes());
        // 如果需要路由到 RZONE,需要设置 UID
        message.putUserProperties(UserPropKey.CELL_UID, "XX");
        SendResult sendResult = producer.send(message);
    }
}

SOFABoot 消费者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.openmessaging.api.OMS;
import io.openmessaging.api.Producer;
import java.util.Properties;

@Configuration
public class ConsumerClient {
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Consumer buildConsumer() {
        Properties properties = new Properties();
        // sofaboot will pass this properties by system property, if not, you can manually set it
        properties.setProperty(PropertyKeyConst.LDC, "true"); // -Dzmode=true
        properties.setProperty(PropertyKeyConst.CELL, "RZXX"); // -Dcom.alipay.ldc.zone=RZ00B
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "XXX"); // -Dcom.alipay.instanceid=XXX
        properties.setProperty(PropertyKeyConst.DATA_CENTER, "XXX"); // -Dcom.alipay.ldc.datacenter=XXX
        //        properties.setProperty(PropertyKeyConst.ENDPOINT, "acvip://1.2.X.X"); // -Dcom.antcloud.antvip.endpoint=1.2.X.X
        // 阿里云账号 AccessKey 拥有所有API的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险  
        properties.setProperty(PropertyKeyConst.ACCESS_KEY, "SOFA_AK_ENV"); // -Dcom.antcloud.mw.access=XXX
        properties.setProperty(PropertyKeyConst.SECRET_KEY, "SOFA_SK_ENV"); // -Dcom.antcloud.mw.secret=XXX
        properties.setProperty(PropertyKeyConst.GROUP_ID, "XXXX");
        properties.setProperty(PropertyKeyConst.SHARED_MODE, "shared");  // -Dcom.alipay.env=shared
        properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, LdcSubMode.DEFAULT.name());
        Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
        consumer.subscribe("TP_XXX", "TAGXXX", new MessageListener() {
          @Override public Action consume(Message message, ConsumeContext context) {
            System.out.println("msgId=" + message.getMsgID() + " ;  body=" + new String(message.getBody()));
            return Action.CommitMessage;
          }
        });
        return consumer;
    }

}

其中订阅模式 LDC_SUB_MODE 包括:

  • DEFAULT:不做任何消息过滤。

  • LOCAL:只消费本 CELL 发出的消息。

  • RZONE:只在 RZONE 启动消费端,并且只消费目标 RZONE CELL 为本 CELL 的消息。需要在管控台 > 消息路由 中配置目标单元 RZONE 的消息路由。

  • GZONE:只在 GZONE 启动消费端,并且只消费目标 GZONE CELL 为本 CELL 的消息。需要在管控台 > 消息路由 中配置目标单元 GZONE 的消息路由。

  • CZONE:只在 CZONE 启动消费端,并且只消费目标 CZONE CELL 为本 CELL 的消息。需要在管控台 > 消息路由 中配置目标单元 GZONE 的消息路由。

消息路由