创建Maven工程
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
生产者
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
消息的生产者:
1.连接RabbitMQ服务器
2.声明(创建)队列
3.往队列上发送消息
*/
public class RabbitProvider {
//队列名称
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 连接RabbitMQ服务器
ConnectionFactory connectionFactory = new ConnectionFactory();
// 指明mq服务器的地址
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
// 虚拟主机
connectionFactory.setVirtualHost("vh1");
// 账号
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
// 得到连接对象
Connection connection = connectionFactory.newConnection();
// 得到一个频道
Channel channel = connection.createChannel();
//声明队列
/*
参数详解:
1)队列的名字
2)是否持久化, 如果是的话,即使服务器重启,队列还在
3)是否独占: 此队列只被一个消费者占有
4)是否自动删除: 如果没有消费者再去使用,那么队列就会删除
5)元数据参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 发布消息
/*
* @param exchange 指定的交换机
* @param routingKey the routing key 路由键/目前写的是队列名称
* @param props other properties for the message - routing headers etc 元数据参
数
* @param body the message body 消息体
*/
String message = "你好";
//简单队列模式来发消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息成功");
}
}
消费者
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//让消费者去监听队列
/*
autoAck: true 自动回复 向broker告知,消息已消费
false 手动回复
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
非Lambda
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 连接rabbitmq服务器
ConnectionFactory connectionFactory = new ConnectionFactory();
// 指明mq服务器的地址
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
// 账号
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
// 获得一个连接对象
Connection connection = connectionFactory.newConnection();
// 获得通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("接收到的消息:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
消息队列的收发模式
简单队列模式
默认情况下,是简单队列模式,两个消费者交替接收消息
work模式——能者多劳
一次只处理一个消息,待消息处理完后手动回复
我们发现如果使用简单队列模式,是不能解决“能者多劳”问题——性能强的消费者多消费的 问题。所以使用Work模式就可以解决。
解决思路:消费者要告知broker一次只接受队列中的一条消息,basicQos(1)。并且要把自动ack改成手动ack.
Ack的目的就是告知broker我已经接受到消息了,请发下一条消息过来。
改变消费者
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer1 {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只处理一个消息,避免消息堆积
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body,"utf-8");
System.out.println("消费者1接收到的消息:"+message);
//手动回复 :处理完成 动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列
/*
autoAck: true 自动回复 向broker告知,消息已消费
false 手动回复
*/
channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复
}
}
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitProvider {
private static final String QUEUE_NAME = "simple_queue";
private static final String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
/*
第一个参数: 交换机的名称
第二个参数: 交换机的类型- fanout 发布订阅模式的交换机的类型:多个队列得到同一条消息
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发布消息
/*
* @param exchange 指定的交换机
* @param routingKey the routing key 路由键/目前写的是队列名称
* @param props other properties for the message - routing headers etc 元数据参数
* @param body the message body 消息体
*/
//发送100条消息
for (int i = 1; i <= 100; i++) {
String message = "message"+i;
// channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//简单队列模式来发消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("发送第"+i+"个消息成功");
}
}
}
改变消费者
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer1 {
//定义队列名字
private static final String QUEUE_NAME = "fanout_queue1";
//定义交换机名字 注意:交换机的名称要和生产者创建的交换机的名称相同
private static final String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只处理一个消息,避免消息堆积
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定在交换机上
//====将队列绑定在交换机上====
//参数1:队列的名称 参数2:交换机的名称 参数3:routing-key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body,"utf-8");
System.out.println("消费者1接收到的消息:"+message);
//手动回复 :处理完成
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列
/*
autoAck: true 自动回复 向broker告知,消息已消费
false 手动回复
*/
channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复
}
}
要点:
- 在创建交换机时类型要选择 direct
- 在队列绑定交换机的时候,同时要指明routing-key
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitProvider {
private static final String EXCHANGE_NAME="direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机-- 改变交换机类型为“direct”
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//指明路由key
/*
* @param exchange 指定的交换机
* @param routingKey the routing key 路由键/目前写的是队列名称
* @param props other properties for the message - routing headers etc 元数据参数
* @param body the message body 消息体
*/
channel.basicPublish(EXCHANGE_NAME,"nba",null,"NBA Message".getBytes());
channel.basicPublish(EXCHANGE_NAME,"cba",null,"cba Message".getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
package xyz.hajin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer1 {
//定义队列名字
private static final String QUEUE_NAME = "direct_queue1";
//定义交换机名字
private static final String EXCHANGE_NAME="direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只处理一个消息,避免消息堆积
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定在交换机上,并指明路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"nba");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"cba");
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body,"utf-8");
System.out.println("消费者1接收到的消息:"+message);
//手动回复 :处理完成
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复
}
}
通配符模式
修改交换机类型为topic
让routing-key在匹配的时候达到模糊匹配的效果
通配符*和#号的区别
- * 只匹配当前级。 在消费者端配的队列和交换机之间的映射是 xyz.java.*, 那么在生产者端发送消息时只有这样的routing-key能够命中: xyz.java.orange 或者 xyz.java.banana等等,但是,这样的routing-key是无法命中: xyz.java.banana.yellow
-# 匹配多级。在消费者端配的队列和交换机之间的映射是 xyz.java.#, 那么在生产者端发送消息时无视层级,routing-key都可以命中: xyz.java.orange 、 xyz.java.banana、xyz.java.banana.yellow
生产者
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitProvider {
private static final String EXCHANGE_NAME="topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机-- 改变交换机类型为“topic”
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//指明路由key
channel.basicPublish(EXCHANGE_NAME,"product.add",null,"添加商品".getBytes());
channel.basicPublish(EXCHANGE_NAME,"product.del",null,"删除商品".getBytes());
channel.basicPublish(EXCHANGE_NAME,"product.del.other",null,"删除商品!!".getBytes());
System.out.println("发送消息成功");
channel.close();
connection.close();
}
}
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer1 {
//定义队列名字
private static final String QUEUE_NAME = "topic_queue1";
//定义交换机名字
private static final String EXCHANGE_NAME="topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只处理一个消息,避免消息堆积
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定在交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.*");
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body,"utf-8");
System.out.println("消费者1接收到的消息:"+message);
//手动回复 :处理完成
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);//不自动回复
}
}
消费者2
package xyz.haijin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer2 {
private static final String QUEUE_NAME = "topic_queue2";
private static final String EXCHANGE_NAME="topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.181");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vh1");
connectionFactory.setUsername("test1");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//一次只处理一个消息,避免消息堆积
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"product.#");
//处理消息
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"utf-8");
System.out.println("消费者2接收到的消息:"+message);
//手动回复 :处理完成
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
SpringBoot-RabbitMQ 使用
package xyz.haijin.spring.boot.amqp.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 队列配置
* <p>Title: RabbitMQConfiguration</p>
* <p>Description: </p>
*/
@Configuration
public class RabbitMQConfiguration {
@Bean
public Queue queue() {
return new Queue("helloRabbit");
}
}
创建消息提供者
package xyz.haijin.spring.boot.amqp.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 提供者
* <p>Title: HelloRabbitProvider</p>
* <p>Description: </p>
*/
@Component
public class HelloRabbitProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "hello" + new Date();
System.out.println("Provider: " + context);
amqpTemplate.convertAndSend("helloRabbit", context);
}
}
创建测试用例
package xyz.haijin.spring.boot.amqp.test;
import com.duo.spring.boot.amqp.Application;
import com.duo.spring.boot.amqp.provider.HelloRabbitProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AmqpTest {
@Autowired
private HelloRabbitProvider helloRabbitProvider;
@Test
public void testSender() {
for (int i = 0; i < 10; i++) {
helloRabbitProvider.send();
}
}
}
创建消息消费者
package xyz.haijin.spring.boot.amqp.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloRabbitConsumer {
@RabbitListener(queues = "helloRabbit")
public void process(String message) {
System.out.println("Consumer: " + message);
}
}
交换机模式
1.FanoutExchange
对于生产者来说:配置类中要配置交换机
对于消费者来说:配置类中要配置队列、交换机、队列和交换机的绑定关系
编辑配置文件
package xyz.haijin.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public Queue queue(){
return new Queue("simple_queue");
}
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("fanout_exchange_new");
}
@Bean
public Binding getBinding(Queue getQueue,FanoutExchange getFanoutExchange){
return BindingBuilder.bind(getQueue).to(getFanoutExchange);
}
}
生产者
package xyz.haijin.rabbitmqdemo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ExchangeProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
rabbitTemplate.convertAndSend("fanout_exchange_new","","交换机模式消息");
}
}
通配符模式
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("topic_exchange_new");
}
@Bean
public Binding getBinding2(Queue getQueue,TopicExchange getTopicExchange){
return BindingBuilder.bind(getQueue).to(getTopicExchange).with("product.*");
}
生产者
public void send(){
rabbitTemplate.convertAndSend("topic_exchange_new","product.add","topic模式消息");
}
DirectExchange 路由模式
关键点,在创建队列和交换机之间的绑定关系时,同时要指明routing-key
@Bean
public Binding getBinding(Queue queue,DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("apple");
}
在发送消息时要指明routing-key
public void send(String message){
rabbitTemplate.convertAndSend("my_direct_exchange","apple",message);
}