高并发内存池的设计与实现 Shepard-Wang

一 定长的内存池

1. 原理

定长内存池只能分配固定长度的内存,因此也叫对象池。

下面是定长内存池从第一次申请内存,到释放的原理图:

上面实现的定长内存池有以下缺点:

  1. 一次只能申请 1 个对象大小的空间,如果想申请对象数组,则需要多次申请。
  2. 如果 memory 指向的内存空间不够一次分配,则会丢弃该内存,这会造成浪费。
  3. 这种对象池的通病:无法回收空闲内存,由于每一块内存被打乱挂在 free_list 上,因此没法做到全回收。

2. 源码

下面是源代码:

#include <iostream>

template<class Tp>
class FixedSizeMemoryPool
{
    enum { DEFAULT_MEMORY_POOL_SIZE = 1024,
           ALIGNMENT = 8};
public:
    FixedSizeMemoryPool(size_t memory_pool_size = DEFAULT_MEMORY_POOL_SIZE )
    : _memory_pool_start( nullptr )
    , _memory_pool_end( nullptr )
    , _free_list( nullptr )
    { realloc_memory_pool(); }

    /*
     * size: 返回 Tp 对象的个数
     */
    Tp* New()
    {
        if( _free_list )
        {
            Tp* obj = (Tp*)_free_list;
            _free_list = *(void**)_free_list;
            return obj;
        }

        size_t obj_size = round_up(sizeof(Tp));
        if( _memory_pool_end - _memory_pool_start < obj_size )
            if( !realloc_memory_pool() )
                return nullptr;

        Tp* obj = (Tp*)_memory_pool_start;
        _memory_pool_start += obj_size;

        // 定位 new 表达式,用来调用 Tp 类的构造函数,初始化 obj 指向的对象
        new(obj)Tp;
        return obj;
    }

    void Delete(Tp* obj)
    {
        // 调用 obj 的析构函数
        obj->~Tp();
        // 将 obj 的前 8 个字节作为 next,指向链表后面的节点
        *(void**)obj = _free_list;
        _free_list = (void*)obj;
    }

private:
    inline size_t round_up(size_t size)
    { return (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); }

    bool realloc_memory_pool()
    {
        static size_t realloc_time = 1;
        realloc_time <<= 1;
        size_t new_size = realloc_time * DEFAULT_MEMORY_POOL_SIZE;

        try{
            _memory_pool_start = new char[new_size];
            _memory_pool_end   = _memory_pool_start + new_size;
        }catch(const std::bad_alloc& e){
            std::cout << "realloc_memory_pool: new error! " << e.what() << "\n";
            // 缩小申请内存的大小,重新尝试,直到把 realloc_time 减到 2
            if( realloc_time > 2 )
            {
                realloc_time /= 4;
                realloc_memory_pool();
            }
            else
                return false;
        }
        return true;
    }

private:
    char* _memory_pool_start;
    char* _memory_pool_end;
    void* _free_list;
};

下面是测试代码:

void test2()
{
    const size_t Rounds = 3;
    // 每轮申请释放多少次
    const size_t N = 1000000;
    size_t begin1 = clock();
    std::vector<ListNode*> v1;
    v1.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v1.push_back(new ListNode);
        }
        for (int i = 0; i < N; ++i)
        {
            delete v1[i];
        }
        v1.clear();
    }
    size_t end1 = clock();
    FixedSizeMemoryPool<ListNode> TNPool;
    size_t begin2 = clock();
    std::vector<ListNode*> v2;
    v2.reserve(N);
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v2.push_back(TNPool.New());
        }
        for (int i = 0; i < 100000; ++i)
        {
            TNPool.Delete(v2[i]);
        }
        v2.clear();
    }
    size_t end2 = clock();
    std::cout <<"new cost time:" <<end1 - begin1 << std::endl;
    std::cout <<"object pool cost time:" <<end2 - begin2 << std::endl;
}

输出为:

new cost time:417
object pool cost time:113

可见 Object Pool 比直接调用 new/delete 要快一些。

3. 实现细节

1. 空闲链表如何连起来?

链表每个节点的带大小至少是一个指针,这样才能存储下一个节点的地址。

对于单位大小的内存,只需要取前一个指针大小,将所有节点连接起来即可。

下面是获取链表上的节点的方法:

