SpringBoot发送RocketMQ事务消息

(1) 2024-08-22 16:12

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
SpringBoot发送RocketMQ事务消息,希望能够帮助你!!!。

事务消息流程图

SpringBoot发送RocketMQ事务消息_https://bianchenghao6.com/blog__第1张

依赖

 <!--RocketMQ相关--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency> 

配置

#RocketMQ配置 #消费者只需要配置mq的server地址即可,生产者也要配置 rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=${spring.application.name} # 发送时间超时时间 rocketmq.producer.send-message-timeout= #异步消息发送失败重试次数 rocketmq.producer.retry-times-when-send-async-failed=0 #消息发送失败后的最大重试次数 rocketmq.producer.retry-times-when-send-failed=2 #消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 rocketmq.producer.compress-message-body-threshold=4096 #消息最大容量 rocketmq.producer.max-message-size= rocketmq.producer.retry-next-server=true 

事务消息生产者

/** * @author: zhangzengxiu * @createDate: 2022/4/25 */ @Service public class ProducerMsg { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发事务消息 * * @param msg * @return */ public boolean sendTransactionMsg(String msg) { try { Message message = new Message(); message.setBody(msg.getBytes()); System.out.println("========sending message========="); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); System.out.println("========finish send ========="); return true; } catch (MessagingException e) { e.printStackTrace(); return false; } } } 

本地事务状态监听器

package com.test.msg; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; /** * 事务消息Listener * * @author: zhangzengxiu * @createDate: 2022/4/25 */ @RocketMQTransactionListener(txProducerGroup = "tx-group") public class TransactionMsgListener implements RocketMQLocalTransactionListener { /** * 执行本地事务 * 如果本地事务返回UNKNOWN,会进行事务补偿,自动执行下面的checkLocalTransaction方法 * * @param msg * @param arg * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("执行本地事务====="); //模拟提交事务 //return RocketMQLocalTransactionState.COMMIT; //模拟回滚事务 //return RocketMQLocalTransactionState.ROLLBACK; //让去check本地事务状态 进行事务补偿 return RocketMQLocalTransactionState.UNKNOWN; } /** * 检测本地事务状态 * 事务补偿过程 * 当消息服务器没有收到消息生产者的事务提交或者回滚确认时,会主动要求消息生产者进行确认, * 消息生产者便会去检测本地事务状态,该过程称为事务补偿过程 * * @param msg * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { System.out.println("执行事务补偿======"); //事务补偿提交 return RocketMQLocalTransactionState.COMMIT; //事务补偿回滚 //return RocketMQLocalTransactionState.ROLLBACK; //如果事务补偿过程还是UNKNOWN 就会一直进行事务补偿,60s一次 //return RocketMQLocalTransactionState.UNKNOWN; } } 

事务消息消费者

package com.test.msg; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 消费事务消息的消费者 * * @author: zhangzengxiu * @createDate: 2022/4/25 */ @Component @RocketMQMessageListener(topic = "topic-tx", consumerGroup = "consumerMsg3") public class ConsumerMsg3 implements RocketMQListener { @Override public void onMessage(Object message) { System.out.println("xiaofei==="); System.out.println(message); } } 

Java之RocketMQ详解

今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

上一篇

已是最后文章

下一篇

已是最新文章

发表回复