百木园-与人分享,
就是让自己快乐。

RabbitMQ

RabbitMQ

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

img

Messaging that just works — RabbitMQ

案例

pom.xml

 <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                \"hello_world\",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = \"HelloWorld~~~~~~~~~~\";
        
        // 6、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 \"\"
                \"\",
                // 路由名称
                \"hello_world\",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                \"hello_world\",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                \"hello_world\",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


img

工作队列

img

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerWorkQueues {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                \"WorkQueue\",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        for (int i = 0; i < 10; i++) {
            // 发送消息内容
            String body = \"WorkQueue~~~~~~~~~~\" + i;
            
            // 6、发送消息
            channel.basicPublish(
                    // 交换机名称。简单模式下交换机会使用默认的 \"\"
                    \"\",
                    // 路由名称
                    \"WorkQueue\",
                    // 配置信息
                    null,
                    // 发送消息数据
                    body.getBytes()
            );
        }
        
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者: 两个

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerWorkQueues1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                \"WorkQueue\",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                \"WorkQueue\",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


订阅模式

img

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerPubSub {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT(\"direct\"),:定向
                FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC(\"topic\"),通配符的方式
                HEADERS(\"headers\");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = \"test_fanout\";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT(\"direct\"),:定向
                //   FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC(\"topic\"),通配符的方式
                //   HEADERS(\"headers\");参数匹配
                BuiltinExchangeType.FANOUT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = \"test_fanout_queue1\";
        String queue2Name = \"test_fanout_queue2\";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为\"\"
         */
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"\"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"\"
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = \"日志信息:张三调用了findAll方法...日志级别:info...\";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 \"\"
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
                \"\",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue1Name = \"test_fanout_queue1\";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue1Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = \"test_fanout_queue2\";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


Routing 路由模式

img

img

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Routing 工作模式
 * <p>
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerRouting {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT(\"direct\"),:定向
                FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC(\"topic\"),通配符的方式
                HEADERS(\"headers\");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = \"test_direct\";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT(\"direct\"),:定向
                //   FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC(\"topic\"),通配符的方式
                //   HEADERS(\"headers\");参数匹配
                BuiltinExchangeType.DIRECT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = \"test_direct_queue1\";
        String queue2Name = \"test_direct_queue2\";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为\"\"
         */
        // 队列1
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"error\"
        );
        // 队列2
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"info\"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"error\"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"waring\"
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = \"日志信息:张三调用了delete方法...警告。。。日志级别:waring...\";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 \"\"
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
                \"waring\",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者 1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = \"test_direct_queue1\";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = \"test_direct_queue2\";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

Topics 通配符模式

image-20230216202538556

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Topics 通配符工作模式
 * <p>
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerTopics {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT(\"direct\"),:定向
                FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC(\"topic\"),通配符的方式
                HEADERS(\"headers\");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        // 交换机名称
        String exchangeName = \"test_topic\";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT(\"direct\"),:定向
                //   FANOUT(\"fanout\"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC(\"topic\"),通配符的方式
                //   HEADERS(\"headers\");参数匹配
                BuiltinExchangeType.TOPIC,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = \"test_topic_queue1\";
        String queue2Name = \"test_topic_queue2\";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为\"\"
         */
        // routing key  系统的名称.日志的级别。
        // * :表示一个单词
        // # :表示0或多个单词
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        // 队列1
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"#.error\"
        );
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"order.*\"
        );
        // 队列2
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为\"\"
                \"*.*\"
        );
    
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 \"\"
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = \"日志信息:张三调用了findAll方法...日志级别:info...\";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 \"\"
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为\"\"
                \"order.info\",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}

消费者1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerTopic1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = \"test_topic_queue1\";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerTopic2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost(\"192.168.36.100\");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost(\"/ljt\");
        // 用户名 默认值 guest
        connectionFactory.setUsername(\"ljt\");
        // 密码 默认值 guest
        connectionFactory.setPassword(\"ljt\");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = \"test_topic_queue2\";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(\"consumerTag:\" + consumerTag);
                System.out.println(\"Exchange:\" + envelope.getExchange());
                System.out.println(\"RoutingKey:\" + envelope.getRoutingKey());
                System.out.println(\"properties:\" + properties);
                System.out.println(\"body:\" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


来源:https://www.cnblogs.com/wwwljt/p/17128169.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » RabbitMQ

相关推荐

  • 暂无文章