在解析PoolArena和PoolByteBuf的过程中会发现有一个类频繁的出现:PooledByteBufAllocator。

PooledByteBufAllocator可以理解为内存分配的工厂类。

PooledByteBufAllocator

继承关系以及成员变量

PooledByteBufAllocator持有一个默认的实例DEFAULT。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
// 定义了默认值值
private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA;

private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;

private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
//
private final Runnable trimTask = new Runnable() {
@Override
public void run() {
PooledByteBufAllocator.this.trimCurrentThreadCache();
}
};
// 默认
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
// 堆内存Arena数组
private final PoolArena<byte[]>[] heapArenas;
// jvm外内存Arena数组
private final PoolArena<ByteBuffer>[] directArenas;
// tiny大小内存缓存数
private final int tinyCacheSize;
// small大小内存缓存数
private final int smallCacheSize;
// normal大小内存缓存数
private final int normalCacheSize;
// 指标
private final List<PoolArenaMetric> heapArenaMetrics;
private final List<PoolArenaMetric> directArenaMetrics;
// 线程级缓存
private final PoolThreadLocalCache threadCache;
// chunk大小
private final int chunkSize;
private final PooledByteBufAllocatorMetric metric;
}

PooledByteBufAllocator定义了大量常量,用于内存分配的默认值。这些值可以通过环境变量设置,在静态代码块中初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
static {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
try {
validateAndCalculatePageShifts(defaultPageSize);
} catch (Throwable t) {
pageSizeFallbackCause = t;
defaultPageSize = 8192;
}
DEFAULT_PAGE_SIZE = defaultPageSize;

int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
Throwable maxOrderFallbackCause = null;
try {
validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
} catch (Throwable t) {
maxOrderFallbackCause = t;
defaultMaxOrder = 11;
}
DEFAULT_MAX_ORDER = defaultMaxOrder;

// Determine reasonable default for nHeapArena and nDirectArena.
// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
final Runtime runtime = Runtime.getRuntime();

/*
* We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
* number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
* allocation and de-allocation needs to be synchronized on the PoolArena.
*
* See https://github.com/netty/netty/issues/3888.
*/
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

// cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);

// the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);

DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong(
"io.netty.allocation.cacheTrimIntervalMillis", 0);

DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);

DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);

// Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
// of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful.
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
if (pageSizeFallbackCause == null) {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
} else {
logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
}
if (maxOrderFallbackCause == null) {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
} else {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
}
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
}
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
// 内部类,继承了FastThreadLocal
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");

checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}

if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}

int pageShifts = validateAndCalculatePageShifts(pageSize);

if (nHeapArena > 0) {
// 初始化Arena数组 new PoolArena[size]
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}

if (nDirectArena > 0) {
// 初始化Arena数组 new PoolArena[size]
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}

分配堆内存newHeapBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
// 通过Arena的allocate方法分配内存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 直接构建非池化的ByteBuf
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
// 转为具有内存泄漏检测功能的Buffer
return toLeakAwareBuffer(buf);
}

分配jvm外内存newDirectBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
// 通过Arena的allocate方法分配内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 转为具有内存泄漏检测功能的Buffer
return toLeakAwareBuffer(buf);
}

内存泄漏检测toLeakAwareBuffer

io.netty.buffer.AbstractByteBufAllocator#toLeakAwareBuffer(io.netty.buffer.ByteBuf)

内存泄漏检测的原理可以参考:https://www.jianshu.com/p/7a3bd7b536e6

内存泄漏检测概述

首先弱引用和引用队列关联,正常情况弱引用引用的对象没有强引用发生gc时会将该弱引用加入到引用队列中。如果在发生gc前执行弱引用的clear()方法,则在发生gc后不会将该弱引用加入到引用队列中,也就是两者断开了关系。

利用这个原理就可以实现内存释放的管理,当分配内存是设置一个弱引用,且存储在集合中Set<DefaultResourceLeak<?>>,释放内存的时候必须执行相应方法release()也就是会执行弱引用的clear()方法,这样内存正常释放后不会将弱引用加入到引用队列中,比较集合和引用队列就可以发现是哪些引用发生了内存泄漏(没有正常释放),也就是同时存在集合和引用队列中的弱引用没有正常释放。

接下来解析一下原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
//获取配置的内存泄漏检测级别,根据不同的级别会返回
//不同的ByteBuf包装类,即下面的SimpleLeakAwareByteBuf
//或者AdvancedLeakAwareByteBuf
switch (ResourceLeakDetector.getLevel()) {
case SIMPLE:
//生成ResourceLeakTracker对象
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new SimpleLeakAwareByteBuf(buf, leak);
}
break;
case ADVANCED:
case PARANOID:
//生成ResourceLeakTracker对象
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}

