RabbitMQ发布订阅模式

小龙 676 2021-07-14

前言

前一篇的内容完成了RabbitMQ安装,编写代码实现了消息的生产以及消费功能,但是其并不是完整的RabbitMQ处理形式,里面忽略了几个重要的概念:Exchange、RoutingKey
以之前的数据发送为例

程序定义:
channel.basicPublish("",QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes());

方法定义
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

我们将Exchange设置为了空字符串,把RoutingKey设置为了队列名称,实际上在整个的RabbitMQ里面,所有可以实现数据发送的操作都视为Exchange,Exchange有不同的描述模式,利用不同的模式可以形成所谓的广播模式(fanout)、直连模式(direct)、主题模式(topic);所有的模式指派都可通过Channel来完成,在Channel类里面提供了Exchange模式的定义

 Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

可以利用此方法设置Exchange的名称以及具体的操作类型,这样就可以形成不同的消息发送与接收处理

扇形模式(Fanout)

扇形模式的最大特点就是一个消息可以同时被不同的消费者消费处理,但是每个都在监听各自不同的队列信息;Exchange是在队列之上的处理控制
image.png

定义扇形模式生产者

public class FanoutProducer {
    private final static String QUEUE_NAME = "fanout_queue"; // 队列名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.fanout"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
        // 利用通道创建消息队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        // 向队列中发送消息
        long start = System.currentTimeMillis();  // 开始发送时间
        CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(()->{
                String msg = "生产者 - " + temp;
                // 使用通道发送消息
                try {
                    channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes());
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        latch.await();
        System.out.println("消耗时间:" + (end - start) + " ms");
        channel.close();// 关闭通道
        connection.close();// 关闭连接
    }
}

定义扇形模式消费者

在fanout之中,不同的Queue队列都可以实现也同样的数据接收处理,只需要设置好统一的Exchange名称,那么Queue名称就可以随意定义了

public class FanoutConsumer {
    private final static String QUEUE_NAME = "fanout_queue_a"; // 队列名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.fanout"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
        // 利用通道创建消息队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        // 绑定队列和Exchange
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        // 消费端会持续进行监听,监听可能出现的新内容
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        // 在通道上开启消费端
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

如果在声明Exchange的是时候不将“durable”设置为true,在发送消息之后发现没有消费者可消费,会将这个消息抛弃,这时候生产者生产的数据不会在队列中保存,数据发送了之后,如果没有消费者接收,数据将不再被接收

直连模式(direct)

使用广播模式处理消息的时候会将消息针对所有的Queue进行广播,但是有些时候某些数据不希望被广播,只需要由特定人员接收,这个时候就可以设置RoutingKey信息了;

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

我们之前一直是把第二个参数设置为队列名称,但这个参数并不是队列名称,RabbitMQ并不会在操作方法上直接使用队列概念的定义,它是通过RoutiingKeey的形式来定义的
image.png

定义生产者

public class DirectProducer {
    private final static String ROUTING_KEY = "direct_key"; // RoutingKey名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.direct"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // 向队列中发送消息
        long start = System.currentTimeMillis();  // 开始发送时间
        CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(()->{
                String msg = "生产者 - " + temp;
                // 使用通道发送消息
                try {
                    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes());
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        latch.await();
        System.out.println("消耗时间:" + (end - start) + " ms");
        channel.close();// 关闭通道
        connection.close();// 关闭连接
    }
}

在消息生产端没有了队列的概念,但是在消费端还是由队列的

定义消费端

public class DirectConsumer {
    private final static String QUEUE_NAME = "direct_queue_msg"; // RoutingKey名称
    private final static String ROUTING_KEY = "direct_key"; // RoutingKey名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.direct"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // 利用通道创建消息队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        // 绑定队列和Exchange
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        // 消费端会持续进行监听,监听可能出现的新内容
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        // 在通道上开启消费端
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

如果在声明Exchange的是时候不将“durable”设置为true,在发送消息之后发现没有消费者可消费,会将这个消息抛弃,这时候生产者生产的数据不会在队列中保存,数据发送了之后,如果没有消费者接收,数据将不再被接收

直连模式有一些操作特点:

