上一章介绍了RabbitMQ
在SpringBoot项目中简单队列的使用,介绍RabbitMQ
的死信队列的使用。
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。死信
是RabbitMQ中的一种消息机制 ,当消息被拒绝或者超出ttl,队列消息数量达到最大数量时,消息会成为死信。如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中。有针对性的分析解决问题(恢复数据)等
使用步骤
- 配置正常业务队列,绑定到业务交换机上
- 为业务队列配置死信队列和路由key
- 为死信交换机配置死信队列
死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。类型可以是Direct、Fanout、Topic等。
环境配置
MAVEN依赖不需要更改,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.16.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
需要调整的是Resource配置文件,添加如下配置项:
spring.rabbitmq.listener.type
是设置队列simple
模式。即简单的点对点消息模型,生产者向mq写消息,消费者监听mq,消费消息。
spring.rabbitmq.listener.simple.default-requeue-rejected
表示被拒绝的消息是否重新入队列。
spring.rabbitmq.listener.simple.acknowledge-mode
表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # service server: port: 8081 #spring spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual
|
项目开发
配置队列
声明常量
1 2 3 4 5 6 7 8 9 10
| public class MQConstant {
public static final String QUEUE_IM_NAME = "queue.demo.im"; public static final String QUEUE_DEAD_NAME = "queue.demo.dead"; }
|
RabbitMQ 交换机和队列的配置。需要注意的是,因为im队列绑定关系变更了,所以需要手动将之前的im队列删除。
此处需要注意的是即时队列与死信队列的路由绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import static org.liangtong.example.rabbit.constant.MQConstant.*;
@Configuration public class RabbitMQConfig {
@Bean("imExchange") public FanoutExchange imExchange() { return new FanoutExchange(EXCHANGE_IM_NAME); } @Bean("deadExchange") public DirectExchange deadExchange() { return new DirectExchange(EXCHANGE_DEAD_NAME); }
@Bean("imQueue") public Queue imQueue() { Map<String, Object> args = new HashMap<>(2); args.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME); args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_NAME); return QueueBuilder.durable(QUEUE_IM_NAME).withArguments(args).build(); } @Bean("deadQueue") public Queue deadQueue() { return new Queue(QUEUE_DEAD_NAME); }
@Bean public Binding imBinding(@Qualifier("imQueue") Queue queue, @Qualifier("imExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue) .to(exchange); } @Bean public Binding deadBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(ROUTING_KEY_DEAD_NAME); } }
|
消费者
为了方便测试,我们修改原消费者代码,在即时消息消费过程中,抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Slf4j @Component public class ImConsumer {
@RabbitListener(queues = QUEUE_IM_NAME) public void receiveIm(Message message, Channel channel) throws IOException { log.info("收到即时推送消息: " + message); boolean ack = true; Exception exception = null; try { long deliveryTag = message.getMessageProperties().getDeliveryTag(); if (deliveryTag % 2 == 1){ throw new RuntimeException("发生异常!"); } } catch (Exception e) { ack = false; exception = e; } finally {
} if (!ack) { log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
|
死信消息消费者
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component public class DeadConsumer { @RabbitListener(queues = QUEUE_DEAD_NAME) public void receiveDead(Message message, Channel channel) throws IOException { log.info("收到死信消息: " + message); log.info("死信消息properties:{}", message.getMessageProperties()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
|
消息生产者
消息生产者不变,直接放代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component public class MsgProducer { @Autowired private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg) { log.info("发送消息:{}", msg); rabbitTemplate.convertAndSend(EXCHANGE_IM_NAME, QUEUE_IM_NAME, msg, new CorrelationData() ); } }
|
测试
创建Webcontroller来进行功能测试,代码如下:
1 2 3 4 5 6 7 8 9 10 11
| @RequestMapping("rabbitmq") @RestController public class RabbitMsgController { @Autowired private MsgProducer sender;
@RequestMapping("send") public void sendMsg(String msg){ sender.sendMsg(msg); } }
|
启动SpringBoot项目,然后浏览器输入http://localhost:8081/rabbitmq/send?msg=hello
测试:
1 2 3 4 5 6 7 8 9 10
| 2020-09-16 10:22:37.714 INFO 53522 --- [nio-8081-exec-1] o.l.example.rabbit.producer.MsgProducer : 发送消息:dead msg 2020-09-16 10:22:37.727 INFO 53522 --- [ntContainer 2020-09-16 10:22:37.731 ERROR 53522 --- [ntContainer
java.lang.RuntimeException: 发生异常! at org.liangtong.example.rabbit.consumer.ImConsumer.receiveIm(ImConsumer.java:31) ~[classes/:na] ......
2020-09-16 10:22:37.732 INFO 53522 --- [ntContainer 2020-09-16 10:22:37.733 INFO 53522 --- [ntContainer
|
至此,死信队列功能已经实现,下一章节介绍如何使用RabbitMQ延迟队列
。
RabbitMQ相关的Demo代码已上传至Github,有需要的话可自行下载查阅。
地址:https://github.com/liangtongdev/demo-springboot-rabbitmq
本文标题:RabbitMQ 死信队列
文章作者:梁通
发布时间:2020-09-16
最后更新:2020-09-16
原始链接:http://www.liangtong.site/2020/09/16/java_20200916_springboot_rabbitmq_dead/
版权声明:Copyright© 2016-2020 liangtong 版权所有