RabbitMQ-在Springboot中触发returnCallback()方法,返回replyCode=312 NO_ROUTE问题解决办法

Scroll Down

RabbitTemplate遇到replyCode=312,replyText=NO_ROUTE

问题描述

在项目中遇到当调用SpringBoot的RabbitTemplate.convertAndSend(String queue, String msg)方法之后,发现消息没有进入队列,同时触发了MQ生产者的ReturnCallBack方法,发现消息确实在发送端出现了问题。

问题原因和解决办法

话不多说直接上官方提供的解决方法。
原因
官方的解释1:

It causes some kind of deadlock down in the amqp-client code. The simplest solution is to do the send on a separate thread - use a TaskExecutor within the callback...

官方的解释2:

the callback is invoked directly on the client's connection I/O thread; to do the send, we open a new channel and wait for the Channel.Open-OK result (which is sent by the broker), but now the I/O thread that would handle that result is blocked waiting for that to happen. It does actually end up with a timeout exception after 10 minutes!

大致意思由于rabbitmq client bug导致客户端内部出现线程死锁导致消息没有进入到队列中,并且在解释中也告知了解决办法,就是可以通过另起一个线程来从新发送可以解决。
解决办法
1.官方确认,并且在2.2.x版本中修复了问题,因此可以通过升级rabbitmq client到2.2.x版本解决此问题
2.在returnCallBack()内部从新起一个线程使用原来的exchange和routing再次发送一遍replyCode=312的消息,即可成功发送消息到队列中
问题描述和启动新的线程来解决

解决问题

下面是我在项目中的部分代码

原先代码和配置

2021-03-08 13:33:42.243  INFO 7817 --- [ntLoopGroup-3-2] c.a.s.ws.TextWebSocketFrameHandler       : Message content:type:heartbeat , endpoint:603eea3d660767905c1686af , time: 2021-03-08T13:33:42.226357661+08:00 .
2021-03-08 13:33:42.335  INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler       : Message content:type:heartbeat , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.333921624+08:00 .
2021-03-08 13:33:42.435  INFO 7817 --- [cTaskExecutor-1] com.Application         : Handle task: {"content":"ls","execId":5567,"id":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f","nodeId":"603f489b660767905c1a958e","resourceName":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>","type":"adhoc_command"}
2021-03-08 13:33:42.436  INFO 7817 --- [cTaskExecutor-1] com.Application         : Sending adhoc_command task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f
2021-03-08 13:33:42.444  INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler       : Message content:type:task , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.442484649+08:00 .
2021-03-08 13:33:42.445  INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler  : Task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f executed: 0 false tResult is <span style="color:#8EE7FF">[192.168.223.2, 172.17.0.1]</span>: agent
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的consume tag:null
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息返回码 replyCode: 312
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 描述:NO_ROUTE
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的交换器 exchange :
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的路由键 routing : 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f
2021-03-08 13:33:42.456  INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler  : amqpTemplate send message to queue 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"}

发送消息的代码

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
        val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference<Packet<TaskResult>>() {
        });
        val tResult = packet.getData();
        log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput());
        rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult));
        log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult));
    }

生产者MQ配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

/**
 * Rabbit mq 数据访问类
 */
@Slf4j
@Component
public class RabbitDao implements RabbitTemplate.ReturnCallback {

    private static final int RABBITMQ_312_CODE = 312;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (!ack) {
                String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause);
                log.error(msg);
            }
        }
    };

    public void convertAndSend(String queueName, Object message) {
        //默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发
        rabbitTemplate.setReturnCallback(this);
        //默认不开启,用来确认消息是否到达exchange的回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend(queueName, message);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息主体 message : " + message);
        log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
        log.error("消息返回码 replyCode: " + replyCode);
        log.error("描述:" + replyText);
        log.error("消息使用的交换器 exchange : " + exchange);
        log.error("消息使用的路由键 routing : " + routingKey);
    }
}

springboot配置文件

##打开confirmcallback和returncallback
spring:
  cloud:
    rabbitmq:
      publisher-confirms: true
      publisher-returns: true

更新后代码

根据官方的描述,那么我们可以判断当直接新起一个线程来进行处理即可

    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息主体 message : " + message);
        log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
        log.error("消息返回码 replyCode: " + replyCode);
        log.error("描述:" + replyText);
        log.error("消息使用的交换器 exchange : " + exchange);
        log.error("消息使用的路由键 routing : " + routingKey);
        if(replyCode == RABBITMQ_312_CODE){
            CompletableFuture.runAsync(()->{
                log.info("retry send message one more time when trigger ReturnCallback message");
                rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
                log.info("retry send message exchange is {}", exchange);
                log.info("retry send message routingKey is {}", routingKey);
                log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
            });
        }
    }

