RabbitMQ入门操作(三)

郎家岭伯爵 2023年01月30日 272次浏览

背景

前面实现了 SpringBoot 对 RabbitMQ 的基础整合。本文我们将继续介绍 MQ 的其它部分内容。

实践

RabbitMQ的组成部分

从前面两篇文章中我们大概也能体会到 RabbitMQ 的组成大致有如下部分:

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

它们之间的协同流程大致如下:

  • 消息生产者连接到RabbitMQ Broker,创建connection,开启channel。
  • 生产者声明交换机类型、名称、是否持久化等。
  • 生产者发送消息,并指定消息是否持久化等属性和routing key。
  • exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。
  • 消费者监听接收到消息之后开始业务处理。

Exchange的四种类型及用法

从上面的工作流程可以看出,实际上有个关键的组件 Exchange,因为消息发送到 RabbitMQ 后首先要经过 Exchange 路由才能找到对应的 Queue

实际上 Exchange 类型有四种,根据不同的类型工作的方式也有所不同。在 HelloWord 例子中,我们就使用了比较简单的 Direct Exchange,翻译就是直连交换机。其余三种分别是:Fanout exchangeTopic exchangeHeaders exchange

Direct Exchange

见文知意,直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。

完整的代码就是上面的 HelloWord 的例子,不再重复代码。

Fanout exchange

这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。

代码怎么写呢,演示一下:

首先要先配置交换机和队列的名称:

public class RabbitMQConfig {
    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 A 的名称
     */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";

    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 B 的名称
     */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";

    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的名称
     */
    public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";

}

再配置 FanoutExchange 类型的交换机和A、B两个队列,并且绑定。这种类型不需要配置 routing key:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Resource
    private RabbitAdmin rabbitAdmin;
    
    @Bean
    public Queue fanoutExchangeQueueA() {
        //队列A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
    }

    @Bean
    public Queue fanoutExchangeQueueB() {
        //队列B
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        //创建FanoutExchange类型交换机
        return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Binding bindFanoutA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB() {
        //队列B绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //启动项目即创建交换机和队列
        rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
        rabbitAdmin.declareQueue(fanoutExchangeQueueB());
        rabbitAdmin.declareQueue(fanoutExchangeQueueA());
        return null;
    }
}

创建 service 发布消息的方法:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    //发布消息
    @Override
    public String sendMsgByFanoutExchange(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    //组装消息体
    private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        return map;
    }
}

Controller 接口:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    /**
     * 发布消息
     *
     * @author 
     */
    @PostMapping("/publish")
    public String publish(@RequestParam(name = "msg") String msg) throws Exception {
        return rabbitMQService.sendMsgByFanoutExchange(msg);
    }
}

接着在消费者项目这边,创建两个队列的监听类,监听队列进行消费:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列A收到消息:" + map.toString());
    }

}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列B收到消息:" + map.toString());
    }
}

然后启动生产者和消费者两个项目,可以看到管理界面创建了一个 FanoutExchange 交换机和两个队列,并且绑定了:

使用 POSTMAN 进行发送消息,测试:

然后可以看到控制台,两个队列同时都收到了相同的消息,形成了发布订阅的效果:

Topic Exchange

直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:“*” 、 “#”。需要注意的是通配符前面必须要加上"."符号。

*符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、“a.c”,但是匹配不了"a.b.c"。

#符号:匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。

代码演示一下:

依然是配置 TopicExchange 名称和三个队列的名称:

/**
 * RabbitMQ的TOPIC_EXCHANGE交换机名称
 */
public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";

/**
 * RabbitMQ的TOPIC_EXCHANGE交换机的队列A的名称
 */
public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";

/**
 * RabbitMQ的TOPIC_EXCHANGE交换机的队列B的名称
 */
public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";

/**
 * RabbitMQ的TOPIC_EXCHANGE交换机的队列C的名称
 */
public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";

然后还是配置交换机和队列,然后绑定,创建:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    //省略...
    
    @Bean
    public TopicExchange rabbitmqDemoTopicExchange() {
        //配置TopicExchange交换机
        return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Queue topicExchangeQueueA() {
        //创建队列1
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueB() {
        //创建队列2
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueC() {
        //创建队列3
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
    }

    @Bean
    public Binding bindTopicA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueB())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicB() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueC())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicC() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueA())
                .to(rabbitmqDemoTopicExchange())
                .with("rabbit.#");
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
        rabbitAdmin.declareQueue(topicExchangeQueueA());
        rabbitAdmin.declareQueue(topicExchangeQueueB());
        rabbitAdmin.declareQueue(topicExchangeQueueC());
        return null;
    }
}

