薛定谔的风口猪

站在巨人的肩膀上学习,猪都能看得很远

RabbitMQ实现延迟队列

RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL。

以下介绍下相关概念及方法

Dead Letter Exchanges

消息在队列满足达到一定的条件,会被认为是死信消息(dead-lettered),这时候,RabbitMQ会重新把这类消息发到另外一个的exchange,这个exchange称为Dead Letter Exchanges.

以下任一条件满足,即可认为是死信:

  • 消息被拒绝消费(basic.reject or basic.nack)并且设置了requeue=fasle
  • 消息的TTL到了(消息过期)
  • 达到了队列的长度限制

需要注意的是,Dead letter exchanges (DLXs) 其实就是普通的exchange,可以和正常的exchange一样的声明或者使用。

死信消息路由

队列中可以设置两个属性:

  • x-dead-letter-exchange
  • x-dead-letter-routing-key

当这个队列里面的消息成为死信之后,就会投递到x-dead-letter-exchange指定的exchange中,其中带着的routing key就是中指定的值x-dead-letter-routing-key。

而如果使用默认的exchange(routing key就是希望指定的队列),则只需要把x-dead-letter-exchange设置为空(不能不设置),类似下面

rabbitmq 延迟队列的配置

死信消息的路由则会根据x-dead-letter-routing-key所指定的进行路由,如果这个值没有指定,则会按照消息一开始发送的时候指定的routing key进行路由

Dead-lettered messages are routed to their dead letter exchange either:

with the routing key specified for the queue they were on; or, if this was not set, with the same routing keys they were originally published with.

例如,如果一开始你对exchange X发送消息,带着routing key “foo”,进入了队列 Q然后消息变死信后,他会被重新发送到 dead letter exchange ,其中发给dead letter exchange带着的routing key 还是foo。 但如果这个队列Q本身是设置了x-dead-letter-routing-key bar, 那么他发送到 dead letter exchange的时候,带着的routing key 就是bar。

需要注意的是,当死信消息重新路由到新的队列的时候,在死信目标队列确认收到这条死信消息之前,原来队列的消息是不会删除的,也就是说在某些异常场景下例如broker突然shutdown,是有机会存在说一个消息既存在于原队列,又存在于死信目标队列。具体可参考官方说明:

Dead-lettered messages are re-published with publisher confirms turned on internally so, the “dead-letter queues” (DLX routing targets) the messages eventually land on must confirm the messages before they are removed from the original queue. In other words, the “publishing” (the one in which messages expired) queue will not remove messages before the dead-letter queues acknowledge receiving them (see Confirms for details on the guarantees made). Note that, in the event of an unclean broker shutdown, the same message may be duplicated on both the original queue and on the dead-lettering destination queues.

Time-To-Live(TTL)

开头我们说过,实现延迟队列除了用死信消息外,还需要利用消息过期的TTL机制,因为只要消息过期了,就会触发死信。

RabbitMQ有两种方法让设置消息的TTL:

直接在消息上设置

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

为队列设置消息过期TTL

rabbitmq x-message-ttl

注意,队列还有一个队列TTL,x-expires,这个的意思是队列空置经过一段时间(没有消费者,没有被重新声明,没有人在上面获取消息(basic.get))后,整个队列便会过期删除,不要混淆

如果同时设置了消息的过期和队列消息过期属性,则取两个较小值。

设计延迟队列:

例如,我们需要触发一个推送新闻,30分钟后统计这个新闻的下发情况,我们就需要一个延迟队列,新闻推送后,往延迟队列发送一个消息,这个队列的消息在30分钟后被消费,这时候触发即可统计30分钟的下发情况。我们可以这样设计:

定义一个正常的队列: ARRIVAL_STAT,统计程序监听此队列,进行消费。

定义一个“延迟队列”(RabbitMQ没有这样的队列,这里只是人为的制造一个这样的队列):DELAY_ARRIVAL_STAT,其中设置好对应的x-dead-letter-exchange,x-dead-letter-routing-key。为了简单说明,我使用默认的exchange,那么配置如下:

x-dead-letter-exchange=“”
x-dead-letter-routing-key=“ARRIVAL_STAT”

意思是,消息当这个队列DELAY_ARRIVAL_STAT的消息变死信之后,就会带着routing key “ARRIVAL_STAT”发送默认的空exchange,即队列ARRIVAL_STAT。

并且这个队列不能有消费者消费消息。

这样我们就实现了消息的死信转发。下一步,只需要让消息在这个DELAY_ARRIVAL_STAT在30分钟后过期变死信即可。按照上文所说,有两种方法,我们可以为队列的消息设置30分钟TTL,或者发送消息的时候指定消息的TTL为30分钟即可。

示例如下:

rabbitmq 延迟队列示意

“延迟队列”的堵塞缺陷

由于设置了x-dead-letter-exchange的队列本身也是普通队列,其过期的顺序是按照队列头部顺序的过期的。也就是说,如果你队列头的消息A过期时间是5分钟,后面对这个队列发送消息B的带着过期时间1分钟,那么后面的队列B要等队列A过期了才会触发过期:

Queues that had a per-message TTL applied to them retroactively (when they already had messages) will discard the messages when specific events occur. Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).

所以,对于此类多延迟时间的,可以考虑设置多级延迟队列。例如1分钟,5分钟,10分钟,20分钟这样多级的延迟队列,使得延迟相近的尽量放到同一个队列中减少拥堵的最坏情况。

rabbitmq 多级延迟队列

RabbitMQ常用命令与配置

以下记录RabbitMQ常用的运维命令和配置


常用命令

启动进程:

sbin/rabbitmq-server -detached

关闭进程:

sbin/rabbitmqctl stop

创建账号:

sbin/rabbitmqctl add_user admin ${mq_password}
sbin/rabbitmqctl set_user_tags admin administrator
sbin/rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

启动监控:

