ActiveMQ 与ActiveMQ-CPP详解程序员

一天功夫终于知道怎么使用activeMQ的c++接口了。 花费时间最多的是不知道activemqcpp需要在activeMQ的基础上使用。

下面分别讲述activeMQ和activemqcpp的安装和使用。

一。 ActiveMQ的安装

      1. 下载 源包: http://activemq.apache.org/download-archives.html

       2. tar zxvf  apache-activemq-xxx.bin.tar.gz -C /usr/local
       3. cd apache-activemq-xxx/bin

       4. chmod 755 activemq

       5. 启动   ./activemq start

       启动后就可以查 看:  http://localhost:8161/admin, 默认 用户名和密码都是admin

 二。 安装ActiveMQ-CPP

          需要较多的依赖。参考 http://blog.csdn.net/lgh1700/article/details/51055784

(1)cppunit 
 
打开http://activemq.apache.org/cms/building.html页面,这里介绍了cms build时用到的依赖库。 
cppunit下载页面: 
https://sourceforge.net/projects/cppunit/files/cppunit/1.12.1/ 
这里选择1.12.1版本,获取到下载地址后,在linux下可以用wget命令直接下载,或者下载完成后传到linux系统中。 
 
tar解压后,进入目录,编译三部曲,configure、make、make install(install需要root权限): 
./configure --prefix=/usr/local/cppunit/ 
make 
make install 
执行完后在/usr/local/cppunit/目录下可以看到头文件和库文件。 
(2)apr 
 
apr的全称为Apache Portable Runtime(Apache可移植运行时),Apache旗下有很多开源软件。 
 
apr介绍页面: 
http://apr.apache.org/download.cgi 
这里选择最新的APR 1.5.2版本,地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-1.5.2.tar.gz 
 
同上,解压进入目录,三部曲: 
./configure --prefix=/usr/local/apr/ 
make 
make install 
(3)apr-util 
 
这里选择最新的APR-util 1.5.4版本,下载地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-util-1.5.4.tar.gz 
 
解压编译: 
./configure --prefix=/usr/local/aprutil --with-apr=/usr/local/apr/ 
make 
make install 
(4)apr-iconv 
 
这里选择最新的APR iconv 1.2.1版本,地址为: 
http://mirrors.hust.edu.cn/apache//apr/apr-iconv-1.2.1.tar.gz 
 
解压编译: 
./configure --prefix=/usr/local/apr-iconv/ --with-apr=/usr/local/apr/ 
make 
make install 
(5)openssl 
 
这里选择openssl 1.0.0a版本,下载地址为: 
http://www.openssl.org/source/openssl-1.0.0a.tar.gz 
 
解压编译: 
./config --prefix=/usr/local/openssl/ 
make 
make install 
 
若出现报错 
cms.pod around line 457: Expected text after =item, not a number 
在root权限下,执行rm -f /usr/bin/pod2man 然后重新make install 
(6)ActiveMQ-CPP 
 
这里选择最新的ActiveMQ-CPP 3.9.3版本,下载页面为: 
http://activemq.apache.org/cms/activemq-cpp-393-release.html 
 
解压编译: 
./configure --prefix=/usr/local/ActiveMQ-CPP --with-apr=/usr/local/apr/ --with-apr-util=/usr/local/aprutil --with-cppunit=/usr/local/cppunit --with-openssl=/usr/local/openssl 
make 
make install  
这样,/usr/local/下有了ActiveMQ-CPP目录。 包含activemqcpp的头文件和lib文件 

三。 示例

        启动ActiveMQ,然后启动activemqcpp的代码。 再查看http://localhost:8161/admin中的queue、topic等

       测试用例使用了别人的:

/* 
 * Licensed to the Apache Software Foundation (ASF) under one or more 
 * contributor license agreements.  See the NOTICE file distributed with 
 * this work for additional information regarding copyright ownership. 
 * The ASF licenses this file to You under the Apache License, Version 2.0 
 * (the "License"); you may not use this file except in compliance with 
 * the License.  You may obtain a copy of the License at 
 * 
 *	 http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */ 
