目录:
- ByteBuf结构及重要API
- ByteBuf分类
- 内存分配器ByteBufAlloctor
- 总结
1. ByteBuf结构及重要API
ByteBuf数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/*
* <pre>
* BEFORE clear()
*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*
*
* AFTER clear()
*
* +---------------------------------------------------------+
* | writable bytes (got more space) |
* +---------------------------------------------------------+
* | |
* 0 = readerIndex = writerIndex <= capacity
* </pre>
*/readerIndex:读数据从当前这个指针开始读
writerIndex:写数据从当前指针开始写
capacity:容量
maxCapacity:当容量快到达capacity之后,maxCapacity代表最大可以扩容到多大。
0到readerIndex的数据是无效的;
readerIndex到writerIndex代表为可读的数据;
writerIndex到capacity代表这段空间是空闲的,可以进行写数据。
ByteBuf重要Api方法
1
2
3
4
5
6read****() //readerIndex的指针会移动,根据所读取的数据移动
write*****() //writerIndex的指针会移动,根据写入的数据移动
set****() //不会移动任何指针
mark****() //标记读写的指针,方便后续回滚到当前指针与reset配合使用
reset*****() //回滚到之前标记的读写指针
2. ByteBuf分类
Pooled与UnPooled
根据以下的ByteBuf继承关系可以看出,Pooled与Unpooled这么两大类
Pooled:Pooled池化内存分配每次从预先分配好的一块内存取一段连续内存封装成ByteBuf提供给应用程序
Unpooled:这个刚好就与Pooled相反。Unpooled非池化每次进行内存分配的时候调用系统API向操作系统申请一块内存
我们可以看下它们的源码创建方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
private ByteBuffer buffer;
private ByteBuffer tmpNioBuf;
private int capacity;
private boolean doNotFree;
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// 这里是直接拿JDK底层的ByteBuffer去申请堆外内存
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
/**
* Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
*/
protected ByteBuffer allocateDirect(int initialCapacity) {
// 这里是直接拿JDK底层的ByteBuffer去申请堆外内存
return ByteBuffer.allocateDirect(initialCapacity);
}
}
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
// 核心对象池
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
protected PooledDirectByteBuf newObject(Handle handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
buf.maxCapacity(maxCapacity);
return buf;
}
private PooledDirectByteBuf(Recycler.Handle recyclerHandle, int maxCapacity){
super(recyclerHandle, maxCapacity);
}
}Heap与Direct
Heap:JVM内存地址,即申请的对象是需要占用JVM的内存空间大小的。如果我们申请16k或者32k等都是消耗新生代内存空间大小的。即可以由jdk自动回收内存空间。
Direct:堆外内存,申请的内存大小是占用机器的内存空间大小。而Direct申请的内存空间,就需要代码手动的去释放内存空间。它不被JVM所在的GC垃圾回收器管理。
1
2
3
4
5
6
7// 堆外内存申请
ByteBuffer.allocateDirect(initialCapacity)
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 堆内存申请
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}Unsafe与非Unsafe
Unsafe:Unsafe直接获取ByteBuf在JVM内存地址调用JDK的Unsafe进行读写操作,通过ByteBuf分配内存首地址和当前指针基于内存偏移地址获取值,
非Unsafe:非Unsafe不依赖JDK的Unsafe对象,通过内存数组和索引获取值
1
2
3
4
5
protected byte _getByte(int index) {
// 系统依赖,这里封装的就是Unsafe sun.misc.Unsafe;
return PlatformDependent.getByte(addr(index));
}
3. 内存分配器ByteBufAllocator分析
ByteBufAllocator
实例化对应的ByteBuf,通过上面看也看到有不同种类的ByteBuf,即Pooled、Unpooled、Heap、Direct、Unsafe。那么ByteBufAllocator就定义用来创建这些不同种类的ByteBuf。
AbstractByteBufAllocator
1
2
3
4
5
6
7
8
9/**
* Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
*/
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
/**
* Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
*/
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);PooledByteBufAllocator
PooledByteBufAllocator从预先分配好的内存取一段内存,这里的内存分配比UnpooledByteBufAllocator要复杂的多,这里在分配内存的时候又有内存规格这么概念。即tiny、small、normal、huge等不同规格的内存。后续我们在详细再来分析看看是如何分析的。这里面的逻辑还是很复杂的,到底是分配tiny、small、normal、huge。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(normCapacity)) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
synchronized (this) {
final PoolSubpage<T> head = table[tableIdx];
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
return;
}
}
} else if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
return;
}
allocateNormal(buf, reqCapacity, normCapacity);
}UnpooledByteBufAllocator
可以看到对于Unpooled类型的,就相对来说比较简单了。直接通过new byte[]数组就可以了,没有pooled的那么负责。
1
2
3
4
5
6
7protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}
4.总结
这里只是大概梳理了下Netty里面有哪些ByteBuf以及对应的分类。从Pooled、Unpooled、Heap、Direct、Unsafe、非Unsafe这些ByteBuf类型。
当中最核心的还有就是内存规格,这里就牵涉到了PoolArena、DirectArena、HeapArena。还有Chunk、Page、Subpage概念。