#启动监控后,可以用http访问控制台 ip:监控端口(默认原端口+10000)
sbin/rabbitmq-plugins enable rabbitmq_management

加入集群:

#要先停止应用
sbin/rabbitmqctl stop_app
#加入集群,cluster_name为之前启动的那个集群名称,通常为环境变量文件中配置的RABBITMQ_NODE_IP_ADDRESS
sbin/rabbitmqctl join_cluster ${cluster_name}
#再次启动应用    
sbin/rabbitmqctl start_app

命令文档:https://www.rabbitmq.com/rabbitmqctl.8.html


配置文件

rabbitmq-env.conf

RABBITMQ_NODE_IP_ADDRESS= //IP地址,空串bind所有地址,指定地址bind指定网络接口
RABBITMQ_NODE_PORT=       //TCP端口号,默认是5672
RABBITMQ_NODENAME=        //节点名称。默认是rabbit
RABBITMQ_CONFIG_FILE= //配置文件路径 ,即rabbitmq.config文件路径
RABBITMQ_MNESIA_BASE=     //mnesia所在路径
RABBITMQ_LOG_BASE=        //日志所在路径
RABBITMQ_PLUGINS_DIR=     //插件所在路径

rabbitmq.config

tcp_listerners    #设置rabbimq的监听端口,默认为[5672]。
disk_free_limit     #磁盘低水位线,若磁盘容量低于指定值则停止接收数据,默认值为{mem_relative, 1.0},即与内存相关联1:1,也可定制为多少byte.
vm_memory_high_watermark    #设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%。
hipe_compile     #将部分rabbimq代码用High Performance Erlang compiler编译,可提升性能,该参数是实验性,若出现erlang vm segfaults,应关掉。
force_fine_statistics    #该参数属于rabbimq_management,若为true则进行精细化的统计,但会影响性能。
frame_max     #包大小,若包小则低延迟,若包则高吞吐,默认是131072=128K。
heartbeat     #客户端与服务端心跳间隔,设置为0则关闭心跳,默认是600秒。

一台机器启动多个实例

以上如果希望一个机器中启动多个实例,简单需要配置的地方仅有

rabbitmq-env.conf:

#改个名字
RABBITMQ_NODENAME=your_new_node_name
#改个端口    
RABBITMQ_NODE_PORT=5673

rabbitmq.config:

%tcp 监听端口对应修改%
{tcp_listeners, [5673]},

rabbitmq_management下面的监听端口对应修改,建议原端口加10000保持与原来默认的统一

{listener, [{port,     15673}]}

文档:http://www.rabbitmq.com/configure.html#configuration-file

RocketMQ 客户端配置

RocketMQ的客户端和服务端采取完全不一样的配置机制,客户端没有配置文件,所有的配置选项需要开发者使用对应的配置的setter进行设置。

注: 以下带 * 的,表示为重要参数。


ClientConfig

RocketMQ的Producer(DefaultMQProducer)和Consumer(DefaultMQPushConsumerDefaultMQPullConsumer),甚至运维相关的的admin类(DefaultMQAdminExt)都继承自ClientConfig。这意味着,其中的配置无论Producer还是Consumer都可以进行设置,其中大部分都是公用的配置(但由于设计的问题,有些配置只会对消费或生产生效)。

namesrvAddr*

配置说明 默认值
NameServer的地址列表,若是集群,用;作为地址的分隔符。 -D系统参数rocketmq.namesrv.addr或环境变量NAMESRV_ADDR

无论生产者还是消费者,只要是客户端需要和服务器broker进行操作,就需要依赖Name Server进行服务发现。具体请看:RocketMQ——组件

instanceName*

配置说明 默认值
NameServer的地址列表,若是集群,用;作为地址的分隔符。 从-D系统参数rocketmq.client.name获取,否则就是DEFAULT

这个值虽然默认写是DEFAULT,但在启动的时候,如果我们没有显示修改还是维持其DEFAULT的话,RocketMQ会更新为当前的进程号:

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。 而ClientID是由ClientIP+InstanceName构成,故如果一个进程中多个实例(无论Producer还是Consumer)ClientIP和InstanceName都一样,他们将公用一个内部实例(同一套网络连接,线程资源等)

此外,此ClientID在对于Consumer负载均衡的时候起到唯一标识的作用,一旦多个实例(无论不同进程、不通机器、还是同一进程)的多个Consumer实例有一样的ClientID,负载均衡的时候必然RocketMQ任然会把两个实例当作一个client(因为同样一个clientID)。 故为了避免不必要的问题,ClientIP+instance Name的组合建议唯一,除非有意需要共用连接、资源。

clientIP

配置说明 默认值
客户端IP RemotingUtil.getLocalAddress()

这个值有两个用处: 1. 对于默认的instanceName(后面说明),如果没有显示设置,会使用ip+进程号,其中的ip便是这里的配置值 2. 对于Producer发送消息的时候,消息本身会存储本值到bornHost,用于标记消息从哪台机器产生的

clientCallbackExecutorThreads

配置说明 默认值
客户端通信层接收到网络请求的时候,处理器的核数 Runtime.getRuntime().availableProcessors()

虽然大部分指令的发起方是客户端而处理方是broker/NameServer端,但客户端有时候也需要处理远端对发送给自己的命令,最常见的是一些运维指令如GET_CONSUMER_RUNNING_INFO,或者消费实例上线/下线的推送指令NOTIFY_CONSUMER_IDS_CHANGED,这些指令的处理都在一个线程池处理,clientCallbackExecutorThreads控制这个线程池的核数。

pollNameServerInterval*

配置说明 默认值
轮询从NameServer获取路由信息的时间间隔 30000,单位毫秒

客户端依靠NameServer做服务发现(具体请看:RocketMQ——组件),这个间隔决定了新服务上线/下线,客户端最长多久能探测得到。默认是30秒,就是说如果做broker扩容,最长需要30秒客户端才能感知得到新broker的存在。

heartbeatBrokerInterval*

配置说明 默认值
定期发送注册心跳到broker的间隔 30000,单位毫秒

客户端依靠心跳告诉broker“我是谁(clientID,ConsumerGroup/ProducerGroup)”,“自己是订阅了什么topic”,”要发送什么topic”。以此,broker会记录并维护这些信息。客户端如果动态更新这些信息,最长则需要这个心跳周期才能告诉broker。

persistConsumerOffsetInterval*

配置说明 默认值
作用于Consumer,持久化消费进度的间隔 5000,单位毫秒

RocketMQ采取的是定期批量ack的机制以持久化消费进度。也就是说每次消费消息结束后,并不会立刻ack,而是定期的集中的更新进度。 由于持久化不是立刻持久化的,所以如果消费实例突然退出(如断点)、或者触发了负载均衡分consue queue重排,有可能会有已经消费过的消费进度没有及时更新而导致重新投递。故本配置值越小,重复的概率越低,但同时也会增加网络通信的负担。

vipChannelEnabled

配置说明 默认值
是否启用vip netty通道以发送消息 -D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true

broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。其中一个的端口(配置端口-2)作为vip通道,客户端可以启用本设置项把发送消息此vip通道。


DefaultMQProducer

所有的消息发送都通过DefaultMQProducer作为入口,以下介绍一下单独属于DefaultMQProducer的一些配置项。

producerGroup*

配置说明 默认值
生产组的名称,一类Producer的标识 DEFAULT_PRODUCER

详见 RocketMQ——角色与术语详解

createTopicKey

配置说明 默认值
发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值 TBW102

这是RocketMQ设计非常晦涩的一个概念,整体的逻辑是这样的:

  • 生产者正常的发送消息,都是需要topic预先创建好的
  • 但是RocketMQ服务端是支持,发送消息的时候,如果topic不存在,在发送的同时自动创建该topic
  • 支持的前提是broker 的配置打开autoCreateTopicEnable=true
  • autoCreateTopicEnable=true后,broker会创建一个TBW102的topic,这个就是我们讲的默认的key topic

自动构建topic(以下成为T)的过程:

  1. Producer发送的时候如果发现该T不存在,就会配置有Producer配置的key topic的那个broker发送消息
  2. broker校验客户端的topic key是否在broker存在,且校验其权限最后一位是否是1(topic权限总共有3位,按位存储,分别是读、写、支持自动创建)
  3. 若权限校验通过,先在该broker把T创建,并且权限就是key topic除去最后一位的权限。

为了方便理解,以下贴出broker的具体源码并加入部分注释:

                TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);//key topic的配置信息
                if (defaultTopicConfig != null) {//key topic 存在
                    if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
                        if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                            defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                        }
                    }

                    if (PermName.isInherited(defaultTopicConfig.getPerm())) {//检验权限,如果允许自动创建
                        topicConfig = new TopicConfig(topic);//创建topic

                        int queueNums =
                            clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                                .getWriteQueueNums() : clientDefaultTopicQueueNums;

                        if (queueNums < 0) {
                            queueNums = 0;
                        }

                        topicConfig.setReadQueueNums(queueNums);
                        topicConfig.setWriteQueueNums(queueNums);
                        int perm = defaultTopicConfig.getPerm();
                        perm &= ~PermName.PERM_INHERIT;//权限按照key topic的来
                        topicConfig.setPerm(perm);
                        topicConfig.setTopicSysFlag(topicSysFlag);
                        topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                    } else {//权限校验不过,自动创建失败
                        LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                                defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                    }
                } else {//key topic不存在,创建失败
                    LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress);
                }

             ...//把创建的topic维护起来

总的来说,这个功能设计出来比较晦涩,而从运维的角度上看,topic在大部分场景下也应该预创建,故本特性没有必要的话,也不会用到,这个配置也没有必要特殊的设置。

关于这个TBW102非常不直观的问题,我已经提了issue :https://issues.apache.org/jira/browse/ROCKETMQ-223

defaultTopicQueueNums

配置说明 默认值
自动创建topic的话,默认queue数量是多少 4

sendMsgTimeout

配置说明 默认值
默认的发送超时时间 3000,单位毫秒

若发送的时候不显示指定timeout,则使用此设置的值作为超时时间。

对于异步发送,超时后会进入回调的onException,对于同步发送,超时则会得到一个RemotingTimeoutException

compressMsgBodyOverHowmuch

配置说明 默认值
消息body需要压缩的阈值 1024 * 4,4K

retryTimesWhenSendFailed

配置说明 默认值
同步发送失败的话,rocketmq内部重试多少次 2

retryTimesWhenSendAsyncFailed

配置说明 默认值
异步发送失败的话,rocketmq内部重试多少次 2

retryAnotherBrokerWhenNotStoreOK

配置说明 默认值
发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发 false

发送结果总共有4钟:

SEND_OK, //状态成功,无论同步还是存储
FLUSH_DISK_TIMEOUT, // broker刷盘策略为同步刷盘(SYNC_FLUSH)的话时候,等待刷盘的时候超时
FLUSH_SLAVE_TIMEOUT, // master role采取同步复制策略(SYNC_MASTER)的时候,消息尝试同步到slave超时
SLAVE_NOT_AVAILABLE, //slave不可用

注:从源码上看,此配置项只对同步发送有效,异步、oneway(由于无法获取结果,肯定无效)均无效

retryAnotherBrokerWhenNotStoreOK

配置说明 默认值
客户端验证,允许发送的最大消息体大小 1024 * 1024 * 4,4M

若超过此大小,会得到一个响应码13(MESSAGE_ILLEGAL)的MQClientException异常


TransactionMQProducer

事务生产者,截至至4.1,由于暂时事务回查功能缺失,整体并不完全可用,配置暂时忽略,等后面功能完善后补上。

https://issues.apache.org/jira/browse/ROCKETMQ-123


DefaultMQPushConsumer

最常用的消费者,使用push模式(长轮询),封装了各种拉取的方法和返回结果的判断。下面介绍其配置。

consumerGroup*

配置说明 默认值
消费组的名称,用于标识一类消费者 无默认值,必设

详见 RocketMQ——角色与术语详解

messageModel*

配置说明 默认值
消费模式 MessageModel.CLUSTERING

可选值有两个:

  1. CLUSTERING //集群消费模式
  2. BROADCASTING //广播消费模式

两种模式的区别详见:RocketMQ——角色与术语详解

consumeFromWhere*

配置说明 默认值
消费点策略 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET

可选值有两个:

  1. CONSUME_FROM_LAST_OFFSET //队列尾消费
  2. CONSUME_FROM_FIRST_OFFSET //队列头消费
  3. CONSUME_FROM_TIMESTAMP //按照日期选择某个位置消费

注:此策略只生效于新在线测consumer group,如果是老的已存在的consumer group,都降按照已经持久化的consume offset进行消费

具体说明祥见: RocketMQ——消息ACK机制及消费进度管理

consumeTimestamp:

配置说明 默认值
CONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费 半小时前

格式为yyyyMMddhhmmss 如 20131223171201

allocateMessageQueueStrategy*

配置说明 默认值
负载均衡策略算法 AllocateMessageQueueAveragely(取模平均分配)

这个算法可以自行扩展以使用自定义的算法,目前有以下算法可以使用

  • AllocateMessageQueueAveragely //取模平均
  • AllocateMessageQueueAveragelyByCircle //环形平均
  • AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueList
  • AllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行
  • AllocateMessageQueueConsistentHash //一致性哈希算法,本人于4.1提交的特性。用于解决“惊群效应”。

需要自行扩展的算法的,需要实现org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueStrategy

具体分配consume queue的过程祥见: RocketMQ——水平扩展及负载均衡详解

subscription

配置说明 默认值
订阅关系(topic->sub expression) {}

不建议设置,订阅topic建议直接调用subscribe接口

messageListener

配置说明 默认值
消息处理监听器(回调) null

不建议设置,注册监听的时候应调用registerMessageListener

offsetStore

配置说明 默认值
消息消费进度存储器 null

不建议设置,offsetStore 有两个策略:LocalFileOffsetStoreRemoteBrokerOffsetStore

若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore,不建议修改。

consumeThreadMin*

配置说明 默认值
消费线程池的core size 20

PushConsumer会内置一个消费线程池,这个配置控制此线程池的core size

consumeThreadMax*

配置说明 默认值
消费线程池的max size 64

PushConsumer会内置一个消费线程池,这个配置控制此线程池的max size

adjustThreadPoolNumsThreshold

配置说明 默认值
动态扩线程核数的消费堆积阈值 1000

相关功能以废弃,不建议设置

consumeConcurrentlyMaxSpan

配置说明 默认值
并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控 2000

注:只对并发消费(ConsumeMessageConcurrentlyService)生效

更多分析祥见: RocketMQ——消息ACK机制及消费进度管理

pullThresholdForQueue

配置说明 默认值
consume queue流控的阈值 1000

每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consume queue的流控。

更多分析祥见: RocketMQ——消息ACK机制及消费进度管理

截至到4.1,流控级别只能针对consume queue级别,针对topic级别的流控已经提了issue: https://issues.apache.org/jira/browse/ROCKETMQ-106

pullInterval*

配置说明 默认值
拉取的间隔 0,单位毫秒

由于RocketMQ采取的pull的方式进行消息投递,每此会发起一个异步pull请求,得到请求后会再发起下次请求,这个间隔默认是0,表示立刻再发起。在间隔为0的场景下,消息投递的及时性几乎等同用Push实现的机制。

pullBatchSize*

f | 配置说明 | 默认值 | | ——| —— | |一次最大拉取的批量大小|32|

每次发起pull请求到broker,客户端需要指定一个最大batch size,表示这次拉取消息最多批量拉取多少条。

consumeMessageBatchMaxSize

配置说明 默认值
批量消费的最大消息条数 1

你可能发现了,RocketMQ的注册监听器回调的回调方法签名是类似这样的:

ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context);

里面的消息是一个集合List而不是单独的msg,这个consumeMessageBatchMaxSize就是控制这个集合的最大大小。

而由于拉取到的一批消息会立刻拆分成N(取决于consumeMessageBatchMaxSize)批消费任务,所以集合中msgs的最大大小是consumeMessageBatchMaxSizepullBatchSize的较小值。

postSubscriptionWhenPull

配置说明 默认值
每次拉取的时候是否更新订阅关系 false

从源码上看,这个值若是true,且不是class fliter模式,则每次拉取的时候会把subExpression带上到pull的指令中,broker发现这个指令会根据这个上传的表达式重新build出注册数据,而不是直接使用读取的缓存数据。

maxReconsumeTimes

配置说明 默认值
一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列 -1

注,这个值默认值虽然是-1,但是实际使用的时候默认并不是-1。按照消费是并行还是串行消费有所不同的默认值。

并行:默认16次

串行:默认无限大(Interge.MAX_VALUE)。由于顺序消费的特性必须等待前面的消息成功消费才能消费后面的,默认无限大即一直不断消费直到消费完成。

suspendCurrentQueueTimeMillis

配置说明 默认值
串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔 1000,单位毫秒

注:如果消费回调中对ConsumeOrderlyContext中的suspendCurrentQueueTimeMillis进行过设置,则使用用户设置的值作为消费间隔。

consumeTimeout

配置说明 默认值
消费的最长超时时间 15, 单位分钟

如果消费超时,RocketMQ会等同于消费失败来处理,更多分析祥见: RocketMQ——消息ACK机制及消费进度管理


DefaultMQPullConsumer

采取主动调用Pull接口的模式的消费者,主动权更大,但是使用难度也相对更大。以下介绍其配置,部分配置和PushConsumer一致。

consumerGroup*

配置说明 默认值
消费组的名称,用于标识一类消费者 无默认值,必设

详见 RocketMQ——角色与术语详解

registerTopics*

配置说明 默认值
消费者需要监听的topic 空集合

由于没有subscribe接口,用户需要自己把想要监听的topic设置到此集合中,RocketMQ内部会依靠此来发送对应心跳数据。

messageModel*

配置说明 默认值
消费模式 MessageModel.CLUSTERING

可选值有两个:

  1. CLUSTERING //集群消费模式
  2. BROADCASTING //广播消费模式

两种模式的区别详见:RocketMQ——角色与术语详解

allocateMessageQueueStrategy*

配置说明 默认值
负载均衡策略算法 AllocateMessageQueueAveragely(取模平均分配)

见DefaultPushConsumer的说明

offsetStore

配置说明 默认值
消息消费进度存储器 null

不建议设置,offsetStore 有两个策略:LocalFileOffsetStoreRemoteBrokerOffsetStore

若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore,不建议修改。

maxReconsumeTimes

配置说明 默认值
调用sendMessageBack的时候,如果发现重新消费超过这个配置的值,则投递到死信队列 16

由于PullConsumer没有管理消费的线程池和管理器,需要用户自己处理各种消费结果和拉取结果,故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack

回传消息的时候会带上maxReconsumeTimes的值,broker发现此消息已经消费超过此值,则投递到死信队列,否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的,只是PushConsumer无需用户显示调用。

brokerSuspendMaxTimeMillis

配置说明 默认值
broker在长轮询下,连接最长挂起的时间 20*1000,单位毫秒

长轮询具体逻辑不在本文论述,且RocketMQ不建议修改此值。

consumerTimeoutMillisWhenSuspend

配置说明 默认值
broker在长轮询下,客户端等待broker响应的最长等待超时时间 30*1000,单位毫秒

长轮询具体逻辑不在本文论述,且RocketMQ不建议修改此值,此值一定要大于brokerSuspendMaxTimeMillis

consumerPullTimeoutMillis

配置说明 默认值
pull的socket 超时时间 10*1000,单位毫秒

虽然注释上说是socket超时时间,但是从源码上看,此值的设计是不启动长轮询也不指定timeout的情况下,拉取的超时时间。

messageQueueListener

配置说明 默认值
负载均衡consume queue分配变化的通知监听器 null

由于pull操作需要用户自己去触发,故如果负载均衡发生变化,要有方法告知用户现在分到的新consume queue是什么。使用方可以实现此接口以达到此目的:

/**
 * A MessageQueueListener is implemented by the application and may be specified when a message queue changed
 */
public interface MessageQueueListener {
/**
 * @param topic message topic
 * @param mqAll all queues in this message topic
 * @param mqDivided collection of queues,assigned to the current consumer
 */
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,final Set<MessageQueue> mqDivided);
}

RocketMQ——消息文件过期原理

RocketMQ——消息ACK机制及消费进度管理 文中提过,所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。

消费完后的消息去哪里了?

消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

什么时候清理物理消息文件?

那消息文件到底删不删,什么时候删?

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

  1. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。

  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。

  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。

跳过历史消息的处理

由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。

这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?

对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的,解析请参看RocketMQ——消息ACK机制及消费进度管理相关章节。

但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:

  1. 自身的消费代码按照日期过滤,太老的消息直接过滤。如:

         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
             for(MessageExt msg: msgs){
                 if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期
                     continue;//过期消息跳过
                 }
    
                 //do consume here
    
             }
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
    
  2. 自身的消费代码代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

         @Override
         public ConsumeConcurrentlyStatus consumeMessage(//
             List<MessageExt> msgs, //
             ConsumeConcurrentlyContext context) {
             long offset = msgs.get(0).getQueueOffset();
             String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
             long diff = Long. parseLong(maxOffset) - offset;
             if (diff > 100000) { //消息堆积了10W情况的特殊处理
                 return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
             }
             //do consume here
             return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
         }
    
  3. 消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。

  4. 原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.

RocketMQ——消息ACK机制及消费进度管理

RokectMQ——水平扩展及负载均衡详解 中剖析过,consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)?

本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。

由于以上工作所有的机制都实现在PushConsumer中,所以本文的原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。

注:广播消费和集群消费的处理有部分区别,以下均特指集群消费(CLSUTER),广播(BROADCASTING)下部分可能不适用。

保证消费成功

PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

消费的时候,我们需要注入一个消费回调,具体sample代码如下:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//执行真正消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

注:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

启动的时候从哪里消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息需要自身做过滤,或者使用先修改消费进度。示例代码请参看:RocketMQ——消息文件过期原理

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。

如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。

但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

message ack

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan

/**
 * Concurrently max span offset.it has no effect on sequential consumption
 */
private int consumeConcurrentlyMaxSpan = 2000;

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。

但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复!

Ack卡进度解决方案

实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景。 这也是我们公司自己定制的解决方案。

部分源码如下:

class ConsumeRequestWithUnAck implements Runnable {
    final ConsumeRequest consumeRequest;
    final long resendAfterIfStillUnAck;//n毫秒没有消费完,就重发

    ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck) {
        this.consumeRequest = consumeRequest;
        this.resendAfterIfStillUnAck = resendAfterIfStillUnAck;
    }

    @Override
    public void run() {
        //每次消费前,计划延时任务,超时则ack并重发
        final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest);
        ScheduledFuture scheduledFuture=null;
        if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown()) {
            scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS);
        }
        try{
            this.consumeRequest.run();//正常执行并更新offset
        }
        finally {
            if (scheduledFuture != null) scheduledFuture.cancel(false);//消费结束后,取消任务
        }
    }

}
  1. 定义了一个装饰器,把原来的ConsumeRequest对象包了一层。
  2. 装饰器中,每条消息消费前都会调度一个调度器,定时触发,触发的时候如果发现消息还存在,就执行sendback并ack的操作。

