八、代码实现-自动/手动应答(11)


消费者2(死信队列)
package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume02 {//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);System.out.println("死信队列准备消费消息......");}}
测试1:首先先运行死信队列 , 然后运行生产者 。由于生产的消息没有被消费掉 , 消息超时自动进入死信队列
测试二:运行死信队列和正常队列 , 然后运行生产者 。由于生产的消息被拒绝 , 消息超时自动进入死信队列
十三、代码实现-延迟队列
延迟队列:延迟队列里面的元素 , 是到达指定时候后 , 就对这些元素进行处理 。如果订单功能 , 如果指定时间内不进行支付 , 则会取消订单 。
原理图:
因为我们后续肯定是要在框架运行 , 所以这里需要整合 。
在yml配置文件配置
spring:rabbitmq:host: 192.168.248.10port: 5672username: adminpassword: admin#交换机确认接口publisher-confirms: true
添加配置类
package com.rabbitmq9;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/@Configurationpublic class Config {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列名称public static final String DEAD_LETTER_QUEUE = "QD";//声明xExchange 别名@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//声明yExchange 别名@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A ttl 为10s@Bean("queueA")public Queue queuA() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,10sargument.put("x-message-ttl", 10000);//创建队列return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();}//声明普通队列B ttl 为10s@Bean("queueB")public Queue queuB() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,40sargument.put("x-message-ttl", 40000);//创建队列return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();}//声明普通队列C ttl 为生产者确定@Bean("queueC")public Queue queuC() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//创建队列return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();}//死信队列@Bean("queueD")public Queue queuD() {//创建队列return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//绑定队列A@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列B@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列C@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//绑定队列D@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}