// Package pipelinedb provides an integrated pipeline database system // 集成了数据库存储和业务管道处理的一体化解决方案 package pipelinedb import ( "encoding/binary" "sync" ) // GroupStatsCache 组统计信息缓存结构 // // 核心功能: // - 缓存每个组的统计信息,提供O(1)的统计查询性能 // - 支持按数据状态分类统计(Hot/Warm/Cold) // - 实时更新,确保统计数据的准确性 // // 设计考虑: // - 使用整数类型提供高效的计数操作 // - 支持JSON序列化,便于数据交换和持久化 // - 结构简单,内存占用小 // // 适用场景: // - 数据库性能监控 // - 组级别的数据分析 // - 缓存命中率统计 type GroupStatsCache struct { TotalRecords int `json:"total_records"` // 总记录数(所有状态的记录总和) HotRecords int `json:"hot_records"` // 热数据记录数(最新、最活跃的数据) WarmRecords int `json:"warm_records"` // 温数据记录数(中等活跃度的数据) ColdRecords int `json:"cold_records"` // 冷数据记录数(较少访问的数据) } // GroupManager 数据组管理器 // // 核心职责: // 1. 管理数据组的生命周期(暂停/恢复) // 2. 维护组级别的ID自增计数器 // 3. 缓存组统计信息,提供高性能统计查询 // 4. 管理组的OnComplete回调执行状态 // 5. 提供组状态的持久化存储 // // 设计思想: // - 使用读写锁支持高并发访问 // - 统计信息缓存在内存中,避免频繁的磁盘I/O // - ID计数器持久化到专用页面,确保重启后的连续性 // - 状态管理支持复杂的业务流程控制 // // 性能特征: // - 统计查询:O(1) - 直接从内存缓存读取 // - ID生成:O(1) - 内存计数器自增 // - 状态检查:O(1) - HashMap查找 // - 并发友好:读多写少的场景下性能优异 // // 适用场景: // - 多租户数据库系统 // - 分组数据处理管道 // - 需要组级别控制的业务系统 type GroupManager struct { pausedGroups map[string]bool // 暂停状态的组集合(key=组名,value=true表示暂停) onCompleteExecuting map[string]bool // 正在执行OnComplete回调的组集合 groupCounters map[string]*int64 // 每个组的ID自增计数器(持久化到磁盘) groupStats map[string]*GroupStatsCache // 组统计信息缓存(内存中维护) mu sync.RWMutex // 读写锁,保护所有内部状态的并发访问 pdb *PipelineDB // 数据库实例引用,用于计数器持久化 } // NewGroupManager 创建一个新的组管理器实例 // // 核心功能:初始化组管理器的所有内部数据结构 // 时间复杂度:O(1) - 只创建空的数据结构 // 空间复杂度:O(1) - 初始状态下不占用额外空间 // // 参数说明: // - pdb: PipelineDB实例引用,用于ID计数器的持久化存储 // 如果为nil,计数器功能将不可用,但其他功能正常 // // 返回值: // - *GroupManager: 完全初始化的组管理器实例 // // 初始化内容: // 1. 创建空的暂停组映射 // 2. 创建空的OnComplete执行状态映射 // 3. 创建空的组计数器映射 // 4. 创建空的组统计缓存映射 // 5. 设置数据库引用 // // 使用示例: // // gm := NewGroupManager(pipelineDB) // gm.PauseGroup("batch1") // 暂停组 // id := gm.GetNextID("user") // 获取ID func NewGroupManager(pdb *PipelineDB) *GroupManager { return &GroupManager{ pausedGroups: make(map[string]bool), // 初始化暂停组映射 onCompleteExecuting: make(map[string]bool), // 初始化OnComplete执行状态映射 groupCounters: make(map[string]*int64), // 初始化组计数器映射 groupStats: make(map[string]*GroupStatsCache), // 初始化组统计缓存映射 pdb: pdb, // 设置数据库引用 // mu 会自动初始化为零值(未锁定状态) } } // PauseGroup 暂停指定组的数据接收功能 // // 核心功能:将组标记为暂停状态,阻止新数据的接收 // 时间复杂度:O(1) - HashMap插入操作 // 空间复杂度:O(1) - 只添加一个映射条目 // // 参数说明: // - group: 要暂停的组名 // // 执行流程: // 1. 获取写锁(独占访问) // 2. 将组添加到暂停组映射中 // 3. 输出暂停确认信息 // // 使用场景: // - 维护期间暂停特定组的数据处理 // - 数据质量问题时临时停止数据接收 // - 系统负载过高时选择性暂停部分组 // - 业务流程控制需要 // // 注意事项: // - 暂停后该组无法接收新数据,但已有数据继续处理 // - 可以通过ResumeGroup恢复 // - 暂停状态不会持久化,重启后会丢失 // // 并发安全:使用写锁保护,确保状态修改的原子性 func (gm *GroupManager) PauseGroup(group string) { gm.mu.Lock() // 获取写锁,独占访问 defer gm.mu.Unlock() // 确保函数退出时释放锁 // 将组标记为暂停状态 gm.pausedGroups[group] = true // 输出暂停确认信息(便于监控和调试) gm.pdb.logger.Info("⏸️ 组已暂停接收数据", "group", group) } // ResumeGroup 恢复指定组的数据接收功能 // // 核心功能:将组从暂停状态中移除,恢复正常的数据接收 // 时间复杂度:O(1) - HashMap删除操作 // 空间复杂度:O(1) - 释放一个映射条目的空间 // // 参数说明: // - group: 要恢复的组名 // // 执行流程: // 1. 获取写锁(独占访问) // 2. 从暂停组映射中删除该组 // 3. 输出恢复确认信息 // // 使用场景: // - 维护完成后恢复组的正常运行 // - 数据质量问题解决后重新启用组 // - 系统负载降低后恢复暂停的组 // - 业务流程需要重新激活组 // // 注意事项: // - 恢复后该组立即可以接收新数据 // - 如果组本来就没有暂停,操作是幂等的(无副作用) // - 恢复状态不会持久化,重启后默认为活跃状态 // // 并发安全:使用写锁保护,确保状态修改的原子性 func (gm *GroupManager) ResumeGroup(group string) { gm.mu.Lock() // 获取写锁,独占访问 defer gm.mu.Unlock() // 确保函数退出时释放锁 // 从暂停组映射中删除该组(恢复活跃状态) delete(gm.pausedGroups, group) // 输出恢复确认信息(便于监控和调试) gm.pdb.logger.Info("▶️ 组已恢复接收数据", "group", group) } // IsPaused 检查指定组是否处于暂停状态 // // 核心功能:查询组的暂停状态,用于数据接收前的状态检查 // 时间复杂度:O(1) - HashMap查找操作 // 空间复杂度:O(1) - 不创建额外数据 // // 参数说明: // - group: 要检查的组名 // // 返回值: // - bool: true表示组已暂停,false表示组处于活跃状态 // // 执行流程: // 1. 获取读锁(允许并发读取) // 2. 在暂停组映射中查找该组 // 3. 返回查找结果 // // 使用场景: // - 数据接收前的状态验证 // - 业务逻辑中的条件判断 // - 监控和状态报告 // - 自动化运维脚本 // // 设计考虑: // - 使用读锁支持高并发查询 // - HashMap查找提供O(1)性能 // - 不存在的组默认返回false(活跃状态) // // 并发安全:使用读锁保护,支持多线程并发调用 func (gm *GroupManager) IsPaused(group string) bool { gm.mu.RLock() // 获取读锁,允许并发读取 defer gm.mu.RUnlock() // 确保函数退出时释放锁 // 在暂停组映射中查找该组 // 如果组不存在,返回false(默认为活跃状态) return gm.pausedGroups[group] } // GetPausedGroups 获取所有暂停的组 func (gm *GroupManager) GetPausedGroups() []string { gm.mu.RLock() defer gm.mu.RUnlock() var groups []string for group := range gm.pausedGroups { groups = append(groups, group) } return groups } // GetGroupStatus 获取组状态 func (gm *GroupManager) GetGroupStatus(group string) string { if gm.IsPaused(group) { return "paused" } return "active" } // IsOnCompleteExecuting 检查组是否正在执行 OnComplete func (gm *GroupManager) IsOnCompleteExecuting(group string) bool { gm.mu.RLock() defer gm.mu.RUnlock() return gm.onCompleteExecuting[group] } // SetOnCompleteExecuting 设置组的 OnComplete 执行状态 func (gm *GroupManager) SetOnCompleteExecuting(group string, executing bool) { gm.mu.Lock() defer gm.mu.Unlock() if executing { gm.onCompleteExecuting[group] = true gm.pdb.logger.Info("🔒 组 OnComplete 开始执行", "group", group) } else { delete(gm.onCompleteExecuting, group) gm.pdb.logger.Info("🔓 组 OnComplete 执行完成", "group", group) } } // GetExecutingGroups 获取所有正在执行 OnComplete 的组 func (gm *GroupManager) GetExecutingGroups() []string { gm.mu.RLock() defer gm.mu.RUnlock() var groups []string for group := range gm.onCompleteExecuting { groups = append(groups, group) } return groups } // GetNextID 为指定组生成下一个唯一的自增ID // // 核心功能:为每个组维护独立的ID序列,确保ID的唯一性和连续性 // 时间复杂度:O(1) - 内存计数器自增 + 持久化写入 // 空间复杂度:O(1) - 只维护单个计数器 // // 参数说明: // - group: 组名,每个组有独立的ID序列 // // 返回值: // - int64: 新生成的唯一ID(从1开始递增) // // 执行流程: // 1. 获取写锁(保证ID生成的原子性) // 2. 检查组计数器是否已加载: // a. 如果未加载:从持久化存储中加载计数器 // b. 如果已加载:直接使用内存中的计数器 // 3. 计数器自增1 // 4. 将新的计数器值持久化到磁盘 // 5. 返回新生成的ID // // 持久化机制: // - 计数器存储在专用的计数器页面中 // - 每次ID生成都会立即持久化,确保重启后的连续性 // - 页面格式:[组名长度][组名][计数器值]... // // 设计考虑: // - 每个组独立的ID序列,避免冲突 // - 立即持久化确保数据安全性 // - 内存缓存提高性能 // - 写锁保证并发安全 // // 使用场景: // - 数据记录的主键生成 // - 业务对象的唯一标识 // - 序列号生成 // // 并发安全:使用写锁保护,确保ID生成的唯一性 func (gm *GroupManager) GetNextID(group string) int64 { gm.mu.Lock() // 获取写锁,保证ID生成的原子性 defer gm.mu.Unlock() // 确保函数退出时释放锁 // 检查组的计数器是否已经加载到内存中 if gm.groupCounters[group] == nil { // 计数器未加载,从持久化存储中加载 counter := gm.loadCounterFromPage(group) gm.groupCounters[group] = &counter } // 计数器自增,生成新的ID *gm.groupCounters[group]++ newID := *gm.groupCounters[group] // 立即将新的计数器值持久化到磁盘 // 这确保了即使系统崩溃,ID序列也不会重复 gm.saveCounterToPage(group, newID) return newID // 返回新生成的唯一ID } // GetGroupCounter 获取组的当前计数器值 func (gm *GroupManager) GetGroupCounter(group string) int64 { gm.mu.RLock() defer gm.mu.RUnlock() if gm.groupCounters[group] == nil { return 0 } return *gm.groupCounters[group] } // loadCounterFromPage 从页面加载组计数器 func (gm *GroupManager) loadCounterFromPage(group string) int64 { if gm.pdb == nil || gm.pdb.header.CounterPage == 0 { return 0 // 如果没有计数器页面,返回0 } // 读取计数器页面 page, err := gm.pdb.readPage(gm.pdb.header.CounterPage) if err != nil { gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err) return 0 } // 在页面中查找组的计数器 // 页面格式:[组名长度(2字节)][组名][计数器(8字节)]... offset := uint16(0) for offset < uint16(len(page)-10) { // 至少需要2+1+8=11字节 // 读取组名长度 nameLen := binary.LittleEndian.Uint16(page[offset : offset+2]) if nameLen == 0 || offset+2+nameLen+8 > uint16(len(page)) { break } // 读取组名 name := string(page[offset+2 : offset+2+nameLen]) if name == group { // 找到了,读取计数器 counter := binary.LittleEndian.Uint64(page[offset+2+nameLen : offset+2+nameLen+8]) return int64(counter) } // 移动到下一个条目 offset += 2 + nameLen + 8 } return 0 // 没找到,返回0 } // saveCounterToPage 保存组计数器到页面 func (gm *GroupManager) saveCounterToPage(group string, counter int64) { if gm.pdb == nil { return } // 如果没有计数器页面,创建一个 if gm.pdb.header.CounterPage == 0 { gm.initCounterPage() } // 读取计数器页面 page, err := gm.pdb.readPage(gm.pdb.header.CounterPage) if err != nil { gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err) return } // 查找并更新组的计数器 offset := uint16(0) found := false for offset < uint16(len(page)-10) { // 读取组名长度 nameLen := binary.LittleEndian.Uint16(page[offset : offset+2]) if nameLen == 0 { break } if offset+2+nameLen+8 > uint16(len(page)) { break } // 读取组名 name := string(page[offset+2 : offset+2+nameLen]) if name == group { // 找到了,更新计数器 binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter)) found = true break } // 移动到下一个条目 offset += 2 + nameLen + 8 } // 如果没找到,添加新条目 if !found { nameLen := uint16(len(group)) if offset+2+nameLen+8 <= uint16(len(page)) { // 写入组名长度 binary.LittleEndian.PutUint16(page[offset:offset+2], nameLen) // 写入组名 copy(page[offset+2:offset+2+nameLen], []byte(group)) // 写入计数器 binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter)) } } // 写回页面 gm.pdb.cache.Put(gm.pdb.header.CounterPage, page, true) } // initCounterPage 初始化计数器页面 func (gm *GroupManager) initCounterPage() { if gm.pdb == nil { return } // 分配新页面 pageNo := gm.pdb.header.TotalPages gm.pdb.header.TotalPages++ gm.pdb.header.CounterPage = pageNo // 创建空页面 page := make([]byte, PageSize) // 写入页面 gm.pdb.cache.Put(pageNo, page, true) // 保存头部 gm.pdb.saveHeader() gm.pdb.logger.Info("📄 初始化计数器页面", "page", pageNo) } // IncrementStats 增加组的统计计数(新增记录时调用) // // 核心功能:当新记录添加到组时,更新该组的统计信息 // 时间复杂度:O(1) - 直接的计数器操作 // 空间复杂度:O(1) - 可能创建一个新的统计缓存结构 // // 参数说明: // - group: 组名 // - status: 新记录的数据状态(Hot/Warm/Cold) // // 执行流程: // 1. 获取写锁(保证统计更新的原子性) // 2. 确保组的统计缓存已初始化 // 3. 增加总记录数计数 // 4. 根据状态增加对应的状态计数 // // 统计维护: // - TotalRecords: 总记录数自增1 // - 对应状态计数: Hot/Warm/Cold计数自增1 // - 统计信息实时更新,无延迟 // // 使用场景: // - 新数据记录插入时 // - 数据导入过程中 // - 批量数据处理时 // // 并发安全:使用写锁保护,确保统计数据的一致性 func (gm *GroupManager) IncrementStats(group string, status DataStatus) { gm.mu.Lock() // 获取写锁,保证统计更新的原子性 defer gm.mu.Unlock() // 确保函数退出时释放锁 // 确保组的统计缓存已经初始化 if gm.groupStats[group] == nil { gm.groupStats[group] = &GroupStatsCache{} } // 增加总记录数 gm.groupStats[group].TotalRecords++ // 根据数据状态增加对应的计数 switch status { case StatusHot: gm.groupStats[group].HotRecords++ // 增加热数据计数 case StatusWarm: gm.groupStats[group].WarmRecords++ // 增加温数据计数 case StatusCold: gm.groupStats[group].ColdRecords++ // 增加冷数据计数 } } // UpdateStats 更新组统计信息(数据状态转换时调用) // // 核心功能:当记录的状态发生转换时,更新统计信息以保持准确性 // 时间复杂度:O(1) - 直接的计数器操作 // 空间复杂度:O(1) - 可能创建一个新的统计缓存结构 // // 参数说明: // - group: 组名 // - oldStatus: 记录的原始状态 // - newStatus: 记录的新状态 // // 执行流程: // 1. 获取写锁(保证统计更新的原子性) // 2. 确保组的统计缓存已初始化 // 3. 减少原状态的计数 // 4. 增加新状态的计数 // 5. 总记录数保持不变 // // 状态转换处理: // - Hot → Warm: HotRecords-1, WarmRecords+1 // - Warm → Cold: WarmRecords-1, ColdRecords+1 // - 支持任意状态间的转换 // - TotalRecords保持不变(只是状态分布改变) // // 使用场景: // - 数据生命周期管理(Hot→Warm→Cold) // - 缓存策略调整 // - 数据归档过程 // // 并发安全:使用写锁保护,确保统计数据的一致性 func (gm *GroupManager) UpdateStats(group string, oldStatus, newStatus DataStatus) { gm.mu.Lock() // 获取写锁,保证统计更新的原子性 defer gm.mu.Unlock() // 确保函数退出时释放锁 // 确保组的统计缓存已经初始化 if gm.groupStats[group] == nil { gm.groupStats[group] = &GroupStatsCache{} } // 减少原状态的计数 switch oldStatus { case StatusHot: gm.groupStats[group].HotRecords-- // 减少热数据计数 case StatusWarm: gm.groupStats[group].WarmRecords-- // 减少温数据计数 case StatusCold: gm.groupStats[group].ColdRecords-- // 减少冷数据计数 } // 增加新状态的计数 switch newStatus { case StatusHot: gm.groupStats[group].HotRecords++ // 增加热数据计数 case StatusWarm: gm.groupStats[group].WarmRecords++ // 增加温数据计数 case StatusCold: gm.groupStats[group].ColdRecords++ // 增加冷数据计数 } // 注意:TotalRecords保持不变,因为只是状态转换,记录总数没有变化 } // GetFastStats 快速获取所有组的统计信息(O(1)复杂度) // // 核心功能:从内存缓存中快速获取所有组的统计信息 // 时间复杂度:O(n) - n为组的数量(需要复制所有统计数据) // 空间复杂度:O(n) - 创建所有统计数据的副本 // // 返回值: // - map[string]*GroupStatsCache: 所有组的统计信息副本 // key为组名,value为该组的统计信息 // // 执行流程: // 1. 获取读锁(允许并发读取) // 2. 遍历所有组的统计缓存 // 3. 为每个组创建统计信息的副本 // 4. 返回完整的统计信息映射 // // 设计考虑: // - 返回副本而不是原始数据,防止外部修改 // - 使用读锁支持高并发查询 // - 内存缓存提供极快的查询性能 // - 避免磁盘I/O,适合频繁查询 // // 性能优势: // - 相比传统的磁盘扫描统计,性能提升数百倍 // - 支持高频率的监控查询 // - 实时性强,统计数据始终是最新的 // // 使用场景: // - 系统监控和仪表板 // - 性能分析和报告 // - 容量规划和预警 // - API接口的统计查询 // // 并发安全:使用读锁保护,支持多线程并发调用 func (gm *GroupManager) GetFastStats() map[string]*GroupStatsCache { gm.mu.RLock() // 获取读锁,允许并发读取 defer gm.mu.RUnlock() // 确保函数退出时释放锁 // 创建统计数据的完整副本 result := make(map[string]*GroupStatsCache) // 遍历所有组的统计缓存,创建副本 for group, stats := range gm.groupStats { result[group] = &GroupStatsCache{ TotalRecords: stats.TotalRecords, // 复制总记录数 HotRecords: stats.HotRecords, // 复制热数据记录数 WarmRecords: stats.WarmRecords, // 复制温数据记录数 ColdRecords: stats.ColdRecords, // 复制冷数据记录数 } } return result // 返回统计信息副本 }