八、代码实现-自动/手动应答( 六 )


public static void publishYb() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//线程安全有序的一个哈希表 , 适合高并发的情况//1.将序号和消息进行关联//2.轻松批量删除条目 只要给到序号//3.支持高并发ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<>();//--------------------监听器--------------------------//消息确认成功回调函数//1.消息的标记 , 2.是否为批量操作ConfirmCallback ackCallback = (deliveryTag, multiple) -> {//消息接受处理if (multiple) {//批量ConcurrentNavigableMap confirmd = concurrentSkipListMap.headMap(deliveryTag);confirmd.clear();} else {//非批量concurrentSkipListMap.remove(deliveryTag);}System.out.println("确认的消息:" + deliveryTag);};//消息确认失败回调函数//1.消息的标记 , 2.是否为批量操作ConfirmCallback nackCallback = (deliveryTag, multiple) -> {//消息未接受处理String message = concurrentSkipListMap.get(deliveryTag);System.out.println("未确认的消息:" + deliveryTag + ":" + message);};//消息监听器 , 监听失败和成功的消息channel.addConfirmListener(ackCallback, nackCallback);//批量发送消息、确认for (int i = 0; i < 1000; i++) {String message = ("message" + i);channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//记录发送的消息concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));System.out.println(concurrentSkipListMap.size());}
十、代码实现-交换机
前面有说到 , 实际上生产者发送消息 , 消息是直接发给交换机 , 然后再由交换机根据相关规则分配给队列 。而根据分配规则 , 常见的基类交换机分别有:直接交换机() , 主题交换机(topic) , 标题交换机(topic) , 首部交换机()等 。
ps:因为我这里用的是同一个交换机 , 如果同一个交换机 , 并且配置修改了 , 则需要删除原先的交换机 , 否则会报错 。
交换机模式一
这种模式跟广播一样 , 即发送到交换机的所有消息 , 都会发到交换机的所有队列中 , 代码如下:
生产者
package com.rabbitmq5;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , (消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}}
消费者1
package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}}