博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【ActiveMQ】- 发布/订阅模式
阅读量:6511 次
发布时间:2019-06-24

本文共 4665 字,大约阅读时间需要 15 分钟。

hot3.png

publish/subscribe

特点:A发送的消息可以被所有监听A的对象的接收,就好比学校的广播,所有的学生都可以收听校园广播信息。


消息生产者:

package com.zhiwei.advanced.mq.activemq.sp;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 发布/订阅消息模型 测试顺序:先订阅才能收到消息 */public class JMSProducer {	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址	public static void main(String[] args) throws Exception {		 // 连接工厂		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.password,JMSProducer.brokeURL);				// 创建连接		Connection connection = factory.createConnection(); 				// 启动连接		connection.start(); 				// 接受或发送消息的线程		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);				Destination destination = session.createTopic("FirstTopic"); // 创建消息主题:Destination子类:Queue/Topic		MessageProducer messageProducer = session.createProducer(destination); // 创建消息生产者		// 发送文本消息		for (int i = 0; i < 10; i++) {						TextMessage message = session.createTextMessage("JMS Provider发送消息:" + i);						System.out.println("JMS Provider发送消息:" + i);						messageProducer.send(message);		}		// 启用事务时session需要提交		session.commit();		session.close();		connection.close();	}}

消息消费者1:

package com.zhiwei.advanced.mq.activemq.sp;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 发布/订阅消息模型 *  * 特別注意:发布订阅消息模型必须先客户端监听,然后主题发送消息 */public class JMSConsumer01{	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址	public static void main(String[] args) throws Exception {		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer01.user, JMSConsumer01.password,JMSConsumer01.brokeURL); // 链接工厂		Connection connection = factory.createConnection(); // 连接		connection.start(); // 启动连接			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务		Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic		MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者        messageConsumer.setMessageListener(new JMSListener());   //注册消息监听 :阻塞监听    	}}

消息消费者2:

package com.zhiwei.advanced.mq.activemq.sp;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 发布/订阅消息模型 */public class JMSConsumer02{	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址	public static void main(String[] args) throws Exception {		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer02.user, JMSConsumer02.password,JMSConsumer02.brokeURL); // 链接工厂		Connection connection = factory.createConnection(); // 连接		connection.start(); // 启动连接		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务		Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic		MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者        messageConsumer.setMessageListener(new JMSListener());   //注册消息监听 :阻塞监听    	}}

消息队列监听器:

package com.zhiwei.advanced.mq.activemq.sp;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class JMSListener implements MessageListener{	@Override	public void onMessage(Message message) {				if(message instanceof TextMessage){			try {				System.out.println(((TextMessage) message).getText());			} catch (JMSException e) {				e.printStackTrace();			}		}			}}

消息生产者日志:

消息消费者1日志:

消息消费者2日志:


结论

发布/订阅模式下每个消费者都会消费消息队列的所有消息,这个区别于MQ点对点模式下的每一条消息只能消费者消费一次。

转载于:https://my.oschina.net/yangzhiwei256/blog/3014202

你可能感兴趣的文章