RocketMQ的通信协议其实很简单,但是无论是官方的用户手册,还是网上的博客,并没有很清晰简单地把其中所有的内容和原理讲明白。 对于需要扩展其他语言SDK的开发来说,意味着必须要深入到Java源码才能弄懂其概念。笔者通过深入源码,本文希望以尽量简短的语言描述清楚协议的每个字段及其意义。
无论是发送消息,拉取消息,还是发送心跳等所有的网络通讯层协议(客户端与broker/nameserver间,broker与nameserver间)都使用一套一样的协议。并且无论请求还是响应,协议是一样的,协议头的字段也是固定的。
通讯协议
协议分为以下四部分:
其中后两部分是通讯的实际数据。前两段都是四个字节的整形,分别表示两段实际数据的长度。
header: 协议的头,数据是序列化后的json。json的每个key字段都是固定的,不同的通讯请求字段不一样。后面解释
body: 请求的二进制实际数据。例如发送消息的网络请求中,body中传输实际的消息内容。
length:2 3 4 端整体的长度。四个字节整数。
header length: header的长度。四个字节整数。
Header
协议header具体标识整个通讯请求的元数据,如请求什么,怎样的方式请求(异步/oneway)请求客户端的版本,语言,请求的具体参数等。
header是序列化的json,以下是json中的所有字段,并阐述起在请求和响应两个阶段的区别。
字段 | 类型 | Request | Response | |
---|---|---|---|---|
code | 整数 | 请求操作码。响应方通过code决定如何处理请求。 | 响应码。0表示成功,非0表示错误码。 | |
language | 字符串 | 标记请求方的语言类型,如JAVA。 | 应答方方的所使用的语言。 | |
version | 整数 | 请求方的版本号 | 应答方的版本号 | |
opaque | 整数 | 在同一个连接上,标记是哪次请求。服务响应的时候会返回这个请求标识码,以达到请求方多线程中复用连接,在收到响应的时候找到对应的请求 | 原请求的opaque。应答方不做修改原值返回。 | |
flag | 整数 | 通信层的标识位。标识这次通信的类型。 | 通信层的标识位。标识这次通信的类型。 | |
remark | 字符串 | 传输的自定义文本 | 应答的文本信息。通常存放错误信息。 | |
extFields | HashMap<String,String> | 请求自定义字段。不同的请求会有不一样的参数列表,这里存放那些请求自定义的参数列表。 | 响应自定义字段。不同的响应会有不一样的参数列表,若有,这里则存放那些请求自定义的参数列表。 |
Header详解:
code:
请求/响应码。所有的请求码参考代码RequestCode.java
。响应码则在ResponseCode.java
中。
language:
由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言。
version:
给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作。
opaque:
请求标识码。在Java版的通信层中,这个只是一个不断自增的整形,为了收到应答方响应的的时候找到对应的请求。
flag: 按位(bit)解释。
第0位标识是这次通信是request还是response,0标识request, 1 标识response。
第1位标识是否是oneway请求,1标识oneway。应答方在处理oneway请求的时候,不会做出响应,请求方也无序等待应答方响应。
remark:
附带的文本信息。常见的如存放一些broker/nameserver返回的一些异常信息,方便开发人员定位问题。
extFields:
这个字段不通的请求/响应不一样,完全自定义。数据结构上是java的hashmap。在Java的每个RemotingCammand中,其实都带有一个CommandCustomHeader的属性成员,可以认为他是一个强类型的extFields,再最后传输的时候,这个CommandCustomHeader会被忽略,而传输前会把其中的所有字段全部都原封不动塞到extFields中,以作传输。
以发送消息为例(code=310),发送消息的自定义header是SendMessageRequestHeaderV2(只是字段名对比SendMessageRequestHeader压缩了)。有以下字段:
@CFNotNull
private String a;// producerGroup;
@CFNotNull
private String b;// topic;
@CFNotNull
private String c;// defaultTopic;
@CFNotNull
private Integer d;// defaultTopicQueueNums;
@CFNotNull
private Integer e;// queueId;
@CFNotNull
private Integer f;// sysFlag;
@CFNotNull
private Long g;// bornTimestamp;
@CFNotNull
private Integer h;// flag;
@CFNullable
private String i;// properties;
@CFNullable
private Integer j;// reconsumeTimes;
@CFNullable
private boolean k;// unitMode = false;
这些字段都会原封不动的去到extFields中做传输,最后看到的发送消息的请求header会类似如:
{
"code":310,
"extFields":{
"f":"0",
"g":"1482158310125",
"d":"4",
"e":"0",
"b":"TopicTest",
"c":"TBW102",
"a":"please_rename_unique_group_name",
"j":"0",
"k":"false",
"h":"0",
"i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
},
"flag":0,
"language":"JAVA",
"opaque":206,
"version":79
}
注:其中fastjson把值为null的remark过滤了。
请求码列表
以下是截至到3.2.6的所有请求码列表
|
|