Tp* obj = (Tp*)_free_list;
_free_list = *(void**)_free_list;

接下来是将释放的内存内存加入空闲链表的方法:

*(void**)obj = _free_list;
_free_list = (void*)obj;

_free_list = *(void**)_free_list;*(void**) 的含义是什么?

_freelist 指向第一块内存的起始地址。空闲链表中下一块内存的地址 以 void* 的形式存放在第一块内存的前 4/8 字节处,因此,*(void**) 就是取出存储在该内存块中的的地址信息。

还有一种更为直观的实现方法,定义一个带 next 的结构体:

struct FreeListNode
{
    FreeListNode* _next;
};

template<class Tp>
class FixedSizeMemoryPool
{
    enum { DEFAULT_MEMORY_POOL_SIZE = 64,
           ALIGNMENT = 8};
public:
    FixedSizeMemoryPool(size_t memory_pool_size = DEFAULT_MEMORY_POOL_SIZE )
    : _memory_pool_start( nullptr )
    , _memory_pool_end( nullptr )
    , _free_list( new FreeListNode() )
    { realloc_memory_pool(); _free_list->_next = nullptr; }

    /*
     * size: 返回 Tp 对象的个数
     */
    Tp* New()
    {
        if( _free_list->_next )
        {
            Tp* obj = (Tp*)_free_list->_next;
            _free_list->_next = _free_list->_next->_next;
            return obj;
        }

        ...

        // 定位 new 表达式,用来调用 Tp 类的构造函数,初始化 obj 指向的对象
        new(obj)Tp;
        return obj;
    }

    void Delete(Tp* obj)
    {
        // 调用 obj 的析构函数
        obj->~Tp();
        // 将 obj 的前 8 个字节作为 next,指向链表后面的节点
        FreeListNode* free_node = (FreeListNode*)obj;
        free_node->_next = _free_list->_next;
        _free_list->_next = free_node;
    }

private:
    inline size_t round_up(size_t size)
    { return (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); }


private:
    char* _memory_pool_start;
    char* _memory_pool_end;
    FreeListNode* _free_list;
};

2. 扩容的细节

static size_t realloc_time = 1;
realloc_time <<= 1;

每次扩容,将默认的扩容大小增大一倍。符合计算机对内存的使用规律。

try{
    _memory_pool_start = new char[new_size];
    _memory_pool_end   = _memory_pool_start + new_size;
}catch(const std::bad_alloc& e){
    std::cout << "realloc_memory_pool: new error! " << e.what() << "\n";
    // 缩小申请内存的大小,重新尝试,直到把 realloc_time 减到 2
    if( realloc_time > 2 )
    {
        realloc_time /= 4;
        return realloc_memory_pool();
    }
    else
        return false;
}

如果某一次内存申请失败,并且本次不是第一次扩容,那么缩小扩容的大小,重新尝试。

Thread Cache 自由链表

1. free list 桶划分

整体控制在最多 10% 左右的内碎片浪费

数据范围字节对齐方式freelist 范围链表个数
[1, 128]8 字节对齐[0, 16)16
(128, 1024]16 字节对齐[16, 72)56
(1024, 8 * 1024]128 字节对齐[72, 128)56
(8 * 1024, 64 * 1024]1024 字节对齐[128, 184)56
(64 *1024, 256 * 1024]8 * 1024 字节对齐[184, 208)24

2.round upindex

RoundUp 将大小调整为 alignment 的倍数

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
    return ((bytes + alignNum - 1) & ~(alignNum - 1));
}

static inline size_t RoundUp(size_t size)
{
    if (size <= 128)
    {
        return _RoundUp(size, 8);
    }
    else if (size <= 1024)
    {
        return _RoundUp(size, 16);
    }
    else if (size <= 8 * 1024)
    {
        return _RoundUp(size, 128);
    }
    else if (size <= 64 * 1024)
    {
        return _RoundUp(size, 1024);
    }
    else if (size <= 256 * 1024)
    {
        return _RoundUp(size, 8 * 1024);
    }
    else
    {
        return _RoundUp(size, 1 << PAGE_SHIFT);
    }
}

Index 计算 size 大小的内存由第几个 freelist 管理

static inline size_t _Index(size_t bytes, size_t align_shift)
{
    // (bytes + (1 << align_shift) - 1) >> align_shift) 计算 bytes 是 (1 << align_shift) 
    // 的多少倍,并将结果向上取整,所以最后要将结果减去 1
    return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}

// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
    assert(bytes <= MAX_BYTES);

    // 每个区间有多少个链
    static int group_array[4] = { 16, 56, 56, 56 };
    if (bytes <= 128) {
        return _Index(bytes, 3);
    }
    else if (bytes <= 1024) {
        return _Index(bytes - 128, 4) + group_array[0];
    } 
    else if (bytes <= 8 * 1024) {
        return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
    }
    // 8 * 1024 + 1, 9 * 1024 -> [1, 1024]
    else if (bytes <= 64 * 1024) {
        return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
    }
    else if (bytes <= 256 * 1024) {
        return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
    }
    else {
        assert(false);
    }

    return -1;
}

3. free list 实现

static const size_t NFREELIST = 208;

static void*& NextObj(void* obj)
{ return *(void**)obj; }

// 管理切分好的小对象的自由链表
class FreeList
{
public:
	void Push(void* obj);

	void PushRange(void* start, void* end, size_t n);

	void PopRange(void*& start, void*& end, size_t n);

	void* Pop();

	bool Empty()
	{ return _freeList == nullptr; }

	size_t& MaxSize()
	{ return _maxSize; }

	size_t Size()
	{ return _size; }

private:
	void* _freeList = nullptr;
	size_t _maxSize = 1;		// 慢启动相关
	size_t _size = 0;			// free list 管理的元素个数
};

4. ThreadCache 结构

class ThreadCache
{
public:
	// 申请和释放内存对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);

	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);

	// 释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);
private:
	FreeList _freeLists[NFREELIST];
};

// TLS thread local storage
static thread_local ThreadCache* pTLSThreadCache = nullptr;

TLS Thread Local Storage

thread_local 声明的变量在线程启动时构造,并在线程退出时析构。

C++ 11 的隐式 thread local

#include "ObjectMemoryPool/ObjectMemoryPool.hpp"

#include <thread>
#include <unistd.h>

thread_local std::thread::id thread_id;

void show_x()
{
    thread_id = std::this_thread::get_id();

    while(1)
    {
        std::cout << thread_id << std::endl;
        sleep(1);
    }
}

int main()
{
    std::thread t1(show_x);
    std::thread t2(show_x);

    t1.join();
    t2.join();

    return 0;
}

Central Cache

central cache 也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是 SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在 span 的自由链表中。

申请内存:

  1. thread cache 中没有内存时,就会批量向 central cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络 tcp 协议拥塞控制的慢开始算法;central cache 也有一个哈希映射的 spanlistspanlist 中挂着span,从 span 中取出对象给 thread cache,这个过程是需要加锁的,这里使用的是一个桶锁,尽可能提高效率。
  2. central cache 映射的 spanlist 中所有 span 的都没有内存以后,则需要向 page cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给 thread cache
  3. central cache 的中挂的 spanuse_count 记录分配了多少个对象出去,分配一个对象给 thread cache,就++use_count

释放内存:

  1. thread_cache 过长或者线程销毁,则会将内存释放回 central cache 中的,释放回来时 use_count。当 use_count 减到 0 时则表示所有对象都回到了 span,则将 span 释放回 page cachepage cache 中会对前后相邻的空闲页进行合并。

1. 根据对象大小计算返回对象个数

size 越小,NumMoveSize 返回的值越大,反之越小。

static const size_t MAX_BYTES = 256 * 1024;

static size_t NumMoveSize(size_t size)
{
    assert(size > 0);

    // [2, 512],一次批量移动多少个对象的(慢启动)上限值
    // 小对象一次批量上限高
    // 小对象一次批量上限低
    int num = MAX_BYTES / size;
    if (num < 2)
        num = 2;

    if (num > 512)
        num = 512;

    return num;
}

“慢启动” 算法:

MaxSize() 返回 _freeList 成员 _maxSize ,开始大小为 1,每次向 Central Cache 申请内存时,如果 MaxSize()NumMoveSize() 小,则增长 1。

size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
    _freeLists[index].MaxSize() += 1;
}

每个 free list 都有一个 _maxSize

2. span & spanList 结构

typedef unsigned long long PAGE_ID;

// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
// 管理多个连续页大块内存跨度结构
struct Span
{
    PAGE_ID _pageId = 0;   		 // 大块内存起始页的页号
    size_t  _n = 0;        		 // 页的数量
    
    Span* _next = nullptr; 		 // 双向链表的结构
    Span* _prev = nullptr;
    
    size_t _objSize = 0;   		 // 切好的小对象的大小
    size_t _useCount = 0;  		 // 切好小块内存,被分配给thread cache的计数
    
    void* _freeList = nullptr;    // 切好的小块内存的自由链表
    
    bool _isUse = false;          // 是否在被使用
};
// 带头双向循环链表
class SpanList
{
public:
     SpanList()
     {
         _head = new Span;
         _head->_next = _head;
         _head->_prev = _head;
     }
    
     Span* Begin();
     Span* End();
     bool Empty();
    
     void PushFront(Span* span);
     Span* PopFront();
    
     void Insert(Span* pos, Span* newSpan);
     void Erase(Span* pos);
private:
	Span* _head;
public:
	std::mutex _mtx; // 桶锁
};

page_id 为何用?

我们以看一下 page_id 是如何计算的:

这是直接从堆上申请大页内存时,page id 的计算方式:

bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;

这是将一个 page 切分为两个小块 page 时的操作:

// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;

nSpan->_pageId += k;
nSpan->_n -= k;

page id 不一定从 0 开始,其作用在分配内存时显得有些繁琐,但如果想要将 空闲的 page 合并起来,那么这个页号就起了十分重要的作用。

比如 page1 的页号为 2912,页数为 10,page2 的页号为 2922,页数为 22,那么这两个页就可以合并为一个大页,页号为 2912 页数为 32

3. 从 span 中获取对象

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
    // 慢开始反馈调节算法
    // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
    // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
    // 3、size越大,一次向central cache要的batchNum就越小
    // 4、size越小,一次向central cache要的batchNum就越大
    size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
    if (_freeLists[index].MaxSize() == batchNum)
    {
        _freeLists[index].MaxSize() += 1;
    }

    void* start = nullptr;
    void* end = nullptr;
    size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
    assert(actualNum > 0);

    if (actualNum == 1)
    {
        assert(start == end);
        return start;
    }
    else
    {
        _freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
        return start;
    }
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();

	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span);
	assert(span->_freeList);

	// 从span中获取batchNum个对象
	// 如果不够batchNum个,有多少拿多少
	start = span->_freeList;
	end = start;
	size_t i = 0;
	size_t actualNum = 1;
	while ( i < batchNum - 1 && NextObj(end) != nullptr)
	{
		end = NextObj(end);
		++i;
		++actualNum;
	}
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;
	span->_useCount += actualNum;

	_spanLists[index]._mtx.unlock();

	return actualNum;
}
void PushRange(void* start, void* end, size_t n)
{
    NextObj(end) = _freeList;
    _freeList = start;

    _size += n;
}

4. 获取一个 span

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	// 查看当前的spanlist中是否有还有未分配对象的span
	Span* it = list.Begin();
	while (it != list.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
	list._mtx.unlock();

	// 走到这里说没有空闲span了,只能找page cache要
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	span->_isUse = true;
	span->_objSize = size;
	PageCache::GetInstance()->_pageMtx.unlock();

	// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span

	// 计算span的大块内存的起始地址和大块内存的大小(字节数)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;
	char* end = start + bytes;

	// 把大块内存切成自由链表链接起来
	// 1、先切一块下来去做头,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	int i = 1;
	while (start < end)
	{
		++i;
		NextObj(tail) = start;
		tail = NextObj(tail); // tail = start;
		start += size;
	}

	NextObj(tail) = nullptr;
    
	list._mtx.lock();
	list.PushFront(span);

	return span;
}

调用 PageCacheNewSpan 前,桶锁释放与否?

调用 NewSpan 说明 Central Cache 中对应桶内的 span 中的 freelist 都为空,那么其他线程再来要,一样会失败,那么不解锁也是合理的?

Central Cache 除了分配内存,还可以 回收内存!如果不把桶锁释放,那么回收内存的操作就会被阻塞!

5. CentalCache 结构

// 单例模式
class CentralCache
{
public:
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t byte_size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	// 将一定数量的对象释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size);
private:
	SpanList _spanLists[NFREELIST];

private:
	CentralCache()
	{}

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
};

