目录:
- Netty编码概述
- Encoder Demo举例
- MessageToByteEncoder
- writeAndFlush()方法
- 总结
如何把对象也就是咱们业务当中的对象转换为字节流,最终写入到socket底层?
1. Netty编码概述
1 | channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 1, body.length(), body)); |
以上这个就是通过Netty中的channel把对象写入到流中,但是这里仅仅还只是把RequestParam这个对象输入到Netty的Outbound传播时间当中, 也就是相当触发了pipeline当中的writeAndFlush。
writeAndFlush方法具体流程
- 从tail节点开始往前传播
- 逐个调用channelHandler的write方法
- 逐个调用channelHandler的flush方法
2. MessageToByteEncoder
根据上面的代码通过writeAndflush写入到事件中。
NioSocketChannel当中的writeAndFlush方法如下:
1 | public ChannelFuture writeAndFlush(Object msg) { |
ChannelPipeline当中的writeAndFlush方法如下:
1 | public ChannelFuture writeAndFlush(Object msg) { |
根据上面的Channel以及对应的Pipeline中可以看到,Netty当中的把对象写入到传播事件中,首先会经过Tail,也就是咱们之前说的Outbound,它是先触达到末尾,然后再往上传播到你注册的Decoder,最终通过自定义的Decoder把相关的字段写入到通道中。
相当于从上面的流程触发,最终到MessageToByteEncoder。
这里最终又是到达了HeadContext的write方法,然后再通过Unsafe类来操作具体的write和flush。
1 | public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { |
2. Encoder Demo
可以看如下的Demo代码。发送的协议消息固定字节长度为6,一个变动长度body。
可以看到type、flag、length这3个字段加一起一共6个字节。length当中则记录了最终消息体body的长度。
1 | /* |
1 | public class RequestEncoder extends MessageToByteEncoder<RequestParam> { |
这里写入整个消息包,最终通过事件传播到encoder进行处理
1 | public class RcpClient { |
4.writeAndFlush()方法
HeadContext的writeAndFlush方法
1 | public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { |
write()
- 把ByteBuf转换为堆外内存Direct
- 插入写队列
- 更新设置写状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82public final void write(Object msg, ChannelPromise promise) {
// 判断是否EventLoop线程
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 转换为directByteBuf,如果已经是堆外内存则直接返回
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 插入写队列并且设置写状态(重要)
// 将已经转化为堆外内存的msg插入到写队列
outboundBuffer.addMessage(msg, size, promise);
}
public void addMessage(Object msg, int size, ChannelPromise promise) {
//包装消息,为写请求Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
// 即将被消费的开始节点
flushedEntry = null;
// 最后一个节点
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
// 被添加的开始节点,但没有准备好被消费。
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
//在添加消息到未刷新写请求链表后,更新待发送的字节数 (这步时统计当前有多少字节需要被写出)
incrementPendingOutboundBytes(size, false);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
//TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
//如果待发送的字节数,大于通道写buf大小,则更新通道可状态
//getWriteBufferHighWaterMark() 最高不能超过64k
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
//更新通道写状态
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
// 这里通过自旋和cas操作, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
//写状态改变,则触发通道ChannelWritabilityChanged事件
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}flush()
- 添加刷新标志并设置写状态
- 遍历buffer队列,过滤ByteBuf
- 调用jdk底层api进行自旋写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 添加刷新标志并设置写状态
outboundBuffer.addFlush();
flush0();
}
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
// 调用jdk写
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
// 获取jdk channel写
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
5.总结
以上主要讲述了一个编码的过程。当中有两个地方时需要注意,也就是说Netty在写数据的过程当中有一个可写状态,当超过它的最大阈值即64k的时候。你发送的数据实际是不会写入到底层的通道中去的。它会给你触发一个ChannelWritabilityChanged事件。还有一个低水位的说法
WRITE_BUFFER_HIGH_WATER_MARK:Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。
WRITE_BUFFER_LOW_WATER_MARK:Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的isWritable()返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。
推荐做法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。然后通过队列的方式进行写入,或者自旋检测增加吞吐。