RabbitMq学习之RabbitMQ使用

我爱海鲸 2022-03-01 10:31:33 暂无标签

简介Maven-RabbitMQ使用

创建Maven工程

添加依赖

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>

生产者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
    消息的生产者:
    1.连接RabbitMQ服务器
    2.声明(创建)队列
    3.往队列上发送消息
     */

public class RabbitProvider {

    //队列名称
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 连接RabbitMQ服务器
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 指明mq服务器的地址
        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        // 虚拟主机
        connectionFactory.setVirtualHost("vh1");
        // 账号
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");
        // 得到连接对象
        Connection connection = connectionFactory.newConnection();
        // 得到一个频道
        Channel channel = connection.createChannel();
        //声明队列
        /*
        参数详解:
        1)队列的名字
        2)是否持久化, 如果是的话,即使服务器重启,队列还在
        3)是否独占: 此队列只被一个消费者占有
        4)是否自动删除: 如果没有消费者再去使用,那么队列就会删除
        5)元数据参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        // 发布消息
         /*
         * @param exchange 指定的交换机
         * @param routingKey the routing key   路由键/目前写的是队列名称
         * @param props other properties for the message - routing headers etc  元数据参 
           数
         * @param body the message body 消息体
         */
        String message = "你好";
        //简单队列模式来发消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息成功");

    }
}

消费者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {

    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //让消费者去监听队列
        /*
        autoAck: true 自动回复   向broker告知,消息已消费
                false  手动回复
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

        
    }

}

非Lambda

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {

    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 连接rabbitmq服务器
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 指明mq服务器的地址
        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        // 账号
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");
        // 获得一个连接对象
        Connection connection = connectionFactory.newConnection();
        // 获得通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("接收到的消息:"+message);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);


    }

}

消息队列的收发模式

简单队列模式

默认情况下,是简单队列模式,两个消费者交替接收消息

work模式——能者多劳

一次只处理一个消息,待消息处理完后手动回复

我们发现如果使用简单队列模式,是不能解决“能者多劳”问题——性能强的消费者多消费的 问题。所以使用Work模式就可以解决。

解决思路:消费者要告知broker一次只接受队列中的一条消息,basicQos(1)。并且要把自动ack改成手动ack.

Ack的目的就是告知broker我已经接受到消息了,请发下一条消息过来。

改变消费者

 

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer1 {

    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.basicQos(1);//一次只处理一个消息,避免消息堆积

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到的消息:"+message);
                //手动回复 :处理完成 动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听队列
         /*
        autoAck: true 自动回复   向broker告知,消息已消费
                false  手动回复
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复


    }

}

发布订阅模式--使用fanout交换机

将消息发送给交换机,交换机绑定多个队列,每个队列都能得到该消息。

场景: 一条消息被发送给多个队列,让多个消费者消费。

改变生成者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProvider {

    private static final String QUEUE_NAME = "simple_queue";

    private static final String EXCHANGE_NAME="fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //声明交换机
        /*
        第一个参数: 交换机的名称
        第二个参数: 交换机的类型- fanout   发布订阅模式的交换机的类型:多个队列得到同一条消息
        */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //发布消息
        /*
         * @param exchange 指定的交换机
         * @param routingKey the routing key   路由键/目前写的是队列名称
         * @param props other properties for the message - routing headers etc  元数据参数
         * @param body the message body 消息体
         */
        //发送100条消息

        for (int i = 1; i <= 100; i++) {
            String message = "message"+i;
//            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//简单队列模式来发消息
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("发送第"+i+"个消息成功");
        }
        
    }

}

改变消费者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer1 {

    //定义队列名字
    private static final String QUEUE_NAME = "fanout_queue1";
    //定义交换机名字 注意:交换机的名称要和生产者创建的交换机的名称相同
    private static final String EXCHANGE_NAME="fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.basicQos(1);//一次只处理一个消息,避免消息堆积
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列绑定在交换机上
        //====将队列绑定在交换机上====
        //参数1:队列的名称  参数2:交换机的名称  参数3:routing-key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到的消息:"+message);
                //手动回复 :处理完成
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听队列
        /*
        autoAck: true 自动回复   向broker告知,消息已消费
                false  手动回复
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复


    }

}

路由模式

修改交换机类型为direct

要点:

- 在创建交换机时类型要选择 direct

- 在队列绑定交换机的时候,同时要指明routing-key

- 在生产者发送消息时,要带着routing-key

生产者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProvider {


    private static final String EXCHANGE_NAME="direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //声明交换机-- 改变交换机类型为“direct”
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //指明路由key
         /*
         * @param exchange 指定的交换机
         * @param routingKey the routing key   路由键/目前写的是队列名称
         * @param props other properties for the message - routing headers etc  元数据参数
         * @param body the message body 消息体
         */
        channel.basicPublish(EXCHANGE_NAME,"nba",null,"NBA Message".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"cba",null,"cba Message".getBytes());
        System.out.println("发送消息成功");
        channel.close();
        connection.close();

    }

}

消费者

package xyz.hajin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer1 {

    //定义队列名字
    private static final String QUEUE_NAME = "direct_queue1";
    //定义交换机名字
    private static final String EXCHANGE_NAME="direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.basicQos(1);//一次只处理一个消息,避免消息堆积
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列绑定在交换机上,并指明路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"nba");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"cba");
        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到的消息:"+message);
                //手动回复 :处理完成
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复
    }
}

通配符模式

