框架结构

central cache对应的桶里如果没有span对象,需要向page cache层申请一个非空的span,page cache也是哈希桶的结构,同样通过双向链表组织一个个的span对象,但是映射规则和thread cache和central cache层不同,page cache的逻辑抽象结构如下:

33250a692c214c8286b38ba3c6c96847.png

page cache采用直接定址法,几号桶就是挂的几页的span,pageche的span对象是按页为单位,并没有切分成小对象,central cache会申请固定页数的span,然后进行切分

具体实现

page cahe 类

page cache 这里设计span最多有128页(128 * 8 * 1024 = 1m),也可以有设计成其他大小,我们设计的thread cahche层最大申请256kb的对象,128页能切成4个,然后我们让几号桶对应几页的span,为了方便整个page cache类设计为129个桶,空出来0号桶

1
2
3

// page cache桶的个数 - 这里设置为129个,0号桶不用
static const size_t NPAGES = 129;

page cahce类在整个进程里只有一个,我们同样设计成单例模式,这里需要注意的是,多个线程申请对象,thread cache层没有内存需要向central cache申请,central cache只需要加桶锁就可以,但是page cache最好整个加锁,因为page cache可能涉及到多页的span切分的问题,并且当central cache归还span,page cache也会尝试将该span和其他桶的span进行合并,这样就会访问page cache的多个桶,造成频繁的加锁和解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class PageCache
{
public:
//整个进程只有一份,同样设计成单例模式
static PageCache* GetInstance()
{
return &_sInst;
}
// 要一个k页的span
Span* NewSpan(size_t k);
public:
std::mutex _pageMtx;
private:
PageCache() {};
PageCache(const PageCache&) = delete;
SpanList _spanLists[NPAGES];

static PageCache _sInst;
};

类内提供一个获取k页span的接口

实现逻辑

我们先继续完善central层的逻辑,上节说从中心缓存获取一定数量的对象给thread cache,用了一个FatchRangeObj的函数,这个函数的内部,首先要保证CentralCache层有一个非空的span,那么如何保证有一个非空的span呢?大致思路是先从central cache对应的桶里找,如果没有就往下找Page Cache层要。

1
2
3
4
5
6
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// ...
return nullptr;
}
我们继续完善这个函数

步骤1、先去size 对应的桶里找非空的span,这里涉及到链表的遍历操作,我们可以完善一下SpanList类,提供相应的接口方便我们操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 在spanlist类中增加一下函数,方便遍历操作
Span* Begin()
{
return _head->_next;
}

Span* End()
{
return _head;
}

bool Empty()
{
return _head == _head->_next;
}

步骤2、若有非空的span直接返回,如果没有,需要向下一层的PageCache申请新的一页Span,这里我们涉及到加锁、解锁过程,以及如果向下一层申请SPan具体是要多个少页呢?

我们可以先解决要多少页的问题,在SizeClass类中增加NumMovePage函数
1
2
3
4
5
6
7
8
9
10
11
12
13
// 计算一次向系统获取多少个页
static size_t NumMovePage(size_t size)
{
// 先获取申请size对象大小的上限(尽可能多要一些)
size_t num = NumMoveSize(size);

// 算一下num个对象总共多少字节,然后右移13位就是多少页
size_t npage = (num * size) >> PAGE_SHITF;
// 最少申请1页
if (npage == 0)
npage = 1;
return npage;
}

步骤3、申请完span后,就要将里面的大块内存进行切分,我们需要直到大块内存的起始地址和结束地址,不断的切成size大小进行尾插,这里尾插有个好处,它的物理地址是连续的,当被线程使用时,可以提高线程的CPU缓存利用率

GetOneSpan整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 1、先去size 对应的桶里找非空的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}

// 这里可以先将这个桶的桶锁进行解锁,其他线程可以释放内存对象回来不会阻塞
list._mtx.unlock();

// 到这里说明,size对应的CentralCache层里面没有Span对象,那么需要向下一层申请
// 先进行整个page cache层的加锁
PageCache::GetInstance()->_pageMtx.lock();

// 我们要计算向page cache层要多少页的span
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));

// 解锁
PageCache::GetInstance()->_pageMtx.unlock();


// 申请回来的span对象进行切分,这会不需要加锁,因为这会其他线程访问不到这个span(还没挂到spanLists)

// 要计算span的大块内存的起始地址和大块内存的大小
// 页号左移PAGE_SHIFT就是地址
char* start = (char*)(span->_pageId << PAGE_SHITF);

size_t bytes = span->_n << PAGE_SHITF;
char* end = start + bytes;


// 先切一块做头
span->_freeList = start;
start += size;

// 进行尾插
void* tail = span->_freeList;

while(start <end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}

// 切好span,将span挂到桶里,加锁
list._mtx.lock();
list.PushFront(span);

return span;
}

完成上边的代码后我们看看底层的PageCache::NewSpan(size_t k) ,我们这个获取一个NewSpan的大致逻辑为:首先检查第k个桶里有没有span,如果没有,向后边的桶找更大的Span,然后分成k和n-k,返回k页的Span,n-k页的Span挂到相应的桶里


步骤1、判断k个桶有没有span如果有,返回头上那个,这时候需要在在SpanList类里面提供一个PopFront函数,同时我们也提供一个PushFront的函数,方便之后的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 头删返回
Span* PopFront()
{
Span* sp = _head->_next;
/*head->_next = sp->_next;
sp->_next = nullptr;
return sp;*/
Erase(sp);
return sp;
}

// 头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}

步骤2、如果没有,遍历后面的桶,如果有,就切成k和n-k,返回k页的span,n-k页的span插入到对应的桶

image.png

步骤3、如果从k+1到NPAGES-1桶都没有span,那么只能通过系统调用向堆申请,这里直接申请128页的span,申请成功后,还是走步骤2的逻辑,可以直接递归处理

整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// page cache 返回一个span
Span* PageCache::NewSpan(size_t k)
{
// 先断言一下
assert(k > 0 && k < NPAGES);

// 先检查第k个桶里有没有Span
if (!_spanLists[k].Empty())
{
// 头删第一个返回这里去实现PopFront的逻辑
return _spanLists[k].PopFront();
}

//如果k号桶为空,检查后面的桶,找到一个非空桶,直到最后一个桶
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
// ?
Span* kSpan = new Span;

// 分成npan 分一个 k页返回,然后n-k页的span挂到n-k的桶上
// 这里注意,只需要改页号和页数就可以了,span挂的内存的地址按页号取
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;

nSpan->_n -= k;
nSpan->_pageId += k;


_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}

// 到这里说明后面的所有位置都没有大页的span
// 这时候就需要向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES-1);


bigSpan->_n = NPAGES - 1;
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHITF;

_spanLists[bigSpan->_n].PushFront(bigSpan);

return NewSpan(k);
}

这里注意最后几句的逻辑,我们申请了128页的内存,返回了起始的指针ptr,将这个地址转成整形,右移PAGE_SHIFT位,相当于除以2^13,就是管理这块大块内存的span的页号

image.png