RocketMQ——事务消息详解编程语言

事务消息就是事务里参杂着发消息的动作

如果一个事务里牵扯到发消息的操作,那么一旦消息被消费了,想要回滚,这时就变的很难

在这里插入图片描述
所以在没有commit的时候,消息只会暂时发送存在broker不会被消费,当commit成功的时候,在MQ中会将这个消息设置为真正可用,这时comusmer才会消费消息,如果执行的是rollback,在MQ中就会把这个消息撤回(就是两阶段提交的机制)

下面是事务消息执行的顺序图
在这里插入图片描述

HFMessage半消息可以为一种向broker发送的标记,与真正的消息内容完全没有关系,仅用于在broker维持该消息的状态,到底消息是可用还是不可用。

请结合以下具体代码理解上述图文

import org.apache.rocketmq.client.producer.*; 
import org.apache.rocketmq.common.message.Message; 
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.remoting.common.RemotingHelper; 
/** 
* RocketMQ——事务消息 
* 
* @author Song X. 
* @date 2020/03/04 
*/ 
public class Producer {
 
public static void main(String[] args) throws Exception {
 
TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup1"); 
producer.setNamesrvAddr("192.168.255.131:9876"); 
producer.setTransactionListener(new TransactionListener() {
 
/** 
*  这是producer执行的本地事务 
*  begin transaction: 
*      这里面的方法应该是同步的按顺序执行 
*      a() 
*      b() 
*      c() -> producer发消息 
*      d() 
*  end transaction 
* 上面的方法应使用try-catch,若发生异常则return ROLLBACK_MESSAGE 
* 若全部成功用COMMIT_MESSAGE 
* 
*      try{ 
*          a() 
*          b() 
*          c() 
*          d() 
*      } catch () { 
*          return LocalTransactionState.ROLLBACK_MESSAGE 
*      } 
*      return LocalTransactionState.COMMIT_MESSAGE 
* 
* 在这个方法中一搬不用UNKNOWN 
* 
* @param msg 就是producer send的消息 
* @param arg 
* @return  LocalTransactionState.COMMIT_MESSAGE 表示执行事务成功,确认提交 
*          LocalTransactionState.ROLLBACK_MESSAGE 表示回滚消息,broker端删除半消息(HFMessage) 
*          LocalTransactionState.UNKNOWN 表示为未知状态,等待broker回调查看 
*/ 
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
 
System.out.println("------executeLocalTransaction------"); 
System.out.println("msgBody: " + new String(msg.getBody())); 
//transactionId就是messageId 
System.out.println("msgTransactionID: " + msg.getTransactionId()); 
return LocalTransactionState.UNKNOW; 
} 
/** 
* 这是broker执行的回调函数 
* Broker端回调检查本地事务的执行状态,也就是检查executeLocalTransaction方法的执行情况 
* 
* if(...){ 
* 
*     ... //如果检查了XXX,表示事务成功了 
*     return LocalTransactionState.COMMIT_MESSAGE; 
* 
* } else if(...) { 
* 
*     ... //如果检查了XXX,表示可能executeLocalTransaction还在执行,等会回来再检查 
*     return LocalTransactionState.UNKNOWN 
* 
* } else { 
* 
*     //事务执行失败了,回滚,回滚的是HFMessage 
*     return LocalTransactionState.ROLLBACK_MESSAGE 
* 
* } 
* 
* @param msg 就是producer send的消息 
* @return LocalTransactionState.COMMIT_MESSAGE; 
*/ 
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
 
System.out.println("------checkLocalTransaction------"); 
System.out.println("msgBody: " + new String(msg.getBody())); 
//transactionId就是messageId 
System.out.println("msgTransactionID: " + msg.getTransactionId()); 
return LocalTransactionState.UNKNOW; 
} 
}); 
producer.start(); 
Message msg = new Message("myTransaction","TagA", "事务消息测试".getBytes( 
RemotingHelper.DEFAULT_CHARSET 
)); 
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, null); 
System.out.println("transactionSendResult: " + transactionSendResult); 
//      producer.shutdown(); 
} 
} 

可在两个函数中都是用UNKNOWN观察checkLocalTransaction函数的回调运行机制

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20583.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论