修改交换机类型为topic

让routing-key在匹配的时候达到模糊匹配的效果

通配符*和#号的区别

- *  只匹配当前级。 在消费者端配的队列和交换机之间的映射是 xyz.java.*, 那么在生产者端发送消息时只有这样的routing-key能够命中:   xyz.java.orange    或者  xyz.java.banana等等,但是,这样的routing-key是无法命中: xyz.java.banana.yellow

-# 匹配多级。在消费者端配的队列和交换机之间的映射是 xyz.java.#, 那么在生产者端发送消息时无视层级,routing-key都可以命中:   xyz.java.orange   、  xyz.java.banana、xyz.java.banana.yellow

生产者

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProvider {


    private static final String EXCHANGE_NAME="topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //声明交换机-- 改变交换机类型为“topic”
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //指明路由key
        channel.basicPublish(EXCHANGE_NAME,"product.add",null,"添加商品".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"product.del",null,"删除商品".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"product.del.other",null,"删除商品!!".getBytes());
        System.out.println("发送消息成功");

  channel.close();
        connection.close();

    }

}

消费者1

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer1 {

    //定义队列名字
    private static final String QUEUE_NAME = "topic_queue1";
    //定义交换机名字
    private static final String EXCHANGE_NAME="topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.basicQos(1);//一次只处理一个消息,避免消息堆积
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //将队列绑定在交换机上
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.*");
        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到的消息:"+message);
                //手动回复 :处理完成
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复


    }

}

消费者2

package xyz.haijin.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer2 {

    private static final String QUEUE_NAME = "topic_queue2";
    private static final String EXCHANGE_NAME="topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("10.0.0.181");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vh1");
        connectionFactory.setUsername("test1");
        connectionFactory.setPassword("123456");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        channel.basicQos(1);//一次只处理一个消息,避免消息堆积

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.#");

        //处理消息
        Consumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2接收到的消息:"+message);
                //手动回复 :处理完成
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,false,consumer);


    }

}

SpringBoot-RabbitMQ 使用

生产者

   创建一个名为 `spring-boot-amqp-provider` 的生产者项目

application.yml:

spring:
  application:
    name: spring-boot-amqp
  rabbitmq:
    host: 192.168.75.133
port: 5672
username: rabbit
password: 123456

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建队列配置

package xyz.haijin.spring.boot.amqp.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 队列配置
 * <p>Title: RabbitMQConfiguration</p>
 * <p>Description: </p>
 */
@Configuration
public class RabbitMQConfiguration {
    @Bean
    public Queue queue() {
        return new Queue("helloRabbit");
    }
}

创建消息提供者

package xyz.haijin.spring.boot.amqp.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 提供者
 * <p>Title: HelloRabbitProvider</p>
 * <p>Description: </p>
 */
@Component
public class HelloRabbitProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String context = "hello" + new Date();
        System.out.println("Provider: " + context);
        amqpTemplate.convertAndSend("helloRabbit", context);
    }
}

创建测试用例

package xyz.haijin.spring.boot.amqp.test;

import com.duo.spring.boot.amqp.Application;
import com.duo.spring.boot.amqp.provider.HelloRabbitProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AmqpTest {
    @Autowired
    private HelloRabbitProvider helloRabbitProvider;

    @Test
    public void testSender() {
        for (int i = 0; i < 10; i++) {
            helloRabbitProvider.send();
        }
    }
}

消费者

创建一个名为 `spring-boot-amqp-consumer` 的消费者项目

application.yml

spring:
  application:
    name: spring-boot-amqp-consumer
  rabbitmq:
    host: 192.168.75.133
    port: 5672
    username: rabbit
    password: 123456

创建消息消费者

package xyz.haijin.spring.boot.amqp.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloRabbitConsumer {
    @RabbitListener(queues = "helloRabbit")
    public void process(String message) {
        System.out.println("Consumer: " + message);
    }
}

交换机模式

1.FanoutExchange

对于生产者来说:配置类中要配置交换机

对于消费者来说:配置类中要配置队列、交换机、队列和交换机的绑定关系

编辑配置文件

package xyz.haijin.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfiguration {

    @Bean
    public Queue queue(){
        return new Queue("simple_queue");
    }


    @Bean
    public FanoutExchange getFanoutExchange(){
        return  new FanoutExchange("fanout_exchange_new");
    }

    @Bean
    public Binding getBinding(Queue getQueue,FanoutExchange getFanoutExchange){
        return BindingBuilder.bind(getQueue).to(getFanoutExchange);
    }


}

生产者

package xyz.haijin.rabbitmqdemo;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ExchangeProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send(){
        rabbitTemplate.convertAndSend("fanout_exchange_new","","交换机模式消息");
    }


}

通配符模式

修改配置文件

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("topic_exchange_new");
    }

    @Bean
    public Binding getBinding2(Queue getQueue,TopicExchange getTopicExchange){
        return BindingBuilder.bind(getQueue).to(getTopicExchange).with("product.*");
    }

生产者

public void send(){
        rabbitTemplate.convertAndSend("topic_exchange_new","product.add","topic模式消息");
    }

DirectExchange 路由模式

关键点,在创建队列和交换机之间的绑定关系时,同时要指明routing-key

@Bean
public Binding getBinding(Queue queue,DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("apple");
}

在发送消息时要指明routing-key

public void send(String message){
    rabbitTemplate.convertAndSend("my_direct_exchange","apple",message);
}

你好:我的2025