三 rabbitmq发布确认+集群+其他高级

8.发布确认 8.1. 发布确认版本 8.1.1. 确认机制方案
8.1.2. 代码架构图
8.1.3. 配置文件
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
经测试有两种效果,其一效果和值一样会触发回调方法,其二在发布消息成功后使用调用或方法等待节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 方法如果返回 false 则会关闭 ,则接下来无法发送消息到
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123spring.rabbitmq.publisher-confirm-type=correlated
8.1.4.添加配置类
package com.bcl.mq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 配置类 发布确认(高级)** @author bcl* @date 2021/9/6*/@Configurationpublic class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //交换机public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//队列public static final String CONFIRM_ROUTING_KEY = "key1";//routingKey//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}}
8.1.5 消息生产者
package com.bcl.mq.controller;import com.bcl.mq.config.MyCallBack;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;/*** 发布确认 生产者(高级)** @author bcl* @date 2021/9/6*/@RestController@RequestMapping("/confirm")@Slf4jpublic class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message) {//指定消息 id 为 1CorrelationData correlationData1 = new CorrelationData("1");String routingKey = "key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);CorrelationData correlationData2 = new CorrelationData("2");routingKey = "key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"111", routingKey, message + routingKey, correlationData2);log.info("发送消息内容:{}", message);}}
8.1.6 消息回调者
package com.bcl.mq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 发布确认 回调接口(高级)* 生产者发送消息 发不出去 回调接口** @author bcl* @date 2021/9/6*/@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(myCallBack);}/*** 交换机确认回调方法* 1.发消息 交换机接收到了 回调* 1.1 correlationData保存回调信息ID及相关信息* 1.2 交换机收到消息 ack = true* 1.3 case null* 2.发消息 交换机接收失败了 回调* 2.1 correlationData保存回调信息ID及相关信息* 2.2 交换机收到消息 ack = false* 2.3 case 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到 id 为:{}的消息", id);} else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);}}}