public static void publishYb() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//线程安全有序的一个哈希表 , 适合高并发的情况//1.将序号和消息进行关联//2.轻松批量删除条目 只要给到序号//3.支持高并发ConcurrentSkipListMap
十、代码实现-交换机
前面有说到 , 实际上生产者发送消息 , 消息是直接发给交换机 , 然后再由交换机根据相关规则分配给队列 。而根据分配规则 , 常见的基类交换机分别有:直接交换机() , 主题交换机(topic) , 标题交换机(topic) , 首部交换机()等 。
ps:因为我这里用的是同一个交换机 , 如果同一个交换机 , 并且配置修改了 , 则需要删除原先的交换机 , 否则会报错 。
交换机模式一
这种模式跟广播一样 , 即发送到交换机的所有消息 , 都会发到交换机的所有队列中 , 代码如下:
生产者
package com.rabbitmq5;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , (消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}}
消费者1
package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}}
- 百年老字号茶叶店,老字号的八大祥是那八个?
- 庞太师和八贤王究竟谁比较大包拯和庞太师的关系
- 揭秘云南奇特习俗,天葬、水葬、树葬,云南这些事不能做!
- 食用夜香花的种植和管理
- 金桔树怎么养
- 淘米水浇花的好处和正确使用方法
- 金钱木怎么养才茂盛多头
- 鸡心果树苗的栽培与管理
- 植物组织培养的特点
- 三七种植技术与栽培管理