// START SNIPPET: demo 
#include <activemq/library/ActiveMQCPP.h> 
#include <decaf/lang/Thread.h> 
#include <decaf/lang/Runnable.h> 
#include <decaf/util/concurrent/CountDownLatch.h> 
#include <decaf/lang/Integer.h> 
#include <decaf/lang/Long.h> 
#include <decaf/lang/System.h> 
#include <activemq/core/ActiveMQConnectionFactory.h> 
#include <activemq/util/Config.h> 
#include <cms/Connection.h> 
#include <cms/Session.h> 
#include <cms/TextMessage.h> 
#include <cms/BytesMessage.h> 
#include <cms/MapMessage.h> 
#include <cms/ExceptionListener.h> 
#include <cms/MessageListener.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <iostream> 
#include <memory> 
#include <decaf/util/Random.h> 
using namespace activemq::core; 
using namespace decaf::util::concurrent; 
using namespace decaf::util; 
using namespace decaf::lang; 
using namespace cms; 
using namespace std; 
#define  QUEUE_NAME	"eventQueue" 
#define NAME_BYTE_LEN		16 
#define USE_PRODUCER 1 
class HelloWorldProducer : public ExceptionListener, 
							public MessageListener, 
							public Runnable { 
private: 
	CountDownLatch latch; 
	CountDownLatch doneLatch; 
	Connection* connection; 
	Session* session; 
	Destination* destination; 
	MessageProducer* producer; 
	int numMessages; 
	bool useTopic; 
	bool sessionTransacted; 
	std::string brokerURI; 
	bool bReciveMessage; 
	long waitMillis; 
private: 
	HelloWorldProducer(const HelloWorldProducer&); 
	HelloWorldProducer& operator=(const HelloWorldProducer&); 
public: 
	HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, 
		long waitMillis=3000) : 
		latch(1), 
		doneLatch(numMessages),   
		connection(NULL), 
		session(NULL), 
		destination(NULL), 
		producer(NULL), 
		numMessages(numMessages), 
		useTopic(useTopic), 
		sessionTransacted(sessionTransacted), 
		brokerURI(brokerURI) , 
		bReciveMessage(false), 
		waitMillis(waitMillis) 
		{ } 
	virtual ~HelloWorldProducer(){ 
		cleanup(); 
	} 
	void close() { 
		this->cleanup(); 
	} 
	void waitUntilReady() { 
		latch.await(); 
	} 
	virtual void run() { 
		try { 
            cout<<"producer run"<<endl; 
			// Create a ConnectionFactory 
			auto_ptr<ConnectionFactory> connectionFactory( 
				ConnectionFactory::createCMSConnectionFactory(brokerURI)); 
            cout<<"98"<<endl; 
            cout<<"brokerURI = "<<brokerURI<<endl; 
			// Create a Connection 
			connection = connectionFactory->createConnection(); 
            cout<<"101"<<endl; 
			connection->start(); 
            cout<<"103"<<endl; 
			// Create a Session 
			if (this->sessionTransacted) { 
				session = connection->createSession(Session::SESSION_TRANSACTED); 
			} else { 
				session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
			} 
			session = connection->createSession(); 
            cout<<"session"<<endl; 
			// Create the destination (Topic or Queue) 
			if (useTopic) { 
				destination = session->createTopic(QUEUE_NAME); 
			} else { 
				destination = session->createQueue(QUEUE_NAME); 
			} 
            cout<<"topic or queeu"<<endl; 
			// Create a MessageProducer from the Session to the Topic or Queue 
			producer = session->createProducer(destination); 
			producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 
            cout<<"topic or queeu 2222"<<endl; 
			// Create the Thread Id String 
			string threadIdStr = Long::toString(Thread::currentThread()->getId()); 
			// Create a messages 
			string text = (string) "Hello world! from thread " + threadIdStr; 
            cout<<text<<endl; 
            cout<<numMessages<<endl; 
			for (int ix = 0; ix < numMessages; ++ix) { 
				std::auto_ptr<TextMessage> message(session->createTextMessage(text)); 
				//鍏抽敭娑堟伅... 
				std::auto_ptr<Destination> tempDest(session->createTemporaryQueue()); 
				//cms::Destination tempDest=session->createTemporaryTopic() ;  
				MessageConsumer * responseConsumer = session->createConsumer(tempDest.get());   
				responseConsumer->setMessageListener(this);//鐩戝惉... 
				message->setCMSReplyTo(tempDest.get()); 
				Random random; 
				char buffer[NAME_BYTE_LEN]={0}; 
				random.nextBytes((unsigned char *)buffer,NAME_BYTE_LEN); 
				string correlationId=""; 
				for(int i=0;i<NAME_BYTE_LEN;++i) 
				{ 
					char ch[NAME_BYTE_LEN*2]={0}; 
					sprintf(ch,"%02X",(unsigned char)buffer[i]); 
					string str(ch); 
					correlationId+=str; 
				} 
				message->setCMSCorrelationID(correlationId); 
				message->setIntProperty("Integer", ix); 
				printf("Producer Sent message #%d from thread %s/n", ix + 1, threadIdStr.c_str()); 
				producer->send(message.get()); 
				// Indicate we are ready for messages. 
				latch.countDown(); 
				// Wait while asynchronous messages come in. 
				doneLatch.await(waitMillis); 
			}  
		}	 
		catch (CMSException& e) { 
			printf("Producer run() CMSException /n" ); 
				// Indicate we are ready for messages. 
				latch.countDown(); 
				e.printStackTrace(); 
			} 
		} 
	// Called from the Producer since this class is a registered MessageListener. 
	virtual void onMessage(const Message* message) { 
		static int count = 0; 
		try { 
			count++; 
			const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 
			//ActiveMQMessageTransformation 
			//std::auto_ptr<TextMessage> responsemessage(session->createTextMessage()); 
			//responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 
			//responsemessage->getCMSReplyTo() 
			string text = ""; 
			if (textMessage != NULL) { 
				text = textMessage->getText(); 
			} else { 
				text = "NOT A TEXTMESSAGE!"; 
			} 
			printf("Producer Message #%d Received: %s/n", count, text.c_str()); 
			//producer.send 
		} catch (CMSException& e) { 
			printf("Producer onMessage() CMSException /n" ); 
			e.printStackTrace(); 
		} 
		// Commit all messages. 
		if (this->sessionTransacted) { 
			session->commit(); 
		} 
		// No matter what, tag the count down latch until done. 
		doneLatch.countDown(); 
	} 
	// If something bad happens you see it here as this class is also been 
	// registered as an ExceptionListener with the connection. 
	virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
		printf("Producer onException() CMS Exception occurred.  Shutting down client. /n" ); 
		ex.printStackTrace(); 
		exit(1); 
	} 
