框架结构
central cache对应的桶里如果没有span对象,需要向page cache层申请一个非空的span,page cache也是哈希桶的结构,同样通过双向链表组织一个个的span对象,但是映射规则和thread cache和central cache层不同,page cache的逻辑抽象结构如下:
page cache采用直接定址法,几号桶就是挂的几页的span,pageche的span对象是按页为单位,并没有切分成小对象,central cache会申请固定页数的span,然后进行切分
具体实现
page cahe 类
page cache 这里设计span最多有128页(128 * 8 * 1024 = 1m),也可以有设计成其他大小,我们设计的thread cahche层最大申请256kb的对象,128页能切成4个,然后我们让几号桶对应几页的span,为了方便整个page cache类设计为129个桶,空出来0号桶
1 2 3 static const size_t NPAGES = 129 ;
page cahce类在整个进程里只有一个,我们同样设计成单例模式,这里需要注意的是,多个线程申请对象,thread cache层没有内存需要向central cache申请,central cache只需要加桶锁就可以,但是page cache最好整个加锁,因为page cache可能涉及到多页的span切分的问题,并且当central cache归还span,page cache也会尝试将该span和其他桶的span进行合并,这样就会访问page cache的多个桶,造成频繁的加锁和解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class PageCache { public : static PageCache* GetInstance () { return &_sInst; } Span* NewSpan (size_t k) ; public : std::mutex _pageMtx; private : PageCache () {}; PageCache (const PageCache&) = delete ; SpanList _spanLists[NPAGES]; static PageCache _sInst; };
类内提供一个获取k页span的接口
实现逻辑
我们先继续完善central层的逻辑,上节说从中心缓存获取一定数量的对象给thread cache,用了一个FatchRangeObj的函数,这个函数的内部,首先要保证CentralCache层有一个非空的span,那么如何保证有一个非空的span呢?大致思路是先从central cache对应的桶里找,如果没有就往下找Page Cache层要。
1 2 3 4 5 6 Span* CentralCache::GetOneSpan (SpanList& list, size_t size) { return nullptr ; }
我们继续完善这个函数
步骤1、先去size 对应的桶里找非空的span,这里涉及到链表的遍历操作,我们可以完善一下SpanList类,提供相应的接口方便我们操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Span* Begin () { return _head->_next; } Span* End () { return _head; } bool Empty () { return _head == _head->_next; }
步骤2、若有非空的span直接返回,如果没有,需要向下一层的PageCache申请新的一页Span,这里我们涉及到加锁、解锁过程,以及如果向下一层申请SPan具体是要多个少页呢?
我们可以先解决要多少页的问题,在SizeClass类中增加NumMovePage函数
1 2 3 4 5 6 7 8 9 10 11 12 13 static size_t NumMovePage (size_t size) { size_t num = NumMoveSize (size); size_t npage = (num * size) >> PAGE_SHITF; if (npage == 0 ) npage = 1 ; return npage; }
步骤3、申请完span后,就要将里面的大块内存进行切分,我们需要直到大块内存的起始地址和结束地址,不断的切成size大小进行尾插,这里尾插有个好处,它的物理地址是连续的,当被线程使用时,可以提高线程的CPU缓存利用率
GetOneSpan整体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 Span* CentralCache::GetOneSpan (SpanList& list, size_t size) { Span* it = list.Begin (); while (it != list.End ()) { if (it->_freeList != nullptr ) { return it; } else { it = it->_next; } } list._mtx.unlock (); PageCache::GetInstance ()->_pageMtx.lock (); Span* span = PageCache::GetInstance ()->NewSpan (SizeClass::NumMovePage (size)); PageCache::GetInstance ()->_pageMtx.unlock (); char * start = (char *)(span->_pageId << PAGE_SHITF); size_t bytes = span->_n << PAGE_SHITF; char * end = start + bytes; span->_freeList = start; start += size; void * tail = span->_freeList; while (start <end) { NextObj (tail) = start; tail = NextObj (tail); start += size; } list._mtx.lock (); list.PushFront (span); return span; }
完成上边的代码后我们看看底层的PageCache::NewSpan(size_t k) ,我们这个获取一个NewSpan的大致逻辑为:首先检查第k个桶里有没有span,如果没有,向后边的桶找更大的Span,然后分成k和n-k,返回k页的Span,n-k页的Span挂到相应的桶里
步骤1、判断k个桶有没有span如果有,返回头上那个,这时候需要在在SpanList类里面提供一个PopFront函数,同时我们也提供一个PushFront的函数,方便之后的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Span* PopFront () { Span* sp = _head->_next; Erase (sp); return sp; } void PushFront (Span* span) { Insert (Begin (), span); }
步骤2、如果没有,遍历后面的桶,如果有,就切成k和n-k,返回k页的span,n-k页的span插入到对应的桶
步骤3、如果从k+1到NPAGES-1桶都没有span,那么只能通过系统调用向堆申请,这里直接申请128页的span,申请成功后,还是走步骤2的逻辑,可以直接递归处理
整体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 Span* PageCache::NewSpan (size_t k) { assert (k > 0 && k < NPAGES); if (!_spanLists[k].Empty ()) { return _spanLists[k].PopFront (); } for (size_t i = k + 1 ; i < NPAGES; i++) { if (!_spanLists[i].Empty ()) { Span* nSpan = _spanLists[i].PopFront (); Span* kSpan = new Span; kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_n -= k; nSpan->_pageId += k; _spanLists[nSpan->_n].PushFront (nSpan); return kSpan; } } Span* bigSpan = new Span; void * ptr = SystemAlloc (NPAGES-1 ); bigSpan->_n = NPAGES - 1 ; bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHITF; _spanLists[bigSpan->_n].PushFront (bigSpan); return NewSpan (k); }
这里注意最后几句的逻辑,我们申请了128页的内存,返回了起始的指针ptr,将这个地址转成整形,右移PAGE_SHIFT位,相当于除以2^13,就是管理这块大块内存的span的页号