RabbitMQ

小龙 505 2021-07-13

一、RabbitMQ简介

RabbitMQ是一个消息中间件,是由ErLang语言开发的AMQP的开源实现,主要负责接收、存储和转发消息。RabbitMQ模型就是生产者和消费者模型。所有的消息组件永远都只有两个核心处理端:消息生产者、消息消费者。

RabbitMQ模型.png

二、RabbitMQ功能

程序可以基于RabbitMQ实现许多丰富的处理功能,比如:队列消息、主题消息、点对点消息等等。在RabbitMQ中还提供了Exchange(交换机)等概念,有以下一些名词:

  • Broker:消息队列服务主机
  • ExChange:消息交换机,它指定消息按什么规则,路由到那个队列
  • Queue:消息队列载体、每个消息都会被投入到一个或多个队列
  • Binding:绑定,把ExChange和Queue安装路由规则绑定起来
  • Routing Key:路由关键字,ExChange根据这个关键字进行消息投递
  • vhost:虚拟机,一个broker里可以设多个vhost,实现用户权限分离
  • producer:消息生产者
  • Consumer:消息消费者
  • channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务

三、安装RabbitMQ

RabbitMQ使用Erlang语言开发,在安装RabbitMQ之前还要安装Erlang
在安装Erlang之前需要配置服务器的环境,否则会安装报错:
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

安装Erlang

1.解压Erlang

tar -xzvf otp_src_24.0.tar.gz -C /usr/local/src/

2.创建目录,存储Erlang编译后的程序

mkdir -p /usr/local/erlang

3.进入解压后的erlang目录

cd /usr/local/src/otp_src_24.0/

4.进行编译,保存到指定目录

./configure --prefix=/usr/local/erlang/

5.配置好环境之后,进行编译安装

make && make install

6.编译好的程序放在“/usr/local/erlang”之中,接下来配置“/ect/profile”环境变量

vim /etc/profile
export ERLOANG_HOME=/usr/local/erlang
export PATH=$ERLANG_HOME/bin:
source /etc/profile

7.判断erlang是否安装好了,可以直接输入erl命令,启动erlang;使用“halt().”退出交互式编程环境

安装RabbitMQ

1.RabbitMQ安装文件是“tar.xz”压缩,需要进行两次解压,第一次将“tar.xz”解压,得到“tar”,再解压“tar”

xz -d rabbitmq-server-generic-unix-3.8.19.tar.xz
tar xvf rabbitmq-server-generic-unix-3.8.19.tar -C /usr/local/
将解压的目录更名,方便操作:mv /usr/local/rabbitmq-server-generic-unix-3.8.19 /usr/local/rabbitmq

2.解压完之后就可以直接启动

/usr/local/rabbitmq/sbin/rabbitmq-server start

看到下面的提示,表示启动成功

  ##  ##      RabbitMQ 3.8.19
  ##  ##
  ##########  Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
  ######  ##
  ##########  Licensed under the MPL 2.0. Website: https://rabbitmq.com

3.上面启动的是前台启动,断掉终端连接之后就会挂掉;可以使用“nohub”命令后台启动

nohup /usr/local/rabbitmq/sbin/rabbitmq-server start > file.log 2>&1 &

4.RabbitMQ提供了一个WEB控制台,要想使用这个控制台,需要创建相应的用户信息

/usr/local/rabbitmq/sbin/rabbitmqctl add_user 用户名 密码
Adding user "sxl" ...

5.创建的用户要想使用还需要为用户进行角色分配,将用户加入到管理组中

/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags sxl administrator
Setting tags for user "sxl" to [administrator] ...

6.启动WEB管理界面

/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management

如果执行命令抛出“{:query, :rabbit@centos7, {:badrpc, :timeout}}”错误

  1. 使用:hostnamectl

Static hostname: centos7
Icon name: computer-vm

2.修改:vim /etc/hosts

添加:192.168.xxx.xx centos7
然后就能正常启动

7.http://ip:15672 就能访问RabbitMQ的WEB服务
image.png

image.png

RabbitMQ使用

所有的消息组件程序只有两类:消息生产者、消息消费者;安装传统的做法,要想进行队列操作处理,在消息组件中首先要进行队列的创建,随后再编写程序,但是RabbitMQ充分考虑到程序开发的便捷性,即便消息组件中没有队列也可以自己在程序中创建。如果要在WEB控制台创建队列,则可以直接选择“Add New Queue”
在这其中有几个重要的配置项:

  • Name:队列的名称
  • Durability:表示队列的类型是否为持久(Druable)、瞬时(Transient)
  • Auto Delete:是否自动删除已经消费过的数据
    然后点击Add queue即可创建
    image.png

Java操作RabbitMQ

如果要通过java程序操作RabbitMQ,需要引入相关依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.0</version>
</dependency>

创建消息生产者