private: 
	void cleanup() { 
		if (connection != NULL) { 
			try { 
				connection->close(); 
			} catch (cms::CMSException& ex) { 
				ex.printStackTrace(); 
			} 
		} 
		// Destroy resources. 
		try { 
			delete destination; 
			destination = NULL; 
			delete producer; 
			producer = NULL; 
			delete session; 
			session = NULL; 
			delete connection; 
			connection = NULL; 
		} catch (CMSException& e) { 
			e.printStackTrace(); 
		} 
	} 
}; 
class HelloWorldConsumer : public ExceptionListener, 
						   public MessageListener, 
						   public Runnable { 
private: 
	CountDownLatch latch; 
	CountDownLatch doneLatch; 
	Connection* connection; 
	Session* session; 
	Destination* destination; 
	MessageConsumer* consumer; 
	MessageProducer *producer; 
	long waitMillis; 
	bool useTopic; 
	bool sessionTransacted; 
	std::string brokerURI; 
private: 
	HelloWorldConsumer(const HelloWorldConsumer&); 
	HelloWorldConsumer& operator=(const HelloWorldConsumer&); 
public: 
	HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) : 
		latch(1), 
		doneLatch(numMessages), 
		connection(NULL), 
		session(NULL), 
		destination(NULL), 
		consumer(NULL), 
		producer(NULL), 
		waitMillis(waitMillis), 
		useTopic(useTopic), 
		sessionTransacted(sessionTransacted), 
		brokerURI(brokerURI) { 
	} 
	virtual ~HelloWorldConsumer() { 
		cleanup(); 
	} 
	void close() { 
		this->cleanup(); 
	} 
	void waitUntilReady() { 
		latch.await(); 
	} 
	virtual void run() { 
		try { 
            cout<<"consumer run"<<endl; 
			// Create a ConnectionFactory 
			auto_ptr<ConnectionFactory> connectionFactory( 
				ConnectionFactory::createCMSConnectionFactory(brokerURI)); 
			// Create a Connection 
			connection = connectionFactory->createConnection(); 
			connection->start(); 
			connection->setExceptionListener(this); 
			// Create a Session 
			if (this->sessionTransacted == true) { 
				session = connection->createSession(Session::SESSION_TRANSACTED); 
			} else { 
				session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
			} 
			// Create the destination (Topic or Queue) 
			if (useTopic) { 
				destination = session->createTopic(QUEUE_NAME); 
			} else { 
				destination = session->createQueue(QUEUE_NAME); 
			} 
			producer = session->createProducer(); 
			producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 
			// Create a MessageConsumer from the Session to the Topic or Queue 
			consumer = session->createConsumer(destination); 
			consumer->setMessageListener(this); 
			std::cout.flush(); 
			std::cerr.flush(); 
			// Indicate we are ready for messages. 
			latch.countDown(); 
			// Wait while asynchronous messages come in. 
			doneLatch.await(); 
		} catch (CMSException& e) { 
			printf("Consumer onException() CMS Exception occurred.  Shutting down client. /n" ); 
			// Indicate we are ready for messages. 
			latch.countDown(); 
			e.printStackTrace(); 
		} 
	} 
	// Called from the consumer since this class is a registered MessageListener. 
	virtual void onMessage(const Message* message) { 
		static int count = 0; 
		try { 
			count++; 
			// Create the Thread Id String 
			string threadIdStr = Long::toString(Thread::currentThread()->getId()); 
			static bool bPrintf=true; 
			if(bPrintf) 
			{ 
				bPrintf=false; 
				printf("consumer Message threadid: %s/n",  threadIdStr.c_str()); 
			} 
			string strReply="consumer return  xxx,ThreadID="+threadIdStr; 
			const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 
			if(NULL==textMessage) 
			{ 
				printf("NULL==textMessage", message->getCMSType().c_str()); 
				//const cms::MapMessage* mapMsg = dynamic_cast<const cms::MapMessage*>(message); 
				//if(mapMsg) 
				//{ 
				//	 
				//	std::vector<std::string> elements = mapMsg->getMapNames(); 
				//	std::vector<std::string>::iterator iter = elements.begin(); 
				//	for(; iter != elements.end() ; ++iter)  
				//	{ 
				//		std::string key = *iter; 
				//		cms::Message::ValueType elementType = mapMsg->getValueType(key); 
				//		string strxxx; 
				//		int cc=0; 
				//		switch(elementType) { 
				//	case cms::Message::BOOLEAN_TYPE: 
				//		//msg->setBoolean(key, mapMsg->getBoolean(key)); 
				//		break; 
				//	case cms::Message::BYTE_TYPE: 
				//		//msg->setByte(key, mapMsg->getByte(key)); 
				//		break; 
				//	case cms::Message::BYTE_ARRAY_TYPE: 
				//		//msg->setBytes(key, mapMsg->getBytes(key)); 
				//		break; 
				//	case cms::Message::CHAR_TYPE: 
				//		//msg->setChar(key, mapMsg->getChar(key)); 
				//		break; 
				//	case cms::Message::SHORT_TYPE: 
				//		//msg->setShort(key, mapMsg->getShort(key)); 
				//		break; 
				//	case cms::Message::INTEGER_TYPE: 
				//		//msg->setInt(key, mapMsg->getInt(key)); 
				//		break; 
				//	case cms::Message::LONG_TYPE: 
				//		//msg->setLong(key, mapMsg->getLong(key)); 
				//		break; 
				//	case cms::Message::FLOAT_TYPE: 
				//		//msg->setFloat(key, mapMsg->getFloat(key)); 
				//		break; 
				//	case cms::Message::DOUBLE_TYPE: 
				//		//msg->setDouble(key, mapMsg->getDouble(key)); 
				//		break; 
				//	case cms::Message::STRING_TYPE: 
				//		//msg->setString(key, mapMsg->getString(key)); 
				//		strxxx=mapMsg->getString(key); 
				//		cc=1; 
				//		break; 
				//	default: 
				//		break; 
				//		} 
				//	} 
				//} 
				return; 
			} 
			std::auto_ptr<TextMessage> responsemessage(session->createTextMessage(strReply)); 
			responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 
			string text = ""; 
			if (textMessage != NULL) { 
				text = textMessage->getText(); 
			} else { 
				text = "NOT A TEXTMESSAGE!"; 
			} 
			int nProPerty=textMessage->getIntProperty("Integer"); 
			printf("consumer Message #%d Received: %s,nProPerty[%d]/n", count, text.c_str(),nProPerty); 
			const cms::Destination* destSend=textMessage->getCMSReplyTo(); 
			if(destSend) 
			{ 
				this->producer->send(destSend,responsemessage.get()); 
				printf("consumer Message #%d send: %s/n", count, strReply.c_str()); 
			} 
		} catch (CMSException& e) { 
			printf("Consumer onMessage() CMS Exception occurred.  Shutting down client. /n" ); 
			e.printStackTrace(); 
		} 
		// Commit all messages. 
		if (this->sessionTransacted) { 
			session->commit(); 
		} 
		// No matter what, tag the count down latch until done. 
		//doneLatch.countDown(); 
	} 
	// If something bad happens you see it here as this class is also been 
	// registered as an ExceptionListener with the connection. 
	virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
		printf("Consumer onException() CMS Exception occurred.  Shutting down client. /n" ); 
		//printf("CMS Exception occurred.  Shutting down client./n"); 
		ex.printStackTrace(); 
		exit(1); 
	} 
