消息类型

更新时间: 2024-09-05 14:16:04

本文介绍 SOFAStack 消息队列各个消息类型的概念、适用场景以及使用过程中的注意事项等。

普通消息

普通消息是指消息队列中无特性的消息,区别于有特性的定时/延时消息、顺序消息和事务消息。

TCP Java SDK 收发普通消息的示例代码

定时和延时消息

  • 定时消息:Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。

  • 延时消息:Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

适用场景

定时消息和延时消息适用于以下场景:

消息生产和消费有时间窗口要求:比如在蚂蚁森林场景中,相关低碳行为触发时,会发送一条定时消息,在第二天 7 点定时投递给消费者,产生绿色能量;或者发送一条延时消息,24 小时后产生绿色能量。

使用方式

定时消息和延时消息的使用在代码编写上存在略微的区别:

  • 发送定时消息需要明确指定消息发送时间点之后的某一时间点作为消息投递的时间点。

  • 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。

注意事项

  • 定时和延时消息的 msg.setStartDeliverTime 参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。

  • 定时和延时消息的 msg.setStartDeliverTime 参数可设置 3 天内的任何时刻(单位毫秒),超过 3 天消息发送将失败。

  • StartDeliverTime 是服务端开始向消费端投递的时间。 如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。

  • 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。

  • 设置定时和延时消息的投递时间后,依然受 3 天的消息保存时长限制。

TCP 协议示例代码

收发定时消息和延时消息的示例代码(Java),请参见 收发延时消息

顺序消息

顺序消息(FIFO 消息)是消息队列提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个 Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

顺序消息目前支持分区顺序消息。

分区顺序消息

对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

  • 适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

  • 示例说明如下:

    用户注册需要发送发验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。

注意事项

使用顺序消息时,请注意以下几点:

  • 顺序消息暂不支持广播模式。

  • 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。

  • 顺序消息不支持异步发送方式,否则将无法严格保证顺序。

TCP SDK 示例代码

TCP 协议下的示例代码请参见 收发顺序消息

事务消息

  • 事务消息:消息队列提供类似 X/Open XA 的分布式事务功能,通过消息队列事务消息能达到分布式事务的最终一致。

  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或 Rollback),该询问过程即消息回查。

适用场景

在转账过程中,比如从支付宝转账到余额宝,两个系统之间的数据需要保持最终一致性,这时可以通过事务消息进行处理。支付宝扣款执行前,发送一条半事务消息,扣款事务执行成功后,将消息状态更新为 Commit,余额宝系统订阅消息队列的扣款消息,做相应的存款业务处理。

交互流程

事务消息交互流程如下图。

交互流程

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列服务端。

  2. 消息队列服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。

  3. 发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

注意事项

  1. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列服务端会根据 Group ID 去查询客户端。

  2. 通过 AccessPoint.getAccessPoint().createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。

  3. 事务消息发送完成本地事务后,可在 execute 方法中返回以下三种状态:

    • TransactionStatus.CommitTransaction:提交事务,允许订阅方消费该消息。

    • TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。

    • TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列服务端向发送方进行消息回查。

  4. 可通过以下方式给每条消息设定第一次消息回查的最快时间:

    Message message =new Message();
    // 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
    message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
    // 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动 0 秒 ~ 60 秒;如第一次回查后事务仍未提交,后续每隔 60 秒回查一次

TCP SDK 示例代码

收发事务消息的示例代码如下:

TCP Java SDK 收发事务消息

上一篇: 消息功能详解 下一篇: Topic 与 Tag
阿里云首页 金融分布式架构 相关技术圈