看一下track的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final ResourceLeakTracker<T> track(T obj) {
return track0(obj);
}

@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
// 获取级别
Level level = ResourceLeakDetector.level;
if (level == Level.DISABLED) {
return null;
}
//如果内存泄漏检测级别为Level.ADVANCED或者Level.SIMPLE
//则会以一定频率进行内存泄漏报告,而不是每次`track`都进行报告。
if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak();
//返回DefaultResourceLeak,为一个自定义的弱引用
//并传入引用队列
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
return null;
}
reportLeak();
//返回DefaultResourceLeak,为一个自定义的弱引用
//并传入引用队列
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}

回到之前toLeakAwareBuffer方法转为具有内存泄漏检测功能的Buffer的步骤

buf = new SimpleLeakAwareByteBuf(buf, leak);
buf = new AdvancedLeakAwareByteBuf(buf, leak);

如果应用程序在使用ByteBuf正确调用了retain和release方法,则在引用计数器为0时,则会清除弱引用持有的实际对象,发生GC时,DefaultResourceLeak也不会被放入引用队列中。

这个是依赖弱引用的clear()方法实现这个逻辑。

AdvancedLeakAwareByteBuf是继承SimpleLeakAwareByteBuf,release方法是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public boolean release() {
if (super.release()) {
//引用计数为0,则清除弱引用
closeLeak();
return true;
}
return false;
}

@Override
public boolean release(int decrement) {
if (super.release(decrement)) {
//引用计数为0,则清除弱引用
closeLeak();
return true;
}
return false;
}

private void closeLeak() {
// Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when
// calling DefaultResourceLeak.track(...).
boolean closed = leak.close(trackedByteBuf);
assert closed;
}

leak.close -> io.netty.util.ResourceLeakDetector.DefaultResourceLeak#close(T)

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public boolean close() {
// Use the ConcurrentMap remove method, which avoids allocating an iterator.
if (allLeaks.remove(this, LeakEntry.INSTANCE)) {
//显示清空该弱引用持有的实际对象,所以该弱引用自身不会
//在GC时被加入应用队列
// Call clear so the reference is not even enqueued.
clear();
headUpdater.set(this, null);
return true;
}
return false;
}

但是如果应用程序在使用ByteBuf没有正确调用retain和release方法,则不会清除弱引用持有的实际对象,此时如果实际上已经没有强引用指向该ByteBuf,那么在发生GC时,垃圾收集器会回收该ByteBuf,而弱引用DefaultResourceLeak会被放入引用队列中。

线程级缓存PoolThreadLocalCache

PoolThreadLocalCache的功能是为PoolThreadCache实现线程级(FastThreadLocal)缓存。

继承关系及成员变量

1
2
3
4
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
// 默认true,所有线程都使用缓存
private final boolean useCacheForAllThreads;
}

构造器

1
2
3
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}

获取缓存时初始化initialValue

initialValue是FastThreadLocal类提供的扩展方法,在调用get方法时如果没有获取到对象就会调用initialValue方法获取一个初始对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 如果没有获取到则初始化一个,FastThreadLocal默认是返回null
return initialize(threadLocalMap);
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}

threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}

PoolThreadLocalCache的initialValue方法是它的核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
// 构建对象
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
// 开启定时任务
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
// 定时任务最终执行PoolThreadCache.trim()方法释放内存
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// 没有缓存
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

leastUsedArena

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}

PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
// 取更少线程使用的Arena
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}

return minArena;
}

PoolThreadCache

继承关系及成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final class PoolThreadCache {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
// Arena缓存
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

// Hold the caches for the different size classes, which are tiny, small and normal.
// 不同大小的缓存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private final AtomicBoolean freed = new AtomicBoolean();

private int allocations;
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 构建缓存
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);

directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);

heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}

// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}

构建MemoryRegionCache的数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}

内存分配allocateTiny

1
2
3
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}

先找到缓存中的MemoryRegionCache再执行MemoryRegionCache#allocate

1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// 获取的前提是队列中存有
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}

在buf内存释放的时候会将其加入缓存,队列中添加的逻辑:

  1. PooledByteBuf#deallocate
  2. PoolArena#free
  3. PoolThreadCache#add
  4. MemoryRegionCache.add

内存释放

内存释放最终调用PoolThreadCache.MemoryRegionCache#free(int, boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private int free(int max, boolean finalizer) {
int numFreed = 0;
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry, finalizer);
} else {
// all cleared
return numFreed;
}
}
return numFreed;
}
private void freeEntry(Entry entry, boolean finalizer) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
ByteBuffer nioBuffer = entry.nioBuffer;

if (!finalizer) {
// recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
// a finalizer.
// 对象池
entry.recycle();
}
// 释放chunk
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer);
}