Netty 解码 Decoder(五)

目录:

  1. Netty解码概述
  2. ByteToMessageDecoder
  3. 固定长度解码器FixedLengthFrameDecoder
  4. 行解码器LineBasedFrameDecoder
  5. 分隔符解码器DelimiterBasedFrameDecoder
  6. 长度解码器LengthFieldBasedFrameDecoder
  7. 总结

1. Netty解码概述

所谓解码就是指把一串二进制的数据流解析成一个个自定义协议的数据包,也就是咱们Netty里的ByteBuf。那么后续的业务处理就可以直接对ByteBuf进行处理。

Netty它做为一个通讯协议框架,对TCP连接做了充分的考虑,比如粘包、拆包导致的半包或者丢包等现象,该如何对数据包进行解码的这么一个过程,它提供了很多已经写好的解码器对这些问题进行处理。

可以通过NioEventLoop的run方法跟踪下,最终接受到可读的事件之后就会走到以下的这个方法,把数据写到ByteBuf中,这里就相当于已经把客户端传递的信息已经读到了ByteBuf中。

1
2
3
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
1
pipeline.fireChannelRead(byteBuf);

然后通过pipeline已事件传播的方式往以下的链路进行传播,直接有对应的Inbound对其进行处理。这里一定要注意Netty所提供的解码Decoder其实都是Inbound的子类或者实现类。Decoder是属于Inbound体系的,也就是说它会在整个读事件的传播链路当中。就根据我们上一篇讲的Inbound事件传播一样,需要把自定义的Decoder假如到这条链路中Netty组件Channel & Pipeline(三)

2. ByteToMessageDecoder

  • 累加字节流
  • 调用子类的decode方法进行解析
  • 将解析到的ByteBuf向下传播

ByteToMessageDecoder,所有的解码器包括Netty自定义的解码器都是这个类的子类。在Pipeline注册的解码器,最终就会执行到以下的这个方法。ByteToMessageDecoder相当于定义的解码的模板,具体的即系则由子类去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断传播的对象是否ByteBuf,如果是则进行处理,如果不是则再继续往下传播
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 判断是否第一次
if (first) {
// 第一次则直接赋值
cumulation = data;
} else {
// 判断当前的这个累加器的字节流是否能够放的下这个数据
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|| cumulation.refCnt() > 1) {
// 如果不够则需要扩容
expandCumulation(ctx, data.readableBytes());
}
//把数据累加到这个cumulation当中
cumulation.writeBytes(data);
//释放ByteBuf
data.release();
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;

for (int i = 0; i < size; i ++) {
// 重点,获取到解码的数据后在进行传播
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
// 非ByteBuf类型,则直接继续传播
ctx.fireChannelRead(msg);
}
}

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 读取所有数据,知道读完为止
while (in.isReadable()) {
int outSize = out.size();
// 记录当前可读数据长度,后续需要用到这个判断每一次的读写是否完成
// 用于处理半包的情况,如果未半包。则不再继续,等到下次在进行处理
int oldInputLength = in.readableBytes();

//调用子类实现进行解码
decode(ctx, in, out);

// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
// 如果decode读到了数据,那么out里面肯定就有值了,这里肯定就不糊相等
// 如果decode没有读取到数据,那么这里则会相等,然后在判断所读取的数据是否与之前相当,相等说以个字节的数据都没有读取完,可能存在半包的问题,则直接跳出
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}

if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}

if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}

3. 固定长度解码器FixedLengthFrameDecoder

可以看到FixedLengthFrameDecoder有个关键的属性:frameLength。这个解码定长为这个属性,即在实例化这个Decoder的时候就需要固定这个解码到底是多长。可以看到它如下就有举例:

比如客户端分三次发送每一次发送的数据为: A | BC | DEFG | HI

那么实际解码也会变成ABC|DEF|GHI,这里它其实就存在半包的问题,即如果数据还没有全部到,那么它还会继续等待下一次的读事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
* <pre>
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* </pre>
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

private final int frameLength;

/**
* Creates a new instance.
*
* @param frameLength the length of the frame
*/
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readSlice(frameLength).retain();
}
}
}

一个简单的Demo使用固定长度解码器。这里通过telnet的方式进行连接,然后依次输入1到9的数值。其实这个地方大家打断点就能知道,在你输入1的时候实际上就已经监听到读事件了,不过长度只有1,则直接把这个字节放到累加器里面,等待继续输入。因为这里定长为3,即3个字节为一个完整的传输包,所以需要等待直到接收到3个包长的数据时才能算一个完整的包,读取完之后然后再向下传递

可以看到通过pipeline增加了3个解码器,首先通过FixedLengthFrameDecoder进行解码,它解码之后就在通过StringDecoder解码即把每个ByteBuf转换为字符串类型,然后在通过自定义的解码器去接收可以再次实现解码。再通过一个out往下传播。

简单的方法就是通过字符串解码器之后,然后再针对字符串进行解码转换为对应的实体对象再进行传播到对应的Hander业务处理即可。相当于具体的业务就是针对字符串解码。

另外一种情况就直接针对ByteBuf进行解码,即字节的长度从哪里到哪里对应哪个字段哪个属性等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DecoderServerTest {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(3));
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new SelfFixedLineFrameDecoder());
}
});

ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

public class SelfFixedLineFrameDecoder extends MessageToMessageDecoder<String> {

@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
System.out.println("received msg := " + msg + " out size = " + out.size());
}
}

4. 行解码器LineBasedFrameDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(10));
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new SelfFixedLineFrameDecoder());
}
});

ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

第一次输入内容:123456\n

第二次输入内容:456789123\n

第三次输入内容:12345678912\n

根据上面简单的demo可以看出来,我再输入10个字符以内然后以换行符结束之后,都可以正常的接收到输入的内容并且打印。

但是一旦超过贵的的限长,即10个字符的时候则直接就报错了。因为最大容纳10个字节的长度。

5. 分隔符解码器DelimiterBasedFrameDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(10,
Unpooled.wrappedBuffer(new byte[]{'#'})));
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new SelfFixedLineFrameDecoder());
}
});

ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

第一次输入内容:789#

第二次输入内容:456123#

第三次输入内容:987654321#

第四次输入内容:12345678912#

其实他这个跟LineBasedFrameDecoder差不多,只是它这个解码器可以指定对应的字符来进行解码。

6. 长度解码器LengthFieldBasedFrameDecoder

这个相对来说就复杂很多了,后续专门记录下。

7.总结

以上讲述了Netty针对解码的这么一个过程。从基础类ByteToMessageDecoder,它把基础框架协议都拟好了之后,这里解决一个channel多次发送数据,全部都通过它来接受并且处理,但是最终的解析逻辑还是由实现类去实现。

那么以上主要也描述到了几个主要的解码器:固定长度解码器FixedLengthFrameDecoder、行解码器LineBasedFrameDecoder、分隔符解码器DelimiterBasedFrameDecoder。那还剩下一个最重要的长度解码器LengthFieldBasedFrameDecoder,这个解码器基本上可以解决所有的协议包定义相对来说是一个比较通用的解码器。

分享到 评论