rabbitMQ知识点整理

重新温习了AMQP协议以及rabbitMQ的知识点,做下记录。基础的东西就不记了,就记录一些关键的知识点。

channel(信道)的两种模式

confirm模式

channel设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel被设置成confirm模式之后,所有被publish的后续消息都将被confirm(即ack)或者被nack一次。但是没有对消息被confirm的快慢做任何保证,并且同一条消息不会同时被confirm和nack 。

如何开启confirm模式

调用channel的confirmSelect方法将channel设置为confirm模式,如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的)。

异步confirm模式

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

transaction模式

提供的三个方法

  • txSelect() 打开事务
  • tCommit() 提交事务
  • txRollback() 回滚事务

通过txSelect开启事务之后,便可以发布消息给broker代理服务器了,如果txCommit提交成功,则消息一定到达了broker,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务。

采用事务机制实现会降低RabbitMQ的消息吞吐量,且不符合消息中间件异步解耦的特性。

Message Acknowledgement(ACK)

用于保证消息从队列中可靠的发送给订阅者,即消息回执。

rabbitMQ在声明消费者的时候可以设置autoAck。

  • true: 接收到消息后会自动ack,适用于对于消息吞吐量以及可靠性要求低的场景
  • false: 接收到消息后,需要手动进行ack,适用于对消息可靠性要求较高的场景
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * Start a non-nolocal, non-exclusive consumer, with
    * a server-generated consumerTag.
    * @param queue the name of the queue
    * @param autoAck true if the server should consider messages
    * acknowledged once delivered; false if the server should expect
    * explicit acknowledgements
    * @param callback an interface to the consumer object
    * @return the consumerTag generated by the server
    * @throws java.io.IOException if an error is encountered
    * @see com.rabbitmq.client.AMQP.Basic.Consume
    * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
    * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
    */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

rabbitMQ提供了以下几个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;

/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

/**
* @throws java.io.IOException if an error is encountered
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;

关于重试

重试只能是宏观的从组件层面解决问题,如网络问题等,而真正的业务失败应当从业务层面去解决,所以即便是重试也无法解决。

show me code

设置autoAck为false,手动实现消息的ack和reject。通过basicNack设置requeue重入队列。,如果失败次数过多则直接reject该消息(默认业务上已经无法处理该消息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
MessageResponse result = MessageResponse.ERROR;

try {
result = this.processor.process(SerializeCode.deSerialize(body, this.processor.getMsgType()));
} catch (Exception e) {
logger.error("", e);
}

if (null != result && !"error".equals(result.getResultMsg())) {
logger.info("消息 {} 处理成功", envelope.getDeliveryTag());
this.channel.basicAck(envelope.getDeliveryTag(), false);
this.retryCount = 0;
} else {
logger.info("消息 {} 处理失败", envelope.getDeliveryTag());
this.dealMessageAck(envelope);
}

}

private void dealMessageAck(final Envelope envelope) throws IOException {
++this.retryCount;
if (this.retryCount < 10) {
logger.info("消费消息 {} 失败.第{}次 重新放入队列", envelope.getDeliveryTag(), this.retryCount);
this.channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
logger.info("消费消息 {} 处理失败已经超过10次. 拒绝该消息", envelope.getDeliveryTag(), this.retryCount);
this.channel.basicReject(envelope.getDeliveryTag(), false);
}

}

持久化

exchange和queue的持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Actively declare a non-autodelete exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @throws java.io.IOException if an error is encountered
* @return a declaration-confirm method to indicate the exchange was successfully declared
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

@param durable

  • true if we are declaring a durable exchange (the exchange will survive a server restart)
  • true if we are declaring a durable queue (the queue will survive a server restart)

声明exchange和queue的时候就可以设置durable为true进行持久化。

消息的持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

直接看BasicProperties,有很多属性,但是都没有注释,其实这个有定义一个常量类,com.rabbitmq.client.MessageProperties。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright (c) 2007-Present Pivotal Software, Inc.  All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.impl.AMQContentHeader;

/**
* Constant holder class with useful static instances of {@link AMQContentHeader}.
* These are intended for use with {@link Channel#basicPublish} and other Channel methods.
*/
public class MessageProperties {

/** Empty basic properties, with no fields set */
public static final BasicProperties MINIMAL_BASIC =
new BasicProperties(null, null, null, null,
null, null, null, null,
null, null, null, null,
null, null);
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
new BasicProperties(null, null, null, 2,
null, null, null, null,
null, null, null, null,
null, null);

/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties BASIC =
new BasicProperties("application/octet-stream",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_BASIC =
new BasicProperties("application/octet-stream",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}
  • deliveryMode 1 (nonpersistent)
  • deliveryMode 2 (persistent)
    这里就写的,deliveryMode 2就是持久化,所以在需要持久化的时候,这里根据消息的类型使用这个常量就可以,一般是用BasicProperties.PERSISTENT_TEXT_PLAIN。

参考资料

投食入口