Files
pipelinedb/group_manager.go
2025-09-30 15:05:56 +08:00

627 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package pipelinedb provides an integrated pipeline database system
// 集成了数据库存储和业务管道处理的一体化解决方案
package pipelinedb
import (
"encoding/binary"
"sync"
)
// GroupStatsCache 组统计信息缓存结构
//
// 核心功能:
// - 缓存每个组的统计信息提供O(1)的统计查询性能
// - 支持按数据状态分类统计Hot/Warm/Cold
// - 实时更新,确保统计数据的准确性
//
// 设计考虑:
// - 使用整数类型提供高效的计数操作
// - 支持JSON序列化便于数据交换和持久化
// - 结构简单,内存占用小
//
// 适用场景:
// - 数据库性能监控
// - 组级别的数据分析
// - 缓存命中率统计
type GroupStatsCache struct {
TotalRecords int `json:"total_records"` // 总记录数(所有状态的记录总和)
HotRecords int `json:"hot_records"` // 热数据记录数(最新、最活跃的数据)
WarmRecords int `json:"warm_records"` // 温数据记录数(中等活跃度的数据)
ColdRecords int `json:"cold_records"` // 冷数据记录数(较少访问的数据)
}
// GroupManager 数据组管理器
//
// 核心职责:
// 1. 管理数据组的生命周期(暂停/恢复)
// 2. 维护组级别的ID自增计数器
// 3. 缓存组统计信息,提供高性能统计查询
// 4. 管理组的OnComplete回调执行状态
// 5. 提供组状态的持久化存储
//
// 设计思想:
// - 使用读写锁支持高并发访问
// - 统计信息缓存在内存中避免频繁的磁盘I/O
// - ID计数器持久化到专用页面确保重启后的连续性
// - 状态管理支持复杂的业务流程控制
//
// 性能特征:
// - 统计查询O(1) - 直接从内存缓存读取
// - ID生成O(1) - 内存计数器自增
// - 状态检查O(1) - HashMap查找
// - 并发友好:读多写少的场景下性能优异
//
// 适用场景:
// - 多租户数据库系统
// - 分组数据处理管道
// - 需要组级别控制的业务系统
type GroupManager struct {
pausedGroups map[string]bool // 暂停状态的组集合key=组名value=true表示暂停
onCompleteExecuting map[string]bool // 正在执行OnComplete回调的组集合
groupCounters map[string]*int64 // 每个组的ID自增计数器持久化到磁盘
groupStats map[string]*GroupStatsCache // 组统计信息缓存(内存中维护)
mu sync.RWMutex // 读写锁,保护所有内部状态的并发访问
pdb *PipelineDB // 数据库实例引用,用于计数器持久化
}
// NewGroupManager 创建一个新的组管理器实例
//
// 核心功能:初始化组管理器的所有内部数据结构
// 时间复杂度O(1) - 只创建空的数据结构
// 空间复杂度O(1) - 初始状态下不占用额外空间
//
// 参数说明:
// - pdb: PipelineDB实例引用用于ID计数器的持久化存储
// 如果为nil计数器功能将不可用但其他功能正常
//
// 返回值:
// - *GroupManager: 完全初始化的组管理器实例
//
// 初始化内容:
// 1. 创建空的暂停组映射
// 2. 创建空的OnComplete执行状态映射
// 3. 创建空的组计数器映射
// 4. 创建空的组统计缓存映射
// 5. 设置数据库引用
//
// 使用示例:
//
// gm := NewGroupManager(pipelineDB)
// gm.PauseGroup("batch1") // 暂停组
// id := gm.GetNextID("user") // 获取ID
func NewGroupManager(pdb *PipelineDB) *GroupManager {
return &GroupManager{
pausedGroups: make(map[string]bool), // 初始化暂停组映射
onCompleteExecuting: make(map[string]bool), // 初始化OnComplete执行状态映射
groupCounters: make(map[string]*int64), // 初始化组计数器映射
groupStats: make(map[string]*GroupStatsCache), // 初始化组统计缓存映射
pdb: pdb, // 设置数据库引用
// mu 会自动初始化为零值(未锁定状态)
}
}
// PauseGroup 暂停指定组的数据接收功能
//
// 核心功能:将组标记为暂停状态,阻止新数据的接收
// 时间复杂度O(1) - HashMap插入操作
// 空间复杂度O(1) - 只添加一个映射条目
//
// 参数说明:
// - group: 要暂停的组名
//
// 执行流程:
// 1. 获取写锁(独占访问)
// 2. 将组添加到暂停组映射中
// 3. 输出暂停确认信息
//
// 使用场景:
// - 维护期间暂停特定组的数据处理
// - 数据质量问题时临时停止数据接收
// - 系统负载过高时选择性暂停部分组
// - 业务流程控制需要
//
// 注意事项:
// - 暂停后该组无法接收新数据,但已有数据继续处理
// - 可以通过ResumeGroup恢复
// - 暂停状态不会持久化,重启后会丢失
//
// 并发安全:使用写锁保护,确保状态修改的原子性
func (gm *GroupManager) PauseGroup(group string) {
gm.mu.Lock() // 获取写锁,独占访问
defer gm.mu.Unlock() // 确保函数退出时释放锁
// 将组标记为暂停状态
gm.pausedGroups[group] = true
// 输出暂停确认信息(便于监控和调试)
gm.pdb.logger.Info("⏸️ 组已暂停接收数据", "group", group)
}
// ResumeGroup 恢复指定组的数据接收功能
//
// 核心功能:将组从暂停状态中移除,恢复正常的数据接收
// 时间复杂度O(1) - HashMap删除操作
// 空间复杂度O(1) - 释放一个映射条目的空间
//
// 参数说明:
// - group: 要恢复的组名
//
// 执行流程:
// 1. 获取写锁(独占访问)
// 2. 从暂停组映射中删除该组
// 3. 输出恢复确认信息
//
// 使用场景:
// - 维护完成后恢复组的正常运行
// - 数据质量问题解决后重新启用组
// - 系统负载降低后恢复暂停的组
// - 业务流程需要重新激活组
//
// 注意事项:
// - 恢复后该组立即可以接收新数据
// - 如果组本来就没有暂停,操作是幂等的(无副作用)
// - 恢复状态不会持久化,重启后默认为活跃状态
//
// 并发安全:使用写锁保护,确保状态修改的原子性
func (gm *GroupManager) ResumeGroup(group string) {
gm.mu.Lock() // 获取写锁,独占访问
defer gm.mu.Unlock() // 确保函数退出时释放锁
// 从暂停组映射中删除该组(恢复活跃状态)
delete(gm.pausedGroups, group)
// 输出恢复确认信息(便于监控和调试)
gm.pdb.logger.Info("▶️ 组已恢复接收数据", "group", group)
}
// IsPaused 检查指定组是否处于暂停状态
//
// 核心功能:查询组的暂停状态,用于数据接收前的状态检查
// 时间复杂度O(1) - HashMap查找操作
// 空间复杂度O(1) - 不创建额外数据
//
// 参数说明:
// - group: 要检查的组名
//
// 返回值:
// - bool: true表示组已暂停false表示组处于活跃状态
//
// 执行流程:
// 1. 获取读锁(允许并发读取)
// 2. 在暂停组映射中查找该组
// 3. 返回查找结果
//
// 使用场景:
// - 数据接收前的状态验证
// - 业务逻辑中的条件判断
// - 监控和状态报告
// - 自动化运维脚本
//
// 设计考虑:
// - 使用读锁支持高并发查询
// - HashMap查找提供O(1)性能
// - 不存在的组默认返回false活跃状态
//
// 并发安全:使用读锁保护,支持多线程并发调用
func (gm *GroupManager) IsPaused(group string) bool {
gm.mu.RLock() // 获取读锁,允许并发读取
defer gm.mu.RUnlock() // 确保函数退出时释放锁
// 在暂停组映射中查找该组
// 如果组不存在返回false默认为活跃状态
return gm.pausedGroups[group]
}
// GetPausedGroups 获取所有暂停的组
func (gm *GroupManager) GetPausedGroups() []string {
gm.mu.RLock()
defer gm.mu.RUnlock()
var groups []string
for group := range gm.pausedGroups {
groups = append(groups, group)
}
return groups
}
// GetGroupStatus 获取组状态
func (gm *GroupManager) GetGroupStatus(group string) string {
if gm.IsPaused(group) {
return "paused"
}
return "active"
}
// IsOnCompleteExecuting 检查组是否正在执行 OnComplete
func (gm *GroupManager) IsOnCompleteExecuting(group string) bool {
gm.mu.RLock()
defer gm.mu.RUnlock()
return gm.onCompleteExecuting[group]
}
// SetOnCompleteExecuting 设置组的 OnComplete 执行状态
func (gm *GroupManager) SetOnCompleteExecuting(group string, executing bool) {
gm.mu.Lock()
defer gm.mu.Unlock()
if executing {
gm.onCompleteExecuting[group] = true
gm.pdb.logger.Info("🔒 组 OnComplete 开始执行", "group", group)
} else {
delete(gm.onCompleteExecuting, group)
gm.pdb.logger.Info("🔓 组 OnComplete 执行完成", "group", group)
}
}
// GetExecutingGroups 获取所有正在执行 OnComplete 的组
func (gm *GroupManager) GetExecutingGroups() []string {
gm.mu.RLock()
defer gm.mu.RUnlock()
var groups []string
for group := range gm.onCompleteExecuting {
groups = append(groups, group)
}
return groups
}
// GetNextID 为指定组生成下一个唯一的自增ID
//
// 核心功能为每个组维护独立的ID序列确保ID的唯一性和连续性
// 时间复杂度O(1) - 内存计数器自增 + 持久化写入
// 空间复杂度O(1) - 只维护单个计数器
//
// 参数说明:
// - group: 组名每个组有独立的ID序列
//
// 返回值:
// - int64: 新生成的唯一ID从1开始递增
//
// 执行流程:
// 1. 获取写锁保证ID生成的原子性
// 2. 检查组计数器是否已加载:
// a. 如果未加载:从持久化存储中加载计数器
// b. 如果已加载:直接使用内存中的计数器
// 3. 计数器自增1
// 4. 将新的计数器值持久化到磁盘
// 5. 返回新生成的ID
//
// 持久化机制:
// - 计数器存储在专用的计数器页面中
// - 每次ID生成都会立即持久化确保重启后的连续性
// - 页面格式:[组名长度][组名][计数器值]...
//
// 设计考虑:
// - 每个组独立的ID序列避免冲突
// - 立即持久化确保数据安全性
// - 内存缓存提高性能
// - 写锁保证并发安全
//
// 使用场景:
// - 数据记录的主键生成
// - 业务对象的唯一标识
// - 序列号生成
//
// 并发安全使用写锁保护确保ID生成的唯一性
func (gm *GroupManager) GetNextID(group string) int64 {
gm.mu.Lock() // 获取写锁保证ID生成的原子性
defer gm.mu.Unlock() // 确保函数退出时释放锁
// 检查组的计数器是否已经加载到内存中
if gm.groupCounters[group] == nil {
// 计数器未加载,从持久化存储中加载
counter := gm.loadCounterFromPage(group)
gm.groupCounters[group] = &counter
}
// 计数器自增生成新的ID
*gm.groupCounters[group]++
newID := *gm.groupCounters[group]
// 立即将新的计数器值持久化到磁盘
// 这确保了即使系统崩溃ID序列也不会重复
gm.saveCounterToPage(group, newID)
return newID // 返回新生成的唯一ID
}
// GetGroupCounter 获取组的当前计数器值
func (gm *GroupManager) GetGroupCounter(group string) int64 {
gm.mu.RLock()
defer gm.mu.RUnlock()
if gm.groupCounters[group] == nil {
return 0
}
return *gm.groupCounters[group]
}
// loadCounterFromPage 从页面加载组计数器
func (gm *GroupManager) loadCounterFromPage(group string) int64 {
if gm.pdb == nil || gm.pdb.header.CounterPage == 0 {
return 0 // 如果没有计数器页面返回0
}
// 读取计数器页面
page, err := gm.pdb.readPage(gm.pdb.header.CounterPage)
if err != nil {
gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err)
return 0
}
// 在页面中查找组的计数器
// 页面格式:[组名长度(2字节)][组名][计数器(8字节)]...
offset := uint16(0)
for offset < uint16(len(page)-10) { // 至少需要2+1+8=11字节
// 读取组名长度
nameLen := binary.LittleEndian.Uint16(page[offset : offset+2])
if nameLen == 0 || offset+2+nameLen+8 > uint16(len(page)) {
break
}
// 读取组名
name := string(page[offset+2 : offset+2+nameLen])
if name == group {
// 找到了,读取计数器
counter := binary.LittleEndian.Uint64(page[offset+2+nameLen : offset+2+nameLen+8])
return int64(counter)
}
// 移动到下一个条目
offset += 2 + nameLen + 8
}
return 0 // 没找到返回0
}
// saveCounterToPage 保存组计数器到页面
func (gm *GroupManager) saveCounterToPage(group string, counter int64) {
if gm.pdb == nil {
return
}
// 如果没有计数器页面,创建一个
if gm.pdb.header.CounterPage == 0 {
gm.initCounterPage()
}
// 读取计数器页面
page, err := gm.pdb.readPage(gm.pdb.header.CounterPage)
if err != nil {
gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err)
return
}
// 查找并更新组的计数器
offset := uint16(0)
found := false
for offset < uint16(len(page)-10) {
// 读取组名长度
nameLen := binary.LittleEndian.Uint16(page[offset : offset+2])
if nameLen == 0 {
break
}
if offset+2+nameLen+8 > uint16(len(page)) {
break
}
// 读取组名
name := string(page[offset+2 : offset+2+nameLen])
if name == group {
// 找到了,更新计数器
binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter))
found = true
break
}
// 移动到下一个条目
offset += 2 + nameLen + 8
}
// 如果没找到,添加新条目
if !found {
nameLen := uint16(len(group))
if offset+2+nameLen+8 <= uint16(len(page)) {
// 写入组名长度
binary.LittleEndian.PutUint16(page[offset:offset+2], nameLen)
// 写入组名
copy(page[offset+2:offset+2+nameLen], []byte(group))
// 写入计数器
binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter))
}
}
// 写回页面
gm.pdb.cache.Put(gm.pdb.header.CounterPage, page, true)
}
// initCounterPage 初始化计数器页面
func (gm *GroupManager) initCounterPage() {
if gm.pdb == nil {
return
}
// 分配新页面
pageNo := gm.pdb.header.TotalPages
gm.pdb.header.TotalPages++
gm.pdb.header.CounterPage = pageNo
// 创建空页面
page := make([]byte, PageSize)
// 写入页面
gm.pdb.cache.Put(pageNo, page, true)
// 保存头部
gm.pdb.saveHeader()
gm.pdb.logger.Info("📄 初始化计数器页面", "page", pageNo)
}
// IncrementStats 增加组的统计计数(新增记录时调用)
//
// 核心功能:当新记录添加到组时,更新该组的统计信息
// 时间复杂度O(1) - 直接的计数器操作
// 空间复杂度O(1) - 可能创建一个新的统计缓存结构
//
// 参数说明:
// - group: 组名
// - status: 新记录的数据状态Hot/Warm/Cold
//
// 执行流程:
// 1. 获取写锁(保证统计更新的原子性)
// 2. 确保组的统计缓存已初始化
// 3. 增加总记录数计数
// 4. 根据状态增加对应的状态计数
//
// 统计维护:
// - TotalRecords: 总记录数自增1
// - 对应状态计数: Hot/Warm/Cold计数自增1
// - 统计信息实时更新,无延迟
//
// 使用场景:
// - 新数据记录插入时
// - 数据导入过程中
// - 批量数据处理时
//
// 并发安全:使用写锁保护,确保统计数据的一致性
func (gm *GroupManager) IncrementStats(group string, status DataStatus) {
gm.mu.Lock() // 获取写锁,保证统计更新的原子性
defer gm.mu.Unlock() // 确保函数退出时释放锁
// 确保组的统计缓存已经初始化
if gm.groupStats[group] == nil {
gm.groupStats[group] = &GroupStatsCache{}
}
// 增加总记录数
gm.groupStats[group].TotalRecords++
// 根据数据状态增加对应的计数
switch status {
case StatusHot:
gm.groupStats[group].HotRecords++ // 增加热数据计数
case StatusWarm:
gm.groupStats[group].WarmRecords++ // 增加温数据计数
case StatusCold:
gm.groupStats[group].ColdRecords++ // 增加冷数据计数
}
}
// UpdateStats 更新组统计信息(数据状态转换时调用)
//
// 核心功能:当记录的状态发生转换时,更新统计信息以保持准确性
// 时间复杂度O(1) - 直接的计数器操作
// 空间复杂度O(1) - 可能创建一个新的统计缓存结构
//
// 参数说明:
// - group: 组名
// - oldStatus: 记录的原始状态
// - newStatus: 记录的新状态
//
// 执行流程:
// 1. 获取写锁(保证统计更新的原子性)
// 2. 确保组的统计缓存已初始化
// 3. 减少原状态的计数
// 4. 增加新状态的计数
// 5. 总记录数保持不变
//
// 状态转换处理:
// - Hot → Warm: HotRecords-1, WarmRecords+1
// - Warm → Cold: WarmRecords-1, ColdRecords+1
// - 支持任意状态间的转换
// - TotalRecords保持不变只是状态分布改变
//
// 使用场景:
// - 数据生命周期管理Hot→Warm→Cold
// - 缓存策略调整
// - 数据归档过程
//
// 并发安全:使用写锁保护,确保统计数据的一致性
func (gm *GroupManager) UpdateStats(group string, oldStatus, newStatus DataStatus) {
gm.mu.Lock() // 获取写锁,保证统计更新的原子性
defer gm.mu.Unlock() // 确保函数退出时释放锁
// 确保组的统计缓存已经初始化
if gm.groupStats[group] == nil {
gm.groupStats[group] = &GroupStatsCache{}
}
// 减少原状态的计数
switch oldStatus {
case StatusHot:
gm.groupStats[group].HotRecords-- // 减少热数据计数
case StatusWarm:
gm.groupStats[group].WarmRecords-- // 减少温数据计数
case StatusCold:
gm.groupStats[group].ColdRecords-- // 减少冷数据计数
}
// 增加新状态的计数
switch newStatus {
case StatusHot:
gm.groupStats[group].HotRecords++ // 增加热数据计数
case StatusWarm:
gm.groupStats[group].WarmRecords++ // 增加温数据计数
case StatusCold:
gm.groupStats[group].ColdRecords++ // 增加冷数据计数
}
// 注意TotalRecords保持不变因为只是状态转换记录总数没有变化
}
// GetFastStats 快速获取所有组的统计信息O(1)复杂度)
//
// 核心功能:从内存缓存中快速获取所有组的统计信息
// 时间复杂度O(n) - n为组的数量需要复制所有统计数据
// 空间复杂度O(n) - 创建所有统计数据的副本
//
// 返回值:
// - map[string]*GroupStatsCache: 所有组的统计信息副本
// key为组名value为该组的统计信息
//
// 执行流程:
// 1. 获取读锁(允许并发读取)
// 2. 遍历所有组的统计缓存
// 3. 为每个组创建统计信息的副本
// 4. 返回完整的统计信息映射
//
// 设计考虑:
// - 返回副本而不是原始数据,防止外部修改
// - 使用读锁支持高并发查询
// - 内存缓存提供极快的查询性能
// - 避免磁盘I/O适合频繁查询
//
// 性能优势:
// - 相比传统的磁盘扫描统计,性能提升数百倍
// - 支持高频率的监控查询
// - 实时性强,统计数据始终是最新的
//
// 使用场景:
// - 系统监控和仪表板
// - 性能分析和报告
// - 容量规划和预警
// - API接口的统计查询
//
// 并发安全:使用读锁保护,支持多线程并发调用
func (gm *GroupManager) GetFastStats() map[string]*GroupStatsCache {
gm.mu.RLock() // 获取读锁,允许并发读取
defer gm.mu.RUnlock() // 确保函数退出时释放锁
// 创建统计数据的完整副本
result := make(map[string]*GroupStatsCache)
// 遍历所有组的统计缓存,创建副本
for group, stats := range gm.groupStats {
result[group] = &GroupStatsCache{
TotalRecords: stats.TotalRecords, // 复制总记录数
HotRecords: stats.HotRecords, // 复制热数据记录数
WarmRecords: stats.WarmRecords, // 复制温数据记录数
ColdRecords: stats.ColdRecords, // 复制冷数据记录数
}
}
return result // 返回统计信息副本
}