八、代码实现-自动/手动应答( 十 )


八、代码实现-自动/手动应答

文章插图
测试 , 启动生产者 , 再启动消费者
十二、代码实现-死信队列
死信指的是无法被消费的消息 。这些消息因为一些如网络超时等原因 , 导致无法被消费 , 就成了死信消息 。所以为了保证这些数据不丢失 , 就有了死信队列 , 专门对死信消息进行处理 。
工作图:
代码如下:
生成者:
package com.rabbitmq8;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机(不需要重复声明)//channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");//设置ttl时间为10s,过期则进入死信队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//发送死信消息for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , (消息持久化)//4.消息体channel.basicPublish(NORMAL_EXCHANGE, "normalQueue", properties, message.getBytes());}}}
消费者(正常队列)
package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume01 {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";//死信交换机public static String DEAD_EXCHANGE = "DEAD_EXCHANGE";//正常队列public static String NORMAL_QUEUE = "NORMAL_QUEUE";//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");channel.exchangeDeclare(DEAD_EXCHANGE, "direct");//声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//设置参数Map arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key", "deadQueue");//设置正常队列长度arguments.put("x-max-length", 6);//设置过期时间,10s(一般不在这里设置 , 而是在生产者端配置 , 这样子过期时间可以由生产者随意改动)//arguments.put("x-message-ttl", "10000");//声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);//绑定普通交换机和队列的channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normalQueue");//绑定死信交换机和队列的channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "deadQueue");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("拒绝");//拒绝 , 并且不放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);System.out.println("正常队列准备消费消息......");}}