627 lines
20 KiB
Go
627 lines
20 KiB
Go
// Package pipelinedb provides an integrated pipeline database system
|
||
// 集成了数据库存储和业务管道处理的一体化解决方案
|
||
package pipelinedb
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"sync"
|
||
)
|
||
|
||
// GroupStatsCache 组统计信息缓存结构
|
||
//
|
||
// 核心功能:
|
||
// - 缓存每个组的统计信息,提供O(1)的统计查询性能
|
||
// - 支持按数据状态分类统计(Hot/Warm/Cold)
|
||
// - 实时更新,确保统计数据的准确性
|
||
//
|
||
// 设计考虑:
|
||
// - 使用整数类型提供高效的计数操作
|
||
// - 支持JSON序列化,便于数据交换和持久化
|
||
// - 结构简单,内存占用小
|
||
//
|
||
// 适用场景:
|
||
// - 数据库性能监控
|
||
// - 组级别的数据分析
|
||
// - 缓存命中率统计
|
||
type GroupStatsCache struct {
|
||
TotalRecords int `json:"total_records"` // 总记录数(所有状态的记录总和)
|
||
HotRecords int `json:"hot_records"` // 热数据记录数(最新、最活跃的数据)
|
||
WarmRecords int `json:"warm_records"` // 温数据记录数(中等活跃度的数据)
|
||
ColdRecords int `json:"cold_records"` // 冷数据记录数(较少访问的数据)
|
||
}
|
||
|
||
// GroupManager 数据组管理器
|
||
//
|
||
// 核心职责:
|
||
// 1. 管理数据组的生命周期(暂停/恢复)
|
||
// 2. 维护组级别的ID自增计数器
|
||
// 3. 缓存组统计信息,提供高性能统计查询
|
||
// 4. 管理组的OnComplete回调执行状态
|
||
// 5. 提供组状态的持久化存储
|
||
//
|
||
// 设计思想:
|
||
// - 使用读写锁支持高并发访问
|
||
// - 统计信息缓存在内存中,避免频繁的磁盘I/O
|
||
// - ID计数器持久化到专用页面,确保重启后的连续性
|
||
// - 状态管理支持复杂的业务流程控制
|
||
//
|
||
// 性能特征:
|
||
// - 统计查询:O(1) - 直接从内存缓存读取
|
||
// - ID生成:O(1) - 内存计数器自增
|
||
// - 状态检查:O(1) - HashMap查找
|
||
// - 并发友好:读多写少的场景下性能优异
|
||
//
|
||
// 适用场景:
|
||
// - 多租户数据库系统
|
||
// - 分组数据处理管道
|
||
// - 需要组级别控制的业务系统
|
||
type GroupManager struct {
|
||
pausedGroups map[string]bool // 暂停状态的组集合(key=组名,value=true表示暂停)
|
||
onCompleteExecuting map[string]bool // 正在执行OnComplete回调的组集合
|
||
groupCounters map[string]*int64 // 每个组的ID自增计数器(持久化到磁盘)
|
||
groupStats map[string]*GroupStatsCache // 组统计信息缓存(内存中维护)
|
||
mu sync.RWMutex // 读写锁,保护所有内部状态的并发访问
|
||
pdb *PipelineDB // 数据库实例引用,用于计数器持久化
|
||
}
|
||
|
||
// NewGroupManager 创建一个新的组管理器实例
|
||
//
|
||
// 核心功能:初始化组管理器的所有内部数据结构
|
||
// 时间复杂度:O(1) - 只创建空的数据结构
|
||
// 空间复杂度:O(1) - 初始状态下不占用额外空间
|
||
//
|
||
// 参数说明:
|
||
// - pdb: PipelineDB实例引用,用于ID计数器的持久化存储
|
||
// 如果为nil,计数器功能将不可用,但其他功能正常
|
||
//
|
||
// 返回值:
|
||
// - *GroupManager: 完全初始化的组管理器实例
|
||
//
|
||
// 初始化内容:
|
||
// 1. 创建空的暂停组映射
|
||
// 2. 创建空的OnComplete执行状态映射
|
||
// 3. 创建空的组计数器映射
|
||
// 4. 创建空的组统计缓存映射
|
||
// 5. 设置数据库引用
|
||
//
|
||
// 使用示例:
|
||
//
|
||
// gm := NewGroupManager(pipelineDB)
|
||
// gm.PauseGroup("batch1") // 暂停组
|
||
// id := gm.GetNextID("user") // 获取ID
|
||
func NewGroupManager(pdb *PipelineDB) *GroupManager {
|
||
return &GroupManager{
|
||
pausedGroups: make(map[string]bool), // 初始化暂停组映射
|
||
onCompleteExecuting: make(map[string]bool), // 初始化OnComplete执行状态映射
|
||
groupCounters: make(map[string]*int64), // 初始化组计数器映射
|
||
groupStats: make(map[string]*GroupStatsCache), // 初始化组统计缓存映射
|
||
pdb: pdb, // 设置数据库引用
|
||
// mu 会自动初始化为零值(未锁定状态)
|
||
}
|
||
}
|
||
|
||
// PauseGroup 暂停指定组的数据接收功能
|
||
//
|
||
// 核心功能:将组标记为暂停状态,阻止新数据的接收
|
||
// 时间复杂度:O(1) - HashMap插入操作
|
||
// 空间复杂度:O(1) - 只添加一个映射条目
|
||
//
|
||
// 参数说明:
|
||
// - group: 要暂停的组名
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 将组添加到暂停组映射中
|
||
// 3. 输出暂停确认信息
|
||
//
|
||
// 使用场景:
|
||
// - 维护期间暂停特定组的数据处理
|
||
// - 数据质量问题时临时停止数据接收
|
||
// - 系统负载过高时选择性暂停部分组
|
||
// - 业务流程控制需要
|
||
//
|
||
// 注意事项:
|
||
// - 暂停后该组无法接收新数据,但已有数据继续处理
|
||
// - 可以通过ResumeGroup恢复
|
||
// - 暂停状态不会持久化,重启后会丢失
|
||
//
|
||
// 并发安全:使用写锁保护,确保状态修改的原子性
|
||
func (gm *GroupManager) PauseGroup(group string) {
|
||
gm.mu.Lock() // 获取写锁,独占访问
|
||
defer gm.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 将组标记为暂停状态
|
||
gm.pausedGroups[group] = true
|
||
|
||
// 输出暂停确认信息(便于监控和调试)
|
||
gm.pdb.logger.Info("⏸️ 组已暂停接收数据", "group", group)
|
||
}
|
||
|
||
// ResumeGroup 恢复指定组的数据接收功能
|
||
//
|
||
// 核心功能:将组从暂停状态中移除,恢复正常的数据接收
|
||
// 时间复杂度:O(1) - HashMap删除操作
|
||
// 空间复杂度:O(1) - 释放一个映射条目的空间
|
||
//
|
||
// 参数说明:
|
||
// - group: 要恢复的组名
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(独占访问)
|
||
// 2. 从暂停组映射中删除该组
|
||
// 3. 输出恢复确认信息
|
||
//
|
||
// 使用场景:
|
||
// - 维护完成后恢复组的正常运行
|
||
// - 数据质量问题解决后重新启用组
|
||
// - 系统负载降低后恢复暂停的组
|
||
// - 业务流程需要重新激活组
|
||
//
|
||
// 注意事项:
|
||
// - 恢复后该组立即可以接收新数据
|
||
// - 如果组本来就没有暂停,操作是幂等的(无副作用)
|
||
// - 恢复状态不会持久化,重启后默认为活跃状态
|
||
//
|
||
// 并发安全:使用写锁保护,确保状态修改的原子性
|
||
func (gm *GroupManager) ResumeGroup(group string) {
|
||
gm.mu.Lock() // 获取写锁,独占访问
|
||
defer gm.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 从暂停组映射中删除该组(恢复活跃状态)
|
||
delete(gm.pausedGroups, group)
|
||
|
||
// 输出恢复确认信息(便于监控和调试)
|
||
gm.pdb.logger.Info("▶️ 组已恢复接收数据", "group", group)
|
||
}
|
||
|
||
// IsPaused 检查指定组是否处于暂停状态
|
||
//
|
||
// 核心功能:查询组的暂停状态,用于数据接收前的状态检查
|
||
// 时间复杂度:O(1) - HashMap查找操作
|
||
// 空间复杂度:O(1) - 不创建额外数据
|
||
//
|
||
// 参数说明:
|
||
// - group: 要检查的组名
|
||
//
|
||
// 返回值:
|
||
// - bool: true表示组已暂停,false表示组处于活跃状态
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取读锁(允许并发读取)
|
||
// 2. 在暂停组映射中查找该组
|
||
// 3. 返回查找结果
|
||
//
|
||
// 使用场景:
|
||
// - 数据接收前的状态验证
|
||
// - 业务逻辑中的条件判断
|
||
// - 监控和状态报告
|
||
// - 自动化运维脚本
|
||
//
|
||
// 设计考虑:
|
||
// - 使用读锁支持高并发查询
|
||
// - HashMap查找提供O(1)性能
|
||
// - 不存在的组默认返回false(活跃状态)
|
||
//
|
||
// 并发安全:使用读锁保护,支持多线程并发调用
|
||
func (gm *GroupManager) IsPaused(group string) bool {
|
||
gm.mu.RLock() // 获取读锁,允许并发读取
|
||
defer gm.mu.RUnlock() // 确保函数退出时释放锁
|
||
|
||
// 在暂停组映射中查找该组
|
||
// 如果组不存在,返回false(默认为活跃状态)
|
||
return gm.pausedGroups[group]
|
||
}
|
||
|
||
// GetPausedGroups 获取所有暂停的组
|
||
func (gm *GroupManager) GetPausedGroups() []string {
|
||
gm.mu.RLock()
|
||
defer gm.mu.RUnlock()
|
||
|
||
var groups []string
|
||
for group := range gm.pausedGroups {
|
||
groups = append(groups, group)
|
||
}
|
||
return groups
|
||
}
|
||
|
||
// GetGroupStatus 获取组状态
|
||
func (gm *GroupManager) GetGroupStatus(group string) string {
|
||
if gm.IsPaused(group) {
|
||
return "paused"
|
||
}
|
||
return "active"
|
||
}
|
||
|
||
// IsOnCompleteExecuting 检查组是否正在执行 OnComplete
|
||
func (gm *GroupManager) IsOnCompleteExecuting(group string) bool {
|
||
gm.mu.RLock()
|
||
defer gm.mu.RUnlock()
|
||
return gm.onCompleteExecuting[group]
|
||
}
|
||
|
||
// SetOnCompleteExecuting 设置组的 OnComplete 执行状态
|
||
func (gm *GroupManager) SetOnCompleteExecuting(group string, executing bool) {
|
||
gm.mu.Lock()
|
||
defer gm.mu.Unlock()
|
||
if executing {
|
||
gm.onCompleteExecuting[group] = true
|
||
gm.pdb.logger.Info("🔒 组 OnComplete 开始执行", "group", group)
|
||
} else {
|
||
delete(gm.onCompleteExecuting, group)
|
||
gm.pdb.logger.Info("🔓 组 OnComplete 执行完成", "group", group)
|
||
}
|
||
}
|
||
|
||
// GetExecutingGroups 获取所有正在执行 OnComplete 的组
|
||
func (gm *GroupManager) GetExecutingGroups() []string {
|
||
gm.mu.RLock()
|
||
defer gm.mu.RUnlock()
|
||
|
||
var groups []string
|
||
for group := range gm.onCompleteExecuting {
|
||
groups = append(groups, group)
|
||
}
|
||
return groups
|
||
}
|
||
|
||
// GetNextID 为指定组生成下一个唯一的自增ID
|
||
//
|
||
// 核心功能:为每个组维护独立的ID序列,确保ID的唯一性和连续性
|
||
// 时间复杂度:O(1) - 内存计数器自增 + 持久化写入
|
||
// 空间复杂度:O(1) - 只维护单个计数器
|
||
//
|
||
// 参数说明:
|
||
// - group: 组名,每个组有独立的ID序列
|
||
//
|
||
// 返回值:
|
||
// - int64: 新生成的唯一ID(从1开始递增)
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(保证ID生成的原子性)
|
||
// 2. 检查组计数器是否已加载:
|
||
// a. 如果未加载:从持久化存储中加载计数器
|
||
// b. 如果已加载:直接使用内存中的计数器
|
||
// 3. 计数器自增1
|
||
// 4. 将新的计数器值持久化到磁盘
|
||
// 5. 返回新生成的ID
|
||
//
|
||
// 持久化机制:
|
||
// - 计数器存储在专用的计数器页面中
|
||
// - 每次ID生成都会立即持久化,确保重启后的连续性
|
||
// - 页面格式:[组名长度][组名][计数器值]...
|
||
//
|
||
// 设计考虑:
|
||
// - 每个组独立的ID序列,避免冲突
|
||
// - 立即持久化确保数据安全性
|
||
// - 内存缓存提高性能
|
||
// - 写锁保证并发安全
|
||
//
|
||
// 使用场景:
|
||
// - 数据记录的主键生成
|
||
// - 业务对象的唯一标识
|
||
// - 序列号生成
|
||
//
|
||
// 并发安全:使用写锁保护,确保ID生成的唯一性
|
||
func (gm *GroupManager) GetNextID(group string) int64 {
|
||
gm.mu.Lock() // 获取写锁,保证ID生成的原子性
|
||
defer gm.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 检查组的计数器是否已经加载到内存中
|
||
if gm.groupCounters[group] == nil {
|
||
// 计数器未加载,从持久化存储中加载
|
||
counter := gm.loadCounterFromPage(group)
|
||
gm.groupCounters[group] = &counter
|
||
}
|
||
|
||
// 计数器自增,生成新的ID
|
||
*gm.groupCounters[group]++
|
||
newID := *gm.groupCounters[group]
|
||
|
||
// 立即将新的计数器值持久化到磁盘
|
||
// 这确保了即使系统崩溃,ID序列也不会重复
|
||
gm.saveCounterToPage(group, newID)
|
||
|
||
return newID // 返回新生成的唯一ID
|
||
}
|
||
|
||
// GetGroupCounter 获取组的当前计数器值
|
||
func (gm *GroupManager) GetGroupCounter(group string) int64 {
|
||
gm.mu.RLock()
|
||
defer gm.mu.RUnlock()
|
||
|
||
if gm.groupCounters[group] == nil {
|
||
return 0
|
||
}
|
||
return *gm.groupCounters[group]
|
||
}
|
||
|
||
// loadCounterFromPage 从页面加载组计数器
|
||
func (gm *GroupManager) loadCounterFromPage(group string) int64 {
|
||
if gm.pdb == nil || gm.pdb.header.CounterPage == 0 {
|
||
return 0 // 如果没有计数器页面,返回0
|
||
}
|
||
|
||
// 读取计数器页面
|
||
page, err := gm.pdb.readPage(gm.pdb.header.CounterPage)
|
||
if err != nil {
|
||
gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err)
|
||
return 0
|
||
}
|
||
|
||
// 在页面中查找组的计数器
|
||
// 页面格式:[组名长度(2字节)][组名][计数器(8字节)]...
|
||
offset := uint16(0)
|
||
for offset < uint16(len(page)-10) { // 至少需要2+1+8=11字节
|
||
// 读取组名长度
|
||
nameLen := binary.LittleEndian.Uint16(page[offset : offset+2])
|
||
if nameLen == 0 || offset+2+nameLen+8 > uint16(len(page)) {
|
||
break
|
||
}
|
||
|
||
// 读取组名
|
||
name := string(page[offset+2 : offset+2+nameLen])
|
||
if name == group {
|
||
// 找到了,读取计数器
|
||
counter := binary.LittleEndian.Uint64(page[offset+2+nameLen : offset+2+nameLen+8])
|
||
return int64(counter)
|
||
}
|
||
|
||
// 移动到下一个条目
|
||
offset += 2 + nameLen + 8
|
||
}
|
||
|
||
return 0 // 没找到,返回0
|
||
}
|
||
|
||
// saveCounterToPage 保存组计数器到页面
|
||
func (gm *GroupManager) saveCounterToPage(group string, counter int64) {
|
||
if gm.pdb == nil {
|
||
return
|
||
}
|
||
|
||
// 如果没有计数器页面,创建一个
|
||
if gm.pdb.header.CounterPage == 0 {
|
||
gm.initCounterPage()
|
||
}
|
||
|
||
// 读取计数器页面
|
||
page, err := gm.pdb.readPage(gm.pdb.header.CounterPage)
|
||
if err != nil {
|
||
gm.pdb.logger.Warn("⚠️ 读取计数器页面失败", "error", err)
|
||
return
|
||
}
|
||
|
||
// 查找并更新组的计数器
|
||
offset := uint16(0)
|
||
found := false
|
||
|
||
for offset < uint16(len(page)-10) {
|
||
// 读取组名长度
|
||
nameLen := binary.LittleEndian.Uint16(page[offset : offset+2])
|
||
if nameLen == 0 {
|
||
break
|
||
}
|
||
|
||
if offset+2+nameLen+8 > uint16(len(page)) {
|
||
break
|
||
}
|
||
|
||
// 读取组名
|
||
name := string(page[offset+2 : offset+2+nameLen])
|
||
if name == group {
|
||
// 找到了,更新计数器
|
||
binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter))
|
||
found = true
|
||
break
|
||
}
|
||
|
||
// 移动到下一个条目
|
||
offset += 2 + nameLen + 8
|
||
}
|
||
|
||
// 如果没找到,添加新条目
|
||
if !found {
|
||
nameLen := uint16(len(group))
|
||
if offset+2+nameLen+8 <= uint16(len(page)) {
|
||
// 写入组名长度
|
||
binary.LittleEndian.PutUint16(page[offset:offset+2], nameLen)
|
||
// 写入组名
|
||
copy(page[offset+2:offset+2+nameLen], []byte(group))
|
||
// 写入计数器
|
||
binary.LittleEndian.PutUint64(page[offset+2+nameLen:offset+2+nameLen+8], uint64(counter))
|
||
}
|
||
}
|
||
|
||
// 写回页面
|
||
gm.pdb.cache.Put(gm.pdb.header.CounterPage, page, true)
|
||
}
|
||
|
||
// initCounterPage 初始化计数器页面
|
||
func (gm *GroupManager) initCounterPage() {
|
||
if gm.pdb == nil {
|
||
return
|
||
}
|
||
|
||
// 分配新页面
|
||
pageNo := gm.pdb.header.TotalPages
|
||
gm.pdb.header.TotalPages++
|
||
gm.pdb.header.CounterPage = pageNo
|
||
|
||
// 创建空页面
|
||
page := make([]byte, PageSize)
|
||
|
||
// 写入页面
|
||
gm.pdb.cache.Put(pageNo, page, true)
|
||
|
||
// 保存头部
|
||
gm.pdb.saveHeader()
|
||
|
||
gm.pdb.logger.Info("📄 初始化计数器页面", "page", pageNo)
|
||
}
|
||
|
||
// IncrementStats 增加组的统计计数(新增记录时调用)
|
||
//
|
||
// 核心功能:当新记录添加到组时,更新该组的统计信息
|
||
// 时间复杂度:O(1) - 直接的计数器操作
|
||
// 空间复杂度:O(1) - 可能创建一个新的统计缓存结构
|
||
//
|
||
// 参数说明:
|
||
// - group: 组名
|
||
// - status: 新记录的数据状态(Hot/Warm/Cold)
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(保证统计更新的原子性)
|
||
// 2. 确保组的统计缓存已初始化
|
||
// 3. 增加总记录数计数
|
||
// 4. 根据状态增加对应的状态计数
|
||
//
|
||
// 统计维护:
|
||
// - TotalRecords: 总记录数自增1
|
||
// - 对应状态计数: Hot/Warm/Cold计数自增1
|
||
// - 统计信息实时更新,无延迟
|
||
//
|
||
// 使用场景:
|
||
// - 新数据记录插入时
|
||
// - 数据导入过程中
|
||
// - 批量数据处理时
|
||
//
|
||
// 并发安全:使用写锁保护,确保统计数据的一致性
|
||
func (gm *GroupManager) IncrementStats(group string, status DataStatus) {
|
||
gm.mu.Lock() // 获取写锁,保证统计更新的原子性
|
||
defer gm.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 确保组的统计缓存已经初始化
|
||
if gm.groupStats[group] == nil {
|
||
gm.groupStats[group] = &GroupStatsCache{}
|
||
}
|
||
|
||
// 增加总记录数
|
||
gm.groupStats[group].TotalRecords++
|
||
|
||
// 根据数据状态增加对应的计数
|
||
switch status {
|
||
case StatusHot:
|
||
gm.groupStats[group].HotRecords++ // 增加热数据计数
|
||
case StatusWarm:
|
||
gm.groupStats[group].WarmRecords++ // 增加温数据计数
|
||
case StatusCold:
|
||
gm.groupStats[group].ColdRecords++ // 增加冷数据计数
|
||
}
|
||
}
|
||
|
||
// UpdateStats 更新组统计信息(数据状态转换时调用)
|
||
//
|
||
// 核心功能:当记录的状态发生转换时,更新统计信息以保持准确性
|
||
// 时间复杂度:O(1) - 直接的计数器操作
|
||
// 空间复杂度:O(1) - 可能创建一个新的统计缓存结构
|
||
//
|
||
// 参数说明:
|
||
// - group: 组名
|
||
// - oldStatus: 记录的原始状态
|
||
// - newStatus: 记录的新状态
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取写锁(保证统计更新的原子性)
|
||
// 2. 确保组的统计缓存已初始化
|
||
// 3. 减少原状态的计数
|
||
// 4. 增加新状态的计数
|
||
// 5. 总记录数保持不变
|
||
//
|
||
// 状态转换处理:
|
||
// - Hot → Warm: HotRecords-1, WarmRecords+1
|
||
// - Warm → Cold: WarmRecords-1, ColdRecords+1
|
||
// - 支持任意状态间的转换
|
||
// - TotalRecords保持不变(只是状态分布改变)
|
||
//
|
||
// 使用场景:
|
||
// - 数据生命周期管理(Hot→Warm→Cold)
|
||
// - 缓存策略调整
|
||
// - 数据归档过程
|
||
//
|
||
// 并发安全:使用写锁保护,确保统计数据的一致性
|
||
func (gm *GroupManager) UpdateStats(group string, oldStatus, newStatus DataStatus) {
|
||
gm.mu.Lock() // 获取写锁,保证统计更新的原子性
|
||
defer gm.mu.Unlock() // 确保函数退出时释放锁
|
||
|
||
// 确保组的统计缓存已经初始化
|
||
if gm.groupStats[group] == nil {
|
||
gm.groupStats[group] = &GroupStatsCache{}
|
||
}
|
||
|
||
// 减少原状态的计数
|
||
switch oldStatus {
|
||
case StatusHot:
|
||
gm.groupStats[group].HotRecords-- // 减少热数据计数
|
||
case StatusWarm:
|
||
gm.groupStats[group].WarmRecords-- // 减少温数据计数
|
||
case StatusCold:
|
||
gm.groupStats[group].ColdRecords-- // 减少冷数据计数
|
||
}
|
||
|
||
// 增加新状态的计数
|
||
switch newStatus {
|
||
case StatusHot:
|
||
gm.groupStats[group].HotRecords++ // 增加热数据计数
|
||
case StatusWarm:
|
||
gm.groupStats[group].WarmRecords++ // 增加温数据计数
|
||
case StatusCold:
|
||
gm.groupStats[group].ColdRecords++ // 增加冷数据计数
|
||
}
|
||
|
||
// 注意:TotalRecords保持不变,因为只是状态转换,记录总数没有变化
|
||
}
|
||
|
||
// GetFastStats 快速获取所有组的统计信息(O(1)复杂度)
|
||
//
|
||
// 核心功能:从内存缓存中快速获取所有组的统计信息
|
||
// 时间复杂度:O(n) - n为组的数量(需要复制所有统计数据)
|
||
// 空间复杂度:O(n) - 创建所有统计数据的副本
|
||
//
|
||
// 返回值:
|
||
// - map[string]*GroupStatsCache: 所有组的统计信息副本
|
||
// key为组名,value为该组的统计信息
|
||
//
|
||
// 执行流程:
|
||
// 1. 获取读锁(允许并发读取)
|
||
// 2. 遍历所有组的统计缓存
|
||
// 3. 为每个组创建统计信息的副本
|
||
// 4. 返回完整的统计信息映射
|
||
//
|
||
// 设计考虑:
|
||
// - 返回副本而不是原始数据,防止外部修改
|
||
// - 使用读锁支持高并发查询
|
||
// - 内存缓存提供极快的查询性能
|
||
// - 避免磁盘I/O,适合频繁查询
|
||
//
|
||
// 性能优势:
|
||
// - 相比传统的磁盘扫描统计,性能提升数百倍
|
||
// - 支持高频率的监控查询
|
||
// - 实时性强,统计数据始终是最新的
|
||
//
|
||
// 使用场景:
|
||
// - 系统监控和仪表板
|
||
// - 性能分析和报告
|
||
// - 容量规划和预警
|
||
// - API接口的统计查询
|
||
//
|
||
// 并发安全:使用读锁保护,支持多线程并发调用
|
||
func (gm *GroupManager) GetFastStats() map[string]*GroupStatsCache {
|
||
gm.mu.RLock() // 获取读锁,允许并发读取
|
||
defer gm.mu.RUnlock() // 确保函数退出时释放锁
|
||
|
||
// 创建统计数据的完整副本
|
||
result := make(map[string]*GroupStatsCache)
|
||
|
||
// 遍历所有组的统计缓存,创建副本
|
||
for group, stats := range gm.groupStats {
|
||
result[group] = &GroupStatsCache{
|
||
TotalRecords: stats.TotalRecords, // 复制总记录数
|
||
HotRecords: stats.HotRecords, // 复制热数据记录数
|
||
WarmRecords: stats.WarmRecords, // 复制温数据记录数
|
||
ColdRecords: stats.ColdRecords, // 复制冷数据记录数
|
||
}
|
||
}
|
||
|
||
return result // 返回统计信息副本
|
||
}
|