SOFAStack 消息队列提供三种方式来发送普通消息:同步发送、异步发送和单向(Oneway)发送。本文介绍了每种发送方式的原理、使用场景、示例代码,以及三种发送方式的对比。
三种发送方式的对比
三者的特点和主要区别如下:
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 | 适用场景 |
同步发送 | 快 | 有 | 不丢失 | 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。 |
异步发送 | 快 | 有 | 不丢失 | 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 |
单向发送 | 最快 | 无 | 可能丢失 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 |
同步发送
原理
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
示例代码
SOFABOOT
import com.alibaba.fastjson.JSON; import com.alipay.sofa.sofamq.api.Messaging; import com.alipay.sofa.sofamq.api.MessageProducer; import com.alipay.sofa.sofamq.api.Producer; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class) public Producer<OrderPojo> producer; public void someMethod() { OrderPojo orderPojo = getPojo(); producer.send(producer.messageBuilder().withTags("TAGA").withValue(orderPojo).build()); } }
非 SOFABOOT
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; public class SomeClass { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX"); properties.setProperty(PropertyKeyConst.CELL, "GZ00B"); Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net") .build().createProducer(properties); producer.start(); OrderPojo orderPojo = getPojo(); SendResult sendResult = producer.send(producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA") .withProperty("propkey", "propvalue").withValue(orderPojo).build()); System.out.println(sendResult); } }
异步发送
原理
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
示例代码
SOFABOOT
import com.alipay.sofa.sofamq.api.MessageProducer; import com.alipay.sofa.sofamq.api.Messaging; import com.alipay.sofa.sofamq.api.Producer; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class) public Producer<OrderPojo> producer; public void someMethod() { OrderPojo orderPojo = getPojo(); Message message = producer.messageBuilder().withTags("TRADE").withValue(orderPojo).build(); producer.sendAsync(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("send success: " + sendResult); } @Override public void onException(OnExceptionContext context) { System.out.println("send exception: " + context); } }); } }
非 SOFABOOT
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.OMS; import io.openmessaging.api.OnExceptionContext; import io.openmessaging.api.Producer; import io.openmessaging.api.SendCallback; import io.openmessaging.api.SendResult; public class SomeClass { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX"); properties.setProperty(PropertyKeyConst.CELL, "GZ00B"); Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net") .build().createProducer(properties); producer.start(); OrderPojo orderPojo = getPojo(); Message message = producer.messageBuilder().withTopic("TP_XXX").withTags("TRADE").withValue(orderPojo).build(); producer.sendAsync(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("send success: " + sendResult); } @Override public void onException(OnExceptionContext context) { System.out.println("send fail: " + context); } }); } }
单向(Oneway)发送
原理
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
示例代码
SOFABOOT
import com.alipay.sofa.sofamq.api.MessageProducer; import com.alipay.sofa.sofamq.api.Messaging; import com.alipay.sofa.sofamq.api.Producer; // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到 @Messaging public class SomeClass { @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class) public Producer<OrderPojo> producer; public void someMethod() { OrderPojo orderPojo = getPojo(); Message message = producer.messageBuilder().withTags("TRADE").withValue(orderPojo).build(); producer.sendOneway(message); } }
非 SOFABOOT
import java.util.Properties; import com.alipay.sofa.sofamq.client.PropertyKeyConst; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; public class SomeClass { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX"); properties.setProperty(PropertyKeyConst.CELL, "GZ00B"); Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net") .build().createProducer(properties); producer.start(); OrderPojo orderPojo = getPojo(); Message message = producer.messageBuilder().withTopic("TP_XXX").withTags("TRADE").withValue(orderPojo).build(); producer.sendOneway(message); } }
文档内容是否对您有帮助?