Sender.java:
package com.mq.demo.demo01;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.primeton.pmq.PMQConnection;
import com.primeton.pmq.PMQConnectionFactory;
public class Sender {
public static void main(String[] args) throws JMSException,
InterruptedException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new PMQConnectionFactory(
PMQConnection.DEFAULT_USER,
PMQConnection.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
Destination destination = session.createQueue("demo1");
// MessageProducer:消息发送者
MessageProducer producer = session.createProducer(destination);
// 设置不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i=0;i<5;i++){
//创建文本消息
TextMessage message = session.createTextMessage("demo1-消息:"+i);
Thread.sleep(1000);
//发送消息
producer.send(message);
System.out.println("demo1生产者发送消息: "+message.getText());
}
session.commit();
session.close();
connection.close();
}
}
Receiver.java
package com.mq.demo.demo01;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.primeton.pmq.PMQConnection;
import com.primeton.pmq.PMQConnectionFactory;
public class Receiver {
// ConnectionFactory :连接工厂,JMS 用它创建连接
private static ConnectionFactory connectionFactory =
new PMQConnectionFactory(PMQConnection.DEFAULT_USER,
PMQConnection.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
public static void main(String[] args) throws JMSException {
// Connection :JMS 客户端到JMS Provider 的连接
final Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
final Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息送谁那获取.
Destination destination = session.createQueue("demo1");
// 消费者,消息接收者
MessageConsumer consumer1 = session.createConsumer(destination);
System.out.println("===========消费者1开始监听消息=========");
consumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
TextMessage message = (TextMessage)msg ;
System.out.println("消费者1收到消息: "+message.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}