Page Cache

申请内存:

1)当 central cachepage cache 申请内存时,page cache 先检查对应位置有没有 span,如果没有 则向更大页寻找一个 span,如果找到则分裂成两个。

比如:申请的是 4 页 page,4 页 page 后面没有挂 span,则向后面寻找更大的 span,假设在 10 页 page 位置找到一个 span,则将 10 页 page 分裂为一个 4 页 page 和一个 6 页 page

2)如果找到 _spanList[128] 都没有合适的 span,则向系统使用 mmapbrk 或者是 VirtualAlloc 等方式申请 128 页 page span 挂在自由链表中,再重复 1 中的过程。

需要注意的是 central cachepage cache 的核心结构都是 spanlist 的哈希桶,但是他们是有本质区别的,central cache 中哈希桶,是按跟 thread cache 一样的大小对齐关系映射的,他的 spanlist 中挂的 span 中的内存都被按映射关系切好链接成小块内存的自由链表。而 page cache 中的 spanlist 则是按下标桶号映射的,也就是说第 i 号桶中挂的 span 都是 i 页内存。

1. 根据对象大小计算获取的页数

static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; // 8KB

// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
    size_t num = NumMoveSize(size);
    size_t npage = num*size;

    npage >>= PAGE_SHIFT;
    if (npage == 0)
        npage = 1;

    return npage;
}

2. NewSpan 实现

// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);

	// 先检查第k个桶里面有没有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		return kSpan;
	}

	// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
	for (size_t i = k+1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			//Span* kSpan = new Span;
			Span* kSpan = _spanPool.New();

			// 在nSpan的头部切一个k页下来
			// k页span返回
			// nSpan再挂到对应映射的位置
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);

			return kSpan;
		}
	}

	// 走到这个位置就说明后面没有大页的span了
	// 这时就去找堆要一个128页的span
	//Span* bigSpan = new Span;
	Span* bigSpan = _spanPool.New();
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	return NewSpan(k);
}

3. PageCache 结构

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);

	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

	// 获取一个K页的span
	Span* NewSpan(size_t k);

	std::mutex _pageMtx;
private:
	SpanList _spanLists[NPAGES];
	ObjectPool<Span> _spanPool;

	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

	PageCache()
	{}
	PageCache(const PageCache&) = delete;
    
	static PageCache _sInst;
};

六 释放逻辑

1. Thread Cache

void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	// 找对映射的自由链表桶,对象插入进入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	// 当链表长度大于一次批量申请的内存时就开始还一段list给 central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	list.PopRange(start, end, list.MaxSize());

	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

思考:归还方式合理性

思考:当 freelist 的持有空闲对象数大于一次批量申请的对象数的归还方式是否合理?

2. Central Cache

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock();
	while (start)
	{
		void* next = NextObj(start);

		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		NextObj(start) = span->_freeList; 
		span->_freeList = start;
		span->_useCount--;

		// 说明span的切分出去的所有小块内存都回来了
		// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
		if (span->_useCount == 0)
		{
			_spanLists[index].Erase(span);
			span->_freeList = nullptr;
			span->_next = nullptr;
			span->_prev = nullptr;

			// 释放span给page cache时,使用page cache的锁就可以了
			// 这时把桶锁解掉
			_spanLists[index]._mtx.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanLists[index]._mtx.lock();
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

通过对象地址计算得到页号,最终从 idSpanMap 中得到 Span 对象

Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

	auto ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

Page CacheNewSpan 在获取返还新的 SpanCentral Cache 之前,将 Span 的所有 page 都建立与该 Span 的映射。

3. Page Cache

1)span.isUse

对于 页号为 2000,页长为 5 的 span 来说,我们需要向前找页号为 1999 所在的 span 是否空闲,向后找页号为 2005 所在的 span 是否空闲。

并且该空闲的页必须在 page cache 的管辖范围,不能是已经分配给了 central cache!如何知道这一点呢?通过判断 useCount 是否为 0 是不可行的!原因是:

可能 GetOneSpan 刚调用完 NewSpan 获取到了一个新的 span,此时该 SpanuseCount 为 0,如果合并时又将该页取走合并,就会引发线程安全问题!所以我们需要在 span 中引入新的成员 isUse,用来标记该 span 是否已经分配给了 central cache

2)再谈 NewSpan

Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);

	// 大于128 page的直接向堆申请
	if (k > NPAGES-1)
	{
		...
		_idSpanMap.set(span->_pageId, span);

		return span;
	}

	if (!_spanLists[k].Empty())
	{
        ...
            
		for (PAGE_ID i = 0; i < kSpan->_n; ++i)
		{
			_idSpanMap.set(kSpan->_pageId + i, kSpan);
		}

		return kSpan;
	}

	for (size_t i = k+1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = _spanPool.New();

			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;
			nSpan->_pageId += k;
			nSpan->_n -= k;

			_spanLists[nSpan->_n].PushFront(nSpan);
			// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
			// 进行的合并查找
			//_idSpanMap[nSpan->_pageId] = nSpan;
			//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
			_idSpanMap.set(nSpan->_pageId, nSpan);
			_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

			// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
			for (PAGE_ID i = 0; i < kSpan->_n; ++i)
			{
				//_idSpanMap[kSpan->_pageId + i] = kSpan;
				_idSpanMap.set(kSpan->_pageId + i, kSpan);
			}

			return kSpan;
		}
	}
	
    ...
        
	return NewSpan(k);
}

3)ReleaseSpanToPageCache

合并时,不断向前向后合并。合并的过程中,可能合并了被切分后的 span(比如原本 128 page 切分为 2 page 和 126 page,2 page 分给了 central cache,126 pagepage cache 中,此时可能这 126 page 会被合并),这没有关系,因为 page cache 不会将内存归还给系统了!

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}

	// 对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;

		auto ret = (Span*)_idSpanMap.get(prevId);
		if (ret == nullptr)  break;

		// 前面相邻页的span在使用,不合并了
		Span* prevSpan = ret;
		if (prevSpan->_isUse == true)  break;

		// 合并出超过128页的span没办法管理,不合并了
		if (prevSpan->_n + span->_n > NPAGES-1)  break;

		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	// 向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;

		auto ret = (Span*)_idSpanMap.get(nextId);
		if (ret == nullptr)  break;

		Span* nextSpan = ret;
		if (nextSpan->_isUse == true)  break;

		if (nextSpan->_n + span->_n > NPAGES-1)  break;

		span->_n += nextSpan->_n;

		_spanLists[nextSpan->_n].Erase(nextSpan);
		_spanPool.Delete(nextSpan);
	}

	_spanLists[span->_n].PushFront(span);
	span->_isUse = false;

	_idSpanMap.set(span->_pageId, span);
	_idSpanMap.set(span->_pageId + span->_n - 1, span);
}

七 大块内存分配

Thread CacheCentral Cache 管理的最大内存为 256 KB,也就是 32 个 page

如果申请的内存在 32 page 到 128 page 之间,也就是 256 KB 到 1 M 之间,可以向 Page Cache 申请内存

如果申请的内存大于 128 page,也就是大于 1 M,那么 Page Cache 直接向系统申请内存。

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES)
	{
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kpage = alignSize >> PAGE_SHIFT;

		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kpage);
		span->_objSize = size;
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
		if (pTLSThreadCache == nullptr)
		{
			static ObjectPool<ThreadCache> tcPool;
			//pTLSThreadCache = new ThreadCache;
			pTLSThreadCache = tcPool.New();
		}

		//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}	
}

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

	if (size > MAX_BYTES)
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);

	// 大于128 page的直接向堆申请
	if (k > NPAGES - 1)
	{
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		Span* span = _spanPool.New();

		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;

		//_idSpanMap[span->_pageId] = span;
		_idSpanMap.set(span->_pageId, span);

		return span;
	}
    
    ...
}

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 大于128 page的直接还给堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}
    
    ...
}

八 避免对 malloc 的依赖

由于 本项目的目标在于提供一个多线程环境下比 malloc 跟高效的内存管理机制,所以本项目中应该解除对 malloc 的依赖。所以,所有的 new/delete 都应该被替换。

项目中最常向堆申请的就是 Span 对象,其次是 ThreadCache 对象

我们可以让 FixedSizeMemoryPool 管理这两种定长的对象的申请与释放。

FixedSizeMemoryPool 直接向系统申请内存:

bool realloc_memory_pool()
{
	...

    try{
        _memory_pool_start = SystemAlloc(new_size);
        ...
    } catch(const std::bad_alloc& e) {
	    ...
    }
    return true;
}

修改申请 span 时对 new/delete 的依赖:

Span* PageCache::NewSpan(size_t k)
{
	// 大于128 page的直接向堆申请
	if (k > NPAGES-1)
	{
		void* ptr = SystemAlloc(k);
		//Span* span = new Span;
		Span* span = _spanPool.New();

		...
	}


	// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
	for (size_t i = k+1; i < NPAGES; ++i)
	{
		if (!_spanLists[i].Empty())
		{
			//Span* kSpan = new Span;
			Span* kSpan = _spanPool.New();

			...
		}
	}

	...
}

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 大于128 page的直接还给堆
	if (span->_n > NPAGES-1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);

		return;
	}

	// 对span前后的页,尝试进行合并,缓解内存碎片问题
	while (1)
	{
		...

		_spanLists[prevSpan->_n].Erase(prevSpan);
		//delete prevSpan;
		_spanPool.Delete(prevSpan);
	}

	// 向后合并
	while (1)
	{
		...

		_spanLists[nextSpan->_n].Erase(nextSpan);
		//delete nextSpan;
		_spanPool.Delete(nextSpan);
	}

	...
}

修改 获取 TLS Thread Cache 时对 new/delete 的依赖:

if (pTLSThreadCache == nullptr)
{
    static ObjectPool<ThreadCache> tcPool;
    //pTLSThreadCache = new ThreadCache;
    pTLSThreadCache = tcPool.New();
}

Benchmark 程序

#include"ConcurrentAlloc.h"

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime);

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 100000;
	cout << "==========================================================" << endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;

	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

十 基数数优化

1)单层基数树

调用方式:TCMalloc_PageMap1<32 - PAGE_SHIFT>

BITSarray_ 数组的大小。对于 32 位程序,由于页的大小为 PAGE_SHIFT 也就是 13,所以最多可以有 2^1932 - 13 = 19 ) 个页!

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS;
    // array_ 是一个 <page_id, span*> 的映射,所以是一个二级指针
	void** array_;

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap1() {
		//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
         // array_ 的大小为 一个指针大小 * 最多可以有的页数
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const {
        // 页数一定小于等于 2^BITS 
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) {
		array_[k] = v;
	}
};

对于 32 位程序,页数最多为 2^19 个,约为 50 万,而指针的大小为 4 字节,所以 array_ 的大小约为 200 万字节。

可是对于 64 位程序呢?单层的基数树就不够用了!所以需要多层的基数树。

2)双层基数树

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;
	
	static const int LEAF_BITS = BITS - ROOT_BITS; // 14
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS; // 第一层在高 5 位
		const Number i2 = k & (LEAF_LENGTH - 1); // 将高 5 位除去,留下低 14 位
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

3)三层基数树

// Three-level radix tree
template <int BITS> // 64 - 13 = 51 
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up 53/3=17
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; // 51-34=17
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};

4)基数树优化原理

  1. 基数树比 std::mapstd::unordered_map 的查找更加高效。即便是三层基数树,也只需要三次查找!
  2. 其次,基数树的优化可以让查找 page idspan 的映射无锁!

第二点的解释如下,下面是整个项目中需要查找 idSpanMap 的所有场景:

  1. 修改只存在于两个函数中:

    Span* PageCache::NewSpan(size_t k);
    void PageCache::ReleaseSpanToPageCache(Span* span)
    

    这两个函数首先不会同时被调用,因为调用前都会把整个 PageCache 锁住

  2. 查找:

    Span* PageCache::MapObjectToSpan(void* obj);
    void PageCache::ReleaseSpanToPageCache(Span* span);
    static void ConcurrentFree(void* ptr);
    

    比较关键的问题在于,当一个线程调用 ConcurrentFree 释放一个对象时,如果此时对基数树进行查找,而此时另一个线程有无可能正好在对该页所在的结构进行修改?

    绝无可能!ConcurrentFree 的场景是 ThreadCache 持有该 page,而修改基数树的场景是 PageCache 持有该 page

十一 遇到的 BUG

1. 合并相邻 page 时,spanLists 出现环

问题复现:

合并前面的页:

prev_span->_page_num += span->_page_num;

_spanPool.Delete(span);

合并后面的页:

span->_page_num += next_span->_page_num;

// 将 span 从 pagesLists 中删除,并删除 Span 结构
_pagesLists[next_span->_page_num - 1].Erase(next_span);
_spanPool.Delete(next_span);

最终将新的 span 加入 _pagesLists

_pagesLists[span->_page_num - 1].PushFront(span);

在后面 ShowPagesLists() 函数中会出现死循环:

for (auto span = _pagesLists[i].Begin(); span != _pagesLists[i].End(); span = span->_next)
{
    printf(" PageID[%u]	PageNum[%u]\n", span->_page_id, span->_page_num);
}

问题的关键就在于合并前面的页时,我们保留了之前 _pagesLists 中的 span,而合并后面的页时删除了 之前 _pagesLists 中的 span。但是最终的都会将新收回来的 span(尽管这个 span 可能是 prev_span)加入 _pagesLists 中,如果是把已经存在于 _pagesLists 中的 prev_span 再次加入 _pagesLists 就会出问题环的问题!

所以正确的做法是向前合并时不要偷懒,选择和向后合并一样的方式,把可以合并的 prev_span 删掉

span->_page_num += prev_span->_page_num;
span->_page_id   = prev_span->_page_id;

// 将 span 从 pagesLists 中删除,并删除 Span 结构
_pagesLists[prev_span->_page_num - 1].Erase(prev_span);
_spanPool.Delete(prev_span);

2. 根据对象大小计算页时,没有考虑到对齐的问题

问题重现:

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
static size_t NumMovePage(size_t size)
{
    size_t num = NumMoveSize(size);
    size_t npage = num * size;

    npage >>= PAGE_SHIFT;
    if (npage == 0)
        npage = 1;

    return npage;
}

NumMovePage 存在两个问题:

  1. 第一个问题是 num * size 不一定是 ``PAGE_SHIFT 对齐的,这样 npage »= PAGE_SHIFT` 可能会向下取整,这样就会导致实际分配的页不足。

  2. 第二个问题是导致崩溃的函数 CutNewSpan

    void CutNewSpan(Span* span, size_t obj_size)
    {
        // 根据 page id 和 page num 计算 span 内存的起始和结束地址
        char* start = (char*)(span->_page_id << PAGE_SHIFT);
         char* end  =  start + (span->_page_num << PAGE_SHIFT)
       
        // 将所有 obj 串联起来
        for (char* cur = start; cur != end; cur += obj_size)
        {
            if( cur + obj_size != end )
                 NextObj((void*)cur) = (void*)(cur + obj_size);
           	else 
                NextObj((void*)cur) = nullptr;
        }
        span->_freelist = (void*)start;
        span->_objSize  = obj_size;
    }
    

    这个函数默认 span 持有的内存正好可以被切分为整数个大小为 obj_size 的对象,但是这个假设是难以支持的,尤其是在第一种情况出现的时候。

为了解决这个问题,首先我们要在计算 npage 时向上取整。

其次,我们要把 NumMovePage 中计算出来的 num 交给 CutNewSpan,只切分出 num 个对象:

// 获取 size 大小的对象 obj_num 个,总共 返回值 个 page
static size_t NumMovePage(size_t size, size_t* obj_num)
{
    *obj_num = NumMoveSize(size);
    size_t total_bytes = *obj_num * size;

    // 计算 page 是要上取整数
    size_t npage = (total_bytes + (1 << PAGE_SHIFT) - 1) >> PAGE_SHIFT;

    return npage;
}

void CutNewSpan(Span* span, size_t obj_size, size_t obj_num)
{
    // 根据 page id 和 page num 计算 span 内存的起始和结束地址
    char* start = (char*)(span->_page_id << PAGE_SHIFT);
    char* cur = start;

    // 将所有 obj 串联起来
    for (int i = 0; i < obj_num - 1; cur += obj_size, ++i)
        NextObj((void*)cur) = (void*)(cur + obj_size);

    NextObj((void*)cur) = nullptr;

    span->_freelist = (void*)start;
    span->_objSize  = obj_size;
}
size_t obj_num;
size_t page_num = SizeClass::NumMovePage(obj_size, &obj_num);

span = pagecache_inst->NewSpan(page_num);

spanlist.CutNewSpan(span, obj_size, obj_num);

原理如下:

十二 性能测试

申请相同大小内存:

申请不同大小内存: