如果这篇博客帮助到你,可以请我喝一杯咖啡~
CC BY 4.0 (除特别声明或转载文章外)
一 定长的内存池
1. 原理
定长内存池只能分配固定长度的内存,因此也叫对象池。
下面是定长内存池从第一次申请内存,到释放的原理图:
上面实现的定长内存池有以下缺点:
- 一次只能申请 1 个对象大小的空间,如果想申请对象数组,则需要多次申请。
- 如果 memory 指向的内存空间不够一次分配,则会丢弃该内存,这会造成浪费。
- 这种对象池的通病:无法回收空闲内存,由于每一块内存被打乱挂在
free_list
上,因此没法做到全回收。
2. 源码
下面是源代码:
#include <iostream>
template<class Tp>
class FixedSizeMemoryPool
{
enum { DEFAULT_MEMORY_POOL_SIZE = 1024,
ALIGNMENT = 8};
public:
FixedSizeMemoryPool(size_t memory_pool_size = DEFAULT_MEMORY_POOL_SIZE )
: _memory_pool_start( nullptr )
, _memory_pool_end( nullptr )
, _free_list( nullptr )
{ realloc_memory_pool(); }
/*
* size: 返回 Tp 对象的个数
*/
Tp* New()
{
if( _free_list )
{
Tp* obj = (Tp*)_free_list;
_free_list = *(void**)_free_list;
return obj;
}
size_t obj_size = round_up(sizeof(Tp));
if( _memory_pool_end - _memory_pool_start < obj_size )
if( !realloc_memory_pool() )
return nullptr;
Tp* obj = (Tp*)_memory_pool_start;
_memory_pool_start += obj_size;
// 定位 new 表达式,用来调用 Tp 类的构造函数,初始化 obj 指向的对象
new(obj)Tp;
return obj;
}
void Delete(Tp* obj)
{
// 调用 obj 的析构函数
obj->~Tp();
// 将 obj 的前 8 个字节作为 next,指向链表后面的节点
*(void**)obj = _free_list;
_free_list = (void*)obj;
}
private:
inline size_t round_up(size_t size)
{ return (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); }
bool realloc_memory_pool()
{
static size_t realloc_time = 1;
realloc_time <<= 1;
size_t new_size = realloc_time * DEFAULT_MEMORY_POOL_SIZE;
try{
_memory_pool_start = new char[new_size];
_memory_pool_end = _memory_pool_start + new_size;
}catch(const std::bad_alloc& e){
std::cout << "realloc_memory_pool: new error! " << e.what() << "\n";
// 缩小申请内存的大小,重新尝试,直到把 realloc_time 减到 2
if( realloc_time > 2 )
{
realloc_time /= 4;
realloc_memory_pool();
}
else
return false;
}
return true;
}
private:
char* _memory_pool_start;
char* _memory_pool_end;
void* _free_list;
};
下面是测试代码:
void test2()
{
const size_t Rounds = 3;
// 每轮申请释放多少次
const size_t N = 1000000;
size_t begin1 = clock();
std::vector<ListNode*> v1;
v1.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new ListNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
FixedSizeMemoryPool<ListNode> TNPool;
size_t begin2 = clock();
std::vector<ListNode*> v2;
v2.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < 100000; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
std::cout <<"new cost time:" <<end1 - begin1 << std::endl;
std::cout <<"object pool cost time:" <<end2 - begin2 << std::endl;
}
输出为:
new cost time:417
object pool cost time:113
可见 Object Pool
比直接调用 new/delete
要快一些。
3. 实现细节
1. 空闲链表如何连起来?
链表每个节点的带大小至少是一个指针,这样才能存储下一个节点的地址。
对于单位大小的内存,只需要取前一个指针大小,将所有节点连接起来即可。
下面是获取链表上的节点的方法:
Tp* obj = (Tp*)_free_list;
_free_list = *(void**)_free_list;
接下来是将释放的内存内存加入空闲链表的方法:
*(void**)obj = _free_list;
_free_list = (void*)obj;
_free_list = *(void**)_free_list;
中 *(void**)
的含义是什么?
_freelist
指向第一块内存的起始地址。空闲链表中下一块内存的地址 以 void*
的形式存放在第一块内存的前 4/8
字节处,因此,*(void**)
就是取出存储在该内存块中的的地址信息。
还有一种更为直观的实现方法,定义一个带 next
的结构体:
struct FreeListNode
{
FreeListNode* _next;
};
template<class Tp>
class FixedSizeMemoryPool
{
enum { DEFAULT_MEMORY_POOL_SIZE = 64,
ALIGNMENT = 8};
public:
FixedSizeMemoryPool(size_t memory_pool_size = DEFAULT_MEMORY_POOL_SIZE )
: _memory_pool_start( nullptr )
, _memory_pool_end( nullptr )
, _free_list( new FreeListNode() )
{ realloc_memory_pool(); _free_list->_next = nullptr; }
/*
* size: 返回 Tp 对象的个数
*/
Tp* New()
{
if( _free_list->_next )
{
Tp* obj = (Tp*)_free_list->_next;
_free_list->_next = _free_list->_next->_next;
return obj;
}
...
// 定位 new 表达式,用来调用 Tp 类的构造函数,初始化 obj 指向的对象
new(obj)Tp;
return obj;
}
void Delete(Tp* obj)
{
// 调用 obj 的析构函数
obj->~Tp();
// 将 obj 的前 8 个字节作为 next,指向链表后面的节点
FreeListNode* free_node = (FreeListNode*)obj;
free_node->_next = _free_list->_next;
_free_list->_next = free_node;
}
private:
inline size_t round_up(size_t size)
{ return (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); }
private:
char* _memory_pool_start;
char* _memory_pool_end;
FreeListNode* _free_list;
};
2. 扩容的细节
static size_t realloc_time = 1;
realloc_time <<= 1;
每次扩容,将默认的扩容大小增大一倍。符合计算机对内存的使用规律。
try{
_memory_pool_start = new char[new_size];
_memory_pool_end = _memory_pool_start + new_size;
}catch(const std::bad_alloc& e){
std::cout << "realloc_memory_pool: new error! " << e.what() << "\n";
// 缩小申请内存的大小,重新尝试,直到把 realloc_time 减到 2
if( realloc_time > 2 )
{
realloc_time /= 4;
return realloc_memory_pool();
}
else
return false;
}
如果某一次内存申请失败,并且本次不是第一次扩容,那么缩小扩容的大小,重新尝试。
二 Thread Cache
自由链表
1. free list
桶划分
整体控制在最多 10%
左右的内碎片浪费
数据范围 | 字节对齐方式 | freelist 范围 | 链表个数 |
---|---|---|---|
[1, 128] | 8 字节对齐 | [0, 16) | 16 |
(128, 1024] | 16 字节对齐 | [16, 72) | 56 |
(1024, 8 * 1024] | 128 字节对齐 | [72, 128) | 56 |
(8 * 1024, 64 * 1024] | 1024 字节对齐 | [128, 184) | 56 |
(64 * 1024, 256 * 1024] | 8 * 1024 字节对齐 | [184, 208) | 24 |
2.round up
与 index
RoundUp
将大小调整为 alignment
的倍数
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
return _RoundUp(size, 1 << PAGE_SHIFT);
}
}
Index
计算 size
大小的内存由第几个 freelist
管理
static inline size_t _Index(size_t bytes, size_t align_shift)
{
// (bytes + (1 << align_shift) - 1) >> align_shift) 计算 bytes 是 (1 << align_shift)
// 的多少倍,并将结果向上取整,所以最后要将结果减去 1
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
// 8 * 1024 + 1, 9 * 1024 -> [1, 1024]
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
3. free list
实现
static const size_t NFREELIST = 208;
static void*& NextObj(void* obj)
{ return *(void**)obj; }
// 管理切分好的小对象的自由链表
class FreeList
{
public:
void Push(void* obj);
void PushRange(void* start, void* end, size_t n);
void PopRange(void*& start, void*& end, size_t n);
void* Pop();
bool Empty()
{ return _freeList == nullptr; }
size_t& MaxSize()
{ return _maxSize; }
size_t Size()
{ return _size; }
private:
void* _freeList = nullptr;
size_t _maxSize = 1; // 慢启动相关
size_t _size = 0; // free list 管理的元素个数
};
4. ThreadCache
结构
class ThreadCache
{
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放对象时,链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS thread local storage
static thread_local ThreadCache* pTLSThreadCache = nullptr;
三 TLS
Thread Local Storage
thread_local
声明的变量在线程启动时构造,并在线程退出时析构。
C++ 11 的隐式 thread local
:
#include "ObjectMemoryPool/ObjectMemoryPool.hpp"
#include <thread>
#include <unistd.h>
thread_local std::thread::id thread_id;
void show_x()
{
thread_id = std::this_thread::get_id();
while(1)
{
std::cout << thread_id << std::endl;
sleep(1);
}
}
int main()
{
std::thread t1(show_x);
std::thread t2(show_x);
t1.join();
t2.join();
return 0;
}
四 Central Cache
central cache
也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache
是一样的。不同的是他的每个哈希桶位置挂是 SpanList
链表结构,不过每个映射桶下面的 span
中的大内存块被按映射关系切成了一个个小内存块对象挂在 span
的自由链表中。
申请内存:
- 当
thread cache
中没有内存时,就会批量向central cache
申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp
协议拥塞控制的慢开始算法;central cache
也有一个哈希映射的spanlist
,spanlist
中挂着span
,从span
中取出对象给thread cache
,这个过程是需要加锁的,这里使用的是一个桶锁,尽可能提高效率。 central cache
映射的spanlist
中所有span
的都没有内存以后,则需要向page cache
申请一个新的span
对象,拿到span
以后将span
管理的内存按大小切好作为自由链表链接到一起。然后从span
中取对象给thread cache
。central cache
的中挂的span
中use_count
记录分配了多少个对象出去,分配一个对象给thread cache
,就++use_count
。
释放内存:
- 当
thread_cache
过长或者线程销毁,则会将内存释放回central cache
中的,释放回来时use_count
。当use_count
减到 0 时则表示所有对象都回到了span
,则将span
释放回page cache
,page cache
中会对前后相邻的空闲页进行合并。
1. 根据对象大小计算返回对象个数
size
越小,NumMoveSize
返回的值越大,反之越小。
static const size_t MAX_BYTES = 256 * 1024;
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
“慢启动” 算法:
MaxSize()
返回 _freeList
成员 _maxSize
,开始大小为 1,每次向 Central Cache
申请内存时,如果 MaxSize()
比 NumMoveSize()
小,则增长 1。
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
每个 free list
都有一个 _maxSize
。
2. span & spanList
结构
typedef unsigned long long PAGE_ID;
// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
// 管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; // 大块内存起始页的页号
size_t _n = 0; // 页的数量
Span* _next = nullptr; // 双向链表的结构
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的小对象的大小
size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
void* _freeList = nullptr; // 切好的小块内存的自由链表
bool _isUse = false; // 是否在被使用
};
// 带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin();
Span* End();
bool Empty();
void PushFront(Span* span);
Span* PopFront();
void Insert(Span* pos, Span* newSpan);
void Erase(Span* pos);
private:
Span* _head;
public:
std::mutex _mtx; // 桶锁
};
page_id
为何用?
我们以看一下 page_id 是如何计算的:
这是直接从堆上申请大页内存时,page id 的计算方式:
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
这是将一个 page 切分为两个小块 page 时的操作:
// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
page id
不一定从 0 开始,其作用在分配内存时显得有些繁琐,但如果想要将 空闲的 page
合并起来,那么这个页号就起了十分重要的作用。
比如 page1
的页号为 2912,页数为 10,page2
的页号为 2922,页数为 22,那么这两个页就可以合并为一个大页,页号为 2912 页数为 32
3. 从 span 中获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
// 从span中获取batchNum个对象
// 如果不够batchNum个,有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while ( i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
_spanLists[index]._mtx.unlock();
return actualNum;
}
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
4. 获取一个 span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
// 走到这里说没有空闲span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();
// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大块内存切成自由链表链接起来
// 1、先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail); // tail = start;
start += size;
}
NextObj(tail) = nullptr;
list._mtx.lock();
list.PushFront(span);
return span;
}
调用 PageCache
的 NewSpan
前,桶锁释放与否?
调用 NewSpan
说明 Central Cache
中对应桶内的 span
中的 freelist
都为空,那么其他线程再来要,一样会失败,那么不解锁也是合理的?
Central Cache
除了分配内存,还可以 回收内存!如果不把桶锁释放,那么回收内存的操作就会被阻塞!
5. CentalCache
结构
// 单例模式
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中心缓存获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
五 Page Cache
申请内存:
1)当 central cache
向 page cache
申请内存时,page cache
先检查对应位置有没有 span
,如果没有 则向更大页寻找一个 span
,如果找到则分裂成两个。
比如:申请的是 4 页 page
,4 页 page
后面没有挂 span
,则向后面寻找更大的 span
,假设在 10 页 page
位置找到一个 span
,则将 10 页 page
分裂为一个 4 页 page
和一个 6 页 page
。
2)如果找到 _spanList[128]
都没有合适的 span
,则向系统使用 mmap
、brk
或者是 VirtualAlloc
等方式申请 128 页 page span
挂在自由链表中,再重复 1 中的过程。
需要注意的是 central cache
和 page cache
的核心结构都是 spanlist
的哈希桶,但是他们是有本质区别的,central cache
中哈希桶,是按跟 thread cache
一样的大小对齐关系映射的,他的 spanlist
中挂的 span
中的内存都被按映射关系切好链接成小块内存的自由链表。而 page cache
中的 spanlist
则是按下标桶号映射的,也就是说第 i
号桶中挂的 span
都是 i
页内存。
1. 根据对象大小计算获取的页数
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; // 8KB
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num*size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
2. NewSpan
实现
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
return kSpan;
}
// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
for (size_t i = k+1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
3. PageCache
结构
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
// 获取一个K页的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
六 释放逻辑
1. Thread Cache
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,对象插入进入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
// 当链表长度大于一次批量申请的内存时就开始还一段list给 central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
思考:归还方式合理性
思考:当 freelist
的持有空闲对象数大于一次批量申请的对象数的归还方式是否合理?
2. Central Cache
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 说明span的切分出去的所有小块内存都回来了
// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
// 释放span给page cache时,使用page cache的锁就可以了
// 这时把桶锁解掉
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
通过对象地址计算得到页号,最终从 idSpanMap
中得到 Span
对象
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
Page Cache
的 NewSpan
在获取返还新的 Span
给 Central Cache
之前,将 Span
的所有 page
都建立与该 Span
的映射。
3. Page Cache
1)span.isUse
对于 页号为 2000,页长为 5 的 span 来说,我们需要向前找页号为 1999 所在的 span
是否空闲,向后找页号为 2005 所在的 span 是否空闲。
并且该空闲的页必须在 page cache
的管辖范围,不能是已经分配给了 central cache
!如何知道这一点呢?通过判断 useCount
是否为 0 是不可行的!原因是:
可能 GetOneSpan
刚调用完 NewSpan
获取到了一个新的 span
,此时该 Span
的 useCount
为 0,如果合并时又将该页取走合并,就会引发线程安全问题!所以我们需要在 span
中引入新的成员 isUse
,用来标记该 span
是否已经分配给了 central cache
2)再谈 NewSpan
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申请
if (k > NPAGES-1)
{
...
_idSpanMap.set(span->_pageId, span);
return span;
}
if (!_spanLists[k].Empty())
{
...
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
for (size_t i = k+1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = _spanPool.New();
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
// 进行的合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
...
return NewSpan(k);
}
3)ReleaseSpanToPageCache
合并时,不断向前向后合并。合并的过程中,可能合并了被切分后的 span
(比如原本 128 page
切分为 2 page
和 126 page
,2 page
分给了 central cache
,126 page
在 page cache
中,此时可能这 126 page
会被合并),这没有关系,因为 page cache
不会将内存归还给系统了!
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr) break;
// 前面相邻页的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true) break;
// 合并出超过128页的span没办法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES-1) break;
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr) break;
Span* nextSpan = ret;
if (nextSpan->_isUse == true) break;
if (nextSpan->_n + span->_n > NPAGES-1) break;
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
七 大块内存分配
Thread Cache
和 Central Cache
管理的最大内存为 256 KB,也就是 32 个 page
如果申请的内存在 32 page
到 128 page
之间,也就是 256 KB 到 1 M 之间,可以向 Page Cache
申请内存
如果申请的内存大于 128 page
,也就是大于 1 M,那么 Page Cache
直接向系统申请内存。
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申请
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
...
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
...
}
八 避免对 malloc 的依赖
由于 本项目的目标在于提供一个多线程环境下比 malloc 跟高效的内存管理机制,所以本项目中应该解除对 malloc 的依赖。所以,所有的 new/delete
都应该被替换。
项目中最常向堆申请的就是 Span
对象,其次是 ThreadCache
对象
我们可以让 FixedSizeMemoryPool
管理这两种定长的对象的申请与释放。
FixedSizeMemoryPool
直接向系统申请内存:
bool realloc_memory_pool()
{
...
try{
_memory_pool_start = SystemAlloc(new_size);
...
} catch(const std::bad_alloc& e) {
...
}
return true;
}
修改申请 span
时对 new/delete
的依赖:
Span* PageCache::NewSpan(size_t k)
{
// 大于128 page的直接向堆申请
if (k > NPAGES-1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
...
}
// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
for (size_t i = k+1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
...
}
}
...
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接还给堆
if (span->_n > NPAGES-1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
...
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
...
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
...
}
修改 获取 TLS Thread Cache
时对 new/delete
的依赖:
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
九 Benchmark
程序
#include"ConcurrentAlloc.h"
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
int main()
{
size_t n = 100000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
十 基数数优化
1)单层基数树
调用方式:TCMalloc_PageMap1<32 - PAGE_SHIFT>
BITS
为 array_
数组的大小。对于 32 位程序,由于页的大小为 PAGE_SHIFT
也就是 13,所以最多可以有 2^19
(32 - 13 = 19
) 个页!
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
// array_ 是一个 <page_id, span*> 的映射,所以是一个二级指针
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
// array_ 的大小为 一个指针大小 * 最多可以有的页数
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
// 页数一定小于等于 2^BITS
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
对于 32 位程序,页数最多为 2^19
个,约为 50 万,而指针的大小为 4 字节,所以 array_
的大小约为 200 万字节。
可是对于 64 位程序呢?单层的基数树就不够用了!所以需要多层的基数树。
2)双层基数树
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS; // 14
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS; // 第一层在高 5 位
const Number i2 = k & (LEAF_LENGTH - 1); // 将高 5 位除去,留下低 14 位
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
3)三层基数树
// Three-level radix tree
template <int BITS> // 64 - 13 = 51
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up 53/3=17
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; // 51-34=17
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
4)基数树优化原理
- 基数树比
std::map
或std::unordered_map
的查找更加高效。即便是三层基数树,也只需要三次查找! - 其次,基数树的优化可以让查找
page id
到span
的映射无锁!
第二点的解释如下,下面是整个项目中需要查找 idSpanMap
的所有场景:
修改只存在于两个函数中:
Span* PageCache::NewSpan(size_t k); void PageCache::ReleaseSpanToPageCache(Span* span)
这两个函数首先不会同时被调用,因为调用前都会把整个 PageCache 锁住
查找:
Span* PageCache::MapObjectToSpan(void* obj); void PageCache::ReleaseSpanToPageCache(Span* span); static void ConcurrentFree(void* ptr);
比较关键的问题在于,当一个线程调用
ConcurrentFree
释放一个对象时,如果此时对基数树进行查找,而此时另一个线程有无可能正好在对该页所在的结构进行修改?绝无可能!
ConcurrentFree
的场景是ThreadCache
持有该page
,而修改基数树的场景是PageCache
持有该page
十一 遇到的 BUG
1. 合并相邻 page
时,spanLists
出现环
问题复现:
合并前面的页:
prev_span->_page_num += span->_page_num;
_spanPool.Delete(span);
合并后面的页:
span->_page_num += next_span->_page_num;
// 将 span 从 pagesLists 中删除,并删除 Span 结构
_pagesLists[next_span->_page_num - 1].Erase(next_span);
_spanPool.Delete(next_span);
最终将新的 span
加入 _pagesLists
中
_pagesLists[span->_page_num - 1].PushFront(span);
在后面 ShowPagesLists()
函数中会出现死循环:
for (auto span = _pagesLists[i].Begin(); span != _pagesLists[i].End(); span = span->_next)
{
printf(" PageID[%u] PageNum[%u]\n", span->_page_id, span->_page_num);
}
问题的关键就在于合并前面的页时,我们保留了之前 _pagesLists
中的 span
,而合并后面的页时删除了 之前 _pagesLists
中的 span
。但是最终的都会将新收回来的 span
(尽管这个 span 可能是 prev_span
)加入 _pagesLists
中,如果是把已经存在于 _pagesLists
中的 prev_span
再次加入 _pagesLists
就会出问题环的问题!
所以正确的做法是向前合并时不要偷懒,选择和向后合并一样的方式,把可以合并的 prev_span
删掉
span->_page_num += prev_span->_page_num;
span->_page_id = prev_span->_page_id;
// 将 span 从 pagesLists 中删除,并删除 Span 结构
_pagesLists[prev_span->_page_num - 1].Erase(prev_span);
_spanPool.Delete(prev_span);
2. 根据对象大小计算页时,没有考虑到对齐的问题
问题重现:
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
NumMovePage
存在两个问题:
第一个问题是
num * size
不一定是 ``PAGE_SHIFT对齐的,这样
npage »= PAGE_SHIFT` 可能会向下取整,这样就会导致实际分配的页不足。第二个问题是导致崩溃的函数
CutNewSpan
void CutNewSpan(Span* span, size_t obj_size) { // 根据 page id 和 page num 计算 span 内存的起始和结束地址 char* start = (char*)(span->_page_id << PAGE_SHIFT); char* end = start + (span->_page_num << PAGE_SHIFT) // 将所有 obj 串联起来 for (char* cur = start; cur != end; cur += obj_size) { if( cur + obj_size != end ) NextObj((void*)cur) = (void*)(cur + obj_size); else NextObj((void*)cur) = nullptr; } span->_freelist = (void*)start; span->_objSize = obj_size; }
这个函数默认 span 持有的内存正好可以被切分为整数个大小为
obj_size
的对象,但是这个假设是难以支持的,尤其是在第一种情况出现的时候。
为了解决这个问题,首先我们要在计算 npage
时向上取整。
其次,我们要把 NumMovePage
中计算出来的 num
交给 CutNewSpan
,只切分出 num
个对象:
// 获取 size 大小的对象 obj_num 个,总共 返回值 个 page
static size_t NumMovePage(size_t size, size_t* obj_num)
{
*obj_num = NumMoveSize(size);
size_t total_bytes = *obj_num * size;
// 计算 page 是要上取整数
size_t npage = (total_bytes + (1 << PAGE_SHIFT) - 1) >> PAGE_SHIFT;
return npage;
}
void CutNewSpan(Span* span, size_t obj_size, size_t obj_num)
{
// 根据 page id 和 page num 计算 span 内存的起始和结束地址
char* start = (char*)(span->_page_id << PAGE_SHIFT);
char* cur = start;
// 将所有 obj 串联起来
for (int i = 0; i < obj_num - 1; cur += obj_size, ++i)
NextObj((void*)cur) = (void*)(cur + obj_size);
NextObj((void*)cur) = nullptr;
span->_freelist = (void*)start;
span->_objSize = obj_size;
}
size_t obj_num;
size_t page_num = SizeClass::NumMovePage(obj_size, &obj_num);
span = pagecache_inst->NewSpan(page_num);
spanlist.CutNewSpan(span, obj_size, obj_num);
原理如下:
十二 性能测试
申请相同大小内存:
申请不同大小内存: