Files
pipelinedb/index.go
2025-09-30 15:05:56 +08:00

652 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package pipelinedb provides an integrated pipeline database system
// 集成了数据库存储和业务管道处理的一体化解决方案
package pipelinedb
import (
"encoding/binary"
"sync"
"github.com/google/btree"
)
// IndexEntry B+Tree索引条目结构
//
// ...
// 核心功能:
// - 存储记录ID到物理位置的映射关系
// - 实现btree.Item接口支持B+Tree操作
// - 提供O(log n)的查找性能
//
// 设计思想:
// - ID作为主键确保唯一性和有序性
// - PageNo和SlotNo组合定位记录的物理位置
// - 结构紧凑,减少内存占用
//
// 物理位置编码:
// - PageNo: 页面编号16位支持65536个页面
// - SlotNo: 页内槽位编号16位支持65536个槽位
// - 组合可定位约42亿个记录位置
//
// 适用场景:
// - 数据库主键索引
// - 记录快速定位
// - 范围查询支持
type IndexEntry struct {
ID int64 // 记录的唯一标识符(主键)
PageNo uint16 // 记录所在的页面编号
SlotNo uint16 // 记录在页面内的槽位编号
}
// Less 实现btree.Item接口的比较方法
//
// 核心功能定义索引条目的排序规则支持B+Tree的有序存储
// 时间复杂度O(1) - 直接的整数比较
// 空间复杂度O(1) - 不创建额外数据
//
// 参数说明:
// - b: 要比较的另一个btree.Item必须是IndexEntry类型
//
// 返回值:
// - bool: true表示当前条目小于参数条目false表示大于等于
//
// 排序规则:
// - 按记录ID升序排列
// - ID相同时认为相等不小于
// - 支持B+Tree的二分查找和范围查询
//
// 使用场景:
// - B+Tree内部排序和查找
// - 范围查询时的边界确定
// - 索引条目的插入位置确定
//
// 注意事项:
// - 必须确保传入的参数是IndexEntry类型否则会panic
// - 该方法被B+Tree库频繁调用性能要求高
func (a IndexEntry) Less(b btree.Item) bool {
// 按记录ID进行升序比较
// 类型断言将btree.Item转换为IndexEntry
return a.ID < b.(IndexEntry).ID
}
// TableIndex 组级索引管理器注意虽然名称是TableIndex但实际管理的是组索引
//
// 核心职责:
// 1. 维护单个组内所有记录的ID到物理位置的映射
// 2. 提供高效的记录查找、插入、删除操作
// 3. 支持范围查询和统计功能
// 4. 管理索引的生命周期
//
// 设计思想:
// - 使用B+Tree提供O(log n)的查找性能
// - 支持范围查询,适合数据库场景
// - 内存索引,提供极快的访问速度
// - 度数为32的B+Tree平衡内存使用和性能
//
// 性能特征:
// - 查找O(log n) - B+Tree查找
// - 插入O(log n) - B+Tree插入
// - 删除O(log n) - B+Tree删除
// - 范围查询O(log n + k) - k为结果数量
// - 空间O(n) - n为索引条目数量
//
// 适用场景:
// - 数据库表的主键索引
// - 记录快速定位系统
// - 支持范围查询的索引系统
type TableIndex struct {
mu sync.RWMutex // 保护并发访问的读写锁
tree *btree.BTree // B+Tree实例存储IndexEntry
name string // 索引名称(通常是组名)
}
// NewGroupIndex 创建一个新的组索引实例
//
// 核心功能初始化一个空的B+Tree索引用于管理单个组的记录映射
// 时间复杂度O(1) - 只创建数据结构
// 空间复杂度O(1) - 初始状态下只占用基础结构空间
//
// 参数说明:
// - groupName: 组名,用于标识和调试
//
// 返回值:
// - *TableIndex: 初始化完成的索引实例
//
// B+Tree配置
// - 度数32平衡内存使用和查找性能
// - 每个节点最多63个键2*32-1
// - 树高度较低,查找效率高
// - 适合数据库索引场景
//
// 使用示例:
//
// idx := NewGroupIndex("user_data")
// idx.Insert(1001, 5, 10) // 插入记录映射
// pageNo, slotNo, found := idx.Get(1001) // 查找记录
func NewGroupIndex(groupName string) *TableIndex {
return &TableIndex{
tree: btree.New(32), // 创建度数为32的B+Tree
name: groupName, // 设置索引名称
}
}
// Insert 向索引中插入或更新记录映射
//
// 核心功能建立记录ID到物理位置的映射关系
// 时间复杂度O(log n) - B+Tree插入操作
// 空间复杂度O(1) - 只添加一个索引条目
//
// 参数说明:
// - id: 记录的唯一标识符
// - pageNo: 记录所在的页面编号
// - slotNo: 记录在页面内的槽位编号
//
// 执行流程:
// 1. 创建IndexEntry结构
// 2. 调用B+Tree的ReplaceOrInsert方法
// 3. 如果ID已存在更新位置信息
// 4. 如果ID不存在插入新的映射
//
// 设计考虑:
// - 使用ReplaceOrInsert支持位置更新
// - 自动维护B+Tree的平衡性
// - 保持索引的有序性
// 使用场景:
// - 新记录插入时建立映射
// - 记录位置变更时更新映射
// - 索引重建过程
func (idx *TableIndex) Insert(id int64, pageNo, slotNo uint16) {
idx.mu.Lock()
defer idx.mu.Unlock()
// 创建索引条目
entry := IndexEntry{
ID: id, // 记录ID
PageNo: pageNo, // 页面编号
SlotNo: slotNo, // 槽位编号
}
// 插入到B+Tree
// 如果ID已存在会自动替换旧条目
idx.tree.ReplaceOrInsert(entry)
}
// Delete 从索引中删除指定记录的映射
//
// 核心功能移除记录ID到物理位置的映射关系
// 时间复杂度O(log n) - B+Tree删除操作
// 空间复杂度O(1) - 释放一个索引条目的空间
//
// 参数说明:
// - id: 要删除的记录ID
//
// 返回值:
// - bool: true表示删除成功false表示记录不存在
//
// 执行流程:
// 1. 创建只包含ID的IndexEntry用于查找
// 2. 调用B+Tree的Delete方法
// 3. 检查删除结果并返回状态
//
// 设计考虑:
// - 只需要ID即可定位要删除的条目
// - B+Tree自动维护删除后的平衡性
// - 返回删除状态便于错误处理
//
// 使用场景:
// - 记录物理删除时清理映射
// - 索引维护和清理
// - 数据迁移过程
func (idx *TableIndex) Delete(id int64) bool {
idx.mu.Lock()
defer idx.mu.Unlock()
// 创建用于查找的索引条目只需要ID
entry := IndexEntry{ID: id}
// 从B+Tree中删除条目
// Delete返回被删除的项如果为nil表示不存在
return idx.tree.Delete(entry) != nil
}
// Get 根据记录ID查找其物理位置
//
// 核心功能通过记录ID快速定位记录的物理存储位置
// 时间复杂度O(log n) - B+Tree查找操作
// 空间复杂度O(1) - 不创建额外数据
//
// 参数说明:
// - id: 要查找的记录ID
//
// 返回值:
// - uint16: 页面编号(如果找到)
// - uint16: 槽位编号(如果找到)
// - bool: 是否找到记录true=找到false=不存在)
//
// 执行流程:
// 1. 创建只包含ID的IndexEntry用于查找
// 2. 调用B+Tree的Get方法查找
// 3. 如果找到,提取位置信息并返回
// 4. 如果未找到返回零值和false
//
// 设计考虑:
// - 返回三元组提供完整的查找结果
// - 零值返回便于调用者处理未找到的情况
// - 类型断言确保数据类型正确性
//
// 使用场景:
// - 记录读取前的位置查找
// - 记录更新前的位置确认
// - 数据完整性检查
//
// 性能优势:
// - O(log n)查找性能,远优于线性扫描
// - 内存索引无磁盘I/O开销
// - B+Tree缓存友好热点数据访问快
func (idx *TableIndex) Get(id int64) (uint16, uint16, bool) {
idx.mu.RLock()
defer idx.mu.RUnlock()
// 创建用于查找的索引条目只需要ID
entry := IndexEntry{ID: id}
// 在B+Tree中查找对应的条目
if item := idx.tree.Get(entry); item != nil {
// 找到记录,提取位置信息
found := item.(IndexEntry)
return found.PageNo, found.SlotNo, true
}
// 未找到记录返回零值和false
return 0, 0, false
}
// Range 执行范围查询遍历指定ID范围内的所有记录
//
// 核心功能按ID顺序遍历指定范围内的所有索引条目
// 时间复杂度O(log n + k) - n为总条目数k为结果数量
// 空间复杂度O(1) - 不创建额外的数据结构
//
// 参数说明:
// - startID: 范围查询的起始ID包含
// - endID: 范围查询的结束ID包含
// - visitor: 访问者函数,对每个匹配的条目调用
// 函数签名func(id int64, pageNo, slotNo uint16) bool
// 返回值true继续遍历false停止遍历
//
// 执行流程:
// 1. 创建起始和结束的IndexEntry边界
// 2. 调用B+Tree的AscendRange方法
// 3. 对范围内每个条目调用visitor函数
// 4. 根据visitor返回值决定是否继续
//
// 查询特性:
// - 按ID升序遍历结果
// - 支持提前终止遍历
// - 包含边界值(闭区间查询)
// - 利用B+Tree的有序性提供高效遍历
//
// 使用场景:
// - 批量数据处理
// - 数据导出和备份
// - 范围统计和分析
// - 分页查询的底层实现
//
// 性能优势:
// - O(log n)定位起始位置
// - 顺序遍历,缓存友好
// - 支持早期终止,避免不必要的遍历
func (idx *TableIndex) Range(startID, endID int64, visitor func(id int64, pageNo, slotNo uint16) bool) {
idx.mu.RLock()
defer idx.mu.RUnlock()
// 创建范围查询的边界条目
start := IndexEntry{ID: startID} // 起始边界(包含)
// 使用 AscendGreaterOrEqual 从起始点开始遍历
// 然后在回调中检查结束条件
idx.tree.AscendGreaterOrEqual(start, func(item btree.Item) bool {
// 提取索引条目信息
entry := item.(IndexEntry)
// 检查是否超出结束边界
if entry.ID > endID {
return false // 停止遍历
}
// 调用访问者函数,传递记录位置信息
// visitor返回false时停止遍历
return visitor(entry.ID, entry.PageNo, entry.SlotNo)
})
}
// Count 获取索引中的记录总数
//
// 核心功能:返回当前索引中存储的记录条目数量
// 时间复杂度O(1) - B+Tree维护内部计数器
// 空间复杂度O(1) - 不创建额外数据
//
// 返回值:
// - int: 索引中的记录条目数量(>=0
//
// 使用场景:
// - 统计分析和报告
// - 容量规划和监控
// - 性能分析和调优
// - 数据完整性检查
//
// 性能特征:
// - 极快的O(1)查询性能
// - 不需要遍历索引结构
// - 适合频繁的统计查询
//
// 使用示例:
//
// count := idx.Count()
// fmt.Printf("组 %s 包含 %d 条记录\n", idx.name, count)
func (idx *TableIndex) Count() int {
idx.mu.RLock()
defer idx.mu.RUnlock()
// 直接返回B+Tree的长度
// B+Tree内部维护计数器提供O(1)性能
return idx.tree.Len()
}
// Clear 清空索引中的所有条目
//
// 核心功能:移除索引中的所有记录映射,重置为空状态
// 时间复杂度O(n) - 需要释放所有节点
// 空间复杂度O(1) - 释放所有索引占用的空间
//
// 执行流程:
// 1. 调用B+Tree的Clear方法
// 2. 释放所有索引节点的内存
// 3. 重置索引为空状态
//
// 参数说明:
// - false参数表示不添加到空闲列表B+Tree内部参数
//
// 使用场景:
// - 组数据完全清理时
// - 索引重建前的清空操作
// - 内存回收和资源清理
// - 测试环境的数据重置
//
// 注意事项:
// - 操作不可逆,清空后无法恢复
// - 不影响磁盘上的实际数据
// - 只清理内存中的索引映射
// - 清空后Count()返回0
//
// 使用示例:
//
// idx.Clear() // 清空索引
// fmt.Printf("索引已清空,当前记录数:%d\n", idx.Count())
func (idx *TableIndex) Clear() {
idx.mu.Lock()
defer idx.mu.Unlock()
// 清空B+Tree中的所有条目
// false参数表示不将节点添加到空闲列表
idx.tree.Clear(false)
}
// RebuildIndex 从页面链重建组索引
//
// 核心功能:扫描页面链中的所有记录,重新构建内存索引
// 时间复杂度O(n log n) - n为记录数量每次插入O(log n)
// 空间复杂度O(n) - 需要存储所有记录的索引条目
//
// 参数说明:
// - groupName: 组名,用于创建新索引
// - rootPage: 页面链的根页面编号
//
// 返回值:
// - *TableIndex: 重建完成的索引实例
// - error: 重建过程中的错误nil表示成功
//
// 执行流程:
// 1. 创建新的空索引
// 2. 从根页面开始遍历页面链
// 3. 扫描每个页面的所有槽位
// 4. 提取记录ID并建立索引映射
// 5. 跳过已删除和损坏的记录
// 6. 返回完整的索引
//
// 错误处理:
// - 页面读取失败时返回错误
// - 跳过损坏的记录,继续处理
// - 确保索引的完整性和一致性
//
// 使用场景:
// - 数据库启动时的索引恢复
// - 索引损坏后的重建操作
// - 数据迁移和修复过程
//
// 性能考虑:
// - 需要读取所有数据页面
// - 内存使用量与记录数成正比
// - 适合在维护窗口期执行
func (pdb *PipelineDB) RebuildIndex(groupName string, rootPage uint16) (*TableIndex, error) {
// 创建新的空索引
idx := NewGroupIndex(groupName)
// 从根页面开始遍历页面链
pageNo := rootPage
for pageNo != 0 {
// 读取当前页面
p, err := pdb.readPage(pageNo)
if err != nil {
return nil, err // 页面读取失败,返回错误
}
// 获取页面的槽位数组
slots := p.slotArray()
for slotNo, off := range slots {
// 跳过已删除的槽位偏移为0表示已删除
if off == 0 {
continue
}
// 检查记录是否损坏(偏移超出页面范围)
if int(off)+8 > PageSize {
continue // 跳过损坏的记录
}
// 从记录中提取ID记录的前8字节
id := int64(binary.LittleEndian.Uint64(p[off:]))
// 将记录ID和位置信息添加到索引中
idx.Insert(id, pageNo, uint16(slotNo))
}
// 移动到链表中的下一个页面
pageNo = p.nextPage()
}
return idx, nil // 返回重建完成的索引
}
// IndexManager 全局索引管理器
//
// 核心职责:
// 1. 管理所有组的索引实例
// 2. 提供索引的创建、获取、删除功能
// 3. 维护索引的生命周期
// 4. 提供索引统计信息
//
// 设计思想:
// - 使用HashMap管理多个组索引
// - 延迟创建索引,按需分配资源
// - 提供统一的索引管理接口
// - 支持索引的动态添加和删除
//
// 性能特征:
// - 索引查找O(1) - HashMap查找
// - 内存使用O(k) - k为组的数量
// - 支持大量组的高效管理
//
// 适用场景:
// - 多租户数据库系统
// - 分组数据管理
// - 索引集中管理
type IndexManager struct {
mu sync.RWMutex // 保护并发访问的读写锁
indexes map[string]*TableIndex // 组名到索引实例的映射
}
// NewIndexManager 创建一个新的索引管理器实例
//
// 核心功能:初始化空的索引管理器
// 时间复杂度O(1) - 只创建空的HashMap
// 空间复杂度O(1) - 初始状态不占用额外空间
//
// 返回值:
// - *IndexManager: 初始化完成的索引管理器
//
// 使用示例:
//
// im := NewIndexManager()
// idx := im.GetOrCreateIndex("user_data")
func NewIndexManager() *IndexManager {
return &IndexManager{
indexes: make(map[string]*TableIndex), // 初始化空的索引映射
}
}
// GetOrCreateIndex 获取或创建指定组的索引
//
// 核心功能:延迟创建索引,按需分配资源
// 时间复杂度O(1) - HashMap查找和插入
// 空间复杂度O(1) - 可能创建一个新索引
//
// 参数说明:
// - groupName: 组名
//
// 返回值:
// - *TableIndex: 组对应的索引实例
//
// 执行流程:
// 1. 检查索引是否已存在
// 2. 如果存在,直接返回
// 3. 如果不存在,创建新索引并缓存
// 4. 返回索引实例
//
// 设计优势:
// - 延迟创建,节省内存
// - 自动管理索引生命周期
// - 线程安全的索引获取
func (im *IndexManager) GetOrCreateIndex(groupName string) *TableIndex {
// 先尝试读锁快速检查
im.mu.RLock()
if idx, exists := im.indexes[groupName]; exists {
im.mu.RUnlock()
return idx // 返回已存在的索引
}
im.mu.RUnlock()
// 需要创建新索引,获取写锁
im.mu.Lock()
defer im.mu.Unlock()
// 双重检查防止在获取写锁期间其他goroutine已创建
if idx, exists := im.indexes[groupName]; exists {
return idx // 返回已存在的索引
}
// 创建新索引并缓存
idx := NewGroupIndex(groupName)
im.indexes[groupName] = idx
return idx
}
// GetIndex 获取指定组的索引(如果存在)
//
// 核心功能:查找已存在的索引,不创建新索引
// 时间复杂度O(1) - HashMap查找
// 空间复杂度O(1) - 不创建额外数据
//
// 参数说明:
// - groupName: 组名
//
// 返回值:
// - *TableIndex: 索引实例(如果存在)
// - bool: 是否找到索引
//
// 使用场景:
// - 检查索引是否已创建
// - 只读访问现有索引
// - 避免意外创建索引
func (im *IndexManager) GetIndex(groupName string) (*TableIndex, bool) {
im.mu.RLock()
defer im.mu.RUnlock()
idx, exists := im.indexes[groupName]
return idx, exists
}
// DropIndex 删除指定组的索引
//
// 核心功能:清理并删除组索引,释放相关资源
// 时间复杂度O(n) - 需要清空索引中的所有条目
// 空间复杂度O(1) - 释放索引占用的空间
//
// 参数说明:
// - groupName: 要删除的组名
//
// 执行流程:
// 1. 检查索引是否存在
// 2. 如果存在,先清空索引内容
// 3. 从管理器中删除索引引用
// 4. 释放相关资源
//
// 使用场景:
// - 组数据完全删除时
// - 内存回收和资源清理
// - 系统维护和重组
func (im *IndexManager) DropIndex(groupName string) {
im.mu.Lock()
defer im.mu.Unlock()
// 检查索引是否存在
if idx, exists := im.indexes[groupName]; exists {
// 清空索引内容,释放内存
idx.Clear()
// 从管理器中删除索引引用
delete(im.indexes, groupName)
}
// 如果索引不存在,操作是幂等的(无副作用)
}
// GetStats 获取所有组的索引统计信息
//
// 核心功能:收集所有组索引的记录数量统计
// 时间复杂度O(k) - k为组的数量
// 空间复杂度O(k) - 创建统计信息映射
//
// 返回值:
// - map[string]int: 组名到记录数量的映射
//
// 使用场景:
// - 系统监控和报告
// - 容量规划和分析
// - 性能调优和诊断
//
// 使用示例:
//
// stats := im.GetStats()
// for group, count := range stats {
// fmt.Printf("组 %s: %d 条记录\n", group, count)
// }
func (im *IndexManager) GetStats() map[string]int {
// 创建统计信息映射
stats := make(map[string]int)
// 遍历所有索引,收集记录数量
for name, idx := range im.indexes {
stats[name] = idx.Count()
}
return stats
}