PrmetonMQ CPP Client测试
测试代码demo提供如下:
consumer.cpp
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/core/PMQConnection.h>
#include <pmq/transport/DefaultTransportListener.h>
#include <pmq/library/PMQCPP.h>
#include <decaf/lang/Integer.h>
#include <pmq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace pmq;
using namespace pmq::core;
using namespace pmq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
std::string brokerURI;
std::string destURI;
bool clientAck;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~SimpleAsyncConsumer() {
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
//1、创建工厂连接对象,需要制定ip和端口号
PMQConnectionFactory* connectionFactory =
new PMQConnectionFactory( brokerURI );
//2、使用连接工厂创建一个连接对象
connection = connectionFactory->createConnection();
delete connectionFactory;
PMQConnection* pmqConnection =
dynamic_cast<PMQConnection*>( connection );
if( pmqConnection != NULL )
{
pmqConnection->addTransportListener( this );
}
//3、开启连接
connection->start();
connection->setExceptionListener(this);
//4、使用连接对象创建会话(session)对象
if( clientAck ) {
session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
}
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
//6、使用会话对象创建生产者对象
consumer = session->createConsumer( destination );
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
//8、程序等待接收用户消息
virtual void onMessage( const Message* message ) {
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex PMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout <<
"The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout <<
"The Connection's Transport has been Restored." << std::endl;
}
private:
//9、关闭资源
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
int main(int argc, char* argv[]) {
pmq::library::PMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI = argv[1] ;//"tcp://192.168.17.218:61616";
std::string destURI = "test.Cpp";
bool useTopics = false;
bool clientAck = false;
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
consumer.runConsumer();
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
pmq::library::PMQCPP::shutdownLibrary();
}
producer.cpp
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/util/Config.h>
#include <pmq/library/PMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace pmq;
using namespace pmq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
//1、创建工厂连接对象,需要制定ip和端口号
auto_ptr<PMQConnectionFactory>
connectionFactory(new PMQConnectionFactory( brokerURI ) );
try{
//2、使用连接工厂创建一个连接对象
connection = connectionFactory->createConnection();
//3、开启连接
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
throw e;
}
//4、使用连接对象创建会话(session)对象
if( clientAck )
{
session =
connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else
{
session =
connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
if( useTopic )
{
destination = session->createTopic( destURI );
} else
{
destination = session->createQueue( destURI );
}
//6、使用会话对象创建生产者对象
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
string threadIdStr = Long::toString(
Thread::currentThread()->getId() );
string text =
(string)"Hello world! from thread " + threadIdStr;
for( unsigned int ix=0; ix<numMessages; ++ix )
{
//7、使用会话对象创建一个消息对象
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
printf( "Sent message #%d from thread %s\n", ix+1,
threadIdStr.c_str() );
//8、发送消息
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
//9、关闭资源
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) { e.printStackTrace(); }
destination = NULL;
try
{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) { e.printStackTrace(); }
producer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) { e.printStackTrace(); }
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) { e.printStackTrace(); }
connection = NULL;
}
};
int main(int argc , char* argv[])
{
pmq::library::PMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting produce message:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI = argv[1] ;//"tcp://192.168.30.222:61616";
unsigned int numMessages = 2000;
std::string destURI = "test.Cpp";
bool useTopics = false;
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
producer.run();
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished test Cpp" << std::endl;
std::cout << "=====================================================\n";
pmq::library::PMQCPP::shutdownLibrary();
}
1、编译生成可执行文件:
g++ consumer.cpp -o consumer -I/usr/local/PMQ-CPP/include/pmq-cpp-3.10.0
-I/usr/local/apr/include/apr-1 -L/usr/local/PMQ-CPP/lib -lpmq-cpp
g++ producer.cpp -o producer -I/usr/local/PMQ-CPP/include/pmq-cpp-3.10.0
-I/usr/local/apr/include/apr-1 -L/usr/local/PMQ-CPP/lib -lpmq-cpp
2、运行
./consumer tcp://192.168.30.222:61616
./producer tcp://192.168.30.222:61616
注:tcp://192.168.30.222:61616 为pmq server 地址