0、参考上一篇文章:RabbitMq学习之RabbitMQ使用
1、消息队列的高级用法
消息的持久化
在发送消息时,可以设置一个basicProperties对象,在对象中封装一些配置信息。
Map<String, Object> map = new HashMap<String, Object>();
map.put("company","hhh");
map.put("name","haijin");
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2 标识持久化消息 服务器重启,消息还在 1不会持久化
.expiration("10000") //消息10s后会过期,也就是说10秒后消息如果没有被消费,那就消失了(死信)
.contentEncoding("utf-8")
.correlationId(UUID.randomUUID().toString()) //消息的id
.headers(map) //可以将该消息的一些元数据信息封装在这个map里面,然后携带过去
.build();
//===发消息==
for (int i = 0; i < 100; i++) {
String message = "消息:"+i;
channel.basicPublish("",QUEUE_NAME,basicProperties,message.getBytes());
}
2、如何在消费者端做限流
3、Comfirm_listener消息队列模式
上半场ack和下半场ack的区别
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.156");
connectionFactory.setPort(5672);
connectionFactory.setUsername("rabbit");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("nz1905");
//==================
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//===声明队列===
/*
队列可以持久化的
交换机也可以是持久化的
消息可以持久化吗?——可以设置消息的持久化
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.exchangeDeclare("myexchange","topic",true,false,null);
//开启消息投递模式(确认模式)
channel.confirmSelect();
Map<String, Object> map = new HashMap<String, Object>();
map.put("company","hhh");
map.put("name","haijin");
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2 标识持久化消息 服务器重启,消息还在 1不会持久化
.expiration("10000") //消息10s后会过期,也就是说10秒后消息如果没有被消费,那就消失了(死信)
.contentEncoding("utf-8")
.correlationId(UUID.randomUUID().toString()) //消息的id
.headers(map) //可以将该消息的一些元数据信息封装在这个map里面,然后携带过去
.build();
//===发消息==
for (int i = 0; i < 100; i++) {
String message = "消息:"+i;
/**
* 消息确认的监听:监听消息的接收情况
*/
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息已收到");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有收到");
}
});
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
}
4、Ack和Nack 和Reject的区别
1)Ack对于明确要消费的消息。可以通过Ack的方式告知mq,mq就会发送下一条消息给消费者
2)Nack 告知mq,这样的消息我不处理,丢弃掉,此时消息会成为死信。
if("1".equals((String)(properties.getHeaders().get("tag")))){
channel.basicNack(envelope.getDeliveryTag(),false,false);
}else{
String message = new String(body,"utf-8");
System.out.println(message+"==="+properties);
channel.basicAck(envelope.getDeliveryTag(),false);
}
channel.basicNack(envelope.getDeliveryTag(),false,false);
第一个参数:envelope.getDeliveryTag() 这是表示消息的唯一标识ID,是一个正整数,是mq来自增设置的。
第二个参数:是否批量处理 可以提高效率 channel.basicNack(5,true,false);
第三个参数:是否重回队列
如果是重回队列的话,那么下一次还会继续消费这条消息,所以一般情况下不会重回队列。
3)Reject
reject与nack的用法相同,但是与nack只有一个区别,nack一次可以拒签多条消息(multiple:true),但是reject一次只能拒签一条消息
4、Return_listener 消息不可达时该如何处理
当发送消息时交互机不存在、或者队列不存在,或者Routing-key错误,那么相应的return listener就可以工作。
注意,使用这个功能时,要把basicPublish中的mandatory开启
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.156");
connectionFactory.setPort(5672);
connectionFactory.setUsername("rabbit");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("nz1905");
//==================
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//===声明队列===
/*
队列可以持久化的
交换机也可以是持久化的
消息可以持久化吗?——可以设置消息的持久化
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.exchangeDeclare("myexchange","topic",true,false,null);
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息不可达");
}
});
//===发消息==
for (int i = 0; i < 100; i++) {
String message = "消息:"+i;
/*
mandatory:
false: 当消息不可达时,此消息会被删除
true: 当消息不可达时,之前设置好的Return Listener将会处理。
*/
channel.basicPublish("","hhhhh",true,null,message.getBytes());
}
}
5、死信队列
DLX定义
DLX为Dead Letter Exchange,死信队列。当一个消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,那么这个让消息变为死信的Exchange就是DLX(死信队列)。
消息变成死信的几种情况
1、消息被拒绝,ack为false,并且 requeue=false;
2、消息TTL(Time To Live)过期,指消息达到了过期时间;
3、队列达到最大长度。
通过消息的过期成为死信的机制,死信队列很容易就能做出一个延迟队列的效果。