Netty组件ByteBuf(四)

目录:

  1. ByteBuf结构及重要API
  2. ByteBuf分类
  3. 内存分配器ByteBufAlloctor
  4. 总结

1. ByteBuf结构及重要API

  1. ByteBuf数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /*
    * <pre>
    * BEFORE clear()
    *
    * +-------------------+------------------+------------------+
    * | discardable bytes | readable bytes | writable bytes |
    * +-------------------+------------------+------------------+
    * | | | |
    * 0 <= readerIndex <= writerIndex <= capacity
    *
    *
    * AFTER clear()
    *
    * +---------------------------------------------------------+
    * | writable bytes (got more space) |
    * +---------------------------------------------------------+
    * | |
    * 0 = readerIndex = writerIndex <= capacity
    * </pre>
    */

    readerIndex:读数据从当前这个指针开始读

    writerIndex:写数据从当前指针开始写

    capacity:容量

    maxCapacity:当容量快到达capacity之后,maxCapacity代表最大可以扩容到多大。

0到readerIndex的数据是无效的;

readerIndex到writerIndex代表为可读的数据;

writerIndex到capacity代表这段空间是空闲的,可以进行写数据。

  1. ByteBuf重要Api方法

    1
    2
    3
    4
    5
    6
    read****() //readerIndex的指针会移动,根据所读取的数据移动
    write*****() //writerIndex的指针会移动,根据写入的数据移动
    set****() //不会移动任何指针

    mark****() //标记读写的指针,方便后续回滚到当前指针与reset配合使用
    reset*****() //回滚到之前标记的读写指针

2. ByteBuf分类

  1. Pooled与UnPooled

    根据以下的ByteBuf继承关系可以看出,Pooled与Unpooled这么两大类

    Pooled:Pooled池化内存分配每次从预先分配好的一块内存取一段连续内存封装成ByteBuf提供给应用程序

    Unpooled:这个刚好就与Pooled相反。Unpooled非池化每次进行内存分配的时候调用系统API向操作系统申请一块内存

    我们可以看下它们的源码创建方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;

    private ByteBuffer buffer;
    private ByteBuffer tmpNioBuf;
    private int capacity;
    private boolean doNotFree;

    protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    if (alloc == null) {
    throw new NullPointerException("alloc");
    }
    if (initialCapacity < 0) {
    throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
    }
    if (maxCapacity < 0) {
    throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
    }
    if (initialCapacity > maxCapacity) {
    throw new IllegalArgumentException(String.format(
    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    // 这里是直接拿JDK底层的ByteBuffer去申请堆外内存
    setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }


    /**
    * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
    */
    protected ByteBuffer allocateDirect(int initialCapacity) {
    // 这里是直接拿JDK底层的ByteBuffer去申请堆外内存
    return ByteBuffer.allocateDirect(initialCapacity);
    }
    }


    final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    // 核心对象池
    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
    @Override
    protected PooledDirectByteBuf newObject(Handle handle) {
    return new PooledDirectByteBuf(handle, 0);
    }
    };

    static PooledDirectByteBuf newInstance(int maxCapacity) {
    PooledDirectByteBuf buf = RECYCLER.get();
    buf.setRefCnt(1);
    buf.maxCapacity(maxCapacity);
    return buf;
    }

    private PooledDirectByteBuf(Recycler.Handle recyclerHandle, int maxCapacity){
    super(recyclerHandle, maxCapacity);
    }
    }
  2. Heap与Direct

    Heap:JVM内存地址,即申请的对象是需要占用JVM的内存空间大小的。如果我们申请16k或者32k等都是消耗新生代内存空间大小的。即可以由jdk自动回收内存空间。

    Direct:堆外内存,申请的内存大小是占用机器的内存空间大小。而Direct申请的内存空间,就需要代码手动的去释放内存空间。它不被JVM所在的GC垃圾回收器管理。

    1
    2
    3
    4
    5
    6
    7
    // 堆外内存申请
    ByteBuffer.allocateDirect(initialCapacity)

    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    // 堆内存申请
    this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
    }
  3. Unsafe与非Unsafe

    Unsafe:Unsafe直接获取ByteBuf在JVM内存地址调用JDK的Unsafe进行读写操作,通过ByteBuf分配内存首地址和当前指针基于内存偏移地址获取值,

    非Unsafe:非Unsafe不依赖JDK的Unsafe对象,通过内存数组和索引获取值

    1
    2
    3
    4
    5

    protected byte _getByte(int index) {
    // 系统依赖,这里封装的就是Unsafe sun.misc.Unsafe;
    return PlatformDependent.getByte(addr(index));
    }

3. 内存分配器ByteBufAllocator分析

  1. ByteBufAllocator

    实例化对应的ByteBuf,通过上面看也看到有不同种类的ByteBuf,即Pooled、Unpooled、Heap、Direct、Unsafe。那么ByteBufAllocator就定义用来创建这些不同种类的ByteBuf。

  2. AbstractByteBufAllocator
    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
    */
    protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);

    /**
    * Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
    */
    protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
  3. PooledByteBufAllocator

    PooledByteBufAllocator从预先分配好的内存取一段内存,这里的内存分配比UnpooledByteBufAllocator要复杂的多,这里在分配内存的时候又有内存规格这么概念。即tiny、small、normal、huge等不同规格的内存。后续我们在详细再来分析看看是如何分析的。这里面的逻辑还是很复杂的,到底是分配tiny、small、normal、huge。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    ByteBuf buf;
    if (heapArena != null) {
    buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
    buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
    }

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    allocate(cache, buf, reqCapacity);
    return buf;
    }

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
    int tableIdx;
    PoolSubpage<T>[] table;
    if (isTiny(normCapacity)) { // < 512
    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
    // was able to allocate out of the cache so move on
    return;
    }
    tableIdx = tinyIdx(normCapacity);
    table = tinySubpagePools;
    } else {
    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
    // was able to allocate out of the cache so move on
    return;
    }
    tableIdx = smallIdx(normCapacity);
    table = smallSubpagePools;
    }

    synchronized (this) {
    final PoolSubpage<T> head = table[tableIdx];
    final PoolSubpage<T> s = head.next;
    if (s != head) {
    assert s.doNotDestroy && s.elemSize == normCapacity;
    long handle = s.allocate();
    assert handle >= 0;
    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
    return;
    }
    }
    } else if (normCapacity <= chunkSize) {
    if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
    // was able to allocate out of the cache so move on
    return;
    }
    } else {
    // Huge allocations are never served via the cache so just call allocateHuge
    allocateHuge(buf, reqCapacity);
    return;
    }
    allocateNormal(buf, reqCapacity, normCapacity);
    }
  4. UnpooledByteBufAllocator

    可以看到对于Unpooled类型的,就相对来说比较简单了。直接通过new byte[]数组就可以了,没有pooled的那么负责。

    1
    2
    3
    4
    5
    6
    7
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
    }

4.总结

这里只是大概梳理了下Netty里面有哪些ByteBuf以及对应的分类。从Pooled、Unpooled、Heap、Direct、Unsafe、非Unsafe这些ByteBuf类型。

当中最核心的还有就是内存规格,这里就牵涉到了PoolArena、DirectArena、HeapArena。还有Chunk、Page、Subpage概念。

分享到 评论