后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题。只是实现方式上有所不同(事实上我认为RocketMQ的方案还不够完善)

  1. 在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。
  2. 消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。

核心源码如下:

//ConsumeMessageConcurrentlyService.java
public void start() {
    this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
    Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, ProcessQueue> next = it.next();
        ProcessQueue pq = next.getValue();
        pq.cleanExpiredMsg(this.defaultMQPushConsumer);
    }
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }

    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {

            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            msgTreeMap.remove(msgTreeMap.firstKey());
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}

通过这个逻辑对比我定制的时间,可以看出有几个不太完善的问题:

  1. 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
  2. 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。

RocketMQ——水平扩展及负载均衡详解

RocketMQ是一个分布式具有高度可扩展性的消息中间件。本文旨在探索在broker端,生产端,以及消费端是如何做到横向扩展以及负载均衡的。

Broker端水平扩展

Broker负载均衡

Broker是以group为单位提供服务。一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。

通过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。

而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。

Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。

并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。

commit log

虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。

不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写。

具体如下图:

broker负载均衡

Producer

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

生产者负载均衡

Consumer负载均衡

集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

默认的分配算法是AllocateMessageQueueAveragely,如下图:

消费者负载均衡1

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:

消费者负载均衡2

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在consumer分配queue的时候,会所有consumer都分到所有的queue。

消费者广播模式

RocketMQ——通信协议

RocketMQ的通信协议其实很简单,但是无论是官方的用户手册,还是网上的博客,并没有很清晰简单地把其中所有的内容和原理讲明白。 对于需要扩展其他语言SDK的开发来说,意味着必须要深入到Java源码才能弄懂其概念。笔者通过深入源码,本文希望以尽量简短的语言描述清楚协议的每个字段及其意义。

无论是发送消息,拉取消息,还是发送心跳等所有的网络通讯层协议(客户端与broker/nameserver间,broker与nameserver间)都使用一套一样的协议。并且无论请求还是响应,协议是一样的,协议头的字段也是固定的。

通讯协议

协议分为以下四部分:

RocketMQ协议

其中后两部分是通讯的实际数据。前两段都是四个字节的整形,分别表示两段实际数据的长度。

  • header: 协议的头,数据是序列化后的json。json的每个key字段都是固定的,不同的通讯请求字段不一样。后面解释

  • body: 请求的二进制实际数据。例如发送消息的网络请求中,body中传输实际的消息内容。

  • length:2 3 4 端整体的长度。四个字节整数。

  • header length: header的长度。四个字节整数。

Header

协议header具体标识整个通讯请求的元数据,如请求什么,怎样的方式请求(异步/oneway)请求客户端的版本,语言,请求的具体参数等。

header是序列化的json,以下是json中的所有字段,并阐述起在请求和响应两个阶段的区别。

字段 类型 Request Response
code 整数 请求操作码。响应方通过code决定如何处理请求。 响应码。0表示成功,非0表示错误码。
language 字符串 标记请求方的语言类型,如JAVA。 应答方方的所使用的语言。
version 整数 请求方的版本号 应答方的版本号
opaque 整数 在同一个连接上,标记是哪次请求。服务响应的时候会返回这个请求标识码,以达到请求方多线程中复用连接,在收到响应的时候找到对应的请求 原请求的opaque。应答方不做修改原值返回。
flag 整数 通信层的标识位。标识这次通信的类型。 通信层的标识位。标识这次通信的类型。
remark 字符串 传输的自定义文本 应答的文本信息。通常存放错误信息。
extFields HashMap<String,String> 请求自定义字段。不同的请求会有不一样的参数列表,这里存放那些请求自定义的参数列表。 响应自定义字段。不同的响应会有不一样的参数列表,若有,这里则存放那些请求自定义的参数列表。

Header详解:

code:

请求/响应码。所有的请求码参考代码RequestCode.java。响应码则在ResponseCode.java中。

language:

由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言。

version:

给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作。

opaque:

请求标识码。在Java版的通信层中,这个只是一个不断自增的整形,为了收到应答方响应的的时候找到对应的请求。

flag: 按位(bit)解释。

第0位标识是这次通信是request还是response,0标识request, 1 标识response。

第1位标识是否是oneway请求,1标识oneway。应答方在处理oneway请求的时候,不会做出响应,请求方也无序等待应答方响应。

remark:

附带的文本信息。常见的如存放一些broker/nameserver返回的一些异常信息,方便开发人员定位问题。

extFields:

这个字段不通的请求/响应不一样,完全自定义。数据结构上是java的hashmap。在Java的每个RemotingCammand中,其实都带有一个CommandCustomHeader的属性成员,可以认为他是一个强类型的extFields,再最后传输的时候,这个CommandCustomHeader会被忽略,而传输前会把其中的所有字段全部都原封不动塞到extFields中,以作传输。

以发送消息为例(code=310),发送消息的自定义header是SendMessageRequestHeaderV2(只是字段名对比SendMessageRequestHeader压缩了)。有以下字段:

@CFNotNull
private String a;// producerGroup;
@CFNotNull
private String b;// topic;
@CFNotNull
private String c;// defaultTopic;
@CFNotNull
private Integer d;// defaultTopicQueueNums;
@CFNotNull
private Integer e;// queueId;
@CFNotNull
private Integer f;// sysFlag;
@CFNotNull
private Long g;// bornTimestamp;
@CFNotNull
private Integer h;// flag;
@CFNullable
private String i;// properties;
@CFNullable
private Integer j;// reconsumeTimes;
@CFNullable
private boolean k;// unitMode = false;

这些字段都会原封不动的去到extFields中做传输,最后看到的发送消息的请求header会类似如:

