Sender.java:

package com.mq.demo.demo01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.primeton.pmq.PMQConnection;
import com.primeton.pmq.PMQConnectionFactory;


public class Sender {

    public static void main(String[] args) throws JMSException,
                                                     InterruptedException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new PMQConnectionFactory(
                                             PMQConnection.DEFAULT_USER,
                                             PMQConnection.DEFAULT_PASSWORD,
                                             "tcp://127.0.0.1:61616");

        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection =  connectionFactory.createConnection();

        connection.start();
        // Session: 一个发送或接收消息的线程
        Session session = connection.createSession(true, 
                                        Session.AUTO_ACKNOWLEDGE);

        // Destination :消息的目的地;消息发送给谁.
        Destination destination =  session.createQueue("demo1");

        // MessageProducer:消息发送者
        MessageProducer producer =  session.createProducer(destination);

        // 设置不持久化,可以更改
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for(int i=0;i<5;i++){
            //创建文本消息
            TextMessage message = session.createTextMessage("demo1-消息:"+i);

            Thread.sleep(1000);
            //发送消息
            producer.send(message);
            System.out.println("demo1生产者发送消息: "+message.getText());
        }

        session.commit();
        session.close();
        connection.close();
    }
}

Receiver.java

package com.mq.demo.demo01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.primeton.pmq.PMQConnection;
import com.primeton.pmq.PMQConnectionFactory;

public class Receiver {
    // ConnectionFactory :连接工厂,JMS 用它创建连接
   private static ConnectionFactory connectionFactory =
         new PMQConnectionFactory(PMQConnection.DEFAULT_USER,
                                  PMQConnection.DEFAULT_PASSWORD, 
                                  "tcp://127.0.0.1:61616");

   public static void main(String[] args) throws JMSException {
        // Connection :JMS 客户端到JMS Provider 的连接
        final Connection connection =  connectionFactory.createConnection();

        connection.start();
        // Session: 一个发送或接收消息的线程
        final Session session = connection.createSession(true, 
                                                Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息送谁那获取.
        Destination destination =  session.createQueue("demo1");
        // 消费者,消息接收者
        MessageConsumer consumer1 =  session.createConsumer(destination);
        System.out.println("===========消费者1开始监听消息=========");
        consumer1.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message msg) {

                    try {
                      TextMessage message = (TextMessage)msg ;
                      System.out.println("消费者1收到消息: "+message.getText());
                      session.commit();
                    } catch (JMSException e) {                
                        e.printStackTrace();
                    }

                }
            });                
  }
}

results matching ""

    No results matching ""