OpenMessaging访问RocketMQ教程(开发实战)

目前网上关于 OpenMessaging 的技术文章还很少,我个人预测它将来会很火。而阿里捐献给 Apache 的 RocketMQ 对 OpenMessaging 提供了部分实现。本文将讲解他们之间的开发实战。

前面我已经介绍过 《OpenMessaging 的架构原理》,没阅读过的可以看看,加深理解。

openmessaging 架构图

OpenMessaging包括建立行业准则和消息传递,流媒体规范为金融,电​​子商务,物联网和大数据区域提供了一个共同的框架。设计原则是分布式异构环境中面向云,简单,灵活和语言无关。符合这些规范将有可能在所有主要平台和操作系统上开发异构消息应用程序。

以下示例显示如何在同步,异步或单向传输中向RocketMQ代理发送消息。ProducerApp 生产者:

package io.openmessaging.samples.producer;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
public class ProducerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
		// :www.xttblog.com
        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        System.out.println("MessagingAccessPoint startup OK");
        producer.startup();
        System.out.println("Producer startup OK");
        //Add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
        //Sync www.xttblog.com
        {
            SendResult sendResult = producer.send(producer.createTopicBytesMessage(
                "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
        }
        //Async with Promise
        {
            final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage(
                "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            final SendResult sendResult = result.get(3000L);
            System.out.println("Send async message OK, message id is: " + sendResult.messageId());
        }
        //Async with FutureListener
        {
            final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage(
                "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new FutureListener<SendResult>() {
                @Override
                public void operationSucceeded(Future<SendResult> promise) {
                    System.out.println("Send async message OK, message id is: " + promise.get().messageId());
                }
                @Override
                public void operationFailed(Future<SendResult> promise) {
                    System.out.println("Send async message Failed, cause is: " + promise.getThrowable().getMessage());
                }
            });
        }
        //Oneway
        {
            producer.sendOneway(producer.createTopicBytesMessage(
                "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.println("Send oneway message OK");
        }
    }
}

使用PullConsumerApp轮询来自指定队列的消息。PullConsumerApp 消费者:

package io.openmessaging.samples.consumer;
import io.openmessaging.ResourceManager;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.consumer.PullConsumer;
public class PullConsumerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
        messagingAccessPoint.startup();
        System.out.println("MessagingAccessPoint startup OK");
        ResourceManager resourceManager = messagingAccessPoint.getResourceManager();
		// www.xttblog.com
        resourceManager.createQueue("NS1", "HELLO_QUEUE", OMS.newKeyValue());
        //PullConsumer only can pull messages from one queue.
        final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer("HELLO_QUEUE");
        pullConsumer.startup();
        //Poll one message from queue.
        Message message = pullConsumer.poll();
        //Acknowledges the consumed message
        pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.MessageId));
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                pullConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
    }
}

将PushConsumer附加到指定的队列,并通过MessageListener消费消息。

package io.openmessaging.samples.consumer;
import io.openmessaging.ResourceManager;
import io.openmessaging.Message;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSResourceNotExistException;
public class PushConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
        messagingAccessPoint.startup();
        System.out.println("MessagingAccessPoint startup OK");
        ResourceManager resourceManager = messagingAccessPoint.getResourceManager();
        final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
        // Consume messages from a simple queue.
        {   // :www.xttblog.com
            String simpleQueue = "HELLO_QUEUE";
            resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue());
            //This queue doesn't has a source topic, so only the message delivered to the queue directly can
            //be consumed by this consumer.
            consumer.attachQueue(simpleQueue, new MessageListener() {
                @Override
                public void onReceived(Message message, Context context) {
                    System.out.println("Received one message: " + message);
                    context.ack();
                }

            });
            consumer.startup();
            System.out.println("Consumer startup OK");
        }
        //Consume messages from a complex queue.
        final PushConsumer anotherConsumer = messagingAccessPoint.createPushConsumer();
        {
            String complexQueue = "QUEUE_HAS_SOURCE_TOPIC";
            String sourceTopic = "SOURCE_TOPIC";

            //Create the complex queue.
            resourceManager.createQueue("NS_01", complexQueue, OMS.newKeyValue());
            //Create the source topic.
            resourceManager.createTopic("NS_01", sourceTopic, OMS.newKeyValue());

        }
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                anotherConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
    }
}

相关源代码,请一步阅读:https://github.com/openmessaging/openmessaging-java。

OpenMessaging访问RocketMQ教程(开发实战)

: » OpenMessaging访问RocketMQ教程(开发实战)

原创文章,作者:Carrie001128,如若转载,请注明出处:https://blog.ytso.com/tech/java/251686.html

(0)
上一篇 2022年5月3日 10:09
下一篇 2022年5月3日 10:14

相关推荐

发表回复

登录后才能评论