RocketMQ的通信协议其实很简单,但是无论是官方的用户手册,还是网上的博客,并没有很清晰简单地把其中所有的内容和原理讲明白。 对于需要扩展其他语言SDK的开发来说,意味着必须要深入到Java源码才能弄懂其概念。笔者通过深入源码,本文希望以尽量简短的语言描述清楚协议的每个字段及其意义。
无论是发送消息,拉取消息,还是发送心跳等所有的网络通讯层协议(客户端与broker/nameserver间,broker与nameserver间)都使用一套一样的协议。并且无论请求还是响应,协议是一样的,协议头的字段也是固定的。
通讯协议
协议分为以下四部分:
其中后两部分是通讯的实际数据。前两段都是四个字节的整形,分别表示两段实际数据的长度。
header: 协议的头,数据是序列化后的json。json的每个key字段都是固定的,不同的通讯请求字段不一样。后面解释
body: 请求的二进制实际数据。例如发送消息的网络请求中,body中传输实际的消息内容。
length:2 3 4 端整体的长度。四个字节整数。
header length: header的长度。四个字节整数。
Header
协议header具体标识整个通讯请求的元数据,如请求什么,怎样的方式请求(异步/oneway)请求客户端的版本,语言,请求的具体参数等。
header是序列化的json,以下是json中的所有字段,并阐述起在请求和响应两个阶段的区别。
字段 | 类型 | Request | Response | |
---|---|---|---|---|
code | 整数 | 请求操作码。响应方通过code决定如何处理请求。 | 响应码。0表示成功,非0表示错误码。 | |
language | 字符串 | 标记请求方的语言类型,如JAVA。 | 应答方方的所使用的语言。 | |
version | 整数 | 请求方的版本号 | 应答方的版本号 | |
opaque | 整数 | 在同一个连接上,标记是哪次请求。服务响应的时候会返回这个请求标识码,以达到请求方多线程中复用连接,在收到响应的时候找到对应的请求 | 原请求的opaque。应答方不做修改原值返回。 | |
flag | 整数 | 通信层的标识位。标识这次通信的类型。 | 通信层的标识位。标识这次通信的类型。 | |
remark | 字符串 | 传输的自定义文本 | 应答的文本信息。通常存放错误信息。 | |
extFields | HashMap<String,String> | 请求自定义字段。不同的请求会有不一样的参数列表,这里存放那些请求自定义的参数列表。 | 响应自定义字段。不同的响应会有不一样的参数列表,若有,这里则存放那些请求自定义的参数列表。 |
Header详解:
code:
请求/响应码。所有的请求码参考代码RequestCode.java
。响应码则在ResponseCode.java
中。
language:
由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言。
version:
给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作。
opaque:
请求标识码。在Java版的通信层中,这个只是一个不断自增的整形,为了收到应答方响应的的时候找到对应的请求。
flag: 按位(bit)解释。
第0位标识是这次通信是request还是response,0标识request, 1 标识response。
第1位标识是否是oneway请求,1标识oneway。应答方在处理oneway请求的时候,不会做出响应,请求方也无序等待应答方响应。
remark:
附带的文本信息。常见的如存放一些broker/nameserver返回的一些异常信息,方便开发人员定位问题。
extFields:
这个字段不通的请求/响应不一样,完全自定义。数据结构上是java的hashmap。在Java的每个RemotingCammand中,其实都带有一个CommandCustomHeader的属性成员,可以认为他是一个强类型的extFields,再最后传输的时候,这个CommandCustomHeader会被忽略,而传输前会把其中的所有字段全部都原封不动塞到extFields中,以作传输。
以发送消息为例(code=310),发送消息的自定义header是SendMessageRequestHeaderV2(只是字段名对比SendMessageRequestHeader压缩了)。有以下字段:
@CFNotNull
private String a;// producerGroup;
@CFNotNull
private String b;// topic;
@CFNotNull
private String c;// defaultTopic;
@CFNotNull
private Integer d;// defaultTopicQueueNums;
@CFNotNull
private Integer e;// queueId;
@CFNotNull
private Integer f;// sysFlag;
@CFNotNull
private Long g;// bornTimestamp;
@CFNotNull
private Integer h;// flag;
@CFNullable
private String i;// properties;
@CFNullable
private Integer j;// reconsumeTimes;
@CFNullable
private boolean k;// unitMode = false;
这些字段都会原封不动的去到extFields中做传输,最后看到的发送消息的请求header会类似如:
{
"code":310,
"extFields":{
"f":"0",
"g":"1482158310125",
"d":"4",
"e":"0",
"b":"TopicTest",
"c":"TBW102",
"a":"please_rename_unique_group_name",
"j":"0",
"k":"false",
"h":"0",
"i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
},
"flag":0,
"language":"JAVA",
"opaque":206,
"version":79
}
注:其中fastjson把值为null的remark过滤了。
请求码列表
以下是截至到3.2.6的所有请求码列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|