百木园-与人分享,
就是让自己快乐。

使用RocketMQ消费消息

RocketMQ消费端

今天要来跟大家学习怎样使用RocketMQ来进行消息的消费

先简单创建个Maven项目使用

  • 添加依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>
  • 启动消费者

    package mq.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    public class BroadcastConsumer {
        public static void main(String[] args) throws MQClientException {
            //创建一个push模式的消费组
            DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(\"pushConsumer\");
            pushConsumer.setNamesrvAddr(\"localhost:9876\");
            //集群模式
            pushConsumer.setMessageModel(MessageModel.CLUSTERING);
            //  订阅的topic tag
            pushConsumer.subscribe(\"topic_test01\",\"Tag1 || Tag2\");
    
            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + \" Receive New Messages: \" + msgs + \"%n\");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            pushConsumer.start();
            System.out.printf(\"Broadcast Consumer Started.%n\");
    
        }
    
    
  • 启动生产者

    package mq.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    public class SyncProducerV2 {
        /**
         * 同步消息发送
         *
         * @param args
         * @throws MQClientException
         * @throws MQBrokerException
         * @throws RemotingException
         * @throws InterruptedException
         * @throws UnsupportedEncodingException
         */
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException {
            System.out.println(\"SyncProducer start......\");
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer(\"pg_sync_01\");
            defaultMQProducer.setNamesrvAddr(\"localhost:9876\");
            defaultMQProducer.start();
            for (int i = 0; i < 10; i++) {
                send(defaultMQProducer, i, i % 3);
            }
            defaultMQProducer.shutdown();
            System.out.println(\"SyncProducer end......\");
    
        }
    
        private static void send(DefaultMQProducer defaultMQProducer, Integer i, int tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
            SendResult sendResult = defaultMQProducer.send(new Message(\"topic_test01\", \"Tag\" + tag, (\"hello this is sync message_\" + i + \"!\").getBytes(RemotingHelper.DEFAULT_CHARSET)));
            System.out.println(sendResult);
        }
    }
    
  • 消费者消费

可以看到消费了Tag为Tag1、Tag2的消息

image

其它Tag会被过滤掉

image

消费分类

RocketMQ的消费模式分为两种:BROADCASTING(广播)和CLUSTERING(集群)

那这两种模式有什么区别呢?

  • 广播:相同消费组下的实例会重复消费同一个Topic的消息,可以理解为大家做同样的工作,消费进度存储在客户端,有可能会导致部分消息没有被消费
  • 集群:相同消费组下的实例会负载均衡地消费同一个Topic的消息,可以理解为分工合作,消费进度存储在Broker端

所以大部分系统都会使用集群模式去消费信息,毕竟可以水平拓展消费者来承受更大的消费压力

广播模式相对来说使用比较少,一般都是一些消息通知同步的场景,想同步刷新缓存等

本文由博客一文多发平台 OpenWrite 发布!


来源:https://www.cnblogs.com/interviewClever/p/16163325.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 使用RocketMQ消费消息

相关推荐

  • 暂无文章