本文介绍如何通过 SOFAStack 消息队列的 Java SDK 订阅消息。
订阅方式
消息队列支持以下两种订阅方式:
集群订阅同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式) properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
广播订阅同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
// 广播订阅方式设置 properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);
示例代码
具体的示例代码,请以 消息队列代码库 为准。
import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Action;
import io.openmessaging.api.ConsumeContext;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
public class MConsumer {
public static void main(String... args) {
Properties credentials = new Properties();
// 阿里云账号 AccessKey 拥有所有 API 的访问权限,风险很高。强烈建议您创建并使用 RAM 用户进行 API 访问或日常运维,请登录 RAM 控制台创建 RAM 用户。
// 此处以把 AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");
credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
// 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
.withCredentials(credentials).build();
Properties properties = new Properties();
// 设置用户实例,进入控制台的概览页面查看接入点配置
properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
// 集群订阅方式 (默认)
// properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.CLUSTERING);
// 广播订阅方式
// properties.put(PropertyKeyConst.MESSAGE_MODEL, PropertyValueConst.BROADCASTING);
Consumer consumer = accessPoint.createConsumer(properties);
consumer.subscribe("YOUR_TOPIC", "TAGA||TAGB", new MessageListener() {
@Override
public Action consume (Message message, ConsumeContext context){
System.out.println(new String(message.getBody()));
return Action.CommitMessage;
}
});
consumer.start();
}
}
文档内容是否对您有帮助?