操作链路概述

本文将引导您快速体验 SOFAStack 消息队列,从创建资源、配置接入点到使用 SDK 收发消息。

具体操作步骤如下:

  1. 创建资源

    1. 创建工作空间

    2. 创建 Topic

    3. 创建 Group ID

    4. 获取 AK(AccessKey ID)和 SK(AccessKey Secret)

  2. 获取接入配置

  3. 发送消息

  4. 订阅消息

创建资源

注意事项

在使用 SOFAStack 消息队列时,请注意以下网络访问限制:

  • Topic 和 Group ID 需创建在同一个地域(Region)下的同一个工作空间中才能互通。例如,当某 Topic 创建在华东 1(杭州)下的工作空间 A 中,那么该 Topic 只能被在华东 1(杭州)下的工作空间 A 中创建的 Group ID 对应的生产端和消费端访问。

  • 目前不支持公网访问,生产端和消费端需要部署在相同地域的 ECS 上,或者保证网络联通。

创建工作空间

要使用消息队列,您需要确保 SOFAStack 控制台已创建至少一个工作空间。如 SOFAStack 未创建工作空间或您需要创建一个新的工作空间,可参见 添加工作空间。创建好工作空间后,将为您自动创建一个消息队列实例。

创建 Topic

Topic 是消息队列里对消息的一级归类。消息生产者将消息发送到一个 Topic,而消息消费者则通过订阅该 Topic 来获取和消费消息。

  1. 登录 SOFAStack 控制台

  2. 在左侧导航栏,选择 中间件 > 消息队列 > Topic 管理

  3. 单击 创建 Topic,然后在 创建 Topic 对话框配置 Topic 信息:

    参数

    是否必填

    说明

    Topic

    必填

    Topic 格式要求如下:

    • Topic 只能包含英文、数字、 短横线(-)和下划线(_),其中英文和数字必须要有一种, 短横线(-)和下划线(_)可选。

    • 长度需控制在 3~64 个字符之间。

    • 命名不能以“CID”和“GID”开头。

    消息类型

    必填

    支持的消息类型有普通消息、分区顺序消息、事务消息和定时消息。详细消息类型的说明,可参见 消息类型

    描述

    选填

    对该 Topic 的备注信息,长度限制在 256 个字符以内。

  4. 单击 确定

创建 Group ID

创建完 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID ,即 Group ID 作为标识。

Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消息,同一个 Topic 也可以接收来自多个生产者的消息。

说明

消费者必须有对应的 Group ID,生产者不做强制要求。

  1. 登录 SOFAStack 控制台

  2. 在左侧导航栏,选择 中间件 > 消息队列 > Group 管理

  3. 单击 创建 Group ID,然后在 创建 Group ID 对话框配置 Group ID 信息:

    参数

    是否必填

    说明

    Group ID

    必填

    Group ID 格式要求如下:

    • 命名以 “GID_” 或者 “GID-” 开头。

    • 只能包含字母、数字、短横线和下划线。

    • 长度限制在 7~64 字符之间。

    说明

    Group ID 一旦创建,则无法修改。

    描述

    选填

    对该 Group ID 的备注信息,长度限制在 256 个字符以内。

  4. 单击 确定

获取 AK(AccessKey ID)和 SK(AccessKey Secret)

阿里云 AccessKey 用于收发消息时进行账户鉴权。

在调用 SDK 发送和订阅消息的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AccessKeySecret。

说明

由于 RAM 是阿里云产品,非阿里云飞天底座输出的环境中可使用蚂蚁 IAM 创建访问密钥(AccessKey)做身份验证。

创建 AccessKey 的具体步骤,参见 创建 AccessKey

获取接入配置

在控制台创建好资源后,您需通过控制台获取工作空间的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入某个具体工作空间或地域的服务。

  1. 在左侧导航栏,选择 中间件 > 消息队列 > 概览

  2. 在页面底部的 接入配置 找到 实例 ID内网接入点接入配置

  3. 内网接入点 配置到客户端的 SDK 代码的 ENDPOINT 参数。

  4. 实例 ID 配置到客户端的 SDK 代码的 INSTANCE_ID 参数。

