三 rabbitmq发布确认+集群+其他高级( 五 )


9.2.2. 如何添加
b.队列中代码添加优先级
Map params = new HashMap();params.put("x-max-priority", 10);
c.消息中代码添加优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
d.注意事项
要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
9.2.3. 实战
package com.bcl.mq.night;import com.bcl.mq.utils.RabbitMqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.util.HashMap;import java.util.Map;/*** hello demo* 生产者** @author bcl* @date 2021/9/1*/public class Producer {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//获取信道Channel channel = RabbitMqUtils.getChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面消息是否会持久化* 3.队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费之消费 false只能一个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数*///设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPUMap params = new HashMap();params.put("x-max-priority", 10);channel.queueDeclareNoWait(QUEUE_NAME, true, false, false, params);//发消息for (int i = 0; i < 11; i++) {String message = "info:" + i;if (i == 5) {AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());}else {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}System.out.println("发送消息完成:" + message);}}}
package com.bcl.mq.night;import com.bcl.mq.utils.RabbitMqUtils;import com.rabbitmq.client.*;import java.util.HashMap;import java.util.Map;/*** hello demo* 消费者* 接收消息** @author bcl* @date 2021/8/31*/public class Consumer {// 队列名称private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println("消费者启动等待消费..............");//声明接收消息,推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收消息:" + new String(message.getBody()));};//取消消息时回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息被中断");};/*** 消费者消费消息* 1.消费那个队列* 2.消费成功后是否要自动应答true代表自动应答false代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}
9.3. 惰性队列 9.3.1. 使用场景
从 3.6.0 版本开始引入了惰性队列的概念 。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储 。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了 。
默认情况下,当生产者将消息发送到的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者 。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份 。当需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息 。虽然的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候 。