RabbitTemplate遇到replyCode=312,replyText=NO_ROUTE
问题描述
在项目中遇到当调用SpringBoot的RabbitTemplate.convertAndSend(String queue, String msg)方法之后,发现消息没有进入队列,同时触发了MQ生产者的ReturnCallBack方法,发现消息确实在发送端出现了问题。
问题原因和解决办法
话不多说直接上官方提供的解决方法。
原因
官方的解释1:
It causes some kind of deadlock down in the amqp-client code. The simplest solution is to do the send on a separate thread - use a TaskExecutor within the callback...
官方的解释2:
the callback is invoked directly on the client's connection I/O thread; to do the send, we open a new channel and wait for the Channel.Open-OK result (which is sent by the broker), but now the I/O thread that would handle that result is blocked waiting for that to happen. It does actually end up with a timeout exception after 10 minutes!
大致意思由于rabbitmq client bug导致客户端内部出现线程死锁导致消息没有进入到队列中,并且在解释中也告知了解决办法,就是可以通过另起一个线程来从新发送可以解决。
解决办法
1.官方确认,并且在2.2.x版本中修复了问题,因此可以通过升级rabbitmq client到2.2.x版本解决此问题
2.在returnCallBack()内部从新起一个线程使用原来的exchange和routing再次发送一遍replyCode=312的消息,即可成功发送消息到队列中
问题描述和启动新的线程来解决
解决问题
下面是我在项目中的部分代码
原先代码和配置
2021-03-08 13:33:42.243 INFO 7817 --- [ntLoopGroup-3-2] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:heartbeat , endpoint:603eea3d660767905c1686af , time: 2021-03-08T13:33:42.226357661+08:00 .
2021-03-08 13:33:42.335 INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:heartbeat , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.333921624+08:00 .
2021-03-08 13:33:42.435 INFO 7817 --- [cTaskExecutor-1] com.Application : Handle task: {"content":"ls","execId":5567,"id":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f","nodeId":"603f489b660767905c1a958e","resourceName":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>","type":"adhoc_command"}
2021-03-08 13:33:42.436 INFO 7817 --- [cTaskExecutor-1] com.Application : Sending adhoc_command task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f
2021-03-08 13:33:42.444 INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:task , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.442484649+08:00 .
2021-03-08 13:33:42.445 INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : Task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f executed: 0 false tResult is <span style="color:#8EE7FF">[192.168.223.2, 172.17.0.1]</span>: agent
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的consume tag:null
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息返回码 replyCode: 312
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 描述:NO_ROUTE
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的交换器 exchange :
2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的路由键 routing : 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f
2021-03-08 13:33:42.456 INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : amqpTemplate send message to queue 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">[192.168.223.2, 172.17.0.1]</span>: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"}
发送消息的代码
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference<Packet<TaskResult>>() {
});
val tResult = packet.getData();
log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput());
rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult));
log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult));
}
生产者MQ配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
/**
* Rabbit mq 数据访问类
*/
@Slf4j
@Component
public class RabbitDao implements RabbitTemplate.ReturnCallback {
private static final int RABBITMQ_312_CODE = 312;
@Autowired
private RabbitTemplate rabbitTemplate;
private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause);
log.error(msg);
}
}
};
public void convertAndSend(String queueName, Object message) {
//默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发
rabbitTemplate.setReturnCallback(this);
//默认不开启,用来确认消息是否到达exchange的回调
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend(queueName, message);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息主体 message : " + message);
log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
log.error("消息返回码 replyCode: " + replyCode);
log.error("描述:" + replyText);
log.error("消息使用的交换器 exchange : " + exchange);
log.error("消息使用的路由键 routing : " + routingKey);
}
}
springboot配置文件
##打开confirmcallback和returncallback
spring:
cloud:
rabbitmq:
publisher-confirms: true
publisher-returns: true
更新后代码
根据官方的描述,那么我们可以判断当直接新起一个线程来进行处理即可
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息主体 message : " + message);
log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
log.error("消息返回码 replyCode: " + replyCode);
log.error("描述:" + replyText);
log.error("消息使用的交换器 exchange : " + exchange);
log.error("消息使用的路由键 routing : " + routingKey);
if(replyCode == RABBITMQ_312_CODE){
CompletableFuture.runAsync(()->{
log.info("retry send message one more time when trigger ReturnCallback message");
rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
log.info("retry send message exchange is {}", exchange);
log.info("retry send message routingKey is {}", routingKey);
log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
});
}
}
查看修复后的日志
2021-03-08 17:47:03.697 INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : Task b617e6c5-ebdb-40a5-8c13-d818b2018188 executed: 0 false tResult is <span style="color:#8EE7FF">shahy-test-resetPassword-1</span>: 123
2021-03-08 17:47:03.698 INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : amqpTemplate send message to queue b617e6c5-ebdb-40a5-8c13-d818b2018188 message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的consume tag:null
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息返回码 replyCode: 312
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 描述:NO_ROUTE
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的交换器 exchange :
2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的路由键 routing : b617e6c5-ebdb-40a5-8c13-d818b2018188
2021-03-08 17:47:03.741 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message one more time when trigger ReturnCallback message
2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message exchange is
2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message routingKey is b617e6c5-ebdb-40a5-8c13-d818b2018188
2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message body is {"code":0,"completed":false,"output":"<span style=\"color:#8EE7FF\">shahy-test-resetPassword-1</span>: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}
通过日志我们可以看到,当触发returnCallBack()方法之后,当replyCode=312的时候,消息被再次处理并且是一个新的线程。最终问题也解决了,消费着可以拿到最终的消息。
更优雅的解决方式
1.直接起一个异步线程
直接起一个线程,在一定程度上还是会有风险假如遇到的问题很多,那么我们就需要在短时间内创建很多线程,有可能造成负载的风险;因此我们可以考虑通过一个线程池来实现,这里就不写全部实现了可以参考一下代码
exec.execute(() -> template.send(...));
2.通过注入一个ApplicationRunner的方式来创建线程
@Bean
public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("消息主体 message : " + message);
log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
log.error("消息返回码 replyCode: " + replyCode);
log.error("描述:" + replyText);
log.error("消息使用的交换器 exchange : " + exchange);
log.error("消息使用的路由键 routing : " + routingKey);
if(replyCode == 312){
CompletableFuture.runAsync(()->{
log.info("retry send message one more time when trigger ReturnCallback message");
rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
log.info("retry send message exchange is {}", exchange);
log.info("retry send message routingKey is {}", routingKey);
log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
});
}
});
return args -> {log.info("retry send message one more time when trigger ReturnCallback message with replyCode 312");};
}
3.升级版本解决
升级springboot版本到2.2.x以上版本,并且采用官方提供的AsyncRabbitTemplate来实现,我选择使用springboot2.2.0版本,spring-amqp 2.2.13版本
buildscript {
ext {
springBootVersion = '2.2.0.RELEASE'
}
}
dependencies {
implementation 'org.springframework.amqp:spring-amqp:2.2.13.RELEASE'
implementation 'org.springframework.amqp:spring-rabbit:2.2.13.RELEASE'
implementation 'org.springframework.boot:spring-boot-starter-amqp:2.2.13.RELEASE'
}
RabbitTemplate配置
@Slf4j
@Component
public class RabbitDao{
private static final int RABBITMQ_312_CODE = 312;
@Autowired
private RabbitTemplate rabbitTemplate;
private AsyncRabbitTemplate asyncRabbitTemplate(){
//默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发
rabbitTemplate.setReturnCallback(returnCallback);
//默认不开启,用来确认消息是否到达exchange的回调
rabbitTemplate.setConfirmCallback(confirmCallback);
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.start();
return asyncRabbitTemplate;
}
private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause);
log.error(msg);
}
}
};
private RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息主体 message : " + message);
log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag());
log.error("消息返回码 replyCode: " + replyCode);
log.error("描述:" + replyText);
log.error("消息使用的交换器 exchange : " + exchange);
log.error("消息使用的路由键 routing : " + routingKey);
if(replyCode == RABBITMQ_312_CODE){
CompletableFuture.runAsync(()->{
log.info("retry send message one more time when trigger ReturnCallback message");
rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody());
log.info("retry send message exchange is {}", exchange);
log.info("retry send message routingKey is {}", routingKey);
log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8));
});
}
}
};
public void convertAndSend(String queueName, Object message) {
asyncRabbitTemplate().convertSendAndReceive(queueName, message);
}
}
生产者发送消息
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference<Packet<TaskResult>>() {
});
val tResult = packet.getData();
log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput());
rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult));
log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult));
}