RocketMQ——事务消息详解编程语言

事务消息就是事务里参杂着发消息的动作

如果一个事务里牵扯到发消息的操作,那么一旦消息被消费了,想要回滚,这时就变的很难

在这里插入图片描述
所以在没有commit的时候,消息只会暂时发送存在broker不会被消费,当commit成功的时候,在MQ中会将这个消息设置为真正可用,这时comusmer才会消费消息,如果执行的是rollback,在MQ中就会把这个消息撤回(就是两阶段提交的机制)

下面是事务消息执行的顺序图
在这里插入图片描述

HFMessage半消息可以为一种向broker发送的标记,与真正的消息内容完全没有关系,仅用于在broker维持该消息的状态,到底消息是可用还是不可用。

请结合以下具体代码理解上述图文

import org.apache.rocketmq.client.producer.*; 
import org.apache.rocketmq.common.message.Message; 
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.remoting.common.RemotingHelper; 
 
/** 
 * RocketMQ——事务消息 
 * 
 * @author Song X. 
 * @date 2020/03/04 
 */ 
public class Producer {
    
 
    public static void main(String[] args) throws Exception {
    
        TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup1"); 
        producer.setNamesrvAddr("192.168.255.131:9876"); 
         
        producer.setTransactionListener(new TransactionListener() {
    
            /** 
             *  这是producer执行的本地事务 
             *  begin transaction: 
             *      这里面的方法应该是同步的按顺序执行 
             *      a() 
             *      b() 
             *      c() -> producer发消息 
             *      d() 
             *  end transaction 
             * 上面的方法应使用try-catch,若发生异常则return ROLLBACK_MESSAGE 
             * 若全部成功用COMMIT_MESSAGE 
             * 
             *      try{ 
             *          a() 
             *          b() 
             *          c() 
             *          d() 
             *      } catch () { 
             *          return LocalTransactionState.ROLLBACK_MESSAGE 
             *      } 
             *      return LocalTransactionState.COMMIT_MESSAGE 
             * 
             * 在这个方法中一搬不用UNKNOWN 
             * 
             * @param msg 就是producer send的消息 
             * @param arg 
             * @return  LocalTransactionState.COMMIT_MESSAGE 表示执行事务成功,确认提交 
             *          LocalTransactionState.ROLLBACK_MESSAGE 表示回滚消息,broker端删除半消息(HFMessage) 
             *          LocalTransactionState.UNKNOWN 表示为未知状态,等待broker回调查看 
             */ 
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
                System.out.println("------executeLocalTransaction------"); 
                System.out.println("msgBody: " + new String(msg.getBody())); 
                //transactionId就是messageId 
                System.out.println("msgTransactionID: " + msg.getTransactionId()); 
 
                return LocalTransactionState.UNKNOW; 
            } 
 
            /** 
             * 这是broker执行的回调函数 
             * Broker端回调检查本地事务的执行状态,也就是检查executeLocalTransaction方法的执行情况 
             * 
             * if(...){ 
             * 
             *     ... //如果检查了XXX,表示事务成功了 
             *     return LocalTransactionState.COMMIT_MESSAGE; 
             * 
             * } else if(...) { 
             * 
             *     ... //如果检查了XXX,表示可能executeLocalTransaction还在执行,等会回来再检查 
             *     return LocalTransactionState.UNKNOWN 
             * 
             * } else { 
             * 
             *     //事务执行失败了,回滚,回滚的是HFMessage 
             *     return LocalTransactionState.ROLLBACK_MESSAGE 
             * 
             * } 
             * 
             * @param msg 就是producer send的消息 
             * @return LocalTransactionState.COMMIT_MESSAGE; 
             */ 
 
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    
                System.out.println("------checkLocalTransaction------"); 
                System.out.println("msgBody: " + new String(msg.getBody())); 
                //transactionId就是messageId 
                System.out.println("msgTransactionID: " + msg.getTransactionId()); 
 
                return LocalTransactionState.UNKNOW; 
            } 
        }); 
 
 
        producer.start(); 
 
        Message msg = new Message("myTransaction","TagA", "事务消息测试".getBytes( 
                RemotingHelper.DEFAULT_CHARSET 
        )); 
 
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, null); 
        System.out.println("transactionSendResult: " + transactionSendResult); 
 
 
  //      producer.shutdown(); 
 
 
    } 
} 

可在两个函数中都是用UNKNOWN观察checkLocalTransaction函数的回调运行机制

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/20583.html

(0)
上一篇 2021年7月19日 23:34
下一篇 2021年7月19日 23:34

相关推荐

发表回复

登录后才能评论