Spring 集成
本文介绍如何在 SpringBoot 框架下用 SOFAStack 消息队列收发消息。
背景信息
主要包括以下三部分内容:
普通消息生产者和 Spring 集成
事务消息生产者和 Spring 集成
消息消费者和 Spring 集成
请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见 订阅关系一致。
SpringBoot 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 接口和参数说明。
生产者与 Spring 集成
声明生产者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; @Configuration public class ProducerClient{ @Autowired private MqConfig mqConfig; @Bean(initMethod ="start", destroyMethod ="shutdown") public Producer buildProducer(){ Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createProducer(mqConfig.getMqProperties()); return producer; } }
通过已经与 Spring 集成好的生产者生产消息。
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.Message; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; import io.openmessaging.api.exception.OMSRuntimeException; @Component public class SyncProducerTest{ //普通消息的 Producer 已经注册到了 spring 容器中,后面需要使用时可以直接注入到其它类中 @Autowired private Producer producer; @Autowired private MqConfig mqConfig; @Test public void testSend(){ //循环发送消息 for(int i =0; i <100; i++){ Message msg =new Message( // Message 所属的 Topic mqConfig.getTopic(), // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤 mqConfig.getTag(), // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预 // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 try{ SendResult sendResult = producer.send(msg); assert sendResult !=null; System.out.println(sendResult); }catch(OMSRuntimeException e){ System.out.println("发送失败"); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } } }
事务消息生产者与 Spring 集成
事务消息的概念详情请参见收发事务消息。
首先需要实现一个
LocalTransactionChecker
,如下所示。 一个消息生产者只能有一个LocalTransactionChecker
。import org.springframework.stereotype.Component; import io.openmessaging.api.Message; import io.openmessaging.api.transaction.LocalTransactionChecker; import io.openmessaging.api.transaction.TransactionStatus; @Component public class DemoLocalTransactionChecker implements LocalTransactionChecker{ @Override public TransactionStatus check(Message msg){ System.out.println("开始回查本地事务状态"); return TransactionStatus.CommitTransaction;//根据本地事务状态检查结果返回不同的 TransactionStatus } }
声明生产者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.OMS; import io.openmessaging.api.transaction.TransactionProducer; @Configuration public class TransactionProducerClient{ @Autowired private MqConfig mqConfig; @Autowired private DemoLocalTransactionChecker localTransactionChecker; @Bean(initMethod ="start", destroyMethod ="shutdown") public TransactionProducer buildTransactionProducer(){ TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker); return producer; } }
通过已经与 Spring 集成好的生产者生产事务消息。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TransactionProducerTest{
//事务消息的 Producer 已经注册到了 spring 容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private TransactionProducer transactionProducer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend(){
Message msg =newMessage(mqConfig.getTopic(),"TagA","Hello MQ".getBytes());
SendResult sendResult = transactionProducer.send(msg,newLocalTransactionExecuter(){
@Override
public TransactionStatus execute(Message msg,Object arg){
System.out.println("执行本地事务");
return TransactionStatus.CommitTransaction;//根据本地事务执行结果来返回不同的 TransactionStatus
}
},null);
System.out.println(sendResult);
}
}
消费者与 SpringBoot 集成
创建
MessageListener
,如下所示。import org.springframework.stereotype.Component; import io.openmessaging.api.Action; import io.openmessaging.api.ConsumeContext; import io.openmessaging.api.Message; import io.openmessaging.api.MessageListener; @Component public class DemoMessageListener implements MessageListener{ @Override public Action consume(Message message,ConsumeContext context){ System.out.println("Receive: "+ message); try{ //do something.. return Action.CommitMessage; }catch(Exception e){ //消费失败 return Action.ReconsumeLater; } } }
声明消费者。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import com.alipay.sofa.sofamq.example.springboot.config.MqConfig; import io.openmessaging.api.Consumer; import io.openmessaging.api.OMS; import org.springframework.context.annotation.Configuration; @Configuration public class ConsumerClient{ @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod ="start", destroyMethod ="shutdown") public Consumer buildConsumer(){ Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties()) .createConsumer(mqConfig.getMqProperties()); consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener); return consumer; } }