#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/util/Config.h>
#include <pmq/library/PMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

using namespace pmq;
using namespace pmq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;

private:

    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );

public:

    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false,
                    bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) {
    }

    virtual ~SimpleProducer(){
        cleanup();
    }

    void close() {
        this->cleanup();
    }

    virtual void run() {
        try {
        //1、创建工厂连接对象,需要制定ip和端口号
            auto_ptr<PMQConnectionFactory> connectionFactory(
                                new PMQConnectionFactory( brokerURI ) );

           try{
        //2、使用连接工厂创建一个连接对象
                connection = connectionFactory->createConnection();
        //3、开启连接
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                throw e;
            }
        //4、使用连接对象创建会话(session)对象 
            if( clientAck ) 
            {
            session = 
            connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
            session = 
            connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
          //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            if( useTopic ) 
            {
                destination = session->createTopic( destURI );
            } else 
            {
                destination = session->createQueue( destURI );
            }
         //6、使用会话对象创建生产者对象
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            string threadIdStr = 
            Long::toString( Thread::currentThread()->getId() );
            string text = (string)"Hello world! from thread " + threadIdStr;

            for( unsigned int ix=0; ix<numMessages; ++ix )
            {
        //7、使用会话对象创建一个消息对象
                TextMessage* message = session->createTextMessage( text );
                message->setIntProperty( "Integer", ix );
                printf( "Sent message #%d from thread %s\n", ix+1, 
                        threadIdStr.c_str() );
        //8、发送消息
                producer->send( message );
                delete message;
            }

        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }

private:
        //9、关闭资源
    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination = NULL;

        try
        {
            if( producer != NULL ) delete producer;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer = NULL;

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }

        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};

int main(int argc , char* argv[]) 
{

    pmq::library::PMQCPP::initializeLibrary();
    std::cout << "=====================================================\n";
    std::cout << "Starting produce message:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    std::string brokerURI = argv[1] ;//"tcp://192.168.30.222:61616";
    unsigned int numMessages = 2000;
    std::string destURI = "test.Cpp";

    bool useTopics = false;
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
    producer.run();
    producer.close();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished test Cpp" << std::endl;
    std::cout << "=====================================================\n";

    pmq::library::PMQCPP::shutdownLibrary();
}

results matching ""

    No results matching ""