八、代码实现-自动/手动应答(14)


告警配置类
package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口** @author 天真热* @create 2022-02-11 15:25* @desc**/@Component@Slf4jpublic class CallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstructprivate void init() {//注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机回调方法(针对于交换机是否成功接收消息)* 1.发消息 交换机接收到了 回调* 1.1 correlationData 保存回调消息的id及相关信息* 1.2 交换机收到消息 ack=true* 1.3 call null* 2. 发消息 交换机失败了 回调* 2.1 correlationData 保存回调消息的id及相关信息* 2.2 交换机收到消息 ack=false*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {System.out.println("交换机接受成功了");} else {System.out.println("交换机接受失败了");}}/*** 消息不可达到目的地时 , 返回给生产者** @param message消息* @param replayCode 失败码* @param replayText 失败原因* @param exchanges交换机* @param routingKey 路由*/@Overridepublic void returnedMessage(Message message, int replayCode, String replayText, String exchanges, String routingKey) {System.out.println("队列接受失败了");System.out.println("消息:" + message + ";消息码:" + replayCode + ";原因:" + replayText + ";交换机:" + exchanges + ";路由:" + routingKey);}}
正常消费者
package com.rabbitmq11;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/*** 发布确认** @author 天真热* @create 2022-02-10 10:09* @desc**/@Configurationpublic class ConfirmConfig {//队列public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";//交换机public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";//routingKeypublic static final String CONFIRM_ROUNTING_KEY = "CONFIRM_ROUNTING_KEY";//备份交换机public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";//备份队列public static final String BACKUP_QUEUE = "BACKUP_QUEUE";//告警队列public static final String WARNING_QUEUE = "WARNING_QUEUE";//声明交换机//这里因为用的是之前的交换机 , 所以需要删除原先的交换机才能生效@Beanpublic DirectExchange confirmEchange() {return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();}//声明备份交换机@Beanpublic FanoutExchange backupEchange() {return new FanoutExchange(BACKUP_EXCHANGE);}//声明队列@Beanpublic Queue confirmQueue() {//创建队列return new Queue(CONFIRM_QUEUE);}//声明备份队列@Beanpublic Queue backupQueue() {//创建队列return new Queue(BACKUP_QUEUE);}//声明报警队列@Beanpublic Queue warningQueue() {//创建队列return new Queue(WARNING_QUEUE);}//绑定队列@Beanpublic Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmEchange") DirectExchange confirmEchange) {return BindingBuilder.bind(confirmQueue).to(confirmEchange).with(CONFIRM_ROUNTING_KEY);}//绑定备份队列@Beanpublic Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(backupQueue).to(backupEchange);}//绑定报警队列@Beanpublic Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(warningQueue).to(backupEchange);}}