Netty组件Channel & Pipeline(三)

目录:

  1. Channel创建(服务端Channel&客户端Channel)
  2. Unsafe创建
  3. Pipeline创建
  4. 读数据传播事件由上而下逻辑(Inbound事件传播)
  5. 写数据传播事件由下而上逻辑(Outbound事件传播)
  6. 异常事件传播
  7. 客户端Channel监听读事件处理逻辑
  8. 总结

1. Channel创建

Channel的创建分为服务端与客户端的Channel即原生jdk对应的ServerSocketChannel、SocketChannel,那么在Netty就分为NioServerSocketChannel与NioSocketChannel,其实在前面两篇的文章记录中都已经有提到了找而两个Channel,这里我将详细记录备注

NioServerSocketChannel创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 实例化并且注册绑定selector
final ChannelFuture initAndRegister() {
// 通过反射的方式实例化NioServerSocketChannel
// channelFactory对应的是BootstrapChannelFactory,是在服务端程序入口注册NioServerSocketChannel的时候创建的factory
final Channel channel = channelFactory().newChannel();
try {
// 初始化channel,这里主要是关联相关属性以及绑定ServerBootstrapAcceptor,用于处理客户端的新连接
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
// selector注册事件
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
//class对应的是NioServerSocketChannel
private final Class<? extends T> clazz;

BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
// NioServerSocketChannel实例化
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

/**
* Create a new instance
*/
public NioServerSocketChannel() {
// 获取jdk的ServerSocketChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
// 向上传递参数,即这个Channel它所关注的是OP_ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}

// 注意这里 服务端的NioServerSocketChannel对应的是MessageChannel
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}

public abstract class AbstractNioChannel extends AbstractChannel {

private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);

private final SelectableChannel ch;
protected final int readInterestOp;
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
private volatile boolean readPending;

/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);

static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();

static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}

private MessageSizeEstimator.Handle estimatorHandle;

private final Channel parent;
private final long hashCode = ThreadLocalRandom.current().nextLong();
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);

private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;

/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;

/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 底层操作类,用于注册绑定channel selector
unsafe = newUnsafe();
// 事件传播初始pipeline
pipeline = new DefaultChannelPipeline(this);
}
}

NioSocketServer创建

此处是NioServerSocketChannel监听到新的连接的时候就会执行到这个方法,对应的流程可以看NioEventLoop的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 此处是NioServerSocketChannel监听到新的连接的时候就会执行到这个方法
protected int doReadMessages(List<Object> buf) throws Exception {
// 接收客户端的新连接,并且生成对应的channel
SocketChannel ch = javaChannel().accept();

try {
if (ch != null) {
// 封装JDKSocketChannel,实例化NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}

// 注意这里,处理客户端的读写Channel它们对应的是ByteChannel,而处理连接的Channel对应的是MessageChannel
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private Runnable flushTask;

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// NioSocketChannel需要关注的是OP_READ事件,这里不管是NioServerSocketChannel还是NioSocketChannel它们只是绑定了这个事件,实际上selector还是没有注册好,仅仅只是初始化这些属性值。
super(parent, ch, SelectionKey.OP_READ);
}
}

到这里NioServerSocketChannel,NioSocketChannel创建的代码就基本上完成了。它们当中有几个关键的属性值。unsafe针对channel的具体操作类。

NioServerSocketChannel对应的是NioMessageUnsafe

NioSocketChannel对应的是NioByteUnsafe

1
2
3
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
protected final int readInterestOp;

2. Unsafe创建

可以看到在创建服务端Channel与客户端连接Channel都会创建对应的Unsafe类,这个是由对应的Channel子类实现的。可以看到以下的两个Unsafe,处理思路都是一样,先把连接或者读取的数据处理好,然后再已事件传播出去给到对对应的Handler处理。

1
protected abstract AbstractUnsafe newUnsafe();
1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//处理连接的具体操作类,doReadMessages就会对应的创建NioSocketChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}

final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
// 创建NioSocketChannel,并且添加至readBuf中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// stop reading and remove op
if (!config.isAutoRead()) {
break;
}

if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
// 以事件的方式传播出去
pipeline.fireChannelRead(readBuf.get(i));
}

readBuf.clear();
pipeline.fireChannelReadComplete();

if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}

pipeline.fireExceptionCaught(exception);
}

if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override
public void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
// 读取数据
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
// 传播事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}

totalReadAmount += localReadAmount;

// stop reading
if (!config.isAutoRead()) {
break;
}

if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);

pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}

3.Pipeline创建

每个Channel都会对应一个pipeline链路,都是在实例化Channel的时候一起把这个Pipeline链路给实例化了。可以看到以下实例化DefaultChannelPipeline,每个都会对应有head与tail节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}

final class DefaultChannelPipeline implements ChannelPipeline {

final AbstractChannel channel;

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

private final Map<String, AbstractChannelHandlerContext> name2ctx =
new HashMap<String, AbstractChannelHandlerContext>(4);

final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();

public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

}