{  
    "code":310,
    "extFields":{  
        "f":"0",
        "g":"1482158310125",
        "d":"4",
        "e":"0",
        "b":"TopicTest",
        "c":"TBW102",
        "a":"please_rename_unique_group_name",
        "j":"0",
        "k":"false",
        "h":"0",
        "i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
    },
    "flag":0,
    "language":"JAVA",
    "opaque":206,
    "version":79
}

注:其中fastjson把值为null的remark过滤了。

请求码列表

以下是截至到3.2.6的所有请求码列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169

    // Broker 发送消息
    public static final int SEND_MESSAGE = 10;
    // Broker 订阅消息
    public static final int PULL_MESSAGE = 11;
    // Broker 查询消息
    public static final int QUERY_MESSAGE = 12;
    // Broker 查询Broker Offset
    public static final int QUERY_BROKER_OFFSET = 13;
    // Broker 查询Consumer Offset
    public static final int QUERY_CONSUMER_OFFSET = 14;
    // Broker 更新Consumer Offset
    public static final int UPDATE_CONSUMER_OFFSET = 15;
    // Broker 更新或者增加一个Topic
    public static final int UPDATE_AND_CREATE_TOPIC = 17;
    // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置)
    public static final int GET_ALL_TOPIC_CONFIG = 21;
    // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置)
    public static final int GET_TOPIC_CONFIG_LIST = 22;
    // Broker 获取所有Topic名称列表
    public static final int GET_TOPIC_NAME_LIST = 23;
    // Broker 更新Broker上的配置
    public static final int UPDATE_BROKER_CONFIG = 25;
    // Broker 获取Broker上的配置
    public static final int GET_BROKER_CONFIG = 26;
    // Broker 触发Broker删除文件
    public static final int TRIGGER_DELETE_FILES = 27;
    // Broker 获取Broker运行时信息
    public static final int GET_BROKER_RUNTIME_INFO = 28;
    // Broker 根据时间查询队列的Offset
    public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
    // Broker 查询队列最大Offset
    public static final int GET_MAX_OFFSET = 30;
    // Broker 查询队列最小Offset
    public static final int GET_MIN_OFFSET = 31;
    // Broker 查询队列最早消息对应时间
    public static final int GET_EARLIEST_MSG_STORETIME = 32;
    // Broker 根据消息ID来查询消息
    public static final int VIEW_MESSAGE_BY_ID = 33;
    // Broker Client向Client发送心跳,并注册自身
    public static final int HEART_BEAT = 34;
    // Broker Client注销
    public static final int UNREGISTER_CLIENT = 35;
    // Broker Consumer将处理不了的消息发回服务器
    public static final int CONSUMER_SEND_MSG_BACK = 36;
    // Broker Commit或者Rollback事务
    public static final int END_TRANSACTION = 37;
    // Broker 获取ConsumerId列表通过GroupName
    public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
    // Broker 主动向Producer回查事务状态
    public static final int CHECK_TRANSACTION_STATE = 39;
    // Broker Broker通知Consumer列表变化
    public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
    // Broker Consumer向Master锁定队列
    public static final int LOCK_BATCH_MQ = 41;
    // Broker Consumer向Master解锁队列
    public static final int UNLOCK_BATCH_MQ = 42;
    // Broker 获取所有Consumer Offset
    public static final int GET_ALL_CONSUMER_OFFSET = 43;
    // Broker 获取所有定时进度
    public static final int GET_ALL_DELAY_OFFSET = 45;
    // Namesrv 向Namesrv追加KV配置
    public static final int PUT_KV_CONFIG = 100;
    // Namesrv 从Namesrv获取KV配置
    public static final int GET_KV_CONFIG = 101;
    // Namesrv 从Namesrv获取KV配置
    public static final int DELETE_KV_CONFIG = 102;
    // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
    public static final int REGISTER_BROKER = 103;
    // Namesrv 卸载一个Broker,数据都是持久化的
    public static final int UNREGISTER_BROKER = 104;
    // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
    public static final int GET_ROUTEINTO_BY_TOPIC = 105;
    // Namesrv 获取注册到Name Server的所有Broker集群信息
    public static final int GET_BROKER_CLUSTER_INFO = 106;
    public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
    public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
    public static final int GET_TOPIC_STATS_INFO = 202;
    public static final int GET_CONSUMER_CONNECTION_LIST = 203;
    public static final int GET_PRODUCER_CONNECTION_LIST = 204;
    public static final int WIPE_WRITE_PERM_OF_BROKER = 205;

    // 从Name Server获取完整Topic列表
    public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
    // 从Broker删除订阅组
    public static final int DELETE_SUBSCRIPTIONGROUP = 207;
    // 从Broker获取消费状态(进度)
    public static final int GET_CONSUME_STATS = 208;
    // Suspend Consumer消费过程
    public static final int SUSPEND_CONSUMER = 209;
    // Resume Consumer消费过程
    public static final int RESUME_CONSUMER = 210;
    // 重置Consumer Offset
    public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
    // 重置Consumer Offset
    public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
    // 调整Consumer线程池数量
    public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
    // 查询消息被哪些消费组消费
    public static final int WHO_CONSUME_THE_MESSAGE = 214;

    // 从Broker删除Topic配置
    public static final int DELETE_TOPIC_IN_BROKER = 215;
    // 从Namesrv删除Topic配置
    public static final int DELETE_TOPIC_IN_NAMESRV = 216;
    // Namesrv 通过 project 获取所有的 server ip 信息
    public static final int GET_KV_CONFIG_BY_VALUE = 217;
    // Namesrv 删除指定 project group 下的所有 server ip 信息
    public static final int DELETE_KV_CONFIG_BY_VALUE = 218;
    // 通过NameSpace获取所有的KV List
    public static final int GET_KVLIST_BY_NAMESPACE = 219;

    // offset 重置
    public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
    // 客户端订阅消息
    public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
    // 通知 broker 调用 offset 重置处理
    public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
    // 通知 broker 调用客户端订阅消息处理
    public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;

    // Broker 查询topic被谁消费
    // 2014-03-21 Add By shijia
    public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;

    // 获取指定集群下的所有 topic
    // 2014-03-26
    public static final int GET_TOPICS_BY_CLUSTER = 224;

    // 向Broker注册Filter Server
    // 2014-04-06 Add By shijia
    public static final int REGISTER_FILTER_SERVER = 301;
    // 向Filter Server注册Class
    // 2014-04-06 Add By shijia
    public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
    // 根据 topic 和 group 获取消息的时间跨度
    public static final int QUERY_CONSUME_TIME_SPAN = 303;
    // 获取所有系统内置 Topic 列表
    public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
    public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;

    // 清理失效队列
    public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;

    // 通过Broker查询Consumer内存数据
    // 2014-07-19 Add By shijia
    public static final int GET_CONSUMER_RUNNING_INFO = 307;

    // 查找被修正 offset (转发组件)
    public static final int QUERY_CORRECTION_OFFSET = 308;

    // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方
    // 2014-08-11 Add By shijia
    public static final int CONSUME_MESSAGE_DIRECTLY = 309;

    // Broker 发送消息,优化网络数据包
    public static final int SEND_MESSAGE_V2 = 310;

    // 单元化相关 topic
    public static final int GET_UNIT_TOPIC_LIST = 311;
    // 获取含有单元化订阅组的 Topic 列表
    public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
    // 获取含有单元化订阅组的非单元化 Topic 列表
    public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
    // 克隆某一个组的消费进度到新的组
    public static final int CLONE_GROUP_OFFSET = 314;

    // 查看Broker上的各种统计信息
    public static final int VIEW_BROKER_STATS_DATA = 315;

