事务消息就是事务里参杂着发消息的动作
如果一个事务里牵扯到发消息的操作,那么一旦消息被消费了,想要回滚,这时就变的很难
所以在没有commit的时候,消息只会暂时发送存在broker不会被消费,当commit成功的时候,在MQ中会将这个消息设置为真正可用,这时comusmer才会消费消息,如果执行的是rollback,在MQ中就会把这个消息撤回(就是两阶段提交的机制)
下面是事务消息执行的顺序图
HFMessage半消息可以为一种向broker发送的标记,与真正的消息内容完全没有关系,仅用于在broker维持该消息的状态,到底消息是可用还是不可用。
请结合以下具体代码理解上述图文
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* RocketMQ——事务消息
*
* @author Song X.
* @date 2020/03/04
*/
public class Producer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup1");
producer.setNamesrvAddr("192.168.255.131:9876");
producer.setTransactionListener(new TransactionListener() {
/**
* 这是producer执行的本地事务
* begin transaction:
* 这里面的方法应该是同步的按顺序执行
* a()
* b()
* c() -> producer发消息
* d()
* end transaction
* 上面的方法应使用try-catch,若发生异常则return ROLLBACK_MESSAGE
* 若全部成功用COMMIT_MESSAGE
*
* try{
* a()
* b()
* c()
* d()
* } catch () {
* return LocalTransactionState.ROLLBACK_MESSAGE
* }
* return LocalTransactionState.COMMIT_MESSAGE
*
* 在这个方法中一搬不用UNKNOWN
*
* @param msg 就是producer send的消息
* @param arg
* @return LocalTransactionState.COMMIT_MESSAGE 表示执行事务成功,确认提交
* LocalTransactionState.ROLLBACK_MESSAGE 表示回滚消息,broker端删除半消息(HFMessage)
* LocalTransactionState.UNKNOWN 表示为未知状态,等待broker回调查看
*/
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("------executeLocalTransaction------");
System.out.println("msgBody: " + new String(msg.getBody()));
//transactionId就是messageId
System.out.println("msgTransactionID: " + msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}
/**
* 这是broker执行的回调函数
* Broker端回调检查本地事务的执行状态,也就是检查executeLocalTransaction方法的执行情况
*
* if(...){
*
* ... //如果检查了XXX,表示事务成功了
* return LocalTransactionState.COMMIT_MESSAGE;
*
* } else if(...) {
*
* ... //如果检查了XXX,表示可能executeLocalTransaction还在执行,等会回来再检查
* return LocalTransactionState.UNKNOWN
*
* } else {
*
* //事务执行失败了,回滚,回滚的是HFMessage
* return LocalTransactionState.ROLLBACK_MESSAGE
*
* }
*
* @param msg 就是producer send的消息
* @return LocalTransactionState.COMMIT_MESSAGE;
*/
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("------checkLocalTransaction------");
System.out.println("msgBody: " + new String(msg.getBody()));
//transactionId就是messageId
System.out.println("msgTransactionID: " + msg.getTransactionId());
return LocalTransactionState.UNKNOW;
}
});
producer.start();
Message msg = new Message("myTransaction","TagA", "事务消息测试".getBytes(
RemotingHelper.DEFAULT_CHARSET
));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("transactionSendResult: " + transactionSendResult);
// producer.shutdown();
}
}
可在两个函数中都是用UNKNOWN观察checkLocalTransaction
函数的回调运行机制
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/20583.html