查看修复后的日志

2021-03-08 17:47:03.697  INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler  : Task b617e6c5-ebdb-40a5-8c13-d818b2018188 executed: 0 false tResult is <span style="color:#8EE7FF">shahy-test-resetPassword-1</span>: 123
2021-03-08 17:47:03.698  INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler  : amqpTemplate send message to queue b617e6c5-ebdb-40a5-8c13-d818b2018188 message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的consume tag:null
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息返回码 replyCode: 312
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 描述:NO_ROUTE
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的交换器 exchange :
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao       : 消息使用的路由键 routing : b617e6c5-ebdb-40a5-8c13-d818b2018188
2021-03-08 17:47:03.741  INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao       : retry send message one more time when trigger ReturnCallback message
2021-03-08 17:47:03.742  INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao       : retry send message exchange is
2021-03-08 17:47:03.742  INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao       : retry send message routingKey is b617e6c5-ebdb-40a5-8c13-d818b2018188
2021-03-08 17:47:03.742  INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao       : retry send message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}

通过日志我们可以看到,当触发returnCallBack()方法之后,当replyCode=312的时候,消息被再次处理并且是一个新的线程。最终问题也解决了,消费着可以拿到最终的消息。

更优雅的解决方式

1.直接起一个异步线程

直接起一个线程,在一定程度上还是会有风险假如遇到的问题很多,那么我们就需要在短时间内创建很多线程,有可能造成负载的风险;因此我们可以考虑通过一个线程池来实现,这里就不写全部实现了可以参考一下代码

exec.execute(() -> template.send(...));

2.通过注入一个ApplicationRunner的方式来创建线程

    @Bean
    public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息主体 message : " + message);
            log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
            log.error("消息返回码 replyCode: " + replyCode);
            log.error("描述:" + replyText);
            log.error("消息使用的交换器 exchange : " + exchange);
            log.error("消息使用的路由键 routing : " + routingKey);
            if(replyCode == 312){
                CompletableFuture.runAsync(()->{
                    log.info("retry send message one more time when trigger ReturnCallback message");
                    rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
                    log.info("retry send message exchange is {}", exchange);
                    log.info("retry send message routingKey is {}", routingKey);
                    log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
                });
            }
        });
        return args -> {log.info("retry send message one more time when trigger ReturnCallback message with replyCode 312");};
    }

3.升级版本解决

升级springboot版本到2.2.x以上版本,并且采用官方提供的AsyncRabbitTemplate来实现,我选择使用springboot2.2.0版本,spring-amqp 2.2.13版本

buildscript {
    ext {
        springBootVersion = '2.2.0.RELEASE'
    }
}
dependencies {
    implementation 'org.springframework.amqp:spring-amqp:2.2.13.RELEASE'
    implementation 'org.springframework.amqp:spring-rabbit:2.2.13.RELEASE'
    implementation 'org.springframework.boot:spring-boot-starter-amqp:2.2.13.RELEASE'
}

RabbitTemplate配置

@Slf4j
@Component
public class RabbitDao{

    private static final int RABBITMQ_312_CODE = 312;

    @Autowired
    private RabbitTemplate rabbitTemplate;


    private AsyncRabbitTemplate asyncRabbitTemplate(){
        //默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发
        rabbitTemplate.setReturnCallback(returnCallback);
        //默认不开启,用来确认消息是否到达exchange的回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
        asyncRabbitTemplate.start();
        return asyncRabbitTemplate;
    }


    private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (!ack) {
                String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause);
                log.error(msg);
            }
        }
    };

    private RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.error("消息主体 message : " + message);
            log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
            log.error("消息返回码 replyCode: " + replyCode);
            log.error("描述:" + replyText);
            log.error("消息使用的交换器 exchange : " + exchange);
            log.error("消息使用的路由键 routing : " + routingKey);
            if(replyCode == RABBITMQ_312_CODE){
                CompletableFuture.runAsync(()->{
                    log.info("retry send message one more time when trigger ReturnCallback message");
                    rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
                    log.info("retry send message exchange is {}", exchange);
                    log.info("retry send message routingKey is {}", routingKey);
                    log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
                });
            }
        }
    };

    public void convertAndSend(String queueName, Object message) {
        asyncRabbitTemplate().convertSendAndReceive(queueName, message);
    }
}

生产者发送消息

    protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
        val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference<Packet<TaskResult>>() {
        });
        val tResult = packet.getData();
        log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput());
        rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult));
        log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult));
    }