发送消息

您可以通过控制台发送测试消息或通过调用 TCP Java SDK 发送消息。

发送测试消息

用于快速验证 Topic 资源的可用性,主要用作测试。

  1. 在左侧导航栏,选择 中间件 > 消息队列 > Topic 管理

  2. 在 Topic 管理页面,找到您刚刚创建的 Topic,单击右侧操作列的 发送测试消息

  3. 发送测试消息 对话框中的 消息体 一栏,输入消息的具体内容,单击 确定。控制台即会返回消息发送成功通知以及相应的 Message ID。

调用 SDK 发送消息

  1. 通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明

    <dependency>
        <groupId>com.alipay.sofa</groupId>
        <artifactId>sofamq-client-all</artifactId>
        <version>"XXX"</version>
        //设置为 Java SDK 的最新版本号
    </dependency>
    <repositories>
        <repository>
            <id>antcloudrelease</id>
            <name>Ant Cloud</name>
            <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url>
        </repository>
    </repositories>
  2. 根据以下说明设置相关参数,运行示例代码:

    import java.util.Properties;
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessagingAccessPoint;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OMSBuiltinKeys;
    import io.openmessaging.api.Producer;
    import io.openmessaging.api.SendResult;
    
    public class Main{
        public static void main(String... args){
            Properties credentials =new Properties();
            // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
            // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
            // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
            credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
            credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
            
            // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
            MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
            .withCredentials(credentials).build();
            
            Properties properties =new Properties();
            // 设置用户实例,进入控制台的概览页面查看接入点配置
            properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId");
            // 您在控制台创建的 Group ID
            properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP");
            Producer producer = accessPoint.createProducer(properties);
            
            producer.start();
            
            Message message =new Message("$topic","YOUR_TAG","hello world".getBytes());
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }
    }
  3. 消息发送后,您可以在控制台查看消息发送状态,步骤如下:

    1. 在左侧导航栏,选择 中间件 > 消息队列 > 消息查询

    2. 单击 按 Message ID 查询,在搜索框中输入发送消息后返回的 Message ID,单击 搜索 查询消息发送状态。

      存储时间 表示消息队列服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。

      重要

      此步骤演示的是第一次使用消息队列的场景,此时消费者从未启动过,所以消息状态显示暂无消费数据。要启动消费者并进行消息订阅请继续下一步操作订阅消息。更多消息状态请参见 消息查询查询消息轨迹

订阅消息

消息发送成功后,需要启动消费者来订阅消息。

  1. 调用 TCP Java SDK 订阅消息。您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正确设置相关参数。

    import java.util.Properties;
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    import io.openmessaging.api.Action;
    import io.openmessaging.api.ConsumeContext;
    import io.openmessaging.api.Consumer;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessageListener;
    import io.openmessaging.api.MessagingAccessPoint;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OMSBuiltinKeys;
    
    public class Main{
        public static void main(String... args){
            Properties credentials =new Properties();
            // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
            // 此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。
            // 强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险
            credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
            credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
            
            // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
            MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
            .withCredentials(credentials).build();
            
            Properties properties =new Properties();
            // 设置用户实例,进入控制台的概览页面查看接入点配置
            properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId");
            // 您在控制台创建的 Group ID
            properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP");
            
            Consumer consumer = accessPoint.createConsumer(properties);
            consumer.subscribe("YOUR_TOPIC","YOUR_TAG",new MessageListener(){
                @Override
                public Action consume(Message message,ConsumeContext context){
                    System.out.println(new String(message.getBody()));
                    return Action.CommitMessage;
                }
            });
            consumer.start();
        }
    }
  2. 完成上述步骤后,您可以在控制台查看消费者是否启动成功,即消息订阅是否成功。

    1. 在左侧导航栏,选择 中间件 > 消息队列 > Group 管理

    2. 单击目标 Group ID 名称进入详情页。

    3. 单击 订阅关系

      如果 是否在线 显示为 在线,且订阅关系一致,则说明订阅成功。否则说明订阅失败。