publicclassNioServerSocketChannelextendsAbstractNioMessageChannel implementsio.netty.channel.socket.ServerSocketChannel{ privatestatic ServerSocketChannel newSocket(SelectorProvider provider){ try { return provider.openServerSocketChannel(); } catch (IOException e) { thrownew ChannelException( "Failed to open a server socket.", e); } } /** * Create a new instance */ publicNioServerSocketChannel(){ // 获取jdk的ServerSocketChannel this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
/** * Create a new instance using the given {@link SelectorProvider}. */ publicNioServerSocketChannel(SelectorProvider provider){ this(newSocket(provider)); }
/** * Create a new instance using the given {@link ServerSocketChannel}. */ publicNioServerSocketChannel(ServerSocketChannel channel){ // 向上传递参数,即这个Channel它所关注的是OP_ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } }
/** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress;
protectedAbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp){ super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } }
thrownew ChannelException("Failed to enter non-blocking mode.", e); } } }
try { if (ch != null) { // 封装JDKSocketChannel,实例化NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }
privatefinal List<Object> readBuf = new ArrayList<Object>();
@Override publicvoidread(){ asserteventLoop().inEventLoop(); final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
finalint maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { try { for (;;) { // 创建NioSocketChannel,并且添加至readBuf中 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; }
// stop reading and remove op if (!config.isAutoRead()) { break; }
if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { // 以事件的方式传播出去 pipeline.fireChannelRead(readBuf.get(i)); }
if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.thisinstanceof ServerChannel); }
pipeline.fireExceptionCaught(exception); }
if (closed) { if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
privatefinalclassNioByteUnsafeextendsAbstractNioUnsafe{ private RecvByteBufAllocator.Handle allocHandle; @Override publicvoidread(){ final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); finalint maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); }
ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); // 读取数据 int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } // 传播事件 pipeline.fireChannelRead(byteBuf); byteBuf = null;
// stop reading if (!config.isAutoRead()) { break; }
if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead);
if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }