// Package pipelinedb provides an integrated pipeline database system // 集成了数据库存储和业务管道处理的一体化解决方案 package pipelinedb import ( "container/list" "sync" ) // PageCache 实现了基于LRU(Least Recently Used)算法的页面缓存系统 // // 核心设计思想: // 1. 使用HashMap提供O(1)的页面查找性能 // 2. 使用双向链表维护页面的访问顺序(LRU顺序) // 3. 支持脏页标记和批量刷新机制 // 4. 提供线程安全的并发访问控制 // 5. 统计缓存命中率用于性能监控 // // 适用场景: // - 数据库页面缓存 // - 文件系统缓存 // - 任何需要LRU淘汰策略的缓存系统 type PageCache struct { capacity int // 缓存容量(最大页面数) cache map[uint16]*cacheEntry // 页号到缓存条目的映射,提供O(1)查找 lru *list.List // LRU双向链表,维护访问顺序(头部=最近使用,尾部=最久未使用) mu sync.RWMutex // 读写锁,支持多读单写的并发控制 hits int64 // 缓存命中次数统计 misses int64 // 缓存未命中次数统计 } // cacheEntry 缓存条目,存储单个页面的完整信息 // // 设计要点: // 1. pageNo: 页面编号,用于标识唯一页面 // 2. page: 页面数据的完整副本,避免外部修改影响缓存 // 3. dirty: 脏页标记,标识页面是否被修改过需要写回磁盘 // 4. elem: 指向LRU链表中对应节点的指针,用于O(1)时间复杂度的链表操作 type cacheEntry struct { pageNo uint16 // 页面编号(16位,支持65536个页面) page []byte // 页面数据副本(避免外部修改) dirty bool // 脏页标记(true=需要写回磁盘,false=与磁盘一致) elem *list.Element // LRU链表节点指针(用于快速移动和删除) } // NewPageCache 创建一个新的LRU页面缓存实例 // // 参数说明: // - capacity: 缓存容量,即最大可缓存的页面数量 // 建议根据可用内存和页面大小来设置,例如: // 可用内存100MB,页面大小4KB,则capacity可设为25600 // // 返回值: // - *PageCache: 初始化完成的页面缓存实例 // // 初始化内容: // 1. 设置缓存容量 // 2. 创建空的HashMap用于O(1)查找 // 3. 创建空的双向链表用于LRU排序 // 4. 统计计数器初始化为0(hits和misses) func NewPageCache(capacity int) *PageCache { return &PageCache{ capacity: capacity, // 设置最大缓存页面数 cache: make(map[uint16]*cacheEntry), // 初始化页号到缓存条目的映射 lru: list.New(), // 初始化LRU双向链表 // hits和misses会自动初始化为0 } } // Get 从缓存中获取指定页面的数据 // // 核心算法:LRU缓存查找 // 时间复杂度:O(1) - HashMap查找 + 双向链表移动 // 空间复杂度:O(1) - 只创建页面数据副本 // // 参数说明: // - pageNo: 要获取的页面编号(16位,范围0-65535) // // 返回值: // - []byte: 页面数据的副本(如果找到) // - bool: 是否在缓存中找到该页面 // // 执行流程: // 1. 获取读锁(支持并发读取) // 2. 在HashMap中查找页面 // 3. 如果找到: // a. 将页面移动到LRU链表头部(标记为最近使用) // b. 增加命中计数 // c. 创建并返回页面数据副本(防止外部修改) // 4. 如果未找到: // a. 增加未命中计数 // b. 返回nil和false // // 并发安全:使用读锁保护,支持多个goroutine同时读取 func (pc *PageCache) Get(pageNo uint16) ([]byte, bool) { pc.mu.RLock() // 获取读锁,允许并发读取 defer pc.mu.RUnlock() // 确保函数退出时释放锁 if entry, exists := pc.cache[pageNo]; exists { // 缓存命中:执行LRU更新和数据返回 // 将访问的页面移动到LRU链表头部 // 这是LRU算法的核心:最近访问的页面不会被淘汰 pc.lru.MoveToFront(entry.elem) // 更新命中统计(用于计算缓存命中率) pc.hits++ // 创建页面数据副本并返回 // 重要:返回副本而不是原始数据,防止外部修改影响缓存 pageCopy := make([]byte, len(entry.page)) copy(pageCopy, entry.page) return pageCopy, true } // 缓存未命中:更新统计并返回失败 pc.misses++ return nil, false } // Put 将页面数据放入缓存 // // 核心算法:LRU缓存插入/更新 // 时间复杂度:O(1) - HashMap操作 + 双向链表操作 // 空间复杂度:O(1) - 创建页面数据副本 // // 参数说明: // - pageNo: 页面编号(16位,范围0-65535) // - p: 页面数据(字节数组) // - dirty: 脏页标记(true=页面已修改需要写回,false=页面与磁盘一致) // // 执行流程: // 1. 获取写锁(独占访问) // 2. 检查页面是否已存在: // a. 如果存在:更新页面数据和脏页标记,移动到LRU头部 // b. 如果不存在:检查容量,必要时淘汰LRU页面,然后插入新页面 // 3. 所有操作都使用数据副本,确保缓存数据独立性 // // 脏页处理: // - 如果新数据标记为dirty,则页面变为脏页 // - 如果页面已经是脏页,则保持脏页状态(dirty标记具有粘性) // - 脏页在淘汰时需要写回磁盘 // // 并发安全:使用写锁保护,确保缓存状态一致性 func (pc *PageCache) Put(pageNo uint16, p []byte, dirty bool) { pc.mu.Lock() // 获取写锁,独占访问缓存 defer pc.mu.Unlock() // 确保函数退出时释放锁 // 情况1:页面已存在,执行更新操作 if entry, exists := pc.cache[pageNo]; exists { // 创建新的页面数据副本 // 重要:使用副本避免外部修改影响缓存 pageCopy := make([]byte, len(p)) copy(pageCopy, p) // 更新页面数据 entry.page = pageCopy // 更新脏页标记:一旦标记为脏页,就保持脏页状态 // 使用OR操作确保脏页标记的粘性(sticky) entry.dirty = entry.dirty || dirty // 将页面移动到LRU链表头部(标记为最近使用) pc.lru.MoveToFront(entry.elem) return } // 情况2:页面不存在,需要插入新页面 // 检查缓存容量,如果已满则淘汰最久未使用的页面 if len(pc.cache) >= pc.capacity { pc.evictLRU() } // 创建页面数据副本 pageCopy := make([]byte, len(p)) copy(pageCopy, p) // 创建新的缓存条目 entry := &cacheEntry{ pageNo: pageNo, // 设置页面编号 page: pageCopy, // 设置页面数据副本 dirty: dirty, // 设置脏页标记 } // 将新条目添加到LRU链表头部(最近使用位置) // 同时将链表元素指针保存到条目中,用于后续的O(1)操作 entry.elem = pc.lru.PushFront(entry) // 将新条目添加到HashMap中,建立页号到条目的映射 pc.cache[pageNo] = entry } // evictLRU 淘汰最久未使用的页面 // // 核心算法:LRU淘汰策略 // 时间复杂度:O(1) - 直接访问链表尾部 // 空间复杂度:O(1) - 只释放一个页面的空间 // // 执行流程: // 1. 检查LRU链表是否为空 // 2. 获取链表尾部元素(最久未使用的页面) // 3. 检查页面是否为脏页: // a. 如果是脏页:需要写回磁盘(当前实现中暂时跳过) // b. 如果不是脏页:直接淘汰 // 4. 从HashMap和LRU链表中移除页面 // // 脏页处理: // - 脏页表示页面数据与磁盘不一致,淘汰前必须写回 // - 当前实现中暂时跳过写回操作(标记为TODO) // - 生产环境中应该实现写回机制或者抛出错误 // // 注意事项: // - 该方法假设调用者已经持有写锁 // - 该方法不检查容量,由调用者确保需要淘汰 func (pc *PageCache) evictLRU() { // 安全检查:如果链表为空,直接返回 if pc.lru.Len() == 0 { return } // 获取LRU链表尾部元素(最久未使用的页面) elem := pc.lru.Back() if elem == nil { return // 双重检查,防止并发问题 } // 从链表元素中提取缓存条目 if elem.Value == nil { return // 防止并发竞争导致的nil值 } entry := elem.Value.(*cacheEntry) pageNo := entry.pageNo // 脏页处理:检查是否需要写回磁盘 if entry.dirty { // TODO: 实现脏页写回机制 // 在生产环境中,这里应该: // 1. 调用写回函数将页面数据写入磁盘 // 2. 处理写回失败的情况(重试或报错) // 3. 只有写回成功后才能淘汰页面 // // 示例实现: // if err := pc.writeBackFunc(pageNo, entry.page); err != nil { // // 处理写回失败,可能需要保留页面或记录错误 // return // } } // 从HashMap中移除页面映射 delete(pc.cache, pageNo) // 从LRU链表中移除页面节点 pc.lru.Remove(elem) } // Invalidate 从缓存中强制移除指定页面 // // 使用场景: // - 页面数据已知损坏,需要强制从缓存中移除 // - 外部直接修改了磁盘数据,缓存数据已过期 // - 内存压力大,需要主动释放特定页面 // // 核心算法:直接删除 // 时间复杂度:O(1) - HashMap删除 + 链表删除 // 空间复杂度:O(1) - 释放一个页面的空间 // // 参数说明: // - pageNo: 要移除的页面编号 // // 执行流程: // 1. 获取写锁(独占访问) // 2. 检查页面是否存在于缓存中 // 3. 如果存在:从HashMap和LRU链表中移除 // 4. 如果不存在:静默忽略(幂等操作) // // 注意事项: // - 该操作不检查脏页状态,直接丢弃数据 // - 如果页面是脏页,未保存的修改将丢失 // - 适用于确定不需要保存数据的场景 // // 并发安全:使用写锁保护,确保操作原子性 func (pc *PageCache) Invalidate(pageNo uint16) { pc.mu.Lock() // 获取写锁,独占访问缓存 defer pc.mu.Unlock() // 确保函数退出时释放锁 // 检查页面是否存在于缓存中 if entry, exists := pc.cache[pageNo]; exists { // 从HashMap中移除页面映射 delete(pc.cache, pageNo) // 从LRU链表中移除页面节点 pc.lru.Remove(entry.elem) // 注意:这里不检查脏页状态,直接丢弃 // 如果需要保护脏页,应该在删除前检查entry.dirty } // 如果页面不存在,静默忽略(幂等操作) } // Flush 将所有脏页刷新到磁盘 // // 使用场景: // - 数据库关闭前确保所有修改都已保存 // - 定期检查点,将内存中的修改持久化 // - 内存压力大,主动释放脏页占用的内存 // // 核心算法:遍历所有缓存条目,写回脏页 // 时间复杂度:O(n) - n为缓存中的页面数量 // 空间复杂度:O(1) - 不创建额外的数据结构 // // 参数说明: // - writeFunc: 写回函数,由调用者提供具体的磁盘写入逻辑 // 函数签名:func(pageNo uint16, p []byte) error // 参数:pageNo=页面编号,p=页面数据 // 返回:error=写入错误(nil表示成功) // // 返回值: // - error: 第一个写回失败的错误(如果有) // // 执行流程: // 1. 获取写锁(独占访问) // 2. 遍历所有缓存条目 // 3. 对于每个脏页: // a. 调用writeFunc写回磁盘 // b. 如果写回成功:清除脏页标记 // c. 如果写回失败:立即返回错误 // 4. 所有脏页写回成功后返回nil // // 错误处理: // - 遇到第一个写回错误时立即停止并返回 // - 已成功写回的页面会清除脏页标记 // - 失败的页面保持脏页状态,可以稍后重试 // // 并发安全:使用写锁保护,确保刷新过程中缓存状态一致 func (pc *PageCache) Flush(writeFunc func(pageNo uint16, p []byte) error) error { pc.mu.Lock() // 获取写锁,独占访问缓存 defer pc.mu.Unlock() // 确保函数退出时释放锁 // 遍历所有缓存条目,查找脏页 for pageNo, entry := range pc.cache { // 只处理脏页(已修改但未写回磁盘的页面) if entry.dirty { // 调用外部提供的写回函数 if err := writeFunc(pageNo, entry.page); err != nil { // 写回失败:立即返回错误 // 注意:已成功写回的页面会保持clean状态 return err } // 写回成功:清除脏页标记 entry.dirty = false } } // 所有脏页都成功写回 return nil } // Stats 获取缓存性能统计信息 // // 用途: // - 监控缓存性能和效果 // - 调优缓存大小和策略 // - 诊断性能问题 // // 核心算法:简单的统计计算 // 时间复杂度:O(1) - 直接读取计数器 // 空间复杂度:O(1) - 不创建额外数据 // // 返回值: // - hits: 缓存命中次数(成功从缓存获取页面的次数) // - misses: 缓存未命中次数(需要从磁盘加载页面的次数) // - hitRate: 缓存命中率(hits / (hits + misses)) // // 命中率解读: // - 0.0 - 1.0:命中率范围 // - 0.9+:优秀的缓存效果 // - 0.7-0.9:良好的缓存效果 // - 0.5-0.7:一般的缓存效果 // - <0.5:较差的缓存效果,可能需要增加缓存大小 // // 并发安全:使用读锁保护,支持并发读取统计信息 func (pc *PageCache) Stats() (hits int64, misses int64, hitRate float64) { pc.mu.RLock() // 获取读锁,允许并发读取 defer pc.mu.RUnlock() // 确保函数退出时释放锁 // 读取命中和未命中计数 hits = pc.hits misses = pc.misses total := hits + misses // 计算命中率,避免除零错误 if total > 0 { hitRate = float64(hits) / float64(total) } else { hitRate = 0.0 // 没有访问记录时命中率为0 } return hits, misses, hitRate } // Size 获取当前缓存中的页面数量 // // 用途: // - 监控缓存使用情况 // - 检查缓存是否接近容量上限 // - 内存使用量估算 // // 核心算法:直接返回HashMap大小 // 时间复杂度:O(1) - 直接读取map长度 // 空间复杂度:O(1) - 不创建额外数据 // // 返回值: // - int: 当前缓存中的页面数量(0 <= size <= capacity) // // 使用示例: // - 内存使用量 ≈ Size() * 页面大小 // - 缓存利用率 = Size() / Capacity() // // 并发安全:使用读锁保护,支持并发读取 func (pc *PageCache) Size() int { pc.mu.RLock() // 获取读锁,允许并发读取 defer pc.mu.RUnlock() // 确保函数退出时释放锁 // 返回HashMap中的条目数量,即缓存的页面数 return len(pc.cache) } // Clear 清空整个缓存 // // 使用场景: // - 系统重启或重新初始化 // - 内存压力极大,需要释放所有缓存 // - 测试环境中重置缓存状态 // - 数据库结构发生重大变化 // // 核心算法:重新初始化所有数据结构 // 时间复杂度:O(1) - 直接创建新的数据结构 // 空间复杂度:O(1) - 释放所有缓存数据 // // 执行流程: // 1. 获取写锁(独占访问) // 2. 重新创建空的HashMap // 3. 重新创建空的LRU链表 // 4. 重置所有统计计数器 // // 注意事项: // - 该操作会丢失所有脏页数据 // - 清空后所有页面访问都会导致缓存未命中 // - 适用于确定不需要保存任何缓存数据的场景 // // 并发安全:使用写锁保护,确保清空操作原子性 func (pc *PageCache) Clear() { pc.mu.Lock() // 获取写锁,独占访问缓存 defer pc.mu.Unlock() // 确保函数退出时释放锁 // 重新创建空的HashMap,释放所有页面数据 pc.cache = make(map[uint16]*cacheEntry) // 重新创建空的LRU链表,释放所有链表节点 pc.lru = list.New() // 重置统计计数器 pc.hits = 0 pc.misses = 0 // 注意:capacity保持不变,因为它是缓存的配置参数 }