RabbitMQ-消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事务。两者不可同时使用,在channel为事务时,不可引入确认模式,同样channel为确认模式下,不可使用事务。

发布确认

有两种方式:消息发送成功确认和消息发送失败回调。

消息发送成功确认

在rabbitmq的配置文件spring-rabbitmq.xml添加配置:

  1. connectionFactory中启用消息确认:

    1
    2
    3
    4
    5
    6
    7
    8
    <!--publisher-confirms="true" 表示:启用了消息确认-->
    <rabbit:connection-factory id="connectionFactory"
    host="${spring.rabbitmq.host}"
    port="${rabbitmq.port}"
    username="${rabbitmq.username}"
    password="${rabbitmq.password}"
    virtual-host="${rabbitmq.virtual-host}"
    publisher-confirms="true"/>
  2. 编写消息确认回调方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * 消息确认与回退
    */
    public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
    if (b) {
    System.out.println("消息确认成功.....");
    } else {
    //处理丢失消息
    System.out.println("消息确认失败,"+s);
    }
    }
    }
  3. 配置消息确认回调方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <!--publisher-confirms="true" 表示:启用了消息确认-->
    <rabbit:connection-factory id="connectionFactory"
    host="192.168.0.8"
    port="5672"
    username="ligangit"
    password="ligangit"
    virtual-host="/ligangit"
    publisher-confirms="true"/>

    <!--消息回调处理类-->
    <bean id="confirmCallBack" class="com.ligangit.springboot.config.MsgSendConfirmCallBack"/>
    <!--定义rabbitTemplate对象操作,可以在代码中方便发送消息-->
    <!--confirm-callback="confirmCallBack"表示消息成功回调-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBack"/>
  4. 测试:

    发送消息:spring_queue队列需要手动添加,测试代码中没有

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 消息确认测试
    */
    @Test
    void queueTest() {
    rabbitTemplate.convertAndSend(
    "spring_queue",
    "只发队列spring_queue的消息");
    }

    管理界面确认消息发送成功:

    消息确认

    消息确认回调

    消息确认

    消息发送失败回调

    在rabbitmq的配置文件spring-rabbitmq.xml添加配置:

    1. connectionFactory中启用消息确认:注意和确认回调参数是不一样的

      1
      2
      3
      4
      5
      6
      7
      8
      <!--publisher-returns="true" 表示:启用了失败回调-->
      <rabbit:connection-factory id="connectionFactory"
      host="192.168.0.8"
      port="5672"
      username="ligangit"
      password="ligangit"
      virtual-host="/ligangit"
      publisher-returns="true"/>
    2. 编写消息失败回调方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      /**
      * 消息发送失败回调
      */
      public class MsgSendReturnCallBack implements RabbitTemplate.ReturnCallback {
      @Override
      public void returnedMessage(Message message, int i, String s, String s1, String s2) {
      String msgJson = new String(message.getBody());
      System.out.println("Return Message: "+msgJson);
      }
      }
    3. 配置消息失败回调方法

      注意:需要配置mandatory=”true”

      1
      2
      3
      4
      5
      6
      7
      <!--消息失败回调处理类-->
      <bean id="returnCallBack" class="com.ligangit.springboot.config.MsgSendReturnCallBack"/>
      <!--定义rabbitTemplate对象操作,可以在代码中方便发送消息-->
      <!--confirm-callback="confirmCallBack"表示消息失败回调-->
      <!--return-callback="returnCallBack"表示消息失败回调,同时配置mandatory="true",否则消息则丢失-->
      <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBack"
      return-callback="returnCallBack" mandatory="true"/>
    4. 测试

    5. 发送消息:test_fail_exchange交换机需要手动添加,测试代码中没有

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      /**
      * 消息失败回调测试
      */
      @Test
      void testFailQueueTest() {
      //exchange 正确,queue 错误,confirm被回调,ack=true;return被回调 replyTest:NO_ROUTE
      rabbitTemplate.convertAndSend(
      "test_fail_exchange",
      "",
      "只发队列spring_queue的消息");
      }

      失败回调结果如下

      消息失败回调

事务支持

场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq使用调用者的外部事务,通常是首选,因为它是非侵入式的(低耦合)。

外部事务的配置:在rabbitmq的配置文件spring-rabbitmq.xml添加配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<!--publisher-confirms="true" 表示:启用了消息确认-->
<!--publisher-returns="true" 表示:启用了失败回调-->
<!--事务和确认机制不能并存-->
<rabbit:connection-factory id="connectionFactory"
host="192.168.0.8"
port="5672"
username="ligangit"
password="ligangit"
virtual-host="/ligangit"
/>


<!--定义rabbitTemplate对象操作,可以在代码中方便发送消息-->
<!--confirm-callback="confirmCallBack"表示消息成功回调-->
<!--return-callback="returnCallBack"表示消息失败回调,同时配置mandatory="true",否则消息则丢失-->
<!--channel-transacted="true"表示:支持事务操作-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" channel-transacted="true"/>
<!--平台事务管理器-->
<bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    /**
* 确认机制-事务测试
*/
@Test
@Transactional //开启事务 (在测试)
// @Rollback(false) //在测试的时候,需要手动的方式定制回滚的策略
void queueTest2() {
rabbitTemplate.convertAndSend(
"spring_queue",
"只发队列spring_queue的消息--01");
System.out.println("-----------dosomething:可以是数据库的操作,也可以是其他业务类的操作------------");
//模拟业务处理失败
// System.out.println(1 / 0);
rabbitTemplate.convertAndSend(
"spring_queue",
"只发队列spring_queue的消息--02");
}

最后更新: 2020年08月22日 17:10

原始链接: http://ligangit.com/2020/08/22/RabbitMQ-消息确认机制/

× 请我吃糖~
打赏二维码