发布于 2016-01-06 12:45:34 | 3753 次阅读 | 评论: 0 | 来源: PHPERZ

这里有新鲜出炉的精品教程,程序狗速度看过来!

RocketMQ 消息中间件

RocketMQ 是一款分布式、队列模型的消息中间件


 

 上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色

 

 上述就是MQ中有关Consumer的类图,下面来介绍一下每个类

 1.MQAdmin:底层类,上篇博客已经提过,就不再此重提

 2.MQConsumer:Consumer公共的接口,常用的方法如下

 如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间

   void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

 3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

 在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下

 PushConsumer:通过注册监听的方式来消费信息

 

/**     
 * @FileName: Consumer.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2015年12月28日 下午2:43:23   
 * @version V1.0     
 */
package com.test;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * @ClassName: Consumer
 * @Description: 模拟消费者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:43:23
 */
public class ConsumerTest {

	public static void main(String[] args) {
		DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
		consumer.setNamesrvAddr("100.66.154.81:9876");
		try {
			
			// 订阅PushTopic下Tag为push的消息,都订阅消息
			consumer.subscribe("PushTopic", "push");
		    
			// 程序第一次启动从消息队列头获取数据
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			//可以修改每次消费消息的数量,默认设置是每次消费一条
			// consumer.setConsumeMessageBatchMaxSize(10);

			//注册消费的监听
			consumer.registerMessageListener(new MessageListenerConcurrently() {
               //在此监听中消费信息,并返回消费的状态信息
				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					
					// msgs中只收集同一个topic,同一个tag,并且key相同的message
					// 会把不同的消息分别放置到不同的队列中
					for(Message msg:msgs){
			
						System.out.println(new String(msg.getBody()));
					}	
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});

			consumer.start();
			Thread.sleep(5000);
			//5秒后挂载消费端消费
			consumer.suspend();
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 PullConsumer:通过拉去的方式来消费消息

/**     
 * @FileName: Consumer.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2015年12月28日 下午2:43:23   
 * @version V1.0     
 */
package com.test;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.common.message.MessageQueue;

/**
 * @ClassName: Consumer
 * @Description: 模拟消费者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:43:23
 */
public class ConsumerPullTest {

	public static void main(String[] args) {
		DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
		consumer.setNamesrvAddr("100.66.154.81:9876");
       consumer.setConsumerGroup("broker");
		try {
			consumer.start();
		Set<MessageQueue> messageQueues=	consumer.fetchSubscribeMessageQueues("PushTopic");		

		for(MessageQueue messageQueue:messageQueues){
		
			System.out.println(messageQueue.getTopic());
		}
		
		
		//消息队列的监听
		consumer.registerMessageQueueListener("", new MessageQueueListener() {
			
			@Override
			//消息队列有改变,就会触发
			public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
					Set<MessageQueue> mqDivided) {
				// TODO Auto-generated method stub
				
			}
		});
			
	
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}


一般在应用中都会采用push的方法来自动的消费信息

 


最新网友评论  共有(0)条评论 发布评论 返回顶部

Copyright © 2007-2017 PHPERZ.COM All Rights Reserved   冀ICP备14009818号  版权声明  广告服务