本文将引导您快速体验 SOFAStack 消息队列,从创建资源、配置接入点到使用 SDK 收发消息。
具体操作步骤如下:
创建资源
注意事项
在使用 SOFAStack 消息队列时,请注意以下网络访问限制:
Topic 和 Group ID 需创建在同一个地域(Region)下的同一个工作空间中才能互通。例如,当某 Topic 创建在华东 1(杭州)下的工作空间 A 中,那么该 Topic 只能被在华东 1(杭州)下的工作空间 A 中创建的 Group ID 对应的生产端和消费端访问。
目前不支持公网访问,生产端和消费端需要部署在相同地域的 ECS 上,或者保证网络联通。
创建工作空间
要使用消息队列,您需要确保 SOFAStack 控制台已创建至少一个工作空间。如 SOFAStack 未创建工作空间或您需要创建一个新的工作空间,可参见 添加工作空间。创建好工作空间后,将为您自动创建一个消息队列实例。
创建 Topic
Topic 是消息队列里对消息的一级归类。消息生产者将消息发送到一个 Topic,而消息消费者则通过订阅该 Topic 来获取和消费消息。
登录 SOFAStack 控制台。
在左侧导航栏,选择 中间件 > 消息队列 > Topic 管理。
单击 创建 Topic,然后在 创建 Topic 对话框配置 Topic 信息:
参数
是否必填
说明
Topic
必填
Topic 格式要求如下:
Topic 只能包含英文、数字、 短横线(-)和下划线(_),其中英文和数字必须要有一种, 短横线(-)和下划线(_)可选。
长度需控制在 3~64 个字符之间。
命名不能以“CID”和“GID”开头。
消息类型
必填
支持的消息类型有普通消息、分区顺序消息、事务消息和定时消息。详细消息类型的说明,可参见 消息类型。
描述
选填
对该 Topic 的备注信息,长度限制在 256 个字符以内。
单击 确定。
创建 Group ID
创建完 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID ,即 Group ID 作为标识。
Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消息,同一个 Topic 也可以接收来自多个生产者的消息。
消费者必须有对应的 Group ID,生产者不做强制要求。
登录 SOFAStack 控制台。
在左侧导航栏,选择 中间件 > 消息队列 > Group 管理。
单击 创建 Group ID,然后在 创建 Group ID 对话框配置 Group ID 信息:
参数
是否必填
说明
Group ID
必填
Group ID 格式要求如下:
命名以 “GID_” 或者 “GID-” 开头。
只能包含字母、数字、短横线和下划线。
长度限制在 7~64 字符之间。
说明Group ID 一旦创建,则无法修改。
描述
选填
对该 Group ID 的备注信息,长度限制在 256 个字符以内。
单击 确定。
获取 AK(AccessKey ID)和 SK(AccessKey Secret)
阿里云 AccessKey 用于收发消息时进行账户鉴权。
在调用 SDK 发送和订阅消息的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AccessKeySecret。
由于 RAM 是阿里云产品,非阿里云飞天底座输出的环境中可使用蚂蚁 IAM 创建访问密钥(AccessKey)做身份验证。
创建 AccessKey 的具体步骤,参见 创建 AccessKey。
获取接入配置
在控制台创建好资源后,您需通过控制台获取工作空间的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入某个具体工作空间或地域的服务。
在左侧导航栏,选择 中间件 > 消息队列 > 概览。
在页面底部的 接入配置 找到 实例 ID 及 内网接入点。
将 内网接入点 配置到客户端的 SDK 代码的 ENDPOINT 参数。
将 实例 ID 配置到客户端的 SDK 代码的 INSTANCE_ID 参数。
发送消息
您可以通过控制台发送测试消息或通过调用 TCP Java SDK 发送消息。
发送测试消息
用于快速验证 Topic 资源的可用性,主要用作测试。
在左侧导航栏,选择 中间件 > 消息队列 > Topic 管理。
在 Topic 管理页面,找到您刚刚创建的 Topic,单击右侧操作列的 发送测试消息。
在 发送测试消息 对话框中的 消息体 一栏,输入消息的具体内容,单击 确定。控制台即会返回消息发送成功通知以及相应的 Message ID。
调用 SDK 发送消息
通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明。
<dependency> <groupId>com.alipay.sofa</groupId> <artifactId>sofamq-client-all</artifactId> <version>"XXX"</version> //设置为 Java SDK 的最新版本号 </dependency> <repositories> <repository> <id>antcloudrelease</id> <name>Ant Cloud</name> <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url> </repository> </repositories>
根据以下说明设置相关参数,运行示例代码:
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。 // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 设置用户实例,进入控制台的概览页面查看接入点配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制台创建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Producer producer = accessPoint.createProducer(properties); producer.start(); Message message =new Message("$topic","YOUR_TAG","hello world".getBytes()); SendResult sendResult = producer.send(message); System.out.println(sendResult); } }
消息发送后,您可以在控制台查看消息发送状态,步骤如下:
在左侧导航栏,选择 中间件 > 消息队列 > 消息查询。
单击 按 Message ID 查询,在搜索框中输入发送消息后返回的 Message ID,单击 搜索 查询消息发送状态。
存储时间 表示消息队列服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。
订阅消息
消息发送成功后,需要启动消费者来订阅消息。
调用 TCP Java SDK 订阅消息。您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正确设置相关参数。
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.Action; import io.openmessaging.api.ConsumeContext; import io.openmessaging.api.Consumer; import io.openmessaging.api.Message; import io.openmessaging.api.MessageListener; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.OMSBuiltinKeys; public class Main{ public static void main(String... args){ Properties credentials =new Properties(); // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。 // 此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。 // 强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险 credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV"); credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV"); // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置 MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint") .withCredentials(credentials).build(); Properties properties =new Properties(); // 设置用户实例,进入控制台的概览页面查看接入点配置 properties.setProperty(PropertyKeyConst.INSTANCE_ID,"$instanceId"); // 您在控制台创建的 Group ID properties.setProperty(PropertyKeyConst.GROUP_ID,"YOUR_GROUP"); Consumer consumer = accessPoint.createConsumer(properties); consumer.subscribe("YOUR_TOPIC","YOUR_TAG",new MessageListener(){ @Override public Action consume(Message message,ConsumeContext context){ System.out.println(new String(message.getBody())); return Action.CommitMessage; } }); consumer.start(); } }
完成上述步骤后,您可以在控制台查看消费者是否启动成功,即消息订阅是否成功。
在左侧导航栏,选择 中间件 > 消息队列 > Group 管理。
单击目标 Group ID 名称进入详情页。
单击 订阅关系。
如果 是否在线 显示为 在线,且订阅关系一致,则说明订阅成功。否则说明订阅失败。