protectedMultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args){ if (nThreads <= 0) { thrownew IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 线程名称工厂类 if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } // 如果使用以上逻辑 // bossGroup的线程池长度则为1 // workerGroup的线程池长度则为cpu核数*2 children = new SingleThreadEventExecutor[nThreads]; // 如果需要多个NioEventLoop,此处做了优化处理,就是在选择NioEventLoop服务的时候做了计算优化,选择策略优化 if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); }
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // boss情况下,就创建一个NioEventLoop // worker请况下,也就创建了cpu核数*2个NioEventLoop个数 children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownew IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
privatestaticvoiddoBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise){
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // 通过channel获取到NioEventLoop,然后执行execute方法区执行这个线程。此处execute也是重写了jdk的线程池方法的 channel.eventLoop().execute(new Runnable() { @Override publicvoidrun(){ if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
privatestaticvoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ // 通过获取到的Unsafe类, // NioServerSocketChannel 绑定的是 NioMessageUnsafe 新连接走的是Unsafe // NioSocketChannel 绑定的是 NioByteUnsafe 读写走的是这个unsafe final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 此处新连接、读写自动适配对应的Unsafe,即NioMessageUnsafe与NioByteUnsafe unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
privatefinal List<Object> readBuf = new ArrayList<Object>();
@Override publicvoidread(){ asserteventLoop().inEventLoop(); final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
finalint maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { try { for (;;) { // 此处它是可以处理多个新的连接,即最多处理16个 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; }
// stop reading and remove op if (!config.isAutoRead()) { break; } // maxMessagesPerRead = 16; if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { // 传播事件,需要再次去注册这个channel pipeline.fireChannelRead(readBuf.get(i)); }
if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.thisinstanceof ServerChannel); }
pipeline.fireExceptionCaught(exception); }
if (closed) { if (isOpen()) { close(voidPromise()); } } } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
try { if (ch != null) { // NioSocketChannel绑定jdk的SocketChannel buf.add(new NioSocketChannel(this, ch)); return1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }