RocketMQ事务消息怎样实现

这篇文章主要介绍RocketMQ事务消息怎样实现,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。

官方版本未发布之前,从apache
rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT、ROLLBACK、PREPARED,在事务消息未开源之前网上对于事务消息的“声音”基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的:

  • TRANSACTION_PREPARED_TYPE

  • TRANSACTION_COMMIT_TYPE

  • TRANSACTION_ROLLBACK_TYPE

消息发送者首先发送TRANSACTION_PREPARED_TYPE类型的消息,然后根据事务状态来决定是提交或回滚事务发送commit请求或rollback请求,如果commit/rollback请求丢失后,rocketmq会在指定超时时间后回查事务状态来决定提交或回滚事务。

让我们各自带着自己的理解和猜测,从阅读RocketMQ官方提供的Demo程序入手,试图窥探一些大体的信息。

Demo示例程序位于:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。该包中未放置消息消费者,为了验证事务的消息消费情况,我们可以从其他包copy一个消费者,从而先运行生产者,然后运行消费者,判断事务消息的预发放、提交、回滚等效果,二话不说,先运行一下,看下效果再说:
消息发送端运行结果:

SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]

消息消费端效果:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='C0A8010518DC6D06D69C8D5768640007'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='C0A8010518DC6D06D69C8D57680F0001'}]]

综上所述,服务端发送了10条消息,而消费端只收到3条消息,应该是由于事务回滚,造成只提交了3条消息,为了更加严谨,可以安装一个rocketmq-consonse,更加直观的观察shangshagn's上述结果:
RocketMQ事务消息怎样实现cdn.com/97db9b9f7bc18478739f472e134cf48eb3af8cad.png”>

接下来对示例代码进行解读:

1、生产者端代码解读:

public class TransactionProducer {    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();        // @1
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");                return thread;
            }
        });      // @2
        producer.setExecutorService(executorService);                                // @3
        producer.setTransactionListener(transactionListener);                      // @4
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};        for (int i = 0; i < 10; i++) {                                                                    // @5
            try {
                Message msg =                    new Message("transaction_topic_test", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }        for (int i = 0; i < 100000; i++) {     //这里只是阻止生产者过早退出,导致事务消息的相关机制无法运行
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

代码@1:创建TransactionListener 实例,字面理解为事务消息事件监听器,下文详细对其进行展开。
代码@2:ExecutorService

executorService,创建一个线程池,其线程的名称前缀”client-transaction-msg-check-thread“,从字面理解为客户端事务消息状态检测线程,我们可以大胆的猜测一下是不是这个线程池调用TransactionListener方法,完成对事务消息的检测呢?【这里只是作者的猜测,大家不能当真,在作者后续文章发布后,如果该观点错误,会加以修复,这里写出来,主要是想分享一下我读源码的方法】。
代码@3:为事务消息发送者设置线程池。
代码@4:为事务消息发送者设置事务监听器。
代码@5:发送10条消息。

2、TransactionListener代码解读

public class TransactionListenerImpl implements TransactionListener {    private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        int value = transactionIndex.getAndIncrement();        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);        return LocalTransactionState.UNKNOW;
    }    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());        if (null != status) {            switch (status) {                case 0:                    return LocalTransactionState.UNKNOW;                case 1:                    return LocalTransactionState.COMMIT_MESSAGE;                case 2:                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  1. executeLocalTransaction方法:记录本地事务的事务状态,这里其实现就是循环设置事务消息的状态为0,1,2,demo中是把消息的状态数据存放在一个Map中。实际应用时通常会持久化消息的事务状态,例如数据库或缓存。

  2. checkLocalTransaction方法,事务回查业务实现,查本地事务表,判断事务的状态如为0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。这里就能解释,生产者连续发10条消息,因为只有3条消息的事务状态为COMMIT_MESSAGE,故消息消费者只能消费3条。

以上是“RocketMQ事务消息怎样实现”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!

原创文章,作者:kirin,如若转载,请注明出处:https://blog.ytso.com/219954.html

(0)
上一篇 2022年1月2日
下一篇 2022年1月2日

相关推荐

发表回复

登录后才能评论