SpringBoot整合RabbitMQ

小龙 702 2021-07-14

依赖

SpringBoot整合RabbitMQ只需要引入AMQP的依赖即可

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.2</version>
</dependency>

然后在appilication.yml文件中定义RabbitMQ配置

spring:
  # RabbitMQ相关配置
  rabbitmq:
    host: 192.168.180.128
    port: 5672
    username: sxl
    password: shuxiaolong
    virtual-host: rsthe

SpringBoot对RabbitMQ的操作可以查看“RabbitAutoConfiguration”自动配置类

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

@EnableConfigurationProperties(RabbitProperties.class),“RabbitProperties”中定义的就是RabbitMQ的相关属性配置

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
	private static final int DEFAULT_PORT = 5672;
	private static final int DEFAULT_PORT_SECURE = 5671;
	/** RabbitMQ host. Ignored if an address is set.*/
	private String host = "localhost";
	/** RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is
	 * enabled.*/
	private Integer port;
	/** Login user to authenticate to the broker. */
	private String username = "guest";
	/** Login to authenticate against the broker. */
	private String password = "guest";
....
}

继续看RabbitAutoConfiguration

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    return template;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    return new RabbitAdmin(connectionFactory);
}

发现向容器中注入两个组件:“RabbitTemplate”和“AmqpAdmin”

RabbitTemplate:可以发送消息、接收消息
AmqpAdmin:可以操作Exchange、Queue、Bindinga等,比如可以创建、删除、解绑

消息组件在在使用的过程之中一定要分为消息的生产者和消息的消费者;消息的生产者一般都会以业务接口的形式出现,而消息的消费端一般以监听的形式出现,在SpringBoot里面可以方便的实现消息组件的连接管理

定义生产端接口

public interface IMessageService {
    void sendMessage(Message message);
}

接口实现

@Service
public class MessageServiceImpl implements IMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(Message message) {
	/**
	* 一个参数是:Exchange
	* 第二个参数是:RoutingKey
	* 第三个参数是发送的信息主体
	*/
	rabbitTemplate.convertAndSend(MyMQConfig.EXCHANGE_NAME,MyMQConfig.ROUTING_KEY,message);
    }
}

修改“MyMQConfig”配置类,绑定Exchange、Queue、RoutingKey信息

@Configuration
public class MyMQConfig {
    private final static String EXCHANGE_NAME = "sxl_exchange";
    private final static String QUEUE_NAME = "sxl.micro.queue";
    private final static String ROUTING_KEY = "sxl_routing_key";
    /**
     * 定义直连模式的Exchange
     * @return
     */
    @Bean
    public DirectExchange getDirectExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }

    /**
     * 定义队列信息
     * @return
     */
    @Bean
    public Queue getQueue(){
        return new Queue(QUEUE_NAME,true,false,true);
    }

    /**
     * 将直连交换器和队列绑定
     * @param directExchange 直连交换器
     * @param queue 队列
     * @return
     */
    @Bean
    public Binding bindingExchangeAndQueue(DirectExchange directExchange,Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
    }
}

测试生产数据

@ExtendWith(SpringExtension.class) // 使用Junit5测试工具
@WebAppConfiguration // 启动WEB运行环境
@SpringBootTest(classes = StartWebApp.class) // 配置程序启动类
public class TestRabbitTemplate {
    @Autowired
    private IMessageService messageService;
    /**
     * 测试生产数据
     */
    @Test
    public void TestProducer(){
        Message message = new Message();
        message.setTitle("RabbitMQ");
        message.setContent("学习使用RabbitMQ");
        message.setUploaddate(new Date());
        messageService.sendMessage(message);
    }
}

执行结果

impleMessageConverter only supports String, byte[] and Serializable payloads, received: com.rsthe.web.entities.Message x

impleMessageConverter only supports String,只支持字符串的序列化,解决这个异常可以使有“Jackson2JsonMessageConverter”将发送的消息装换为JSON格式
配置MessageConverter 转换器

/**
 * 使用Jackson将RabbitMQ生产的消息格式化成JSON
 */
@Configuration
@EnableRabbit // 开启基于注解的RabbitMQ
public class MyMQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
... ...
}

执行结果

{"title":"RabbitMQ","content":"学习使用RabbitMQ","uploaddate":1626310624299}

定义消费端

定义Application.yml

spring:
  # RabbitMQ相关配置
  rabbitmq:
    host: 192.168.180.128
    port: 5672
    username: sxl
    password: shuxiaolong
    virtual-host: rsthe

定义配置文件,绑定Exchange和队列,定义Jackson2JsonMessageConverter转换器,生产端使用了转换器,消费端也一定要使用,不然会报转换错误

@Configuration
@EnableRabbit // 开启基于注解的RabbitMQ
public class MyMQConfig {
    public static final String EXCHANGE_NAME = "sxl_exchange";
    public static final String QUEUE_NAME = "sxl.micro.queue";
    public static final String ROUTING_KEY = "sxl_routing_key";

   @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定义直连模式的Exchange
     * @return
     */
    @Bean
    public DirectExchange getDirectExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }

    /**
     * 定义队列信息
     * @return
     */
    @Bean
    public Queue getQueue(){
        return new Queue(QUEUE_NAME,true,false,true,null);
    }

    /**
     * 将直连交换器和队列绑定
     * @param directExchange 直连交换器
     * @param queue 队列
     * @return
     */
    @Bean
    public Binding bindingExchangeAndQueue(DirectExchange directExchange,Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    }
}

定义监听器

@Component
public class MessageConsumerListener {

    @RabbitListener(queues = "sxl.micro.queue")
    public void receiveMessage(Message message){
        System.out.println(message);
    }
}

定义完成之后就可以启动SpringBoot项目,启动后就会持续监听,一旦RabbitMQ中有了新的消息,立刻会被发现并消费

测试接口
image.png

执行结果

Message{title='RabbitMQ', content='学习!学习', uploaddate=null}