#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/util/Config.h>
#include <pmq/library/PMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace pmq;
using namespace pmq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
auto_ptr<PMQConnectionFactory> connectionFactory(
new PMQConnectionFactory( brokerURI ) );
try{
connection = connectionFactory->createConnection();
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
throw e;
}
if( clientAck )
{
session =
connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session =
connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
if( useTopic )
{
destination = session->createTopic( destURI );
} else
{
destination = session->createQueue( destURI );
}
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
string threadIdStr =
Long::toString( Thread::currentThread()->getId() );
string text = (string)"Hello world! from thread " + threadIdStr;
for( unsigned int ix=0; ix<numMessages; ++ix )
{
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
printf( "Sent message #%d from thread %s\n", ix+1,
threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) { e.printStackTrace(); }
destination = NULL;
try
{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) { e.printStackTrace(); }
producer = NULL;
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) { e.printStackTrace(); }
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) { e.printStackTrace(); }
connection = NULL;
}
};
int main(int argc , char* argv[])
{
pmq::library::PMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting produce message:" << std::endl;
std::cout << "-----------------------------------------------------\n";
std::string brokerURI = argv[1] ;
unsigned int numMessages = 2000;
std::string destURI = "test.Cpp";
bool useTopics = false;
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
producer.run();
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished test Cpp" << std::endl;
std::cout << "=====================================================\n";
pmq::library::PMQCPP::shutdownLibrary();
}