// Package pipelinedb provides an integrated pipeline database system // 集成了数据库存储和业务管道处理的一体化解决方案 package pipelinedb import ( "encoding/binary" "sync" "github.com/google/btree" ) // IndexEntry B+Tree索引条目结构 // // ... // 核心功能: // - 存储记录ID到物理位置的映射关系 // - 实现btree.Item接口,支持B+Tree操作 // - 提供O(log n)的查找性能 // // 设计思想: // - ID作为主键,确保唯一性和有序性 // - PageNo和SlotNo组合定位记录的物理位置 // - 结构紧凑,减少内存占用 // // 物理位置编码: // - PageNo: 页面编号(16位,支持65536个页面) // - SlotNo: 页内槽位编号(16位,支持65536个槽位) // - 组合可定位约42亿个记录位置 // // 适用场景: // - 数据库主键索引 // - 记录快速定位 // - 范围查询支持 type IndexEntry struct { ID int64 // 记录的唯一标识符(主键) PageNo uint16 // 记录所在的页面编号 SlotNo uint16 // 记录在页面内的槽位编号 } // Less 实现btree.Item接口的比较方法 // // 核心功能:定义索引条目的排序规则,支持B+Tree的有序存储 // 时间复杂度:O(1) - 直接的整数比较 // 空间复杂度:O(1) - 不创建额外数据 // // 参数说明: // - b: 要比较的另一个btree.Item(必须是IndexEntry类型) // // 返回值: // - bool: true表示当前条目小于参数条目,false表示大于等于 // // 排序规则: // - 按记录ID升序排列 // - ID相同时认为相等(不小于) // - 支持B+Tree的二分查找和范围查询 // // 使用场景: // - B+Tree内部排序和查找 // - 范围查询时的边界确定 // - 索引条目的插入位置确定 // // 注意事项: // - 必须确保传入的参数是IndexEntry类型,否则会panic // - 该方法被B+Tree库频繁调用,性能要求高 func (a IndexEntry) Less(b btree.Item) bool { // 按记录ID进行升序比较 // 类型断言:将btree.Item转换为IndexEntry return a.ID < b.(IndexEntry).ID } // TableIndex 组级索引管理器(注意:虽然名称是TableIndex,但实际管理的是组索引) // // 核心职责: // 1. 维护单个组内所有记录的ID到物理位置的映射 // 2. 提供高效的记录查找、插入、删除操作 // 3. 支持范围查询和统计功能 // 4. 管理索引的生命周期 // // 设计思想: // - 使用B+Tree提供O(log n)的查找性能 // - 支持范围查询,适合数据库场景 // - 内存索引,提供极快的访问速度 // - 度数为32的B+Tree,平衡内存使用和性能 // // 性能特征: // - 查找:O(log n) - B+Tree查找 // - 插入:O(log n) - B+Tree插入 // - 删除:O(log n) - B+Tree删除 // - 范围查询:O(log n + k) - k为结果数量 // - 空间:O(n) - n为索引条目数量 // // 适用场景: // - 数据库表的主键索引 // - 记录快速定位系统 // - 支持范围查询的索引系统 type TableIndex struct { mu sync.RWMutex // 保护并发访问的读写锁 tree *btree.BTree // B+Tree实例,存储IndexEntry name string // 索引名称(通常是组名) } // NewGroupIndex 创建一个新的组索引实例 // // 核心功能:初始化一个空的B+Tree索引,用于管理单个组的记录映射 // 时间复杂度:O(1) - 只创建数据结构 // 空间复杂度:O(1) - 初始状态下只占用基础结构空间 // // 参数说明: // - groupName: 组名,用于标识和调试 // // 返回值: // - *TableIndex: 初始化完成的索引实例 // // B+Tree配置: // - 度数32:平衡内存使用和查找性能 // - 每个节点最多63个键(2*32-1) // - 树高度较低,查找效率高 // - 适合数据库索引场景 // // 使用示例: // // idx := NewGroupIndex("user_data") // idx.Insert(1001, 5, 10) // 插入记录映射 // pageNo, slotNo, found := idx.Get(1001) // 查找记录 func NewGroupIndex(groupName string) *TableIndex { return &TableIndex{ tree: btree.New(32), // 创建度数为32的B+Tree name: groupName, // 设置索引名称 } } // Insert 向索引中插入或更新记录映射 // // 核心功能:建立记录ID到物理位置的映射关系 // 时间复杂度:O(log n) - B+Tree插入操作 // 空间复杂度:O(1) - 只添加一个索引条目 // // 参数说明: // - id: 记录的唯一标识符 // - pageNo: 记录所在的页面编号 // - slotNo: 记录在页面内的槽位编号 // // 执行流程: // 1. 创建IndexEntry结构 // 2. 调用B+Tree的ReplaceOrInsert方法 // 3. 如果ID已存在,更新位置信息 // 4. 如果ID不存在,插入新的映射 // // 设计考虑: // - 使用ReplaceOrInsert支持位置更新 // - 自动维护B+Tree的平衡性 // - 保持索引的有序性 // 使用场景: // - 新记录插入时建立映射 // - 记录位置变更时更新映射 // - 索引重建过程 func (idx *TableIndex) Insert(id int64, pageNo, slotNo uint16) { idx.mu.Lock() defer idx.mu.Unlock() // 创建索引条目 entry := IndexEntry{ ID: id, // 记录ID PageNo: pageNo, // 页面编号 SlotNo: slotNo, // 槽位编号 } // 插入到B+Tree // 如果ID已存在,会自动替换旧条目 idx.tree.ReplaceOrInsert(entry) } // Delete 从索引中删除指定记录的映射 // // 核心功能:移除记录ID到物理位置的映射关系 // 时间复杂度:O(log n) - B+Tree删除操作 // 空间复杂度:O(1) - 释放一个索引条目的空间 // // 参数说明: // - id: 要删除的记录ID // // 返回值: // - bool: true表示删除成功,false表示记录不存在 // // 执行流程: // 1. 创建只包含ID的IndexEntry(用于查找) // 2. 调用B+Tree的Delete方法 // 3. 检查删除结果并返回状态 // // 设计考虑: // - 只需要ID即可定位要删除的条目 // - B+Tree自动维护删除后的平衡性 // - 返回删除状态便于错误处理 // // 使用场景: // - 记录物理删除时清理映射 // - 索引维护和清理 // - 数据迁移过程 func (idx *TableIndex) Delete(id int64) bool { idx.mu.Lock() defer idx.mu.Unlock() // 创建用于查找的索引条目(只需要ID) entry := IndexEntry{ID: id} // 从B+Tree中删除条目 // Delete返回被删除的项,如果为nil表示不存在 return idx.tree.Delete(entry) != nil } // Get 根据记录ID查找其物理位置 // // 核心功能:通过记录ID快速定位记录的物理存储位置 // 时间复杂度:O(log n) - B+Tree查找操作 // 空间复杂度:O(1) - 不创建额外数据 // // 参数说明: // - id: 要查找的记录ID // // 返回值: // - uint16: 页面编号(如果找到) // - uint16: 槽位编号(如果找到) // - bool: 是否找到记录(true=找到,false=不存在) // // 执行流程: // 1. 创建只包含ID的IndexEntry(用于查找) // 2. 调用B+Tree的Get方法查找 // 3. 如果找到,提取位置信息并返回 // 4. 如果未找到,返回零值和false // // 设计考虑: // - 返回三元组提供完整的查找结果 // - 零值返回便于调用者处理未找到的情况 // - 类型断言确保数据类型正确性 // // 使用场景: // - 记录读取前的位置查找 // - 记录更新前的位置确认 // - 数据完整性检查 // // 性能优势: // - O(log n)查找性能,远优于线性扫描 // - 内存索引,无磁盘I/O开销 // - B+Tree缓存友好,热点数据访问快 func (idx *TableIndex) Get(id int64) (uint16, uint16, bool) { idx.mu.RLock() defer idx.mu.RUnlock() // 创建用于查找的索引条目(只需要ID) entry := IndexEntry{ID: id} // 在B+Tree中查找对应的条目 if item := idx.tree.Get(entry); item != nil { // 找到记录,提取位置信息 found := item.(IndexEntry) return found.PageNo, found.SlotNo, true } // 未找到记录,返回零值和false return 0, 0, false } // Range 执行范围查询,遍历指定ID范围内的所有记录 // // 核心功能:按ID顺序遍历指定范围内的所有索引条目 // 时间复杂度:O(log n + k) - n为总条目数,k为结果数量 // 空间复杂度:O(1) - 不创建额外的数据结构 // // 参数说明: // - startID: 范围查询的起始ID(包含) // - endID: 范围查询的结束ID(包含) // - visitor: 访问者函数,对每个匹配的条目调用 // 函数签名:func(id int64, pageNo, slotNo uint16) bool // 返回值:true继续遍历,false停止遍历 // // 执行流程: // 1. 创建起始和结束的IndexEntry边界 // 2. 调用B+Tree的AscendRange方法 // 3. 对范围内每个条目调用visitor函数 // 4. 根据visitor返回值决定是否继续 // // 查询特性: // - 按ID升序遍历结果 // - 支持提前终止遍历 // - 包含边界值(闭区间查询) // - 利用B+Tree的有序性提供高效遍历 // // 使用场景: // - 批量数据处理 // - 数据导出和备份 // - 范围统计和分析 // - 分页查询的底层实现 // // 性能优势: // - O(log n)定位起始位置 // - 顺序遍历,缓存友好 // - 支持早期终止,避免不必要的遍历 func (idx *TableIndex) Range(startID, endID int64, visitor func(id int64, pageNo, slotNo uint16) bool) { idx.mu.RLock() defer idx.mu.RUnlock() // 创建范围查询的边界条目 start := IndexEntry{ID: startID} // 起始边界(包含) // 使用 AscendGreaterOrEqual 从起始点开始遍历 // 然后在回调中检查结束条件 idx.tree.AscendGreaterOrEqual(start, func(item btree.Item) bool { // 提取索引条目信息 entry := item.(IndexEntry) // 检查是否超出结束边界 if entry.ID > endID { return false // 停止遍历 } // 调用访问者函数,传递记录位置信息 // visitor返回false时停止遍历 return visitor(entry.ID, entry.PageNo, entry.SlotNo) }) } // Count 获取索引中的记录总数 // // 核心功能:返回当前索引中存储的记录条目数量 // 时间复杂度:O(1) - B+Tree维护内部计数器 // 空间复杂度:O(1) - 不创建额外数据 // // 返回值: // - int: 索引中的记录条目数量(>=0) // // 使用场景: // - 统计分析和报告 // - 容量规划和监控 // - 性能分析和调优 // - 数据完整性检查 // // 性能特征: // - 极快的O(1)查询性能 // - 不需要遍历索引结构 // - 适合频繁的统计查询 // // 使用示例: // // count := idx.Count() // fmt.Printf("组 %s 包含 %d 条记录\n", idx.name, count) func (idx *TableIndex) Count() int { idx.mu.RLock() defer idx.mu.RUnlock() // 直接返回B+Tree的长度 // B+Tree内部维护计数器,提供O(1)性能 return idx.tree.Len() } // Clear 清空索引中的所有条目 // // 核心功能:移除索引中的所有记录映射,重置为空状态 // 时间复杂度:O(n) - 需要释放所有节点 // 空间复杂度:O(1) - 释放所有索引占用的空间 // // 执行流程: // 1. 调用B+Tree的Clear方法 // 2. 释放所有索引节点的内存 // 3. 重置索引为空状态 // // 参数说明: // - false参数:表示不添加到空闲列表(B+Tree内部参数) // // 使用场景: // - 组数据完全清理时 // - 索引重建前的清空操作 // - 内存回收和资源清理 // - 测试环境的数据重置 // // 注意事项: // - 操作不可逆,清空后无法恢复 // - 不影响磁盘上的实际数据 // - 只清理内存中的索引映射 // - 清空后Count()返回0 // // 使用示例: // // idx.Clear() // 清空索引 // fmt.Printf("索引已清空,当前记录数:%d\n", idx.Count()) func (idx *TableIndex) Clear() { idx.mu.Lock() defer idx.mu.Unlock() // 清空B+Tree中的所有条目 // false参数表示不将节点添加到空闲列表 idx.tree.Clear(false) } // RebuildIndex 从页面链重建组索引 // // 核心功能:扫描页面链中的所有记录,重新构建内存索引 // 时间复杂度:O(n log n) - n为记录数量,每次插入O(log n) // 空间复杂度:O(n) - 需要存储所有记录的索引条目 // // 参数说明: // - groupName: 组名,用于创建新索引 // - rootPage: 页面链的根页面编号 // // 返回值: // - *TableIndex: 重建完成的索引实例 // - error: 重建过程中的错误(nil表示成功) // // 执行流程: // 1. 创建新的空索引 // 2. 从根页面开始遍历页面链 // 3. 扫描每个页面的所有槽位 // 4. 提取记录ID并建立索引映射 // 5. 跳过已删除和损坏的记录 // 6. 返回完整的索引 // // 错误处理: // - 页面读取失败时返回错误 // - 跳过损坏的记录,继续处理 // - 确保索引的完整性和一致性 // // 使用场景: // - 数据库启动时的索引恢复 // - 索引损坏后的重建操作 // - 数据迁移和修复过程 // // 性能考虑: // - 需要读取所有数据页面 // - 内存使用量与记录数成正比 // - 适合在维护窗口期执行 func (pdb *PipelineDB) RebuildIndex(groupName string, rootPage uint16) (*TableIndex, error) { // 创建新的空索引 idx := NewGroupIndex(groupName) // 从根页面开始遍历页面链 pageNo := rootPage for pageNo != 0 { // 读取当前页面 p, err := pdb.readPage(pageNo) if err != nil { return nil, err // 页面读取失败,返回错误 } // 获取页面的槽位数组 slots := p.slotArray() for slotNo, off := range slots { // 跳过已删除的槽位(偏移为0表示已删除) if off == 0 { continue } // 检查记录是否损坏(偏移超出页面范围) if int(off)+8 > PageSize { continue // 跳过损坏的记录 } // 从记录中提取ID(记录的前8字节) id := int64(binary.LittleEndian.Uint64(p[off:])) // 将记录ID和位置信息添加到索引中 idx.Insert(id, pageNo, uint16(slotNo)) } // 移动到链表中的下一个页面 pageNo = p.nextPage() } return idx, nil // 返回重建完成的索引 } // IndexManager 全局索引管理器 // // 核心职责: // 1. 管理所有组的索引实例 // 2. 提供索引的创建、获取、删除功能 // 3. 维护索引的生命周期 // 4. 提供索引统计信息 // // 设计思想: // - 使用HashMap管理多个组索引 // - 延迟创建索引,按需分配资源 // - 提供统一的索引管理接口 // - 支持索引的动态添加和删除 // // 性能特征: // - 索引查找:O(1) - HashMap查找 // - 内存使用:O(k) - k为组的数量 // - 支持大量组的高效管理 // // 适用场景: // - 多租户数据库系统 // - 分组数据管理 // - 索引集中管理 type IndexManager struct { mu sync.RWMutex // 保护并发访问的读写锁 indexes map[string]*TableIndex // 组名到索引实例的映射 } // NewIndexManager 创建一个新的索引管理器实例 // // 核心功能:初始化空的索引管理器 // 时间复杂度:O(1) - 只创建空的HashMap // 空间复杂度:O(1) - 初始状态不占用额外空间 // // 返回值: // - *IndexManager: 初始化完成的索引管理器 // // 使用示例: // // im := NewIndexManager() // idx := im.GetOrCreateIndex("user_data") func NewIndexManager() *IndexManager { return &IndexManager{ indexes: make(map[string]*TableIndex), // 初始化空的索引映射 } } // GetOrCreateIndex 获取或创建指定组的索引 // // 核心功能:延迟创建索引,按需分配资源 // 时间复杂度:O(1) - HashMap查找和插入 // 空间复杂度:O(1) - 可能创建一个新索引 // // 参数说明: // - groupName: 组名 // // 返回值: // - *TableIndex: 组对应的索引实例 // // 执行流程: // 1. 检查索引是否已存在 // 2. 如果存在,直接返回 // 3. 如果不存在,创建新索引并缓存 // 4. 返回索引实例 // // 设计优势: // - 延迟创建,节省内存 // - 自动管理索引生命周期 // - 线程安全的索引获取 func (im *IndexManager) GetOrCreateIndex(groupName string) *TableIndex { // 先尝试读锁快速检查 im.mu.RLock() if idx, exists := im.indexes[groupName]; exists { im.mu.RUnlock() return idx // 返回已存在的索引 } im.mu.RUnlock() // 需要创建新索引,获取写锁 im.mu.Lock() defer im.mu.Unlock() // 双重检查,防止在获取写锁期间其他goroutine已创建 if idx, exists := im.indexes[groupName]; exists { return idx // 返回已存在的索引 } // 创建新索引并缓存 idx := NewGroupIndex(groupName) im.indexes[groupName] = idx return idx } // GetIndex 获取指定组的索引(如果存在) // // 核心功能:查找已存在的索引,不创建新索引 // 时间复杂度:O(1) - HashMap查找 // 空间复杂度:O(1) - 不创建额外数据 // // 参数说明: // - groupName: 组名 // // 返回值: // - *TableIndex: 索引实例(如果存在) // - bool: 是否找到索引 // // 使用场景: // - 检查索引是否已创建 // - 只读访问现有索引 // - 避免意外创建索引 func (im *IndexManager) GetIndex(groupName string) (*TableIndex, bool) { im.mu.RLock() defer im.mu.RUnlock() idx, exists := im.indexes[groupName] return idx, exists } // DropIndex 删除指定组的索引 // // 核心功能:清理并删除组索引,释放相关资源 // 时间复杂度:O(n) - 需要清空索引中的所有条目 // 空间复杂度:O(1) - 释放索引占用的空间 // // 参数说明: // - groupName: 要删除的组名 // // 执行流程: // 1. 检查索引是否存在 // 2. 如果存在,先清空索引内容 // 3. 从管理器中删除索引引用 // 4. 释放相关资源 // // 使用场景: // - 组数据完全删除时 // - 内存回收和资源清理 // - 系统维护和重组 func (im *IndexManager) DropIndex(groupName string) { im.mu.Lock() defer im.mu.Unlock() // 检查索引是否存在 if idx, exists := im.indexes[groupName]; exists { // 清空索引内容,释放内存 idx.Clear() // 从管理器中删除索引引用 delete(im.indexes, groupName) } // 如果索引不存在,操作是幂等的(无副作用) } // GetStats 获取所有组的索引统计信息 // // 核心功能:收集所有组索引的记录数量统计 // 时间复杂度:O(k) - k为组的数量 // 空间复杂度:O(k) - 创建统计信息映射 // // 返回值: // - map[string]int: 组名到记录数量的映射 // // 使用场景: // - 系统监控和报告 // - 容量规划和分析 // - 性能调优和诊断 // // 使用示例: // // stats := im.GetStats() // for group, count := range stats { // fmt.Printf("组 %s: %d 条记录\n", group, count) // } func (im *IndexManager) GetStats() map[string]int { // 创建统计信息映射 stats := make(map[string]int) // 遍历所有索引,收集记录数量 for name, idx := range im.indexes { stats[name] = idx.Count() } return stats }