收发定时消息

本文将引导您如何使用 TCP 协议下的 Java SDK 进行定时消息的收发。

前置条件

需要确保您已完成以下操作:

背景信息

通过定时消息,在消息发送后,可以在当前时间点之后的某一个时间点,再投递到消费者进行消费,适用于对消息生产和消费有时间窗口要求,或者利用消息触发定时任务的场景。

关于定时消息的更多信息,参见 消息类型 > 定时和延时消息

说明

对于新用户,建议在正式收发消息前,阅读 Demo 工程来了解搭建消息队列工程的具体步骤。

发送定时消息

具体的示例代码,请以 消息队列代码库 为准。

发送定时消息的示例代码如下。

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;

public class DelayProducerTest {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
        // 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
        // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
        // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();
        Properties properties = new Properties();
        // 设置用户实例,进入控制台的概览页面查看接入点配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
        Producer producer = accessPoint.createProducer(properties);
        producer.start();
        Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
        // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2020-03-13 18:27:00
        long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-13 18:27:00").getTime();
        message.setStartDeliverTime(timeStamp);
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
    }
}

订阅定时消息

定时消息的订阅方式与普通消息一致,详见 订阅消息