消息生产者主要负责消息内容的生产,对于生产的消息一定会保存到RabbitMQ之中,如果没有被消费者消费,那么会持续保存,一直等待有消费者消费处理;所有的队列都是基于Channel实现创建的,在使用Channel创建队列时的方法有如下的一些定义

    /**
     * Declare a queue (声明一个队列)
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue (队列名称)
     * @param durable 队列是否为持久化队列(设置为true标识为持久化队列,该队列将在服务器重启后继续存在)
     * @param exclusive 是否设置为专用队列,一般将其设置为false
     * @param autoDelete 是否允许进行消息的自动删除
     * @param arguments 队列的其他属性(构造参数)
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
package com.rsthe.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
/**
 * 创建生产者,往消息组件中生产消息
 * @author ByXiaoL
 * @date 2021/7/13 20:49
 */
public class MessageProducer {
    private final static String QUEUE_NAME = "sxl.queue.msg"; // 队列名称
    private final static String HOST = "192.168.180.128"; // 服务器ip地址
    private final static int PORT = 5672; // 服务器端口
    private final static String USERNAME = "sxl"; // 用户名
    private final static String PASSWORD = "shuxiaolong"; // 密码
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 要想进行RabbitMQ连接,首先一定要有连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 通过连接工厂获取一个连接
        Connection connection = factory.newConnection();
        // 获取到连接之后创建通道
        Channel channel = connection.createChannel();
        // 利用通道创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        long start = System.currentTimeMillis(); // 开始发送时间
        // 设置同步等待,当达到100个线程时,同时发送个RabbitMQ
        CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(()->{
                String msg = "发送的消息:" + temp;
                try {
                    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); // 发送消息到RabbitMQ
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long end = System.currentTimeMillis(); // 结束时间
        latch.await(); // 开始执行
        System.err.println("【消息发送完毕】总耗时:" + (end - start));
        channel.close(); // 关闭通道
        connection.close(); // 关闭连接
    }
}

执行完之后,RabbitMQ中显示内容;在程序中指定队列名称时,在RabbitMQ中一定不要存在,不然会报错,RabbitMQ会自动创建队列
image.png

创建消息消费者

当所有的消息都保存到消息组件之后,这个时候就可以通风管消费端进行消息数据的获取处理,如果要进行消费时肯定会访问指定的消息队列名称,这个时候就需要在进行队列名称声明时一定要与创建好的队列名称保持一致

public class MessageConsumer {
    private final static String QUEUE_NAME = "sxl.queue.msg";
    private final static String HOST = "192.168.180.128";
    private final static int PORT = 5672;
    private final static String USERNAME = "sxl";
    private final static String PASSWORD = "shuxiaolong";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 要想连接RabbitMQ,一定要先创建连接工厂,在连接工厂中定义RabbitMQ服务信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        // 使用连接工厂获取连接
        Connection connection = factory.newConnection();
        // 使用连接创建通道
        Channel channel = connection.createChannel();
        // 利用通道创建一个队列消息
        channel.queueDeclare(QUEUE_NAME,false,false,true,null);
        // 消费者会对通道进行监听,监听可能出现的新内容
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body);
                System.out.println(message);
            }
        };
        // 在通道上开启消费端
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

执行结果

发送的消息:14
发送的消息:4
发送的消息:56
发送的消息:3
发送的消息:49
...

还可以开启多个消费者同时消费;安装队列的设置原则,多个消费端不会进行重复消费,在进行消费的时候会使用应答机制,来达到不重复消费。但是使用应答机制可能会出现性能下降

@Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body);
                // 在接收到消息之后,给服务器一个应答,表示这条消息,我已经消费了,其他人不要再消费
                channel.basicAck(envelope.getDeliveryTag(),false);
                System.out.println(message);
            }

消息持久化

持久化指的是消息服务关闭之后,未消费的消息是否允许保留的问题,上面的例子创建的消息类型都属于瞬时消息(消息内容只保存在内存中),一旦服务关闭消息也就丢失了。

channel.queueDeclare(QUEUE_NAME, false, false, true, null);

在进行队列声明的时候设置的第二个参数描述的就是消息持久化,如果要启用消息持久化,只需要将第二个参数设置为true即可

channel.queueDeclare(QUEUE_NAME, true, false, true, null);

但是这种设置只会是队列持久化保存,并不会使队列里面的消息持久化保存,要想消息也持久化保存,还需要在生产消息的时候设置消息为持久化保存

channel.basicPublish("",QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,msg.getBytes()); // 发送消息到RabbitMQ

MessageProperties.MINIMAL_PERSISTENT_BASIC只是设置消息为持久化保存,除此之外没有其它任何功能

虚拟主机VHost

虚拟主机是RabbitMQ之中最为重要的技术,利用虚拟主机可以实现不同种类消息的发送与消费处理
image.png

创建虚拟主机

image.png

为虚拟主机分配权限

image.png

随后就能在程序中操作虚拟主机了

private final static String VHOST = "rsthe"; // 虚拟主机	
	ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VHOST)