RocketMQ——角色与术语详解

RocketMQ中有很多概念,其中包括一些术语和角色。

理清楚基本的概念能有效的帮助理解RocketMQ的原理以及排查问题。

角色:

Producer

生产者。发送消息的客户端角色。发送消息的时候需要指定Topic。

Consumer

消费者。消费消息的客户端角色。通常是后台处理异步消费的系统。 RocketMQ中Consumer有两种实现:PushConsumer和PullConsumer。

PushConsumer

推送模式(虽然RocketMQ使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。

PullConsumer

拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。

概念术语

Producer Group

标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不支持事务消息。

Consumer Group

标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。

注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。

Topic

标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。

Tag

RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。

Message Queue

简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不通的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。

无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。

每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

Offset

RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。

注: 逻辑offset的概念在RocketMQ中字面意思实际上和真正的意思有一定差别,这点在设计上显得有点混乱。祥见下面的解释。

可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset。

max offset

字面上可以理解为这是标识message queue中的max offset表示消息的最大offset。但是从源码上看,这个offset实际上是最新消息的offset+1,即:下一条消息的offset。

min offset:

标识现存在的最小offset。而由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。

consumer offset

字面上,可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置

消费者拉取消息的时候需要指定offset,broker不主动推送消息, offset的消息返回给客户端。

consumer刚启动的时候会获取持久化的consumer offset,用以决定从哪里开始消费,consumer以此发起第一次请求。

每次消息消费成功后,这个offset在会先更新到内存,而后定时持久化。在集群消费模式下,会同步持久化到broker,而在广播模式下,则会持久化到本地文件。

集群消费

消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度的存储会持久化到Broker。

广播消费

消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度会存储持久化到实例本地。

顺序消息

消费消息的顺序要同发送消息的顺序一致。由于Consumer消费消息的时候是针对Message Queue顺序拉取并开始消费,且一条Message Queue只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于Q上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行)。

在RocketMQ中,顺序消费主要指的是都是Queue级别的局部顺序。这一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。

生产者发送的时候可以用MessageQueueSelector为某一批消息(通常是有相同的唯一标示id)选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)。或者Message Queue的数量只有1,但这样消费的实例只能有一个,多出来的实例都会空跑。

普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生异常,Broker宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致。

如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

严格顺序消息

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前并未实现)

目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息

RocketMQ——组件

rocketmq部署

RocketMQ服务端的组件有三个,NameServer,Broker,FilterServer(可选,部署于和Broker同一台机器)

下面分别介绍三个组件:

Name Server

Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。用户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。

  • Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。

  • 对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。

  • Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。

  • 如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。

Broker

Broker是处理消息存储,转发等处理的服务器。

  • Broker以group分开,每个group只允许一个master,若干个slave。
  • 只有master才能进行写入操作,slave不允许。
  • slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。
  • 客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。
  • Broker向所有的NameServer结点建立长连接,注册Topic信息。

Filter Server(可选)

RocketMQ可以允许消费者上传一个Java类给Filter Server进行过滤。

  • Filter Server只能起在Broker所在的机器
  • 可以有若干个Filter Server进程
  • 拉取消息的时候,消息先经过Filter Server,Filter Server靠上传的Java类过滤消息后才推给Consumer消费。
  • 客户端完全可以消费消息的时候做过滤,不需要Filter Server
  • FilterServer存在的目的是用Broker的CPU资源换取网卡资源。因为Broker的瓶颈往往在网卡,而且CPU资源很闲。在客户端过滤会导致无需使用的消息在占用网卡资源。
  • 使用 Java class上传作为过滤表达式是一个双刃剑——一方面方便了应用的过滤操作且节省网卡资源,另一方面也带来了服务器端的安全风险。这就需要应用来保证过滤代码安全——例如在过滤程序里尽可能不做申请大内存,创建线程等操作。避免 Broker 服务器资源泄漏。

在IntilliJ中运行Maven的Tomcat项目

对于一个Maven项目,如果又是一个tomcat项目,在运行tomcat之前应该要进行Maven的构建。

以下是步骤:

1.新建一个Maven Run/Debug configuration:

新建一个Maven Run/Debug configuration

2.在Tomcat Run/Debug Configuration中新增#1的阶段在”Before Launch”中

在Tomcat Run/Debug Configuration中新增#1的阶段在"Before Launch"中