  • 为同一个队列绑定若干个消费端的时候,如果有一个消费端的RoutingKey设置正确,那么就表示该消费队列中的所有消费端可以实现消息的处理
  • 不同的队列如果RoutingKey相同,那么最终的结果就是不同的队列都可以实现类似于fanout广播的处理,所有的消费端都可以消费同样的数据内容

主题模式(topic)

主题模式的最大特点就是所有的消费端如果订阅的主题信息一致,那么就标识可以重复的进行小内容的接收处理;

定义生产端

public class TopicProducer {
    private final static String ROUTING_KEY = "topic_key"; // RoutingKey名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.topic"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        // 向队列中发送消息
        long start = System.currentTimeMillis();  // 开始发送时间
        CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(()->{
                String msg = "生产者 - " + temp;
                // 使用通道发送消息
                try {
                    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes());
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        latch.await();
        System.out.println("消耗时间:" + (end - start) + " ms");
        channel.close();// 关闭通道
        connection.close();// 关闭连接
    }
}

定义消费端

public class TopicConsumer {
    private final static String QUEUE_NAME = "topic_queue_msg"; // 队列名称
    private final static String ROUTING_KEY = "topic_key"; // RoutingKey名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.topic"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        // 利用通道创建消息队列
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        // 绑定队列和Exchange
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        // 消费端会持续进行监听,监听可能出现的新内容
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        // 在通道上开启消费端
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

如果仅以上面的程序为例,感觉主题模式和直连模式完全相识;实际上主题模式相比直连模式有两个重要的特征:

  1. RabbitMQ之所以提出直连模式和主题模式还是要从概念上加一区分的,直连模式是点到点,不存在多个Queue队列消费问题,而主题模式肯定是能对应多个Queue(但有不是针对所有Queue),因为里面依然存在有RoutingKey的概念
  2. 在使用主题模式的时候还有一项比较重要的通配符匹配技术,他听两个重要的通配符:
    . 消费端使用:“#”:匹配0位、1位或多位数据
    · 消费端使用:“*”:匹配0位或1位数据
    image.png
    以图片上例子创建新的生产者和消费者

生产者

public class HrDeptTopicProducer {
    private final static String ROUTING_KEY = "hr.dept."; // RoutingKey名称
    private final static String HOST = "192.168.180.128"; // 服务器地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    private final static String EXCHANGE_NAME = "rsthe.exchange.topic"; // Exchange名称
    private final static String VHOST = "rsthe"; // 虚拟主机
    public static void main(String[] args) throws Exception {
        // 实例化得到一个连接工厂,设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost(VHOST);
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 根据连接工厂获得一个连接
        Connection connection = factory.newConnection();
        // 使用连接创建一个通道
        Channel channel = connection.createChannel();
        // 定义要使用的Exchange信息
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        // 向队列中发送消息
        long start = System.currentTimeMillis();  // 开始发送时间
        CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(()->{
                String msg = null;
                String RoutingKey = null;
                if (temp % 4 == 0){
                    msg = "【HR】部门数据增加: " + temp;
                    RoutingKey = ROUTING_KEY + "add";
                }else if (temp % 4 == 1){
                    msg = "【HR】部门数据修改: " + temp;
                    RoutingKey = ROUTING_KEY + "edit";
                }else if (temp % 4 == 2){
                    msg = "【HR】部门数据删除: " + temp;
                    RoutingKey = ROUTING_KEY + "delete";
                }else {
                    msg = "【HR】部门数据列表: " + temp;
                    RoutingKey = ROUTING_KEY + "list";
                }
                // 使用通道发送消息
                try {
                    channel.basicPublish(EXCHANGE_NAME,RoutingKey, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes());
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long end = System.currentTimeMillis();
        latch.await();
        System.out.println("消耗时间:" + (end - start) + " ms");
        channel.close();// 关闭通道
        connection.close();// 关闭连接
    }
}

消费者

下面是定义了4个消费者,每个消费者只需要修改这两个地区即可
========================================================================
private final static String QUEUE_NAME = "topic_queue_msg.a"; // 队列名称
private final static String ROUTING_KEY = "#.dept.add"; // RoutingKey名称
========================================================================
private final static String QUEUE_NAME = "topic_queue_msg.b"; // 队列名称
private final static String ROUTING_KEY = "#.dept.edit"; // RoutingKey名称
========================================================================
private final static String QUEUE_NAME = "topic_queue_msg.c"; // 队列名称
private final static String ROUTING_KEY = "#.dept.delete"; // RoutingKey名称
========================================================================
private final static String QUEUE_NAME = "topic_queue_msg.d"; // 队列名称
private final static String ROUTING_KEY = "#.dept.list"; // RoutingKey名称
...
其它与上面的消费者代码一样