private: 
	void cleanup() { 
		if (connection != NULL) { 
			try { 
				connection->close(); 
			} catch (cms::CMSException& ex) { 
				ex.printStackTrace(); 
			} 
		} 
		// Destroy resources. 
		try { 
			delete destination; 
			destination = NULL; 
			delete consumer; 
			consumer = NULL; 
			delete session; 
			session = NULL; 
			delete connection; 
			connection = NULL; 
		} catch (CMSException& e) { 
			e.printStackTrace(); 
		} 
	} 
}; 
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { 
	//if(argc<2) 
	//{ 
	//	printf("argc<2/r/n"); 
	//	return 0; 
	//} 
	activemq::library::ActiveMQCPP::initializeLibrary(); 
	{ 
	std::cout << "=====================================================/n"; 
	std::cout << "Starting the example:" << std::endl; 
	std::cout << "-----------------------------------------------------/n"; 
	// Set the URI to point to the IP Address of your broker. 
	// add any optional params to the url to enable things like 
	// tightMarshalling or tcp logging etc.  See the CMS web site for 
	// a full list of configuration options. 
	// 
	//  http://activemq.apache.org/cms/ 
	// 
	// Wire Format Options: 
	// ========================= 
	// Use either stomp or openwire, the default ports are different for each 
	// 
	// Examples: 
	//	tcp://127.0.0.1:61616					  default to openwire 
	//	tcp://127.0.0.1:61616?wireFormat=openwire  same as above 
	//	tcp://127.0.0.1:61613?wireFormat=stomp	 use stomp instead 
	// 
	// SSL: 
	// ========================= 
	// To use SSL you need to specify the location of the trusted Root CA or the 
	// certificate for the broker you want to connect to.  Using the Root CA allows 
	// you to use failover with multiple servers all using certificates signed by 
	// the trusted root.  If using client authentication you also need to specify 
	// the location of the client Certificate. 
	// 
	//	 System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" ); 
	//	 System::setProperty( "decaf.net.ssl.keyStorePassword", "password" ); 
	//	 System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" ); 
	// 
	// The you just specify the ssl transport in the URI, for example: 
	// 
	//	 ssl://localhost:61617 
	// 
	std::string brokerURI = 
		"failover:(tcp://localhost:61616" 
//		"?wireFormat=openwire" 
//		"&transport.useInactivityMonitor=false" 
//		"&connection.alwaysSyncSend=true" 
//		"&connection.useAsyncSend=true" 
//		"?transport.commandTracingEnabled=true" 
//		"&transport.tcpTracingEnabled=true" 
//		"&wireFormat.tightEncodingEnabled=true" 
		")"; 
	//============================================================ 
	// set to true to use topics instead of queues 
	// Note in the code above that this causes createTopic or 
	// createQueue to be used in both consumer an producer. 
	//============================================================ 
	bool useTopics = false; 
	bool sessionTransacted = true; 
	int numMessages = 1; 
	bool useConsumer=true; 
	bool useProducer=true; 
	//int nSet=atoi(argv[1]); 
	//if(1==nSet) 
	//{ 
		//#define USE_COMSUMER 
	//} 
	//else 
	//{ 
		//#define USE_PRODUCER 
		// 
	//} 
	long long startTime = System::currentTimeMillis(); 
#ifdef USE_PRODUCER 
	printf("褰撳墠 USE_PRODUCER /r/n"); 
	int numProducerMessages = 30; 
	int nThreadNumber=10; 
	vector<HelloWorldProducer *> vHelloWorldProducer; 
	for(int i=0;i<nThreadNumber;++i) 
	{ 
		HelloWorldProducer * producerTemp=new HelloWorldProducer(brokerURI, numProducerMessages, useTopics); 
		vHelloWorldProducer.push_back(producerTemp); 
	} 
#endif 
#ifdef USE_COMSUMER 
	printf("褰撳墠 USE_COMSUMER /r/n"); 
	HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted); 
	// Start the consumer thread. 
	Thread consumerThread(&consumer); 
	consumerThread.start(); 
	// Wait for the consumer to indicate that its ready to go. 
	consumer.waitUntilReady(); 
#endif 
#ifdef USE_PRODUCER 
	// Start the producer thread. 
	vector<Thread *> vThread; 
	for(int i=0;i<nThreadNumber;++i) 
	{ 
		HelloWorldProducer & ProducerTemp=*vHelloWorldProducer[i]; 
		Thread * threadTemp=new Thread(&ProducerTemp); 
		vThread.push_back(threadTemp); 
		threadTemp->start(); 
		ProducerTemp.waitUntilReady(); 
	} 
	for(int i=0;i<vThread.size();++i) 
	{ 
		Thread * threadTemp=vThread[i]; 
		//threadTemp->join(); 
	} 
	while(1) 
	{ 
		Thread::sleep(10); 
	} 
	//Thread producerThread1(&producer1); 
	//producerThread1.start(); 
	//producer1.waitUntilReady(); 
	//Thread producerThread2(&producer2); 
	//producerThread2.start(); 
	//producer2.waitUntilReady(); 
	//Thread producerThread3(&producer3); 
	//producerThread3.start(); 
	//producer3.waitUntilReady(); 
#endif 
#ifdef USE_PRODUCER 
	// Wait for the threads to complete. 
	//producerThread1.join(); 
	//producerThread2.join(); 
	//producerThread3.join(); 
#endif 
#ifdef USE_COMSUMER 
	consumerThread.join(); 
#endif 
	long long endTime = System::currentTimeMillis(); 
	double totalTime = (double)(endTime - startTime) / 1000.0; 
#ifdef USE_PRODUCER 
	//producer1.close(); 
	//producer2.close(); 
	//producer3.close(); 
	for(int i=0;i<vHelloWorldProducer.size();++i) 
	{ 
		HelloWorldProducer * ProducerTemp=vHelloWorldProducer[i]; 
		ProducerTemp->close(); 
		if(ProducerTemp) 
		{ 
			delete ProducerTemp; 
			ProducerTemp=NULL; 
		} 
	} 
#endif 
#ifdef USE_COMSUMER 
	consumer.close(); 
#endif 
	std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; 
	std::cout << "-----------------------------------------------------/n"; 
	std::cout << "Finished with the example." << std::endl; 
	std::cout << "=====================================================/n"; 
	} 
	activemq::library::ActiveMQCPP::shutdownLibrary(); 
	return 0; 
} 
// END SNIPPET: demo 
 

以上代码即包含了生产者又包含了消费者。 使用ifdef实现分开编译。   将以上代码做两份,分别编译生产者和消费者。

在终端1中启动生产者。 会发现8161/admin中的queue中有event_queue, 并且有消息不断入队列;

另一个终端启动消费者,可以发现消息队列中消息在被消费。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/7469.html

(0)
上一篇 2021年7月17日 09:01
下一篇 2021年7月17日 09:01

相关推荐

发表回复

登录后才能评论