收发顺序消息

更新时间: 2023-07-26 16:12:45

顺序消息(FIFO 消息)是 SOFAStack 消息队列提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用 TCP 协议下的 Java SDK 收发顺序消息的示例代码供您参考。

前提条件

您已完成以下操作:

  • 通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明

    <dependencies>
        <dependency>
            <groupId>com.alipay.sofa</groupId>
            <artifactId>sofamq-client-all</artifactId>
            <version>"XXX"</version>
            //设置为 Java SDK 的最新版本号
        </dependency>
    </dependencies>
    <repositories>
        <repository>
            <id>antcloudrelease</id>
            <name>Ant Cloud</name>
            <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url>
        </repository>
    </repositories>
  • 准备环境

  • (可选)日志配置

背景信息

分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

详情请参见 消息类型 > 顺序消息

说明

对于新手用户,建议在正式收发消息前,阅读 Demo 工程(TCP 版) 来了解搭建消息队列工程的具体步骤。

发送顺序消息

具体的示例代码,请以 消息队列代码库 为准。

示例代码如下。

import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.order.OrderProducer;

public class Main {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
        // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();

        Properties properties = new Properties();
        // 设置用户实例,进入控制台的概览页面查看接入点配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
        OrderProducer producer = accessPoint.createOrderProducer(properties);

        producer.start();

        Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
        // 分区顺序消息中区分不同分区的关键字段,Sharding Key 与普通消息的 key 是完全不同的概念。
        SendResult sendResult = producer.send(message, "YOUR_SHARDING_KEY");
        System.out.println(sendResult);
    }
}

订阅顺序消息

全局顺序消息和分区顺序消息的订阅方式基本一样,示例代码如下。

 import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.order.ConsumeOrderContext;
import io.openmessaging.api.order.MessageOrderListener;
import io.openmessaging.api.order.OrderAction;
import io.openmessaging.api.order.OrderConsumer;

public class Main {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");

        // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();

        Properties properties = new Properties();
        // 设置用户实例,进入控制台的概览页面查看接入点配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");

        OrderConsumer consumer = accessPoint.createOrderedConsumer(properties);
        consumer.subscribe("YOUR_TOPIC", "YOUR_TAG", new MessageOrderListener() {
            @Override
            public OrderAction consume(Message message, ConsumeOrderContext context) {
                System.out.println(newString(message.getBody()));
                return OrderAction.Success;
            }
        });
        consumer.start();
    }
}
上一篇: 发送消息(多线程) 下一篇: 收发事务消息
阿里云首页 金融分布式架构 相关技术圈