RabbitMQ学习
消息队列
MQ全称为Message Queue,消息队列应用程序和应用程序之间的通信方法。
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
应用场景
- 任务异步处理
- 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间。
- 应用程序解耦合
- MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合
- 削峰填谷
AMQP和JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
AMQP
AMQP(Advanced Message Queue 高级消息队列协议)是一种协议,更准确的说是一种链接协议。这是和JMS的本职差别,AMQP不是API层进行限定,而是直接定义网络交换的数据格式。
JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP和JMS的区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式;
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的;
- JMS规定了两种消费模式;而AMQP的消息模式更加丰富。
消息队列产品
- ActiveMQ:基于JMS;
- ZeroMQ:基于C语言开发
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ产品;分布式消息系统,高吞吐量
RabbitMQ
RabbitMQ,是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ提供了6中模式:简单模式,工作队列模式,Plublish/Subscribe发布/订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不做介绍)。
RabbitMQ运转流程(以简单模式为例)
生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker(服务节点);
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息值RabbitMQ Broker;
- 关闭信道
- 关闭连接
代码:
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Producer { static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.8"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ligangit"); connectionFactory.setUsername("ligangit"); connectionFactory.setPassword("ligangit"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "世界,你好!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); channel.close(); connection.close();
} }
|
生产者流转过程说明

消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 向RabbitMQ Broker请求消费相应队列中的消息,设置相应的回调函数;
- 等待RabbitMQ Broker回应关闭投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ 从队列中删除相应已经被确认的消息;
- 关闭信道
- 关闭连接
代码:
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.8"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ligangit"); connectionFactory.setUsername("ligangit"); connectionFactory.setPassword("ligangit"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key为:"+envelope.getRoutingKey()); System.out.println("交换机为:"+envelope.getExchange()); System.out.println("消息id为:"+envelope.getDeliveryTag()); System.out.println("接收到的消息:" + new String(body, "UTF-8")); System.out.println(""); System.out.println("=============================================================="); System.out.println(""); } };
channel.basicConsume(Producer.QUEUE_NAME,true,consumer);
} }
|
消费者流转说明
