PrmetonMQ CPP Client测试

测试代码demo提供如下:

consumer.cpp

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/core/PMQConnection.h>
#include <pmq/transport/DefaultTransportListener.h>
#include <pmq/library/PMQCPP.h>
#include <decaf/lang/Integer.h>
#include <pmq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>

using namespace pmq;
using namespace pmq::core;
using namespace pmq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;

private:

    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );

public:

    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck) {
    }

    virtual ~SimpleAsyncConsumer() {
        this->cleanup();
    }

    void close() {
        this->cleanup();
    }

    void runConsumer() {

        try {
    //1、创建工厂连接对象,需要制定ip和端口号
            PMQConnectionFactory* connectionFactory = 
                        new PMQConnectionFactory( brokerURI );
    //2、使用连接工厂创建一个连接对象

            connection = connectionFactory->createConnection();
            delete connectionFactory;

            PMQConnection* pmqConnection = 
                    dynamic_cast<PMQConnection*>( connection );
            if( pmqConnection != NULL ) 
            {
                pmqConnection->addTransportListener( this );
            }
     //3、开启连接
            connection->start();
            connection->setExceptionListener(this);
    //4、使用连接对象创建会话(session)对象        
            if( clientAck ) {
                session = connection->createSession( 
                                Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( 
                                  Session::AUTO_ACKNOWLEDGE );
            }
     //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
    //6、使用会话对象创建生产者对象
            consumer = session->createConsumer( destination );
    //7、向consumer对象中设置一个messageListener对象,用来接收消息
            consumer->setMessageListener( this );

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    //8、程序等待接收用户消息
    virtual void onMessage( const Message* message ) {
        static int count = 0;
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";

            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }

            if( clientAck ) {
                message->acknowledge();
            }

            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex PMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }

    virtual void transportInterrupted() {
        std::cout << 
            "The Connection's Transport has been Interrupted." << std::endl;
    }

    virtual void transportResumed() {
        std::cout <<
            "The Connection's Transport has been Restored." << std::endl;
    }

private:
     //9、关闭资源
    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) {}
        destination = NULL;

        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) {}
        consumer = NULL;

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {}

        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) {}
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {}
        connection = NULL;
    }
};

int main(int argc, char* argv[]) {

    pmq::library::PMQCPP::initializeLibrary();

    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    std::string brokerURI =  argv[1] ;//"tcp://192.168.17.218:61616";

    std::string destURI = "test.Cpp"; 
    bool useTopics = false;
    bool clientAck = false;
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
    consumer.runConsumer();
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}
    consumer.close();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";

    pmq::library::PMQCPP::shutdownLibrary();
}

producer.cpp

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <pmq/core/PMQConnectionFactory.h>
#include <pmq/util/Config.h>
#include <pmq/library/PMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

using namespace pmq;
using namespace pmq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;

private:

    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );

public:

    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false,
                     bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) {
    }

    virtual ~SimpleProducer(){
        cleanup();
    }

    void close() {
        this->cleanup();
    }

    virtual void run() {
        try {
        //1、创建工厂连接对象,需要制定ip和端口号
            auto_ptr<PMQConnectionFactory> 
            connectionFactory(new PMQConnectionFactory( brokerURI ) );

           try{
        //2、使用连接工厂创建一个连接对象
                connection = connectionFactory->createConnection();
        //3、开启连接
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                throw e;
            }
        //4、使用连接对象创建会话(session)对象 
            if( clientAck ) 
            {
                session = 
                connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else 
            {
                session = 
                connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
          //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            if( useTopic ) 
            {
                destination = session->createTopic( destURI );
            } else 
            {
                destination = session->createQueue( destURI );
            }
         //6、使用会话对象创建生产者对象
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            string threadIdStr = Long::toString( 
                                   Thread::currentThread()->getId() );
            string text = 
                    (string)"Hello world! from thread " + threadIdStr;

            for( unsigned int ix=0; ix<numMessages; ++ix )
            {
        //7、使用会话对象创建一个消息对象
                TextMessage* message = session->createTextMessage( text );
                message->setIntProperty( "Integer", ix );
                printf( "Sent message #%d from thread %s\n", ix+1, 
                                                    threadIdStr.c_str() );
        //8、发送消息
                producer->send( message );
                delete message;
            }

        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }

private:
        //9、关闭资源
    void cleanup(){
        try{
            if( destination != NULL ) delete destination;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination = NULL;

        try
        {
            if( producer != NULL ) delete producer;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer = NULL;

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }

        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};

int main(int argc , char* argv[]) 
{

    pmq::library::PMQCPP::initializeLibrary();
    std::cout << "=====================================================\n";
    std::cout << "Starting produce message:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    std::string brokerURI = argv[1] ;//"tcp://192.168.30.222:61616";
    unsigned int numMessages = 2000;
    std::string destURI = "test.Cpp";

    bool useTopics = false;
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
    producer.run();
    producer.close();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished test Cpp" << std::endl;
    std::cout << "=====================================================\n";

    pmq::library::PMQCPP::shutdownLibrary();
}

1、编译生成可执行文件:

g++ consumer.cpp -o consumer -I/usr/local/PMQ-CPP/include/pmq-cpp-3.10.0 
-I/usr/local/apr/include/apr-1 -L/usr/local/PMQ-CPP/lib -lpmq-cpp

g++ producer.cpp -o producer -I/usr/local/PMQ-CPP/include/pmq-cpp-3.10.0 
-I/usr/local/apr/include/apr-1 -L/usr/local/PMQ-CPP/lib -lpmq-cpp

2、运行

./consumer tcp://192.168.30.222:61616
./producer tcp://192.168.30.222:61616

注:tcp://192.168.30.222:61616 为pmq server 地址

results matching ""

    No results matching ""