RabbitMQ学习

消息队列

MQ全称为Message Queue,消息队列应用程序和应用程序之间的通信方法。

为什么使用MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量

应用场景

  1. 任务异步处理
    • 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间。
  2. 应用程序解耦合
    • MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合
  3. 削峰填谷

AMQP和JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

AMQP

AMQP(Advanced Message Queue 高级消息队列协议)是一种协议,更准确的说是一种链接协议。这是和JMS的本职差别,AMQP不是API层进行限定,而是直接定义网络交换的数据格式。

JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信。

AMQP和JMS的区别

  1. JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式;
  2. JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的;
  3. JMS规定了两种消费模式;而AMQP的消息模式更加丰富。

消息队列产品

  1. ActiveMQ:基于JMS;
  2. ZeroMQ:基于C语言开发
  3. RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  4. RocketMQ:基于JMS,阿里巴巴产品
  5. Kafka:类似MQ产品;分布式消息系统,高吞吐量

RabbitMQ

RabbitMQ,是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ提供了6中模式:简单模式,工作队列模式,Plublish/Subscribe发布/订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不做介绍)。

RabbitMQ运转流程(以简单模式为例)

生产者发送消息

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker(服务节点);
  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息值RabbitMQ Broker;
  5. 关闭信道
  6. 关闭连接

代码:

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Producer {
static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址:默认为localhost
connectionFactory.setHost("192.168.0.8");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/ligangit");
//连接用户名:默认为guest
connectionFactory.setUsername("ligangit");
//连接密码:默认为guest
connectionFactory.setPassword("ligangit");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数一:队列名称
* 参数二:是否定义持久化队列
* 参数三:是否独占本次连接
* 参数四:是否在不使用的时候自动删除队列
* 参数五:队列其他参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送的消息
String message = "世界,你好!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
//释放资源
channel.close();
connection.close();

}
}

生产者流转过程说明

生产者流转过程

消费者接收消息

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
  2. 向RabbitMQ Broker请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待RabbitMQ Broker回应关闭投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ 从队列中删除相应已经被确认的消息;
  6. 关闭信道
  7. 关闭连接

代码:

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址:默认为localhost
connectionFactory.setHost("192.168.0.8");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机名称:默认为/
connectionFactory.setVirtualHost("/ligangit");
//连接用户名:默认为guest
connectionFactory.setUsername("ligangit");
//连接密码:默认为guest
connectionFactory.setPassword("ligangit");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//创建队列,并设置消息处理
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//监听消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag:消息标签,在 channel.basicConsume 的时候可以指定
* envelope:消息包内容,可从中获取消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
* properties:消息属性
* body:消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息id
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息:" + new String(body, "UTF-8"));
System.out.println("");
System.out.println("==============================================================");
System.out.println("");
}
};
/**
* 监听消息
* 参数一:队列名称
* 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到回复消息后会删除消息;设置为false则需要手动确认
*/
channel.basicConsume(Producer.QUEUE_NAME,true,consumer);
//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();
}
}

消费者流转说明

消费者流转过程

最后更新: 2020年08月22日 10:40

原始链接: http://ligangit.com/2020/08/21/RabbitMQ学习/

× 请我吃糖~
打赏二维码