🔧 索引重建优化: - 当数据库为空时跳过索引重建,避免频繁日志 - 只在有数据的页面时记录详细扫描日志 - 添加记录计数统计,显示重建的记录数量 - 减少 CPU 使用率,提高空数据库性能 💻 系统监控功能: - 添加完整的 CPU 和内存监控 - 实时显示 Goroutine 数量和内存使用情况 - 垃圾回收统计和对象分配监控 - 每10秒显示系统资源状态 📊 自动处理示例增强: - 集成系统资源监控到示例中 - 提供性能分析和资源优化指导 - 完善的监控文档和使用说明 🎯 性能提升: - 解决空数据库时的高 CPU 使用问题 - 优化日志输出频率和级别 - 提供实时性能监控能力
1491 lines
42 KiB
Go
1491 lines
42 KiB
Go
// Package pipelinedb provides an integrated pipeline database system
|
||
//
|
||
// # Pipeline Database V4 是一个集成了数据库存储和业务管道处理的一体化解决方案
|
||
//
|
||
// 核心特性:
|
||
// - 基于页面的存储引擎:高效的数据存储和检索
|
||
// - 分组数据管理:支持按业务组织数据,独立管理和统计
|
||
// - 三级数据状态:Hot(热)-> Warm(温)-> Cold(冷)的自动流转
|
||
// - 外部处理器集成:支持自定义业务逻辑处理
|
||
// - 高并发支持:线程安全的索引和存储操作
|
||
// - 页面缓存:提高数据访问性能
|
||
// - 持久化存储:数据安全持久保存
|
||
//
|
||
// 数据流转模型:
|
||
// 1. 数据接收:新数据以Hot状态进入系统
|
||
// 2. 预热处理:Hot数据经过预热处理转为Warm状态
|
||
// 3. 冷却处理:Warm数据经过冷却处理转为Cold状态
|
||
// 4. 完成回调:组内所有数据处理完成后触发回调
|
||
//
|
||
// 使用场景:
|
||
// - 数据管道处理:ETL、数据清洗、数据转换
|
||
// - 业务流程管理:订单处理、用户行为分析
|
||
// - 实时数据处理:日志分析、监控数据处理
|
||
// - 批量数据处理:数据导入、数据同步
|
||
package pipelinedb
|
||
|
||
import (
|
||
"context"
|
||
"encoding/binary"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"log/slog"
|
||
"os"
|
||
"sort"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ==================== 系统常量定义 ====================
|
||
|
||
const (
|
||
PageSize = 4096 // 页面大小:4KB,与操作系统页面大小对齐
|
||
HdrSize = 14 // 文件头大小:包含所有头部字段的字节数
|
||
MaxRecSize = 3000 // 最大记录大小:限制单条记录的最大字节数
|
||
)
|
||
|
||
// ==================== 数据结构定义 ====================
|
||
|
||
// Header 数据库文件头结构
|
||
//
|
||
// 文件头存储在文件的开始位置,包含数据库的元数据信息
|
||
// 用于数据库启动时的初始化和状态恢复
|
||
type Header struct {
|
||
Magic uint32 // 魔数:用于文件格式验证
|
||
PageSize uint16 // 页面大小:固定为4096字节
|
||
TotalPages uint16 // 总页面数:当前数据库文件包含的页面数量
|
||
FreeHead uint16 // 空闲页链头:空闲页面管理的起始页面
|
||
RootPage uint16 // 根页面:数据存储的根页面编号
|
||
CounterPage uint16 // 计数器页面:用于生成唯一ID的计数器页面
|
||
}
|
||
|
||
// DataStatus 数据状态枚举
|
||
//
|
||
// 定义数据在管道中的处理状态,支持三级状态流转
|
||
// 状态转换:Hot -> Warm -> Cold
|
||
type DataStatus string
|
||
|
||
const (
|
||
StatusHot DataStatus = "hot" // 热数据:刚接收的新数据,等待预热处理
|
||
StatusWarm DataStatus = "warm" // 温数据:已预热的数据,等待冷却处理
|
||
StatusCold DataStatus = "cold" // 冷数据:已完成处理的数据,可以归档或清理
|
||
)
|
||
|
||
// DataRecord 数据记录结构
|
||
//
|
||
// 表示系统中的一条数据记录,包含完整的数据信息和元数据
|
||
// 支持JSON序列化,便于存储和传输
|
||
type DataRecord struct {
|
||
ID int64 `json:"id"` // 记录唯一标识符
|
||
Group string `json:"group"` // 所属数据组名称
|
||
Status DataStatus `json:"status"` // 当前数据状态
|
||
Data []byte `json:"data"` // 实际数据内容
|
||
CreatedAt time.Time `json:"created_at"` // 创建时间戳
|
||
UpdatedAt time.Time `json:"updated_at"` // 最后更新时间戳
|
||
Metadata string `json:"metadata,omitempty"` // 可选的元数据信息
|
||
}
|
||
|
||
// GroupDataEvent 组数据事件
|
||
//
|
||
// 用于在组数据处理过程中传递事件信息
|
||
// 支持事件驱动的数据处理模式
|
||
type GroupDataEvent struct {
|
||
Group string // 组名:事件所属的数据组
|
||
Data []byte // 数据内容:事件携带的数据
|
||
Done bool // 完成标志:标识该组是否已完成所有数据处理
|
||
}
|
||
|
||
// ==================== 记录序列化方法 ====================
|
||
|
||
// ToBytes 将数据记录序列化为字节数组
|
||
//
|
||
// 返回值:
|
||
// - []byte: 序列化后的字节数组
|
||
// - error: 序列化错误
|
||
//
|
||
// 使用JSON格式进行序列化,便于跨平台兼容和调试
|
||
func (dr *DataRecord) ToBytes() ([]byte, error) {
|
||
return json.Marshal(dr)
|
||
}
|
||
|
||
// FromBytes 从字节数组反序列化数据记录
|
||
//
|
||
// 参数:
|
||
// - data: 序列化的字节数组
|
||
//
|
||
// 返回值:
|
||
// - error: 反序列化错误
|
||
//
|
||
// 从JSON格式的字节数组中恢复数据记录对象
|
||
func (dr *DataRecord) FromBytes(data []byte) error {
|
||
return json.Unmarshal(data, dr)
|
||
}
|
||
|
||
// ==================== 核心数据库结构 ====================
|
||
|
||
// PipelineDB 管道数据库主结构体
|
||
//
|
||
// 集成了存储引擎、索引管理、缓存系统、管道处理等所有核心功能
|
||
// 提供线程安全的数据库操作接口
|
||
type PipelineDB struct {
|
||
// ========== 存储引擎组件 ==========
|
||
file *os.File // 数据库文件句柄
|
||
header *Header // 文件头信息
|
||
cache *PageCache // 页面缓存系统
|
||
freePageMgr *FreePageManager // 空闲页面管理器
|
||
indexMgr *IndexManager // 索引管理器
|
||
rowMutexes map[int64]*sync.RWMutex // 行级锁映射
|
||
mu sync.RWMutex // 数据库级读写锁
|
||
|
||
// ========== 配置和处理器 ==========
|
||
config *Config // 数据库配置
|
||
handler Handler // 数据处理器接口
|
||
logger *slog.Logger // 结构化日志器
|
||
|
||
// ========== 并发控制 ==========
|
||
ctx context.Context // 上下文控制
|
||
cancel context.CancelFunc // 取消函数
|
||
wg sync.WaitGroup // 等待组,用于优雅关闭
|
||
|
||
// ========== 业务组件 ==========
|
||
groupManager *GroupManager // 组管理器
|
||
}
|
||
|
||
// ==================== 配置结构定义 ====================
|
||
|
||
// Config 数据库统一配置结构
|
||
//
|
||
// 包含存储引擎配置和管道处理配置
|
||
// 支持JSON序列化,便于配置文件管理
|
||
type Config struct {
|
||
// ========== 存储引擎配置 ==========
|
||
CacheSize int `json:"cache_size"` // 页缓存大小:缓存的页面数量
|
||
SyncWrites bool `json:"sync_writes"` // 同步写入:是否立即同步到磁盘
|
||
CreateIfMiss bool `json:"create_if_miss"` // 自动创建:文件不存在时是否自动创建
|
||
|
||
// ========== 管道处理配置 ==========
|
||
WarmInterval time.Duration `json:"warm_interval"` // 预热间隔:Hot->Warm状态转换的时间间隔
|
||
ProcessInterval time.Duration `json:"process_interval"` // 处理间隔:Warm->Cold状态转换的时间间隔
|
||
BatchSize int `json:"batch_size"` // 批处理大小:每次处理的记录数量
|
||
EnableMetrics bool `json:"enable_metrics"` // 启用指标:是否收集性能指标
|
||
}
|
||
|
||
// DefaultConfig 返回默认配置
|
||
//
|
||
// 返回值:
|
||
// - *Config: 包含合理默认值的配置对象
|
||
//
|
||
// 默认配置适用于大多数使用场景,可以根据实际需求进行调整
|
||
func DefaultConfig() *Config {
|
||
return &Config{
|
||
CacheSize: 256, // 256页缓存,约1MB内存
|
||
SyncWrites: false, // 异步写入,提高性能
|
||
CreateIfMiss: true, // 自动创建数据库文件
|
||
WarmInterval: 5 * time.Second, // 5秒预热间隔
|
||
ProcessInterval: 10 * time.Second, // 10秒处理间隔
|
||
BatchSize: 100, // 每批处理100条记录
|
||
EnableMetrics: true, // 启用性能指标收集
|
||
}
|
||
}
|
||
|
||
// ==================== 数据处理器接口 ====================
|
||
|
||
// Handler 数据处理器接口
|
||
//
|
||
// 定义了数据在状态流转过程中的处理回调
|
||
// 用户可以实现此接口来定制业务逻辑处理
|
||
type Handler interface {
|
||
// WillWarm 预热处理回调
|
||
//
|
||
// 参数:
|
||
// - ctx: 上下文,用于超时控制和取消操作
|
||
// - group: 数据组名称
|
||
// - data: 待处理的数据
|
||
//
|
||
// 返回值:
|
||
// - []byte: 处理后的数据
|
||
// - error: 处理错误
|
||
//
|
||
// 在数据从Hot状态转换为Warm状态时调用
|
||
// 可以在此阶段进行数据验证、转换、清洗等操作
|
||
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
|
||
|
||
// WillCold 冷却处理回调
|
||
//
|
||
// 参数:
|
||
// - ctx: 上下文,用于超时控制和取消操作
|
||
// - group: 数据组名称
|
||
// - data: 待处理的数据
|
||
//
|
||
// 返回值:
|
||
// - []byte: 处理后的数据
|
||
// - error: 处理错误
|
||
//
|
||
// 在数据从Warm状态转换为Cold状态时调用
|
||
// 可以在此阶段进行数据压缩、归档、发送等操作
|
||
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
|
||
|
||
// OnComplete 组完成回调
|
||
//
|
||
// 参数:
|
||
// - ctx: 上下文,用于超时控制和取消操作
|
||
// - group: 完成处理的数据组名称
|
||
//
|
||
// 返回值:
|
||
// - error: 处理错误
|
||
//
|
||
// 在组内所有数据都完成处理后调用
|
||
// 可以在此阶段进行清理、通知、统计等操作
|
||
OnComplete(ctx context.Context, group string) error
|
||
}
|
||
|
||
// ==================== 数据库选项结构 ====================
|
||
|
||
// Options 数据库打开选项
|
||
//
|
||
// 包含打开数据库所需的所有配置参数
|
||
// 使用选项模式,提供灵活的配置方式
|
||
type Options struct {
|
||
Filename string // 数据库文件路径
|
||
Config *Config // 数据库配置,nil时使用默认配置
|
||
Handler Handler // 数据处理器,可选
|
||
Logger *slog.Logger // 日志器,nil时使用默认日志器
|
||
}
|
||
|
||
// ==================== 数据库核心方法 ====================
|
||
|
||
// Open 打开或创建管道数据库
|
||
//
|
||
// 参数:
|
||
// - opts: 数据库打开选项,包含文件路径、配置、处理器等
|
||
//
|
||
// 返回值:
|
||
// - *PipelineDB: 数据库实例
|
||
// - error: 打开错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 验证和设置默认配置
|
||
// 2. 打开或创建数据库文件
|
||
// 3. 初始化存储引擎组件(缓存、索引、页面管理器等)
|
||
// 4. 加载或创建文件头信息
|
||
// 5. 初始化组管理器和管道处理器
|
||
// 6. 启动后台处理协程
|
||
//
|
||
// 线程安全:返回的数据库实例支持并发访问
|
||
func Open(opts Options) (*PipelineDB, error) {
|
||
// 设置默认配置
|
||
if opts.Config == nil {
|
||
opts.Config = DefaultConfig()
|
||
}
|
||
|
||
// 设置默认 logger
|
||
logger := opts.Logger
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
|
||
pdb := &PipelineDB{
|
||
config: opts.Config,
|
||
handler: opts.Handler,
|
||
logger: logger,
|
||
rowMutexes: make(map[int64]*sync.RWMutex),
|
||
cache: NewPageCache(opts.Config.CacheSize),
|
||
freePageMgr: NewFreePageManager(),
|
||
indexMgr: NewIndexManager(),
|
||
}
|
||
|
||
// 创建上下文
|
||
pdb.ctx, pdb.cancel = context.WithCancel(context.Background())
|
||
|
||
// 打开文件
|
||
var err error
|
||
if opts.Config.CreateIfMiss {
|
||
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644)
|
||
} else {
|
||
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR, 0644)
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 检查文件是否为空
|
||
stat, err := pdb.file.Stat()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if stat.Size() == 0 {
|
||
// 初始化新文件
|
||
if err := pdb.initializeFile(); err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// 加载现有文件
|
||
if err := pdb.loadHeader(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 加载空闲页信息
|
||
if err := pdb.freePageMgr.LoadFromHeader(pdb.header.FreeHead, pdb.readPageDirect); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 重建索引
|
||
if err := pdb.rebuildIndexes(); err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
// 创建组管理器
|
||
pdb.groupManager = NewGroupManager(pdb)
|
||
|
||
return pdb, nil
|
||
}
|
||
|
||
// Start 启动管道数据库
|
||
func (pdb *PipelineDB) Start(groupEventCh <-chan GroupDataEvent) error {
|
||
pdb.logger.Info("🚀 启动管道数据库...")
|
||
|
||
// 启动自动管道处理器
|
||
pdb.wg.Add(1)
|
||
go func() {
|
||
defer pdb.wg.Done()
|
||
pdb.runAutoPipeline(pdb.ctx)
|
||
}()
|
||
|
||
// 启动组数据事件监听器
|
||
if groupEventCh != nil {
|
||
pdb.wg.Add(1)
|
||
go func() {
|
||
defer pdb.wg.Done()
|
||
pdb.handleGroupDataEvents(groupEventCh)
|
||
}()
|
||
}
|
||
|
||
pdb.logger.Info("✅ 管道数据库启动完成")
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止管道数据库
|
||
func (pdb *PipelineDB) Stop() error {
|
||
pdb.logger.Info("🛑 停止管道数据库...")
|
||
|
||
pdb.cancel()
|
||
pdb.wg.Wait()
|
||
|
||
// 刷新缓存
|
||
if err := pdb.cache.Flush(pdb.writePageDirect); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 保存空闲页信息
|
||
freeHead, err := pdb.freePageMgr.SaveToHeader(pdb.writePageDirect)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新文件头
|
||
pdb.header.FreeHead = freeHead
|
||
if err := pdb.saveHeader(); err != nil {
|
||
return err
|
||
}
|
||
|
||
if err := pdb.file.Close(); err != nil {
|
||
return err
|
||
}
|
||
|
||
pdb.logger.Info("✅ 管道数据库已停止")
|
||
return nil
|
||
}
|
||
|
||
// AcceptData Producer 接收数据 (业务入口)
|
||
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error) {
|
||
// 检查组是否被暂停
|
||
if pdb.groupManager.IsPaused(group) {
|
||
return 0, fmt.Errorf("组 [%s] 已暂停接收数据", group)
|
||
}
|
||
|
||
// 生成自增ID
|
||
id := pdb.groupManager.GetNextID(group)
|
||
|
||
record := &DataRecord{
|
||
ID: id,
|
||
Group: group,
|
||
Status: StatusHot,
|
||
Data: data,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
Metadata: metadata,
|
||
}
|
||
|
||
// 序列化记录
|
||
recordBytes, err := record.ToBytes()
|
||
if err != nil {
|
||
return 0, fmt.Errorf("序列化记录失败: %w", err)
|
||
}
|
||
|
||
// 直接使用组名作为表名
|
||
err = pdb.insert(group, id, recordBytes)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("存储数据失败: %w", err)
|
||
}
|
||
|
||
// 更新统计缓存
|
||
pdb.groupManager.IncrementStats(group, StatusHot)
|
||
|
||
pdb.logger.Info("📥 接收数据",
|
||
"group", group,
|
||
"id", id,
|
||
"size", len(data),
|
||
"status", StatusHot)
|
||
return id, nil
|
||
}
|
||
|
||
// GetRecord 获取记录
|
||
func (pdb *PipelineDB) GetRecord(group string, id int64) (*DataRecord, error) {
|
||
data, err := pdb.get(group, id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
record := &DataRecord{}
|
||
if err := record.FromBytes(data); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return record, nil
|
||
}
|
||
|
||
// updateRecord 内部更新记录(不对外暴露)
|
||
func (pdb *PipelineDB) updateRecord(record *DataRecord) error {
|
||
record.UpdatedAt = time.Now()
|
||
|
||
recordBytes, err := record.ToBytes()
|
||
if err != nil {
|
||
return fmt.Errorf("序列化记录失败: %w", err)
|
||
}
|
||
|
||
return pdb.update(record.Group, record.ID, recordBytes)
|
||
}
|
||
|
||
// GetRecordsByStatus 按状态获取记录(跨所有组)
|
||
func (pdb *PipelineDB) GetRecordsByStatus(status DataStatus, limit int) ([]*DataRecord, error) {
|
||
var records []*DataRecord
|
||
count := 0
|
||
|
||
// 遍历所有组的表
|
||
groups := pdb.getAllGroups()
|
||
for _, group := range groups {
|
||
if count >= limit {
|
||
break
|
||
}
|
||
|
||
// 查询当前组的记录
|
||
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
|
||
if count >= limit {
|
||
return fmt.Errorf("达到限制") // 停止遍历
|
||
}
|
||
|
||
record := &DataRecord{}
|
||
if err := record.FromBytes(data); err != nil {
|
||
return nil // 跳过损坏的记录
|
||
}
|
||
|
||
if record.Status == status {
|
||
records = append(records, record)
|
||
count++
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
if err != nil && err.Error() != "达到限制" {
|
||
return nil, fmt.Errorf("查询组 [%s] 失败: %w", group, err)
|
||
}
|
||
}
|
||
|
||
return records, nil
|
||
}
|
||
|
||
// GetRecordsByGroup 按组获取记录(支持分页)
|
||
func (pdb *PipelineDB) GetRecordsByGroup(group string, pageReq *PageRequest) (*PageResponse, error) {
|
||
// 参数验证
|
||
if pageReq.Page < 1 {
|
||
pageReq.Page = 1
|
||
}
|
||
if pageReq.PageSize < 1 {
|
||
pageReq.PageSize = 10 // 默认每页10条
|
||
}
|
||
if pageReq.PageSize > 1000 {
|
||
pageReq.PageSize = 1000 // 最大每页1000条
|
||
}
|
||
|
||
var allRecords []*DataRecord
|
||
|
||
// 收集所有记录
|
||
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
|
||
record := &DataRecord{}
|
||
if err := record.FromBytes(data); err != nil {
|
||
return nil // 跳过损坏的记录
|
||
}
|
||
|
||
allRecords = append(allRecords, record)
|
||
return nil
|
||
})
|
||
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询记录失败: %w", err)
|
||
}
|
||
|
||
// 排序记录
|
||
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
|
||
|
||
// 计算分页信息
|
||
totalCount := len(allRecords)
|
||
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
|
||
|
||
// 计算当前页的起始和结束索引
|
||
startIndex := (pageReq.Page - 1) * pageReq.PageSize
|
||
endIndex := startIndex + pageReq.PageSize
|
||
|
||
// 边界检查
|
||
if startIndex >= totalCount {
|
||
return &PageResponse{
|
||
Records: []*DataRecord{},
|
||
Page: pageReq.Page,
|
||
PageSize: pageReq.PageSize,
|
||
TotalCount: totalCount,
|
||
TotalPages: totalPages,
|
||
HasNext: false,
|
||
HasPrevious: pageReq.Page > 1,
|
||
}, nil
|
||
}
|
||
|
||
if endIndex > totalCount {
|
||
endIndex = totalCount
|
||
}
|
||
|
||
// 提取当前页的记录
|
||
pageRecords := allRecords[startIndex:endIndex]
|
||
|
||
// 构建响应
|
||
response := &PageResponse{
|
||
Records: pageRecords,
|
||
Page: pageReq.Page,
|
||
PageSize: pageReq.PageSize,
|
||
TotalCount: totalCount,
|
||
TotalPages: totalPages,
|
||
HasNext: pageReq.Page < totalPages,
|
||
HasPrevious: pageReq.Page > 1,
|
||
}
|
||
|
||
return response, nil
|
||
}
|
||
|
||
// SortOrder 排序方向
|
||
type SortOrder string
|
||
|
||
const (
|
||
SortAsc SortOrder = "asc" // 升序
|
||
SortDesc SortOrder = "desc" // 降序
|
||
)
|
||
|
||
// SortField 排序字段
|
||
type SortField string
|
||
|
||
const (
|
||
SortByID SortField = "id" // 按ID排序
|
||
SortByCreatedAt SortField = "created_at" // 按创建时间排序
|
||
SortByUpdatedAt SortField = "updated_at" // 按更新时间排序
|
||
SortByStatus SortField = "status" // 按状态排序
|
||
)
|
||
|
||
// PageRequest 分页请求参数
|
||
type PageRequest struct {
|
||
Page int `json:"page"` // 页码,从1开始
|
||
PageSize int `json:"page_size"` // 每页大小
|
||
SortField SortField `json:"sort_field"` // 排序字段
|
||
SortOrder SortOrder `json:"sort_order"` // 排序方向
|
||
}
|
||
|
||
// PageResponse 分页响应结果
|
||
type PageResponse struct {
|
||
Records []*DataRecord `json:"records"` // 当前页记录
|
||
Page int `json:"page"` // 当前页码
|
||
PageSize int `json:"page_size"` // 每页大小
|
||
TotalCount int `json:"total_count"` // 总记录数
|
||
TotalPages int `json:"total_pages"` // 总页数
|
||
HasNext bool `json:"has_next"` // 是否有下一页
|
||
HasPrevious bool `json:"has_previous"` // 是否有上一页
|
||
}
|
||
|
||
// GetRecordsByGroupAndStatus 按组和状态获取记录(支持分页)
|
||
func (pdb *PipelineDB) GetRecordsByGroupAndStatus(group string, status DataStatus, pageReq *PageRequest) (*PageResponse, error) {
|
||
// 参数验证
|
||
if pageReq.Page < 1 {
|
||
pageReq.Page = 1
|
||
}
|
||
if pageReq.PageSize < 1 {
|
||
pageReq.PageSize = 10 // 默认每页10条
|
||
}
|
||
if pageReq.PageSize > 1000 {
|
||
pageReq.PageSize = 1000 // 最大每页1000条
|
||
}
|
||
|
||
var allRecords []*DataRecord
|
||
|
||
// 首先收集所有匹配的记录
|
||
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
|
||
record := &DataRecord{}
|
||
if err := record.FromBytes(data); err != nil {
|
||
return nil // 跳过损坏的记录
|
||
}
|
||
|
||
if record.Status == status {
|
||
allRecords = append(allRecords, record)
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询记录失败: %w", err)
|
||
}
|
||
|
||
// 排序记录
|
||
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
|
||
|
||
// 计算分页信息
|
||
totalCount := len(allRecords)
|
||
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
|
||
|
||
// 计算当前页的起始和结束索引
|
||
startIndex := (pageReq.Page - 1) * pageReq.PageSize
|
||
endIndex := startIndex + pageReq.PageSize
|
||
|
||
// 边界检查
|
||
if startIndex >= totalCount {
|
||
// 页码超出范围,返回空结果
|
||
return &PageResponse{
|
||
Records: []*DataRecord{},
|
||
Page: pageReq.Page,
|
||
PageSize: pageReq.PageSize,
|
||
TotalCount: totalCount,
|
||
TotalPages: totalPages,
|
||
HasNext: false,
|
||
HasPrevious: pageReq.Page > 1,
|
||
}, nil
|
||
}
|
||
|
||
if endIndex > totalCount {
|
||
endIndex = totalCount
|
||
}
|
||
|
||
// 提取当前页的记录
|
||
pageRecords := allRecords[startIndex:endIndex]
|
||
|
||
// 构建响应
|
||
response := &PageResponse{
|
||
Records: pageRecords,
|
||
Page: pageReq.Page,
|
||
PageSize: pageReq.PageSize,
|
||
TotalCount: totalCount,
|
||
TotalPages: totalPages,
|
||
HasNext: pageReq.Page < totalPages,
|
||
HasPrevious: pageReq.Page > 1,
|
||
}
|
||
|
||
return response, nil
|
||
}
|
||
|
||
// GetStats 获取统计信息(高性能版本)
|
||
func (pdb *PipelineDB) GetStats() (*Stats, error) {
|
||
stats := &Stats{
|
||
Timestamp: time.Now(),
|
||
GroupStats: make(map[string]*GroupStats),
|
||
}
|
||
|
||
// 从 GroupManager 快速获取统计缓存(O(1)复杂度)
|
||
groupStatsCache := pdb.groupManager.GetFastStats()
|
||
|
||
// 统计各状态的记录数
|
||
var totalRecords, hotRecords, warmRecords, coldRecords int
|
||
|
||
// 转换缓存数据为 Stats 格式
|
||
for group, cache := range groupStatsCache {
|
||
stats.GroupStats[group] = &GroupStats{
|
||
Group: group,
|
||
TotalRecords: cache.TotalRecords,
|
||
HotRecords: cache.HotRecords,
|
||
WarmRecords: cache.WarmRecords,
|
||
ColdRecords: cache.ColdRecords,
|
||
}
|
||
|
||
// 累计总体统计
|
||
totalRecords += cache.TotalRecords
|
||
hotRecords += cache.HotRecords
|
||
warmRecords += cache.WarmRecords
|
||
coldRecords += cache.ColdRecords
|
||
}
|
||
|
||
// 设置总体统计
|
||
stats.TotalRecords = totalRecords
|
||
stats.HotRecords = hotRecords
|
||
stats.WarmRecords = warmRecords
|
||
stats.ColdRecords = coldRecords
|
||
|
||
// 获取数据库统计
|
||
hits, misses, hitRate := pdb.cache.Stats()
|
||
stats.CacheHits = hits
|
||
stats.CacheMisses = misses
|
||
stats.CacheHitRate = hitRate
|
||
stats.TotalPages = int(pdb.header.TotalPages)
|
||
stats.FreePages = pdb.freePageMgr.FreeCount()
|
||
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
|
||
|
||
return stats, nil
|
||
}
|
||
|
||
// ValidateStats 验证统计信息(通过遍历实际数据)
|
||
// 用于数据校验和调试,性能较慢但结果准确
|
||
func (pdb *PipelineDB) ValidateStats() (*Stats, error) {
|
||
stats := &Stats{
|
||
Timestamp: time.Now(),
|
||
GroupStats: make(map[string]*GroupStats),
|
||
}
|
||
|
||
// 统计各状态的记录数
|
||
statusCounts := make(map[DataStatus]int)
|
||
|
||
// 遍历所有组的表
|
||
groups := pdb.getAllGroups()
|
||
for _, group := range groups {
|
||
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
|
||
record := &DataRecord{}
|
||
if err := record.FromBytes(data); err != nil {
|
||
return nil // 跳过损坏的记录
|
||
}
|
||
|
||
// 更新总体统计
|
||
statusCounts[record.Status]++
|
||
|
||
// 更新组统计
|
||
if stats.GroupStats[record.Group] == nil {
|
||
stats.GroupStats[record.Group] = &GroupStats{
|
||
Group: record.Group,
|
||
}
|
||
}
|
||
stats.GroupStats[record.Group].TotalRecords++
|
||
|
||
// 更新组的状态计数
|
||
switch record.Status {
|
||
case StatusHot:
|
||
stats.GroupStats[record.Group].HotRecords++
|
||
case StatusWarm:
|
||
stats.GroupStats[record.Group].WarmRecords++
|
||
case StatusCold:
|
||
stats.GroupStats[record.Group].ColdRecords++
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
if err != nil {
|
||
return nil, fmt.Errorf("统计组 [%s] 失败: %w", group, err)
|
||
}
|
||
}
|
||
|
||
// 设置总体统计
|
||
stats.TotalRecords = statusCounts[StatusHot] + statusCounts[StatusWarm] + statusCounts[StatusCold]
|
||
stats.HotRecords = statusCounts[StatusHot]
|
||
stats.WarmRecords = statusCounts[StatusWarm]
|
||
stats.ColdRecords = statusCounts[StatusCold]
|
||
|
||
// 获取数据库统计
|
||
hits, misses, hitRate := pdb.cache.Stats()
|
||
stats.CacheHits = hits
|
||
stats.CacheMisses = misses
|
||
stats.CacheHitRate = hitRate
|
||
stats.TotalPages = int(pdb.header.TotalPages)
|
||
stats.FreePages = pdb.freePageMgr.FreeCount()
|
||
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
|
||
|
||
return stats, nil
|
||
}
|
||
|
||
// PauseGroup 暂停组接收数据
|
||
func (pdb *PipelineDB) PauseGroup(group string) {
|
||
pdb.groupManager.PauseGroup(group)
|
||
}
|
||
|
||
// ResumeGroup 恢复组接收数据
|
||
func (pdb *PipelineDB) ResumeGroup(group string) {
|
||
pdb.groupManager.ResumeGroup(group)
|
||
}
|
||
|
||
// IsGroupPaused 检查组是否暂停
|
||
func (pdb *PipelineDB) IsGroupPaused(group string) bool {
|
||
return pdb.groupManager.IsPaused(group)
|
||
}
|
||
|
||
// GetPausedGroups 获取所有暂停的组
|
||
func (pdb *PipelineDB) GetPausedGroups() []string {
|
||
return pdb.groupManager.GetPausedGroups()
|
||
}
|
||
|
||
// IsGroupReadyForCleanup 检查组是否可以清理(所有数据都是cold)
|
||
func (pdb *PipelineDB) IsGroupReadyForCleanup(group string) (bool, error) {
|
||
// 检查组是否暂停
|
||
if !pdb.groupManager.IsPaused(group) {
|
||
return false, fmt.Errorf("组 [%s] 未暂停,不能清理", group)
|
||
}
|
||
|
||
// 检查是否有非-cold数据
|
||
hotPageReq := &PageRequest{Page: 1, PageSize: 1}
|
||
hotResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusHot, hotPageReq)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
if len(hotResponse.Records) > 0 {
|
||
return false, nil // 还有热数据
|
||
}
|
||
|
||
warmPageReq := &PageRequest{Page: 1, PageSize: 1}
|
||
warmResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusWarm, warmPageReq)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
warmRecords := warmResponse.Records
|
||
if len(warmRecords) > 0 {
|
||
return false, nil // 还有温数据
|
||
}
|
||
|
||
return true, nil // 所有数据都是cold,可以清理
|
||
}
|
||
|
||
// sortRecords 对记录进行排序
|
||
func (pdb *PipelineDB) sortRecords(records []*DataRecord, sortField SortField, sortOrder SortOrder) {
|
||
if len(records) <= 1 {
|
||
return
|
||
}
|
||
|
||
// 设置默认排序
|
||
if sortField == "" {
|
||
sortField = SortByID
|
||
}
|
||
if sortOrder == "" {
|
||
sortOrder = SortAsc
|
||
}
|
||
|
||
sort.Slice(records, func(i, j int) bool {
|
||
var less bool
|
||
|
||
switch sortField {
|
||
case SortByID:
|
||
less = records[i].ID < records[j].ID
|
||
case SortByCreatedAt:
|
||
less = records[i].CreatedAt.Before(records[j].CreatedAt)
|
||
case SortByUpdatedAt:
|
||
less = records[i].UpdatedAt.Before(records[j].UpdatedAt)
|
||
case SortByStatus:
|
||
// 状态排序:hot < warm < cold
|
||
statusOrder := map[DataStatus]int{
|
||
StatusHot: 1,
|
||
StatusWarm: 2,
|
||
StatusCold: 3,
|
||
}
|
||
less = statusOrder[records[i].Status] < statusOrder[records[j].Status]
|
||
default:
|
||
// 默认按ID排序
|
||
less = records[i].ID < records[j].ID
|
||
}
|
||
|
||
// 如果是降序,反转比较结果
|
||
if sortOrder == SortDesc {
|
||
less = !less
|
||
}
|
||
|
||
return less
|
||
})
|
||
}
|
||
|
||
// CleanupGroup 清理组数据
|
||
func (pdb *PipelineDB) CleanupGroup(group string) error {
|
||
// 检查是否可以清理
|
||
ready, err := pdb.IsGroupReadyForCleanup(group)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !ready {
|
||
return fmt.Errorf("组 [%s] 还有未处理完成的数据,不能清理", group)
|
||
}
|
||
|
||
pdb.logger.Info("🧽 开始清理组数据", "group", group)
|
||
|
||
// 调用外部处理器的完成回调
|
||
if pdb.handler != nil {
|
||
// 检查是否已经有 OnComplete 在执行
|
||
if pdb.groupManager.IsOnCompleteExecuting(group) {
|
||
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,等待完成", "group", group)
|
||
// 等待当前执行完成
|
||
for pdb.groupManager.IsOnCompleteExecuting(group) {
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
pdb.logger.Info("✅ 组的 OnComplete 执行完成,继续清理", "group", group)
|
||
} else {
|
||
// 标记为正在执行
|
||
pdb.groupManager.SetOnCompleteExecuting(group, true)
|
||
defer pdb.groupManager.SetOnCompleteExecuting(group, false)
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer cancel()
|
||
|
||
pdb.logger.Info("🎯 调用组完成回调", "group", group)
|
||
if err := pdb.handler.OnComplete(ctx, group); err != nil {
|
||
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
|
||
// 不阻止清理过程,只记录错误
|
||
} else {
|
||
pdb.logger.Info("✅ 组完成回调成功", "group", group)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 获取组中所有记录
|
||
pageReq := &PageRequest{Page: 1, PageSize: 10000} // 假设最多10000条
|
||
response, err := pdb.GetRecordsByGroup(group, pageReq)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
records := response.Records
|
||
|
||
// 删除所有记录
|
||
deletedCount := len(records)
|
||
// TODO: 实现实际的记录删除逻辑
|
||
pdb.logger.Info("📋 组中有记录待删除", "group", group, "count", deletedCount)
|
||
|
||
pdb.logger.Info("🧽 组清理完成", "group", group, "deleted_count", deletedCount)
|
||
return nil
|
||
}
|
||
|
||
// checkGroupCompletion 检查组是否完成处理并调用完成回调
|
||
func (pdb *PipelineDB) checkGroupCompletion(group string) {
|
||
// 只检查暂停的组
|
||
if !pdb.groupManager.IsPaused(group) {
|
||
return
|
||
}
|
||
|
||
// 检查是否已经有 OnComplete 在执行
|
||
if pdb.groupManager.IsOnCompleteExecuting(group) {
|
||
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,跳过重复调用", "group", group)
|
||
return
|
||
}
|
||
|
||
// 检查是否所有数据都已转为 cold
|
||
ready, err := pdb.IsGroupReadyForCleanup(group)
|
||
if err != nil {
|
||
pdb.logger.Warn("⚠️ 检查组完成状态失败", "group", group, "error", err)
|
||
return
|
||
}
|
||
|
||
if ready && pdb.handler != nil {
|
||
// 标记为正在执行
|
||
pdb.groupManager.SetOnCompleteExecuting(group, true)
|
||
|
||
// 异步执行完成回调,避免阻塞
|
||
go func() {
|
||
defer pdb.groupManager.SetOnCompleteExecuting(group, false) // 确保执行完成后清除标记
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer cancel()
|
||
|
||
pdb.logger.Info("🎯 组所有数据已转为 cold,调用完成回调", "group", group)
|
||
if err := pdb.handler.OnComplete(ctx, group); err != nil {
|
||
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
|
||
} else {
|
||
pdb.logger.Info("✅ 组完成回调成功", "group", group)
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// handleGroupDataEvents 处理组数据事件
|
||
func (pdb *PipelineDB) handleGroupDataEvents(groupEventCh <-chan GroupDataEvent) {
|
||
pdb.logger.Info("📡 启动组数据事件监听器")
|
||
|
||
for {
|
||
select {
|
||
case <-pdb.ctx.Done():
|
||
pdb.logger.Info("📡 组数据事件监听器停止")
|
||
return
|
||
|
||
case event, ok := <-groupEventCh:
|
||
if !ok {
|
||
pdb.logger.Info("📡 组数据事件通道已关闭")
|
||
return
|
||
}
|
||
|
||
pdb.processGroupDataEvent(event)
|
||
}
|
||
}
|
||
}
|
||
|
||
// processGroupDataEvent 处理单个组数据事件
|
||
func (pdb *PipelineDB) processGroupDataEvent(event GroupDataEvent) {
|
||
if event.Done {
|
||
// 组完成,自动暂停
|
||
pdb.logger.Info("🔚 组数据产出完成,自动暂停", "group", event.Group)
|
||
pdb.groupManager.PauseGroup(event.Group)
|
||
|
||
// 可选:触发组完成检查
|
||
go func() {
|
||
// 等待一小段时间让当前数据处理完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
pdb.checkGroupCompletion(event.Group)
|
||
}()
|
||
} else if len(event.Data) > 0 {
|
||
// 有新数据,确保组未暂停
|
||
if pdb.groupManager.IsPaused(event.Group) {
|
||
pdb.logger.Info("▶️ 组有新数据产出,自动恢复", "group", event.Group)
|
||
pdb.groupManager.ResumeGroup(event.Group)
|
||
}
|
||
|
||
// 自动接收数据
|
||
_, err := pdb.AcceptData(event.Group, event.Data, "auto_from_event")
|
||
if err != nil {
|
||
pdb.logger.Warn("⚠️ 自动接收组数据失败", "group", event.Group, "error", err)
|
||
} else {
|
||
pdb.logger.Info("📥 自动接收组数据成功", "group", event.Group)
|
||
}
|
||
}
|
||
}
|
||
|
||
// getAllGroups 获取所有组名(通过索引管理器)
|
||
func (pdb *PipelineDB) getAllGroups() []string {
|
||
// 简化实现:通过索引管理器获取所有组名
|
||
var groups []string
|
||
for groupName := range pdb.indexMgr.indexes {
|
||
groups = append(groups, groupName)
|
||
}
|
||
return groups
|
||
}
|
||
|
||
// GetGroupCleanupStatus 获取组清理状态
|
||
func (pdb *PipelineDB) GetGroupCleanupStatus(group string) (*GroupCleanupStatus, error) {
|
||
status := &GroupCleanupStatus{
|
||
Group: group,
|
||
Paused: pdb.groupManager.IsPaused(group),
|
||
}
|
||
|
||
// 获取组统计
|
||
stats, err := pdb.GetStats()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if groupStat, exists := stats.GroupStats[group]; exists {
|
||
status.TotalRecords = groupStat.TotalRecords
|
||
status.HotRecords = groupStat.HotRecords
|
||
status.WarmRecords = groupStat.WarmRecords
|
||
status.ColdRecords = groupStat.ColdRecords
|
||
status.ReadyForCleanup = (groupStat.HotRecords == 0 && groupStat.WarmRecords == 0 && status.Paused)
|
||
}
|
||
|
||
return status, nil
|
||
}
|
||
|
||
// Stats 统计信息
|
||
type Stats struct {
|
||
Timestamp time.Time `json:"timestamp"`
|
||
TotalRecords int `json:"total_records"`
|
||
HotRecords int `json:"hot_records"`
|
||
WarmRecords int `json:"warm_records"`
|
||
ColdRecords int `json:"cold_records"`
|
||
GroupStats map[string]*GroupStats `json:"group_stats"`
|
||
TotalPages int `json:"total_pages"`
|
||
FreePages int `json:"free_pages"`
|
||
CacheHits int64 `json:"cache_hits"`
|
||
CacheMisses int64 `json:"cache_misses"`
|
||
CacheHitRate float64 `json:"cache_hit_rate"`
|
||
FileSizeBytes int64 `json:"file_size_bytes"`
|
||
}
|
||
|
||
// GroupStats 组统计信息
|
||
type GroupStats struct {
|
||
Group string `json:"group"`
|
||
TotalRecords int `json:"total_records"`
|
||
HotRecords int `json:"hot_records"`
|
||
WarmRecords int `json:"warm_records"`
|
||
ColdRecords int `json:"cold_records"`
|
||
}
|
||
|
||
// GroupCleanupStatus 组清理状态
|
||
type GroupCleanupStatus struct {
|
||
Group string `json:"group"`
|
||
Paused bool `json:"paused"`
|
||
TotalRecords int `json:"total_records"`
|
||
HotRecords int `json:"hot_records"`
|
||
WarmRecords int `json:"warm_records"`
|
||
ColdRecords int `json:"cold_records"`
|
||
ReadyForCleanup bool `json:"ready_for_cleanup"`
|
||
}
|
||
|
||
// 内部方法
|
||
|
||
func (pdb *PipelineDB) initializeFile() error {
|
||
pdb.header = &Header{
|
||
Magic: 0x57A5DB,
|
||
PageSize: PageSize,
|
||
TotalPages: 2,
|
||
FreeHead: 0,
|
||
RootPage: 1,
|
||
}
|
||
|
||
// 写入文件头
|
||
buf := make([]byte, PageSize)
|
||
pdb.writeHeaderToBuffer(buf)
|
||
if _, err := pdb.file.Write(buf); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 初始化根页
|
||
rootPage := make([]byte, PageSize)
|
||
binary.LittleEndian.PutUint16(rootPage[0:2], 0) // numSlots
|
||
binary.LittleEndian.PutUint16(rootPage[2:4], PageSize) // freeOff
|
||
binary.LittleEndian.PutUint16(rootPage[4:6], 0) // nextPage
|
||
|
||
if _, err := pdb.file.Write(rootPage); err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) loadHeader() error {
|
||
buf := make([]byte, HdrSize)
|
||
if _, err := pdb.file.ReadAt(buf, 0); err != nil {
|
||
return err
|
||
}
|
||
|
||
pdb.header = &Header{
|
||
Magic: binary.LittleEndian.Uint32(buf[0:4]),
|
||
PageSize: binary.LittleEndian.Uint16(buf[4:6]),
|
||
TotalPages: binary.LittleEndian.Uint16(buf[6:8]),
|
||
FreeHead: binary.LittleEndian.Uint16(buf[8:10]),
|
||
RootPage: binary.LittleEndian.Uint16(buf[10:12]),
|
||
CounterPage: binary.LittleEndian.Uint16(buf[12:14]),
|
||
}
|
||
|
||
if pdb.header.Magic != 0x57A5DB || pdb.header.PageSize != PageSize {
|
||
return errors.New("invalid database file")
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) saveHeader() error {
|
||
buf := make([]byte, PageSize)
|
||
pdb.writeHeaderToBuffer(buf)
|
||
_, err := pdb.file.WriteAt(buf, 0)
|
||
return err
|
||
}
|
||
|
||
func (pdb *PipelineDB) writeHeaderToBuffer(buf []byte) {
|
||
binary.LittleEndian.PutUint32(buf[0:4], pdb.header.Magic)
|
||
binary.LittleEndian.PutUint16(buf[4:6], pdb.header.PageSize)
|
||
binary.LittleEndian.PutUint16(buf[6:8], pdb.header.TotalPages)
|
||
binary.LittleEndian.PutUint16(buf[8:10], pdb.header.FreeHead)
|
||
binary.LittleEndian.PutUint16(buf[10:12], pdb.header.RootPage)
|
||
binary.LittleEndian.PutUint16(buf[12:14], pdb.header.CounterPage)
|
||
}
|
||
|
||
func (pdb *PipelineDB) getRowMutex(id int64) *sync.RWMutex {
|
||
pdb.mu.Lock()
|
||
defer pdb.mu.Unlock()
|
||
|
||
if mutex, exists := pdb.rowMutexes[id]; exists {
|
||
return mutex
|
||
}
|
||
|
||
mutex := &sync.RWMutex{}
|
||
pdb.rowMutexes[id] = mutex
|
||
return mutex
|
||
}
|
||
|
||
// runAutoPipeline 运行自动管道处理器
|
||
func (pdb *PipelineDB) runAutoPipeline(ctx context.Context) {
|
||
pdb.logger.Info("🔥 启动自动管道处理器")
|
||
|
||
// 创建两个独立的定时器
|
||
warmTicker := time.NewTicker(pdb.config.WarmInterval)
|
||
processTicker := time.NewTicker(pdb.config.ProcessInterval)
|
||
|
||
defer warmTicker.Stop()
|
||
defer processTicker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
pdb.logger.Info("🔥 自动管道处理器已停止")
|
||
return
|
||
case <-warmTicker.C:
|
||
// 处理 hot -> warm
|
||
if err := pdb.processHotData(); err != nil {
|
||
pdb.logger.Error("❌ 预热处理错误", "error", err)
|
||
}
|
||
case <-processTicker.C:
|
||
// 处理 warm -> cold
|
||
if err := pdb.processWarmData(); err != nil {
|
||
pdb.logger.Error("❌ 温数据处理错误", "error", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// processHotData 处理热数据
|
||
func (pdb *PipelineDB) processHotData() error {
|
||
// 获取热数据
|
||
hotRecords, err := pdb.GetRecordsByStatus(StatusHot, pdb.config.BatchSize)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if len(hotRecords) == 0 {
|
||
return nil // 没有热数据需要处理
|
||
}
|
||
|
||
pdb.logger.Info("🔥 开始预热热数据", "count", len(hotRecords))
|
||
|
||
processed := 0
|
||
for _, record := range hotRecords {
|
||
if err := pdb.warmupRecord(record); err != nil {
|
||
pdb.logger.Error("❌ 预热记录失败", "id", record.ID, "error", err)
|
||
continue
|
||
}
|
||
processed++
|
||
}
|
||
|
||
pdb.logger.Info("🔥 预热完成", "processed", processed, "total", len(hotRecords))
|
||
return nil
|
||
}
|
||
|
||
// warmupRecord 预热单条记录
|
||
func (pdb *PipelineDB) warmupRecord(record *DataRecord) error {
|
||
// 执行预热逻辑
|
||
if err := pdb.performWarmup(record); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新状态为 warm
|
||
record.Status = StatusWarm
|
||
|
||
// 保存更新
|
||
if err := pdb.updateRecord(record); err != nil {
|
||
return err
|
||
}
|
||
|
||
pdb.logger.Info("🔥 记录已预热: hot -> warm", "id", record.ID)
|
||
return nil
|
||
}
|
||
|
||
// performWarmup 执行实际的预热操作
|
||
func (pdb *PipelineDB) performWarmup(record *DataRecord) error {
|
||
// 调用外部处理器的预热方法
|
||
if pdb.handler != nil {
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
|
||
processedData, err := pdb.handler.WillWarm(ctx, record.Group, record.Data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新处理后的数据
|
||
if len(processedData) > 0 {
|
||
record.Data = processedData
|
||
}
|
||
}
|
||
|
||
// 添加预热元数据
|
||
if record.Metadata == "" {
|
||
record.Metadata = "warmed_up"
|
||
} else {
|
||
record.Metadata += ",warmed_up"
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// processWarmData 处理温数据
|
||
func (pdb *PipelineDB) processWarmData() error {
|
||
// 获取 warm 数据
|
||
warmRecords, err := pdb.GetRecordsByStatus(StatusWarm, pdb.config.BatchSize)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if len(warmRecords) == 0 {
|
||
return nil // 没有温数据需要处理
|
||
}
|
||
|
||
pdb.logger.Info("⚙️ 开始处理温数据", "count", len(warmRecords))
|
||
|
||
processed := 0
|
||
for _, record := range warmRecords {
|
||
if err := pdb.cooldownRecord(record); err != nil {
|
||
pdb.logger.Error("❌ 处理记录失败", "id", record.ID, "error", err)
|
||
continue
|
||
}
|
||
processed++
|
||
}
|
||
|
||
pdb.logger.Info("⚙️ 处理完成", "processed", processed, "total", len(warmRecords))
|
||
return nil
|
||
}
|
||
|
||
// cooldownRecord 处理单条记录 (warm -> cold)
|
||
func (pdb *PipelineDB) cooldownRecord(record *DataRecord) error {
|
||
// 自动处理器处理 warm 数据
|
||
pdb.logger.Info("⚙️ 自动处理器处理记录", "id", record.ID)
|
||
|
||
// 调用外部处理器
|
||
if err := pdb.performCooldown(record); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新状态为 cold 并保存回存储
|
||
record.Status = StatusCold
|
||
|
||
if err := pdb.updateRecord(record); err != nil {
|
||
return err
|
||
}
|
||
|
||
pdb.logger.Info("⚙️ 记录处理完成: warm -> cold", "id", record.ID)
|
||
return nil
|
||
}
|
||
|
||
// performCooldown 调用外部处理器
|
||
func (pdb *PipelineDB) performCooldown(record *DataRecord) error {
|
||
pdb.logger.Info("🔗 调用外部处理器处理记录", "id", record.ID)
|
||
|
||
// 创建超时上下文
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer cancel()
|
||
|
||
// 调用外部处理器
|
||
processedData, err := pdb.handler.WillCold(ctx, record.Group, record.Data)
|
||
if err != nil {
|
||
pdb.logger.Error("❌ 外部处理器处理失败", "error", err)
|
||
return err
|
||
}
|
||
|
||
// 更新处理后的数据
|
||
record.Data = processedData
|
||
|
||
// 成功处理
|
||
pdb.logger.Info("✅ 外部处理器成功处理记录", "id", record.ID)
|
||
|
||
// 检查组是否完成处理
|
||
pdb.checkGroupCompletion(record.Group)
|
||
|
||
return nil
|
||
}
|
||
|
||
// rebuildIndexes 重建所有索引(用于数据库重启后恢复)
|
||
func (pdb *PipelineDB) rebuildIndexes() error {
|
||
// 如果没有根页面或总页数为0,跳过重建
|
||
if pdb.header.RootPage == 0 || pdb.header.TotalPages <= 1 {
|
||
pdb.logger.Info("🔄 数据库为空,跳过索引重建")
|
||
return nil
|
||
}
|
||
|
||
pdb.logger.Info("🔄 重建索引...")
|
||
|
||
// 遍历所有数据页重建索引
|
||
pdb.logger.Info("📊 开始扫描页面", "totalPages", pdb.header.TotalPages, "rootPage", pdb.header.RootPage)
|
||
|
||
recordCount := 0
|
||
// 从根页面开始扫描页链
|
||
for pageNo := pdb.header.RootPage; pageNo != 0 && pageNo < pdb.header.TotalPages; {
|
||
// 读取页面
|
||
page, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
pdb.logger.Info("⚠️ 跳过无法读取的页面", "pageNo", pageNo, "error", err)
|
||
break // 页链断裂,停止扫描
|
||
}
|
||
|
||
// 检查是否是数据页
|
||
if len(page) < 8 {
|
||
pdb.logger.Info("⚠️ 跳过太小的页面", "pageNo", pageNo, "size", len(page))
|
||
continue
|
||
}
|
||
|
||
// 解析页面头部
|
||
numSlots := binary.LittleEndian.Uint16(page[0:2]) // 正确的槽数量
|
||
freeOff := binary.LittleEndian.Uint16(page[2:4])
|
||
nextPage := binary.LittleEndian.Uint16(page[4:6])
|
||
// 只在有数据时记录详细日志
|
||
if numSlots > 0 {
|
||
pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage)
|
||
}
|
||
|
||
if numSlots == 0 {
|
||
pageNo = nextPage
|
||
continue
|
||
}
|
||
|
||
// 遍历页面中的所有记录(使用槽数组)
|
||
for slotNo := uint16(0); slotNo < numSlots && slotNo < 100; slotNo++ { // 限制最大槽数,避免无效数据
|
||
// 读取槽偏移(槽数组从偏移6开始)
|
||
slotOffset := 6 + slotNo*2
|
||
if slotOffset+2 > uint16(len(page)) {
|
||
pdb.logger.Info("⚠️ 槽偏移超出页面", "slotNo", slotNo, "slotOffset", slotOffset, "pageSize", len(page))
|
||
break
|
||
}
|
||
recordOffset := binary.LittleEndian.Uint16(page[slotOffset : slotOffset+2])
|
||
|
||
// 读取记录
|
||
if recordOffset+8 > uint16(len(page)) {
|
||
continue
|
||
}
|
||
|
||
// 读取ID
|
||
id := binary.LittleEndian.Uint64(page[recordOffset : recordOffset+8])
|
||
|
||
// 读取数据长度(变长编码)
|
||
dataLen, n := binary.Uvarint(page[recordOffset+8:])
|
||
if n <= 0 {
|
||
continue
|
||
}
|
||
|
||
// 读取数据
|
||
dataStart := recordOffset + 8 + uint16(n)
|
||
dataEnd := dataStart + uint16(dataLen)
|
||
if dataEnd > uint16(len(page)) {
|
||
continue
|
||
}
|
||
|
||
// 解析JSON记录获取组名
|
||
var record DataRecord
|
||
if err := record.FromBytes(page[dataStart:dataEnd]); err != nil {
|
||
continue
|
||
}
|
||
|
||
// 添加到索引
|
||
idx := pdb.indexMgr.GetOrCreateIndex(record.Group)
|
||
idx.Insert(int64(id), pageNo, slotNo)
|
||
recordCount++
|
||
}
|
||
|
||
// 移动到下一个页面
|
||
nextPageNo := Page(page).nextPage()
|
||
if nextPageNo == 0 {
|
||
break // 页链结束
|
||
}
|
||
pageNo = nextPageNo
|
||
}
|
||
|
||
pdb.logger.Info("✅ 索引重建完成", "recordCount", recordCount)
|
||
return nil
|
||
}
|