RabbitMq学习之业务场景以及死信队列

我爱海鲸 2022-03-01 15:06:14 暂无标签

简介业务场景以及死信队列等高级用法

0、参考上一篇文章:RabbitMq学习之RabbitMQ使用

1、消息队列的高级用法

消息的持久化

在发送消息时,可以设置一个basicProperties对象,在对象中封装一些配置信息。

Map<String, Object> map = new HashMap<String, Object>();
map.put("company","hhh");
map.put("name","haijin");



AMQP.BasicProperties basicProperties  = new AMQP.BasicProperties().builder()
        .deliveryMode(2)  //2 标识持久化消息 服务器重启,消息还在    1不会持久化
        .expiration("10000") //消息10s后会过期,也就是说10秒后消息如果没有被消费,那就消失了(死信)
        .contentEncoding("utf-8")
        .correlationId(UUID.randomUUID().toString()) //消息的id
        .headers(map)  //可以将该消息的一些元数据信息封装在这个map里面,然后携带过去
        .build();


//===发消息==
for (int i = 0; i < 100; i++) {
    String message = "消息:"+i;


    channel.basicPublish("",QUEUE_NAME,basicProperties,message.getBytes());
}

2、如何在消费者端做限流

3、Comfirm_listener消息队列模式

上半场ack和下半场ack的区别

public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.2.156");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("rabbit");
    connectionFactory.setPassword("123456");
    connectionFactory.setVirtualHost("nz1905");

    //==================

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();

    //===声明队列===
    /*
    队列可以持久化的
    交换机也可以是持久化的
    消息可以持久化吗?——可以设置消息的持久化
     */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    channel.exchangeDeclare("myexchange","topic",true,false,null);


    //开启消息投递模式(确认模式)
    channel.confirmSelect();

    Map<String, Object> map = new HashMap<String, Object>();
    map.put("company","hhh");
    map.put("name","haijin");



    AMQP.BasicProperties basicProperties  = new AMQP.BasicProperties().builder()
            .deliveryMode(2)  //2 标识持久化消息 服务器重启,消息还在    1不会持久化
            .expiration("10000") //消息10s后会过期,也就是说10秒后消息如果没有被消费,那就消失了(死信)
            .contentEncoding("utf-8")
            .correlationId(UUID.randomUUID().toString()) //消息的id
            .headers(map)  //可以将该消息的一些元数据信息封装在这个map里面,然后携带过去
            .build();


    //===发消息==
    for (int i = 0; i < 100; i++) {
        String message = "消息:"+i;


        /**
         * 消息确认的监听:监听消息的接收情况
         */
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息已收到");
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息没有收到");
            }
        });


        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    }



}

4、Ack和Nack 和Reject的区别

1)Ack对于明确要消费的消息。可以通过Ack的方式告知mq,mq就会发送下一条消息给消费者

2)Nack 告知mq,这样的消息我不处理,丢弃掉,此时消息会成为死信。

if("1".equals((String)(properties.getHeaders().get("tag")))){
    channel.basicNack(envelope.getDeliveryTag(),false,false);
}else{
    String message = new String(body,"utf-8");
    System.out.println(message+"==="+properties);
    channel.basicAck(envelope.getDeliveryTag(),false);
}

channel.basicNack(envelope.getDeliveryTag(),false,false);       

第一个参数:envelope.getDeliveryTag() 这是表示消息的唯一标识ID,是一个正整数,是mq来自增设置的。       

第二个参数:是否批量处理  可以提高效率        channel.basicNack(5,true,false);       

第三个参数:是否重回队列              

如果是重回队列的话,那么下一次还会继续消费这条消息,所以一般情况下不会重回队列。

3)Reject

         reject与nack的用法相同,但是与nack只有一个区别,nack一次可以拒签多条消息(multiple:true),但是reject一次只能拒签一条消息

4、Return_listener 消息不可达时该如何处理

当发送消息时交互机不存在、或者队列不存在,或者Routing-key错误,那么相应的return listener就可以工作。

注意,使用这个功能时,要把basicPublish中的mandatory开启

public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.156");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("rabbit");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("nz1905");

        //==================

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //===声明队列===
        /*
        队列可以持久化的
        交换机也可以是持久化的
        消息可以持久化吗?——可以设置消息的持久化
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.exchangeDeclare("myexchange","topic",true,false,null);


        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息不可达");
            }
        });


        //===发消息==
        for (int i = 0; i < 100; i++) {
            String message = "消息:"+i;
            /*
            mandatory:
                false:  当消息不可达时,此消息会被删除
                true:   当消息不可达时,之前设置好的Return Listener将会处理。
             */
            channel.basicPublish("","hhhhh",true,null,message.getBytes());
        }

    }

5、死信队列

DLX定义
DLX为Dead Letter Exchange,死信队列。当一个消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,那么这个让消息变为死信的Exchange就是DLX(死信队列)。

消息变成死信的几种情况
1、消息被拒绝,ack为false,并且 requeue=false;
2、消息TTL(Time To Live)过期,指消息达到了过期时间;
3、队列达到最大长度。

通过消息的过期成为死信的机制,死信队列很容易就能做出一个延迟队列的效果。

你好:我的2025