4.Pipeline读数据传播事件由上而下逻辑(Inbound事件传播)

怎么理解这个Pipeline的Inbound时间传播呢,即相当于往pipeline里面添加观察者,然后它是自上而下去通知整条链路上的观察者。相当于就是从head开始通知,知道tail结束。可以看到以下代码添加了A、B、C三个InboundHandler,那么他就是从head-》A》B》C》tail。先看下结果,从客户端加一个连接进来,写入字节然后看服务端的打印结果。通过telnet的方式执行以上这个步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public static void main(String args[]) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.childOption(ChannelOption.SO_BACKLOG, 100)
//设置这样做好的好处就是禁用nagle算法 nagle是尽可能的把小包合并发送一个大包,最大延迟500ms
.childOption(ChannelOption.TCP_NODELAY, true)
// .handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("init Channel");
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new AInHander());
channelPipeline.addLast(new BInHandler());
channelPipeline.addLast(new CInHandler());
}
});

ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();

}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

public class AInHander extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("this is AInbound handler " + msg);
ctx.fireChannelRead(msg);
}
}

public class BInHandler extends ChannelInboundHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("this is Binbound Handler " + msg);
ctx.fireChannelRead(msg);
}
}

public class CInHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("this is Cinbound handler " + msg);
}
}

可以看到,有连接接入进来的时候,就会毁掉这个initChannel方法,把这些观察者即Handler假如到监听列表。即有监听到读,那么则会从A开始传播直到C,顺序就是谁先假如则哪个handler就会优先通知到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);

AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}

return this;
}

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);

AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;

name2ctx.put(name, newCtx);

callHandlerAdded(newCtx);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void read() {
final ChannelPipeline pipeline = pipeline();
try {
do {
// 简化版的读取数据流程

byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
// 读取数据
int localReadAmount = doReadBytes(byteBuf);
// 传播数据,这里的pipeline就是在实例化channel的时候创建的DefaultChannelPipeline
pipeline.fireChannelRead(byteBuf);
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
}
}

public ChannelPipeline fireChannelRead(Object msg) {
// 通过pipeline到这里后,就开始进行链路传播了
head.fireChannelRead(msg);
return this;
}


public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
// 比如首次是head到这里,然后到head的next
// 然后在往下记录找,这个就是为什么上面打印的是A->B->C
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

分析下源码到底是如何执行的,这里相当于维护了一个链表数据结构的handler列表,从head->*->tail。根据上面的源码分析之后,首先添加的时候是一个链表,然后消费的时候也是从头开始一路往下消费传播。最终到你写的Handler,然后不再往下传播。也可以看下TailContext这个类,它的很多方法都是空的额,相当于到这里就截止了。不再往下传播了。

5.Pipeline写数据传播事件由下而上逻辑(Outbound事件传播)

在看下Outbound的事件流是怎么样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public static void main(String args[]) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.childOption(ChannelOption.SO_BACKLOG, 100)
//设置这样做好的好处就是禁用nagle算法 nagle是尽可能的把小包合并发送一个大包,最大延迟500ms
.childOption(ChannelOption.TCP_NODELAY, true)
// .handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("init Channel");
channelPipeline.addLast(new AOutHandler());
channelPipeline.addLast(new BOutHandler());
channelPipeline.addLast(new COutHandler());
}
});

ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();

}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public class AOutHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("AOutHandler: " + msg);
ctx.write(msg);
}

@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
// 这里在添加完handler之后回调到这里,往channel里面写个数据来验证它是怎么传播的
ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
}, 4, TimeUnit.SECONDS);
}
}

public class BOutHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("BOutHandler: " + msg);
ctx.write(msg);
}
}

public class COutHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("COutHandler: " + msg);
ctx.write(msg);
}
}

可以看到,Outbound与Inbound是相反的,添加是一样的链表。猜测下也就是消费的时候是反过来的。我们看看源码它是如何消费的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}

// 看到这里其实就明白了,它是从尾部开始传播的
public ChannelFuture write(Object msg) {
return tail.write(msg);
}

private void write(Object msg, boolean flush, ChannelPromise promise) {

AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}

到这里就基本上完成了pipeline的Inbound与Outbound的大概运行流程及思路了。

6.异常事件传播

可以试下在Inbound与Outbound当中抛出一个业务异常,可以看到它都是往上继续抛的,即如果在InboundA当中抛出一个异常,那么InboundB照样会接收到这个异常,一直到到InboundC。

Outbound那么就是相反。所以在最外层需要去捕获这些异常。

7.客户端Channel监听读事件处理逻辑
客户端的读跟咱们刚才分析的Inbound传播思路是一样的,最终到Handler去处理对应的业务。

8.总结

到这里我们分析的一个大致的流程,然后到NioEventLoop、Channel、Pipeline,基本上就能弄明白Netty的一个大致数据流转过程了。当然了,这里仅仅只是一个大致的流程,里面其实还有很多细节需要去深挖的。

分享到 评论