八、代码实现-自动/手动应答(12)


添加生产消息类
package com.rabbitmq9;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/@Slf4j@RestController@RequestMapping("/ttl")public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMessage(@PathVariable String message) {log.info("发送消息");rabbitTemplate.convertAndSend("X", "XA", "消息来自10s的ttl:" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自40s的ttl:" + message);}@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendExpireMsg(@PathVariable String message, @PathVariable String ttlTime) {log.info("发送定时消息");rabbitTemplate.convertAndSend("X", "XC", "消息来自定时消息:" + message, msg -> {//发消息的时候 , 延迟延长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}}
添加消费者类
package com.rabbitmq9;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/@Slf4j@Componentpublic class Consume {//接收消息@RabbitListener(queues = Config.DEAD_LETTER_QUEUE)public void receiveD(Message msg, Channel channel) {String message = new String(msg.getBody());log.info("接收到了延迟队列消息:" + message);}}
测试1::8090/ttl//
测试2::8090/ttl//5555/100
测试3:连续发送两个地址 。可以看到延迟消息需要排队 , 没法优先发送延时时间短的 , 这是个弊端 , 可以使用插件克服这个问题 。
:8090/ttl//5555/10000
:8090/ttl///1000
十四、代码实现-插件实现延迟队列
安装教程
下载:将插件放入:/usr/lib//lib/-3.8.8/下进入目录:cd /usr/lib//lib/-3.8.8/安装:-重启:-
工作原理:
代码如下
配置类
package com.rabbitmq10;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/@Configurationpublic class DelayConfig {//队列public static final String DELAY_QUEUE = "DELAY_QUEUE";//交换机public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";//routingKeypublic static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";//声明交换机@Beanpublic CustomExchange delayEchange() {Map arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");//1.交换机名称//2.交换机类型//3.是否需要持久化//4.是否需要自动删除//5.其他参数return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);}//声明队列@Beanpublic Queue delayQueue() {//创建队列return new Queue(DELAY_QUEUE);}//绑定队列@Beanpublic Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {return BindingBuilder.bind(delayQueue).to(delayEchange).with("DELAY_ROUNTING_KEY").noargs();}}
消费者
package com.rabbitmq10;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/@Slf4j@Componentpublic class DelayConsume {//接收消息@RabbitListener(queues = DelayConfig.DELAY_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("接收到了插件延迟队列消息:" + message);}}