然后写一个发送消息的 service 方法:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Override
    public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            //发送消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

写一个 Controller 接口:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    /**
     * 通配符交换机发送消息
     *
     * @author java技术爱好者
     */
    @PostMapping("/topicSend")
    public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {
        return rabbitMQService.sendMsgByTopicExchange(msg, routingKey);
    }
}

生产者这边写完,就写消费端,消费端比较简单,写三个监听类:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A))
public class TopicExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "]收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B))
public class TopicExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "]收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C))
public class TopicExchangeConsumerC {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "]收到消息:" + map.toString());
    }
}

然后启动项目开始调试。启动成功后可以看到队列和路由键绑定的关系:

通过 POSTMAN 进行测试,测试一下 rabbit.# 的路由键是否能够匹配成功:

测试成功,队列A消费到消息:

接着测试 a.* 路由键,发送 routingKey = a.b :

比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。

实际上稍微思考一下,可以发现通配符(TopicExchange)这种模式其实是可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果的。

FanoutExchange 不需要绑定 routingKey,所以性能相对 TopicExchange 会好一点。

Headers Exchange

这种交换机用的相对没这么多。它跟上面三种有点区别:它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。如图所示:

创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。代码怎么实现呢,往下看演示代码:

首先还是需要定义交换机名称,队列名称:

/**
 * HEADERS_EXCHANGE交换机名称
 */
public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";

/**
 * RabbitMQ的HEADERS_EXCHANGE交换机的队列A的名称
 */
public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";

/**
 * RabbitMQ的HEADERS_EXCHANGE交换机的队列B的名称
 */
public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";

然后设置交换机,队列,进行绑定:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Bean
    public Queue headersQueueA() {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Queue headersQueueB() {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false);
    }

    @Bean
    public HeadersExchange rabbitmqDemoHeadersExchange() {
        return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Binding bindHeadersA() {
        Map<String, Object> map = new HashMap<>();
        map.put("key_one", "java");
        map.put("key_two", "rabbit");
        //全匹配
        return BindingBuilder.bind(headersQueueA())
                .to(rabbitmqDemoHeadersExchange())
                .whereAll(map).match();
    }

    @Bean
    public Binding bindHeadersB() {
        Map<String, Object> map = new HashMap<>();
        map.put("headers_A", "coke");
        map.put("headers_B", "sky");
        //部分匹配
        return BindingBuilder.bind(headersQueueB())
                .to(rabbitmqDemoHeadersExchange())
                .whereAny(map).match();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
        rabbitAdmin.declareQueue(headersQueueA());
        rabbitAdmin.declareQueue(headersQueueB());
        return null;
    }
}

再写一个 Service 方法发送消息:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
        try {
            MessageProperties messageProperties = new MessageProperties();
            //消息持久化
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messageProperties.setContentType("UTF-8");
            //添加消息
            messageProperties.getHeaders().putAll(map);
            Message message = new Message(msg.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

再写一个 Controller 接口:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    @PostMapping("/headersSend")
    @SuppressWarnings("unchecked")
    public String headersSend(@RequestParam(name = "msg") String msg,
                              @RequestParam(name = "json") String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        Map<String, Object> map = mapper.readValue(json, Map.class);
        return rabbitMQService.sendMsgByHeadersExchange(msg, map);
    }
}

生产者这边写完了,再写两个队列的监听类进行消费:

@Component
public class HeadersExchangeConsumerA {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "]收到消息:" + new String(message.getBody(), contentType));
    }
}

@Component
public class HeadersExchangeConsumerB {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType));
    }
}

启动项目,打开管理界面,我们可以看到交换机绑定队列的信息:

跟上面示意图一样~证明没有问题。使用 POSTMAN 发送,测试全匹配的队列A:

再测试部分匹配的队列B:

总结

至此,我们已完成 RabbitMQ 入门操作的内容。但 MQ 的使用远不止这些,还有事务机制、负载均衡等等内容,后续再看情况更新吧!

赞助页面示例