RabbitMQ入门操作(二)

郎家岭伯爵 2023年01月29日 261次浏览

背景

上一篇我们简单介绍了 RabbitMQ 的理论部分及启动了 RabbitMQ 服务。本文我们将实现 SpringBoot 整合 RabbitMQ 。

实践

永远的HelloWorld

公共项目common

通常我们会创建一个公共项目 common ,用于在各项目中共享一些配置,例如:队列主题、交换机名称、路由匹配键名称等。

package com.langjialing.common.config;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/30 9:37
 */
public class RabbitMQConfig {

    /**
     * RabbitMQ的队列主题名称
     */
    public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq.demo.topic";

    /**
     * RabbitMQ的DIRECT交换机名称
     */
    public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbitmq.demo.direct.exchange";

    /**
     * RabbitMQ的DIRECT交换机和队列绑定的匹配键 DirectRouting
     */
    public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq.demo.direct.routing";
    
}

生产者

pom.xml添加依赖

这里需要添加 RabbitMQ 的依赖以及 common 包的依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--common包的依赖-->
<dependency>
    <groupId>com.langjialing</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <scope>compile</scope>
</dependency>

application.properties

在配置文件中添加 RabbitMQ 的相关配置:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

DirectRabbitConfig类

用于创建一个 Direct 交换机以及队列的配置类:

package com.langjialing.providerdemo.config;

import com.langjialing.common.config.RabbitMQConfig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/30 9:37
 */
@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue rabbitmqDemoDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQueue())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}

Service层

在 Service 层,我们会创建一个发送消息的类。

Service 接口类:

package com.langjialing.providerdemo.service;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/29 17:14
 */
public interface RabbitMQService {
    /**
     * 发送消息
     * @param msg
     * @return
     * @throws Exception
     */
    String sendMsg(String msg) throws Exception;
}

Service 接口实现类:

package com.langjialing.providerdemo.service.impl;

import com.langjialing.common.config.RabbitMQConfig;
import com.langjialing.providerdemo.service.RabbitMQService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/29 17:13
 */
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    //日期格式化
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
            String sendTime = sdf.format(new Date());
            Map<String, Object> map = new HashMap<>();
            map.put("msgId", msgId);
            map.put("sendTime", sendTime);
            map.put("msg", msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

Controller层

根据业务需求来调用 Service 服务,例如:定时任务、接口等。这里我们创建一个 Controller 来进行调用。

package com.langjialing.providerdemo.controller;

import com.langjialing.providerdemo.service.RabbitMQService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/29 17:16
 */
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
        return rabbitMQService.sendMsg(msg);
    }
}

创建消息

调用 Controller 提供的接口来创建消息。

至此,我们已完成生产者部分的代码。

消费者

pom.xml导入依赖

这里同样添加 RabbitMQ 的依赖以及 common 包的依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>com.langjialing</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <scope>compile</scope>
</dependency>

application.properties

server.port=8081

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

消费代码

package com.langjialing.consumerdemo.demo;

import com.langjialing.common.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author 郎家岭伯爵
 * @time 2023/1/29 17:21
 */
@Component
@RabbitListener(queues = {RabbitMQConfig.RABBITMQ_DEMO_TOPIC})
public class RabbitConsumer {
    @RabbitHandler
    public void process(Map map){
        System.out.println("消费者RabbitConsumer从RabbitMQ服务端消费消息" + map.toString());
    }
}

这里的注解可以修改为如下,这样可以避免由于因先启动消费者,消息队列尚不存在导致的异常。

// 使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))

消费消息

启动消费者项目,控制台输出:

同时 RabbitMQ 消息队列的消息数显示为零,消息已被消费:

至此,已完成消费者部分的代码。这样我们已初步完成了从服务端搭建,以及生产者创建消息、消费者消费消息的客户端使用。

本文项目 GitHub 地址

总结

本文我们实现了 SpringBoot 对 RabbitMQ 的基础整合。后续我们将继续介绍 MQ 中 Exchange 的四种类型及使用。

赞助页面示例