470 lines
16 KiB
Go
470 lines
16 KiB
Go
// Package pipelinedb provides an integrated pipeline database system
|
||
// 集成了数据库存储和业务管道处理的一体化解决方案
|
||
package pipelinedb
|
||
|
||
import (
|
||
"container/list"
|
||
"sync"
|
||
)
|
||
|
||
// PageCache 实现了基于LRU(Least Recently Used)算法的页面缓存系统
|
||
//
|
||
// 核心设计思想:
|
||
// 1. 使用HashMap提供O(1)的页面查找性能
|
||
// 2. 使用双向链表维护页面的访问顺序(LRU顺序)
|
||
// 3. 支持脏页标记和批量刷新机制
|
||
// 4. 提供线程安全的并发访问控制
|
||
// 5. 统计缓存命中率用于性能监控
|
||
//
|
||
// 适用场景:
|
||
// - 数据库页面缓存
|
||
// - 文件系统缓存
|
||
// - 任何需要LRU淘汰策略的缓存系统
|
||
type PageCache struct {
|
||
capacity int // 缓存容量(最大页面数)
|
||
cache map[uint16]*cacheEntry // 页号到缓存条目的映射,提供O(1)查找
|
||
lru *list.List // LRU双向链表,维护访问顺序(头部=最近使用,尾部=最久未使用)
|
||
mu sync.RWMutex // 读写锁,支持多读单写的并发控制
|
||
hits int64 // 缓存命中次数统计
|
||
misses int64 // 缓存未命中次数统计
|
||
}
|
||
|
||
// cacheEntry 缓存条目,存储单个页面的完整信息
|
||
//
|
||
// 设计要点:
|
||
// 1. pageNo: 页面编号,用于标识唯一页面
|
||
// 2. page: 页面数据的完整副本,避免外部修改影响缓存
|
||
// 3. dirty: 脏页标记,标识页面是否被修改过需要写回磁盘
|
||
// 4. elem: 指向LRU链表中对应节点的指针,用于O(1)时间复杂度的链表操作
|
||
type cacheEntry struct {
|
||
pageNo uint16 // 页面编号(16位,支持65536个页面)
|
||
page []byte // 页面数据副本(避免外部修改)
|
||
dirty bool // 脏页标记(true=需要写回磁盘,false=与磁盘一致)
|
||
elem *list.Element // LRU链表节点指针(用于快速移动和删除)
|
||
}
|
||
|
||
// NewPageCache 创建一个新的LRU页面缓存实例
|
||
//
|
||
// 参数说明:
|
||
// - capacity: 缓存容量,即最大可缓存的页面数量
|
||
// 建议根据可用内存和页面大小来设置,例如:
|
||
// 可用内存100MB,页面大小4KB,则capacity可设为25600
|
||
//
|
||
// 返回值:
|
||
// - *PageCache: 初始化完成的页面缓存实例
|
||
//
|
||
// 初始化内容:
|
||
// 1. 设置缓存容量
|
||
// 2. 创建空的HashMap用于O(1)查找
|
||
// 3. 创建空的双向链表用于LRU排序
|
||
// 4. 统计计数器初始化为0(hits和misses)
|
||
func NewPageCache(capacity int) *PageCache {
|
||
return &PageCache{
|
||
capacity: capacity, // 设置最大缓存页面数
|
||
cache: make(map[uint16]*cacheEntry), // 初始化页号到缓存条目的映射
|
||
lru: list.New(), // 初始化LRU双向链表
|
||
// hits和misses会自动初始化为0
|
||
}
|
||
}
|
||
|
||
// Get 从缓存中获取指定页面的数据
|
||
//
|
||
// 核心算法:LRU缓存查找
|
||
// 时间复杂度:O(1) - HashMap查找 + 双向链表移动
|
||
// 空间复杂度:O(1) - 只创建页面数据副本
|
||
//
|
||
// 参数说明:
|
||
// - pageNo: 要获取的页面编号(16位,范围0-65535)
|
||
//
|
||
// 返回值:
|
||
// - []byte: 页面数据的副本(如果找到)
|
||
// - bool: 是否在缓存中找到该页面
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取读锁(支持并发读取)
|
||
// 2. 在HashMap中查找页面
|
||
// 3. 如果找到:
|
||
// a. 将页面移动到LRU链表头部(标记为最近使用)
|
||
// b. 增加命中计数
|
||
// c. 创建并返回页面数据副本(防止外部修改)
|
||
// 4. 如果未找到:
|
||
// a. 增加未命中计数
|
||
// b. 返回nil和false
|
||
//
|
||
// 并发安全:使用读锁保护,支持多个goroutine同时读取
|
||
func (pc *PageCache) Get(pageNo uint16) ([]byte, bool) {
|
||
pc.mu.RLock() // 获取读锁,允许并发读取
|
||
defer pc.mu.RUnlock() // 确保函数退出时释放锁
|
||
|
||
if entry, exists := pc.cache[pageNo]; exists {
|
||
// 缓存命中:执行LRU更新和数据返回
|
||
|
||
// 将访问的页面移动到LRU链表头部
|
||
// 这是LRU算法的核心:最近访问的页面不会被淘汰
|
||
pc.lru.MoveToFront(entry.elem)
|
||
|
||
// 更新命中统计(用于计算缓存命中率)
|
||
pc.hits++
|
||
|
||
// 创建页面数据副本并返回
|
||
// 重要:返回副本而不是原始数据,防止外部修改影响缓存
|
||
pageCopy := make([]byte, len(entry.page))
|
||
copy(pageCopy, entry.page)
|
||
return pageCopy, true
|
||
}
|
||
|
||
// 缓存未命中:更新统计并返回失败
|
||
pc.misses++
|
||
return nil, false
|
||
}
|
||
|
||
// Put 将页面数据放入缓存
|
||
//
|
||
// 核心算法:LRU缓存插入/更新
|
||
// 时间复杂度:O(1) - HashMap操作 + 双向链表操作
|
||
// 空间复杂度:O(1) - 创建页面数据副本
|
||
//
|
||
// 参数说明:
|
||
// - pageNo: 页面编号(16位,范围0-65535)
|
||
// - p: 页面数据(字节数组)
|
||
// - dirty: 脏页标记(true=页面已修改需要写回,false=页面与磁盘一致)
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 检查页面是否已存在:
|
||
// a. 如果存在:更新页面数据和脏页标记,移动到LRU头部
|
||
// b. 如果不存在:检查容量,必要时淘汰LRU页面,然后插入新页面
|
||
// 3. 所有操作都使用数据副本,确保缓存数据独立性
|
||
//
|
||
// 脏页处理:
|
||
// - 如果新数据标记为dirty,则页面变为脏页
|
||
// - 如果页面已经是脏页,则保持脏页状态(dirty标记具有粘性)
|
||
// - 脏页在淘汰时需要写回磁盘
|
||
//
|
||
// 并发安全:使用写锁保护,确保缓存状态一致性
|
||
func (pc *PageCache) Put(pageNo uint16, p []byte, dirty bool) {
|
||
pc.mu.Lock() // 获取写锁,独占访问缓存
|
||
defer pc.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 情况1:页面已存在,执行更新操作
|
||
if entry, exists := pc.cache[pageNo]; exists {
|
||
// 创建新的页面数据副本
|
||
// 重要:使用副本避免外部修改影响缓存
|
||
pageCopy := make([]byte, len(p))
|
||
copy(pageCopy, p)
|
||
|
||
// 更新页面数据
|
||
entry.page = pageCopy
|
||
|
||
// 更新脏页标记:一旦标记为脏页,就保持脏页状态
|
||
// 使用OR操作确保脏页标记的粘性(sticky)
|
||
entry.dirty = entry.dirty || dirty
|
||
|
||
// 将页面移动到LRU链表头部(标记为最近使用)
|
||
pc.lru.MoveToFront(entry.elem)
|
||
return
|
||
}
|
||
|
||
// 情况2:页面不存在,需要插入新页面
|
||
|
||
// 检查缓存容量,如果已满则淘汰最久未使用的页面
|
||
if len(pc.cache) >= pc.capacity {
|
||
pc.evictLRU()
|
||
}
|
||
|
||
// 创建页面数据副本
|
||
pageCopy := make([]byte, len(p))
|
||
copy(pageCopy, p)
|
||
|
||
// 创建新的缓存条目
|
||
entry := &cacheEntry{
|
||
pageNo: pageNo, // 设置页面编号
|
||
page: pageCopy, // 设置页面数据副本
|
||
dirty: dirty, // 设置脏页标记
|
||
}
|
||
|
||
// 将新条目添加到LRU链表头部(最近使用位置)
|
||
// 同时将链表元素指针保存到条目中,用于后续的O(1)操作
|
||
entry.elem = pc.lru.PushFront(entry)
|
||
|
||
// 将新条目添加到HashMap中,建立页号到条目的映射
|
||
pc.cache[pageNo] = entry
|
||
}
|
||
|
||
// evictLRU 淘汰最久未使用的页面
|
||
//
|
||
// 核心算法:LRU淘汰策略
|
||
// 时间复杂度:O(1) - 直接访问链表尾部
|
||
// 空间复杂度:O(1) - 只释放一个页面的空间
|
||
//
|
||
// 执行流程:
|
||
// 1. 检查LRU链表是否为空
|
||
// 2. 获取链表尾部元素(最久未使用的页面)
|
||
// 3. 检查页面是否为脏页:
|
||
// a. 如果是脏页:需要写回磁盘(当前实现中暂时跳过)
|
||
// b. 如果不是脏页:直接淘汰
|
||
// 4. 从HashMap和LRU链表中移除页面
|
||
//
|
||
// 脏页处理:
|
||
// - 脏页表示页面数据与磁盘不一致,淘汰前必须写回
|
||
// - 当前实现中暂时跳过写回操作(标记为TODO)
|
||
// - 生产环境中应该实现写回机制或者抛出错误
|
||
//
|
||
// 注意事项:
|
||
// - 该方法假设调用者已经持有写锁
|
||
// - 该方法不检查容量,由调用者确保需要淘汰
|
||
func (pc *PageCache) evictLRU() {
|
||
// 安全检查:如果链表为空,直接返回
|
||
if pc.lru.Len() == 0 {
|
||
return
|
||
}
|
||
|
||
// 获取LRU链表尾部元素(最久未使用的页面)
|
||
elem := pc.lru.Back()
|
||
if elem == nil {
|
||
return // 双重检查,防止并发问题
|
||
}
|
||
|
||
// 从链表元素中提取缓存条目
|
||
if elem.Value == nil {
|
||
return // 防止并发竞争导致的nil值
|
||
}
|
||
entry := elem.Value.(*cacheEntry)
|
||
pageNo := entry.pageNo
|
||
|
||
// 脏页处理:检查是否需要写回磁盘
|
||
if entry.dirty {
|
||
// TODO: 实现脏页写回机制
|
||
// 在生产环境中,这里应该:
|
||
// 1. 调用写回函数将页面数据写入磁盘
|
||
// 2. 处理写回失败的情况(重试或报错)
|
||
// 3. 只有写回成功后才能淘汰页面
|
||
//
|
||
// 示例实现:
|
||
// if err := pc.writeBackFunc(pageNo, entry.page); err != nil {
|
||
// // 处理写回失败,可能需要保留页面或记录错误
|
||
// return
|
||
// }
|
||
}
|
||
|
||
// 从HashMap中移除页面映射
|
||
delete(pc.cache, pageNo)
|
||
|
||
// 从LRU链表中移除页面节点
|
||
pc.lru.Remove(elem)
|
||
}
|
||
|
||
// Invalidate 从缓存中强制移除指定页面
|
||
//
|
||
// 使用场景:
|
||
// - 页面数据已知损坏,需要强制从缓存中移除
|
||
// - 外部直接修改了磁盘数据,缓存数据已过期
|
||
// - 内存压力大,需要主动释放特定页面
|
||
//
|
||
// 核心算法:直接删除
|
||
// 时间复杂度:O(1) - HashMap删除 + 链表删除
|
||
// 空间复杂度:O(1) - 释放一个页面的空间
|
||
//
|
||
// 参数说明:
|
||
// - pageNo: 要移除的页面编号
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 检查页面是否存在于缓存中
|
||
// 3. 如果存在:从HashMap和LRU链表中移除
|
||
// 4. 如果不存在:静默忽略(幂等操作)
|
||
//
|
||
// 注意事项:
|
||
// - 该操作不检查脏页状态,直接丢弃数据
|
||
// - 如果页面是脏页,未保存的修改将丢失
|
||
// - 适用于确定不需要保存数据的场景
|
||
//
|
||
// 并发安全:使用写锁保护,确保操作原子性
|
||
func (pc *PageCache) Invalidate(pageNo uint16) {
|
||
pc.mu.Lock() // 获取写锁,独占访问缓存
|
||
defer pc.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 检查页面是否存在于缓存中
|
||
if entry, exists := pc.cache[pageNo]; exists {
|
||
// 从HashMap中移除页面映射
|
||
delete(pc.cache, pageNo)
|
||
|
||
// 从LRU链表中移除页面节点
|
||
pc.lru.Remove(entry.elem)
|
||
|
||
// 注意:这里不检查脏页状态,直接丢弃
|
||
// 如果需要保护脏页,应该在删除前检查entry.dirty
|
||
}
|
||
// 如果页面不存在,静默忽略(幂等操作)
|
||
}
|
||
|
||
// Flush 将所有脏页刷新到磁盘
|
||
//
|
||
// 使用场景:
|
||
// - 数据库关闭前确保所有修改都已保存
|
||
// - 定期检查点,将内存中的修改持久化
|
||
// - 内存压力大,主动释放脏页占用的内存
|
||
//
|
||
// 核心算法:遍历所有缓存条目,写回脏页
|
||
// 时间复杂度:O(n) - n为缓存中的页面数量
|
||
// 空间复杂度:O(1) - 不创建额外的数据结构
|
||
//
|
||
// 参数说明:
|
||
// - writeFunc: 写回函数,由调用者提供具体的磁盘写入逻辑
|
||
// 函数签名:func(pageNo uint16, p []byte) error
|
||
// 参数:pageNo=页面编号,p=页面数据
|
||
// 返回:error=写入错误(nil表示成功)
|
||
//
|
||
// 返回值:
|
||
// - error: 第一个写回失败的错误(如果有)
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 遍历所有缓存条目
|
||
// 3. 对于每个脏页:
|
||
// a. 调用writeFunc写回磁盘
|
||
// b. 如果写回成功:清除脏页标记
|
||
// c. 如果写回失败:立即返回错误
|
||
// 4. 所有脏页写回成功后返回nil
|
||
//
|
||
// 错误处理:
|
||
// - 遇到第一个写回错误时立即停止并返回
|
||
// - 已成功写回的页面会清除脏页标记
|
||
// - 失败的页面保持脏页状态,可以稍后重试
|
||
//
|
||
// 并发安全:使用写锁保护,确保刷新过程中缓存状态一致
|
||
func (pc *PageCache) Flush(writeFunc func(pageNo uint16, p []byte) error) error {
|
||
pc.mu.Lock() // 获取写锁,独占访问缓存
|
||
defer pc.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 遍历所有缓存条目,查找脏页
|
||
for pageNo, entry := range pc.cache {
|
||
// 只处理脏页(已修改但未写回磁盘的页面)
|
||
if entry.dirty {
|
||
// 调用外部提供的写回函数
|
||
if err := writeFunc(pageNo, entry.page); err != nil {
|
||
// 写回失败:立即返回错误
|
||
// 注意:已成功写回的页面会保持clean状态
|
||
return err
|
||
}
|
||
|
||
// 写回成功:清除脏页标记
|
||
entry.dirty = false
|
||
}
|
||
}
|
||
|
||
// 所有脏页都成功写回
|
||
return nil
|
||
}
|
||
|
||
// Stats 获取缓存性能统计信息
|
||
//
|
||
// 用途:
|
||
// - 监控缓存性能和效果
|
||
// - 调优缓存大小和策略
|
||
// - 诊断性能问题
|
||
//
|
||
// 核心算法:简单的统计计算
|
||
// 时间复杂度:O(1) - 直接读取计数器
|
||
// 空间复杂度:O(1) - 不创建额外数据
|
||
//
|
||
// 返回值:
|
||
// - hits: 缓存命中次数(成功从缓存获取页面的次数)
|
||
// - misses: 缓存未命中次数(需要从磁盘加载页面的次数)
|
||
// - hitRate: 缓存命中率(hits / (hits + misses))
|
||
//
|
||
// 命中率解读:
|
||
// - 0.0 - 1.0:命中率范围
|
||
// - 0.9+:优秀的缓存效果
|
||
// - 0.7-0.9:良好的缓存效果
|
||
// - 0.5-0.7:一般的缓存效果
|
||
// - <0.5:较差的缓存效果,可能需要增加缓存大小
|
||
//
|
||
// 并发安全:使用读锁保护,支持并发读取统计信息
|
||
func (pc *PageCache) Stats() (hits int64, misses int64, hitRate float64) {
|
||
pc.mu.RLock() // 获取读锁,允许并发读取
|
||
defer pc.mu.RUnlock() // 确保函数退出时释放锁
|
||
|
||
// 读取命中和未命中计数
|
||
hits = pc.hits
|
||
misses = pc.misses
|
||
total := hits + misses
|
||
|
||
// 计算命中率,避免除零错误
|
||
if total > 0 {
|
||
hitRate = float64(hits) / float64(total)
|
||
} else {
|
||
hitRate = 0.0 // 没有访问记录时命中率为0
|
||
}
|
||
|
||
return hits, misses, hitRate
|
||
}
|
||
|
||
// Size 获取当前缓存中的页面数量
|
||
//
|
||
// 用途:
|
||
// - 监控缓存使用情况
|
||
// - 检查缓存是否接近容量上限
|
||
// - 内存使用量估算
|
||
//
|
||
// 核心算法:直接返回HashMap大小
|
||
// 时间复杂度:O(1) - 直接读取map长度
|
||
// 空间复杂度:O(1) - 不创建额外数据
|
||
//
|
||
// 返回值:
|
||
// - int: 当前缓存中的页面数量(0 <= size <= capacity)
|
||
//
|
||
// 使用示例:
|
||
// - 内存使用量 ≈ Size() * 页面大小
|
||
// - 缓存利用率 = Size() / Capacity()
|
||
//
|
||
// 并发安全:使用读锁保护,支持并发读取
|
||
func (pc *PageCache) Size() int {
|
||
pc.mu.RLock() // 获取读锁,允许并发读取
|
||
defer pc.mu.RUnlock() // 确保函数退出时释放锁
|
||
|
||
// 返回HashMap中的条目数量,即缓存的页面数
|
||
return len(pc.cache)
|
||
}
|
||
|
||
// Clear 清空整个缓存
|
||
//
|
||
// 使用场景:
|
||
// - 系统重启或重新初始化
|
||
// - 内存压力极大,需要释放所有缓存
|
||
// - 测试环境中重置缓存状态
|
||
// - 数据库结构发生重大变化
|
||
//
|
||
// 核心算法:重新初始化所有数据结构
|
||
// 时间复杂度:O(1) - 直接创建新的数据结构
|
||
// 空间复杂度:O(1) - 释放所有缓存数据
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 重新创建空的HashMap
|
||
// 3. 重新创建空的LRU链表
|
||
// 4. 重置所有统计计数器
|
||
//
|
||
// 注意事项:
|
||
// - 该操作会丢失所有脏页数据
|
||
// - 清空后所有页面访问都会导致缓存未命中
|
||
// - 适用于确定不需要保存任何缓存数据的场景
|
||
//
|
||
// 并发安全:使用写锁保护,确保清空操作原子性
|
||
func (pc *PageCache) Clear() {
|
||
pc.mu.Lock() // 获取写锁,独占访问缓存
|
||
defer pc.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 重新创建空的HashMap,释放所有页面数据
|
||
pc.cache = make(map[uint16]*cacheEntry)
|
||
|
||
// 重新创建空的LRU链表,释放所有链表节点
|
||
pc.lru = list.New()
|
||
|
||
// 重置统计计数器
|
||
pc.hits = 0
|
||
pc.misses = 0
|
||
|
||
// 注意:capacity保持不变,因为它是缓存的配置参数
|
||
}
|