八、代码实现-自动/手动应答( 三 )


创建消费者
package com.rabbitmq1;import com.rabbitmq.client.*;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 15:34* @desc**/public class Consume {//发消息public static void main(String[] args) throws IOException, TimeoutException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}}
启动生产者类 , 查看可视化界面 , 可以发现已经写入了10条
启动消费者 , 效果如下 , 可以看到队列消息已经被消费 , 则成功!
ps:如果报错ERROR com...impl.ndler - Anerror  , 则需要admin授权
八、代码实现-多个消费者
工作原理图
生产者
package com.rabbitmq2;import com.rabbitmq.client.Channel;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发 , 大于1则是预取值 , 默认为0 。预取后 , 再进行不公平分发 。channel.basicQos(0);//生成队列,//1.名称//2.队列消息是否持久化(否:存内存 , 是:存磁盘 。默认否)//3.队列是否只供一个消费者消费 , 默认否//4.最后一个消费者断开连接后 , 是否自动删除 。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {String message="this is Product"+i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , 忽略//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, null, message.getBytes());}System.out.println("消息发送成功");}}
消费者1
package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}}
消费者2
package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}}