订阅关系一致

更新时间: 2024-03-20 14:14:35

订阅关系一致指的是同一个消费者 Group ID 下所有消费者实例所订阅的 Topic、Tag 以及对消息的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系不一致的示例代码,帮助您顺畅地订阅消息。

订阅关系一致

背景信息

SOFAStack 消息队列里的一个消费者 Group ID 代表一个消费者群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个消费者。

由于消息队列的订阅关系主要由 Topic 和 Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的消费者需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致

  • 订阅的 Topic 中的 Tag 必须一致

正确订阅关系图片示例

多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者的订阅关系保持了一致。

正确订阅关系

错误订阅关系图片示例

单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者的订阅关系并没有保持一致。

错误订阅关系

错误订阅关系代码示例

示例一

同一个 Group ID 下的两个消费者订阅的 Topic 不一致。

  • 消费者 1-1:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_A", "*", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
  • 消费者 1-2:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });

示例二

同一个 Group ID 下订阅 Topic 的 Tag 不一致。消费者 2-1 订阅了 TagA,而消费者 2-2 未指定 Tag。

  • 消费者 2-1:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
  • 消费者 2-2:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_A", "*", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });

示例三

同一个 Group ID 下订阅 Topic 个数不一致。

  • 消费者 3-1:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });

同一个 Group ID 下订阅 Topic 的 Tag 不一致。

  • 消费者 3-2:

      Properties properties = new Properties();
      properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
      Consumer consumer = OMS.builder().driver("sofamq").createConsumer(properties);
      consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
上一篇: Topic 与 Tag 下一篇: 集群消费和广播消费
阿里云首页 金融分布式架构 相关技术圈