本文以 TCP 协议下的 Java 为例,提供操作示例帮助您从零开始搭建消息队列测试工程,提供 Spring 和纯 Java 两种使用方式。Demo 工程包含普通消息、顺序消息、事务消息和定时、延时消息的配置以及测试代码。
前提条件
安装 IDE。
您可以使用 IntelliJ IDEA 或者 Eclipse,本文以 IntelliJ IDEA 为例。在 https://www.jetbrains.com/idea/ 下载 IntelliJ IDEA Ultimate 版本,并参考 IntelliJ IDEA 说明进行安装。
下载 Demo 工程。
在 https://github.com/sofastack-guides/sofamq-demo 下载 Demo 工程到本地,然后解压即可看到本地新增了 sofamq-demo 文件夹。
下载安装 JDK。
配置 Demo 工程
将 Demo 工程文件导入 IntelliJ IDEA。
在 IntelliJ IDEA 界面,单击 New > Project From Existing Sources…,选择 sofamq-demo 文件夹。
选择 Import 类型为 Maven。
默认单击 Finish,直到导入完成。Demo 工程需要加载依赖的 JAR 包,因此导入过程需要等待 2-3 分钟。
创建资源。
您需要先到控制台创建所需资源,包括消息队列的工作空间、Topic、Group ID(GID),以及鉴权需要的 AccessKey(AK)。
配置访问凭证。
对接中间件时,因考虑安全问题,您必须做好应用的身份认证,避免非法访问,因此您需要在应用所在机器的环境变量参数中配置账号的 AccessKey、AccessSecret 信息。
操作步骤如下:
Linux 和 macOS 系统配置方法
执行以下命令:
export SOFA_AK_ENV=<access_key_id> export SOFA_SK_ENV=<access_key_secret>
<access_key_id>
替换为已准备好的 AccessKey ID,<access_key_secret>
替换为 AccessKey Secret。Windows 系统配置方法
新建环境变量文件,添加环境变量
SOFA_AK_ENV
和SOFA_SK_ENV
,并写入已准备好的AccessKey ID 和 AccessKey Secret。重启 Windows 系统。
重要禁止使用阿里云账号 AccessKey,因为阿里云账号 AccessKey 泄露会威胁您所有资源的安全。请使用 RAM 用户 AccessKey 进行操作,可有效降低 AccessKey 泄露的风险。
生产环境中,建议单独为中间件创建一个 RAM 用户,以便于您控制权限,方便管理。
您需将在步骤二中创建好的资源信息配置到
MqConfig
类。public static final String TOPIC ="步骤二创建的 Topic"; public static final String GROUP_ID ="步骤二创建的 Group ID"; // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。 // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 public static final String ACCESS_KEY = System.getenv("SOFA_AK_ENV"); public static final String SECRECT_KEY = System.getenv("SOFA_SK_ENV"); public static final String INSTANCE ="您的实例 ID,可在控制台概览页底部接入配置获取实例 ID"; public static final String TAG ="您发布订阅时使用的 TAG,如没有可为空"; public static final String ENDPOINT ="您的 TCP 接入点,可在控制台概览页底部接入配置获取 TCP 协议接入点";
以 Main 方式运行 Demo
发送消息。
发送普通消息:
以纯 Java 方式发送普通消息:运行
SimpleProducer
类。以 Spring 方式发送普通消息:运行
ProducerClient
类。
发送事务消息:运行
SimpleTransactionProducer
类。LocalTransactionCheckerImpl 类为本地事务 check 接口类,用于校验事务。详情请参见 收发事务消息。发送顺序消息:运行
SimpleOrderProducer
类。此方式下,消息发布和消费都按顺序进行。详情请参见 收发顺序消息。发送定时(延时)消息:运行
SimpleDelayProducer
类发送消息。延时 3 秒后投递。您也可以指定一个精确的投递时间。
检查方法消息发送成功的步骤如下:
登录 SOFAStack 控制台。
在左侧导航栏,选择 中间件 > 消息队列 > 消息查询。
在 消息查询 页面,您可单击 按 Topic 查询。
单击目标 Topic 名称进行查询,可以看见消息已经发送至 Topic。
接收消息。
接收普通消息:
以纯 Java 方式接收普通消息:运行
SimpleConsumer
类。以 Spring 方式接收普通消息:运行
ConsumerClient
类。
接收事务消息:运行
SimpleConsumer
类。接收顺序消息:运行
SimpleOrderConsumer
类。接收定时(延时)消息:运行
SimpleConsumer
类。可以看到消息被接收打印的日志。因为有初始化,所以需等待几秒,在生产环境中不会经常初始化。
检查方法消息发送成功的步骤如下:
登录 SOFAStack 控制台。
在左侧导航栏,选择 中间件 > 消息队列 > Group 管理。
单击目标 Group ID 进入详情页。
单击 消费者状态,可以看到启动的消费端已经在线,并且订阅关系一致。