消息过滤
本文描述 SOFAStack 消息队列的消费者如何根据 Tag 在消息队列服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。
Tag,即消息标签,用于对某个 Topic 下的消息进行分类。消息队列的生产者在发送消息时,已经指定消息的 Tag,消费者需根据已经指定的 Tag 来进行订阅。
示例代码
发送消息
发送消息时,每条消息必须指明 Tag:
producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA").withValue(orderPojo).build()
订阅所有 Tag
消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:
SOFABOOT 示例
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "*") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe("TP_XXX", "*", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater return Action.CommitMessage; } });
订阅单个 Tag
消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:
SOFABOOT 示例
import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "TAGA") public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe("TP_XXX", "TAGA", new GenericMessageListener<OrderPojo>() { @Override public Class<OrderPojo> payloadClass() { return OrderPojo.class; } @Override public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) { System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue()); // 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater return Action.CommitMessage; } });
订阅多个 Tag
消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
}
});
SQL92 过滤
过滤表达式中的 TAGS 特指消息的 TAG,其他的变量取自消息属性 Properties
,可通过 Message#putUserProperties
设置。
SOFABOOT 示例
import static com.alipay.sofa.sofamq.api.MessageConsumer.SQL_FILTER; import com.alipay.sofa.sofamq.api.MessageConsumer; import com.alipay.sofa.sofamq.api.Messaging; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageConsumer(group = "GID_XXX", topic = "TP_XXX", filter = "(TAGS in ('tag')) and a > 5", filterType = SQL_FILTER) public void someMethodReceivePojo(OrderPojo somePojo) { // do something } }
非 SOFABOOT 示例
consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql("(TAGS in ('tag')) and a > 5"), new MessageListenerImpl());
错误示例
同一个消费者多次订阅某个 Topic 下的 Tag,以最后一次订阅的 Tag 为准:
//如下错误代码中,Consumer 只能订阅到 MQ_TOPIC 下 TagB 的消息,而不能订阅 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
}
});
consumer.subscribe("MQ_TOPIC", "TagB", new GenericMessageListener<OrderPojo>() {
@Override
public Class<OrderPojo> payloadClass() {
return OrderPojo.class;
}
@Override
public Action consume(GenericMessage<OrderPojo> message, MessageConsumeContext context) {
System.out.println(new Date() + " Receive message: " + message.getMsgID() + " " + message.getValue());
// 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
}
});
更多信息
同一个 Group ID 下的消费者实例与 Topic 的订阅关系需保持一致,详情请参见 订阅关系一致。
合理使用 Topic 和 Tag 来过滤消息可以让业务更清晰,详情请参见 Topic 与 Tag。