八、代码实现-自动/手动应答( 八 )


消费者2
package com.rabbitmq6;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "info");channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "warning");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("队列error等待...");}}
3. 交换机模式之topic
前面说的模式实际上就是绝对匹配 。而topic是的模糊匹配 。
*(星号)代表一个单词
#(井号)可以替代零个或多个单词
代码如下:
生产者
package com.rabbitmq7;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , (消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "queue.queue.queue11", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}}
消费者1
package com.rabbitmq7;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "*.queue.*");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("消费者|*.queue.*|等待中....");}}