背景
上一篇我们简单介绍了 RabbitMQ 的理论部分及启动了 RabbitMQ 服务。本文我们将实现 SpringBoot 整合 RabbitMQ 。
实践
永远的HelloWorld
公共项目common
通常我们会创建一个公共项目 common ,用于在各项目中共享一些配置,例如:队列主题、交换机名称、路由匹配键名称等。
package com.langjialing.common.config;
/**
* @author 郎家岭伯爵
* @time 2023/1/30 9:37
*/
public class RabbitMQConfig {
/**
* RabbitMQ的队列主题名称
*/
public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq.demo.topic";
/**
* RabbitMQ的DIRECT交换机名称
*/
public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbitmq.demo.direct.exchange";
/**
* RabbitMQ的DIRECT交换机和队列绑定的匹配键 DirectRouting
*/
public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq.demo.direct.routing";
}
生产者
pom.xml添加依赖
这里需要添加 RabbitMQ 的依赖以及 common 包的依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--common包的依赖-->
<dependency>
<groupId>com.langjialing</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
application.properties
在配置文件中添加 RabbitMQ 的相关配置:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
DirectRabbitConfig类
用于创建一个 Direct 交换机以及队列的配置类:
package com.langjialing.providerdemo.config;
import com.langjialing.common.config.RabbitMQConfig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 郎家岭伯爵
* @time 2023/1/30 9:37
*/
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue rabbitmqDemoDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQueue())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
Service层
在 Service 层,我们会创建一个发送消息的类。
Service 接口类:
package com.langjialing.providerdemo.service;
/**
* @author 郎家岭伯爵
* @time 2023/1/29 17:14
*/
public interface RabbitMQService {
/**
* 发送消息
* @param msg
* @return
* @throws Exception
*/
String sendMsg(String msg) throws Exception;
}
Service 接口实现类:
package com.langjialing.providerdemo.service.impl;
import com.langjialing.common.config.RabbitMQConfig;
import com.langjialing.providerdemo.service.RabbitMQService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author 郎家岭伯爵
* @time 2023/1/29 17:13
*/
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
//日期格式化
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(String msg) throws Exception {
try {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
Controller层
根据业务需求来调用 Service 服务,例如:定时任务、接口等。这里我们创建一个 Controller 来进行调用。
package com.langjialing.providerdemo.controller;
import com.langjialing.providerdemo.service.RabbitMQService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author 郎家岭伯爵
* @time 2023/1/29 17:16
*/
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
@PostMapping("/sendMsg")
public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
return rabbitMQService.sendMsg(msg);
}
}
创建消息
调用 Controller 提供的接口来创建消息。
至此,我们已完成生产者部分的代码。
消费者
pom.xml导入依赖
这里同样添加 RabbitMQ 的依赖以及 common 包的依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.langjialing</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
application.properties
server.port=8081
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
消费代码
package com.langjialing.consumerdemo.demo;
import com.langjialing.common.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author 郎家岭伯爵
* @time 2023/1/29 17:21
*/
@Component
@RabbitListener(queues = {RabbitMQConfig.RABBITMQ_DEMO_TOPIC})
public class RabbitConsumer {
@RabbitHandler
public void process(Map map){
System.out.println("消费者RabbitConsumer从RabbitMQ服务端消费消息" + map.toString());
}
}
这里的注解可以修改为如下,这样可以避免由于因先启动消费者,消息队列尚不存在导致的异常。
// 使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
消费消息
启动消费者项目,控制台输出:
同时 RabbitMQ 消息队列的消息数显示为零,消息已被消费:
至此,已完成消费者部分的代码。这样我们已初步完成了从服务端搭建,以及生产者创建消息、消费者消费消息的客户端使用。
本文项目 GitHub 地址。
总结
本文我们实现了 SpringBoot 对 RabbitMQ 的基础整合。后续我们将继续介绍 MQ 中 Exchange 的四种类型及使用。