Files
pipelinedb/pipeline_db.go
Pipeline Database 6f78cdd8a9 🚀 优化索引重建和添加系统监控
🔧 索引重建优化:
- 当数据库为空时跳过索引重建,避免频繁日志
- 只在有数据的页面时记录详细扫描日志
- 添加记录计数统计,显示重建的记录数量
- 减少 CPU 使用率,提高空数据库性能

💻 系统监控功能:
- 添加完整的 CPU 和内存监控
- 实时显示 Goroutine 数量和内存使用情况
- 垃圾回收统计和对象分配监控
- 每10秒显示系统资源状态

📊 自动处理示例增强:
- 集成系统资源监控到示例中
- 提供性能分析和资源优化指导
- 完善的监控文档和使用说明

🎯 性能提升:
- 解决空数据库时的高 CPU 使用问题
- 优化日志输出频率和级别
- 提供实时性能监控能力
2025-09-30 17:48:28 +08:00

1491 lines
42 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package pipelinedb provides an integrated pipeline database system
//
// # Pipeline Database V4 是一个集成了数据库存储和业务管道处理的一体化解决方案
//
// 核心特性:
// - 基于页面的存储引擎:高效的数据存储和检索
// - 分组数据管理:支持按业务组织数据,独立管理和统计
// - 三级数据状态Hot-> Warm-> Cold的自动流转
// - 外部处理器集成:支持自定义业务逻辑处理
// - 高并发支持:线程安全的索引和存储操作
// - 页面缓存:提高数据访问性能
// - 持久化存储:数据安全持久保存
//
// 数据流转模型:
// 1. 数据接收新数据以Hot状态进入系统
// 2. 预热处理Hot数据经过预热处理转为Warm状态
// 3. 冷却处理Warm数据经过冷却处理转为Cold状态
// 4. 完成回调:组内所有数据处理完成后触发回调
//
// 使用场景:
// - 数据管道处理ETL、数据清洗、数据转换
// - 业务流程管理:订单处理、用户行为分析
// - 实时数据处理:日志分析、监控数据处理
// - 批量数据处理:数据导入、数据同步
package pipelinedb
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"sort"
"sync"
"time"
)
// ==================== 系统常量定义 ====================
const (
PageSize = 4096 // 页面大小4KB与操作系统页面大小对齐
HdrSize = 14 // 文件头大小:包含所有头部字段的字节数
MaxRecSize = 3000 // 最大记录大小:限制单条记录的最大字节数
)
// ==================== 数据结构定义 ====================
// Header 数据库文件头结构
//
// 文件头存储在文件的开始位置,包含数据库的元数据信息
// 用于数据库启动时的初始化和状态恢复
type Header struct {
Magic uint32 // 魔数:用于文件格式验证
PageSize uint16 // 页面大小固定为4096字节
TotalPages uint16 // 总页面数:当前数据库文件包含的页面数量
FreeHead uint16 // 空闲页链头:空闲页面管理的起始页面
RootPage uint16 // 根页面:数据存储的根页面编号
CounterPage uint16 // 计数器页面用于生成唯一ID的计数器页面
}
// DataStatus 数据状态枚举
//
// 定义数据在管道中的处理状态,支持三级状态流转
// 状态转换Hot -> Warm -> Cold
type DataStatus string
const (
StatusHot DataStatus = "hot" // 热数据:刚接收的新数据,等待预热处理
StatusWarm DataStatus = "warm" // 温数据:已预热的数据,等待冷却处理
StatusCold DataStatus = "cold" // 冷数据:已完成处理的数据,可以归档或清理
)
// DataRecord 数据记录结构
//
// 表示系统中的一条数据记录,包含完整的数据信息和元数据
// 支持JSON序列化便于存储和传输
type DataRecord struct {
ID int64 `json:"id"` // 记录唯一标识符
Group string `json:"group"` // 所属数据组名称
Status DataStatus `json:"status"` // 当前数据状态
Data []byte `json:"data"` // 实际数据内容
CreatedAt time.Time `json:"created_at"` // 创建时间戳
UpdatedAt time.Time `json:"updated_at"` // 最后更新时间戳
Metadata string `json:"metadata,omitempty"` // 可选的元数据信息
}
// GroupDataEvent 组数据事件
//
// 用于在组数据处理过程中传递事件信息
// 支持事件驱动的数据处理模式
type GroupDataEvent struct {
Group string // 组名:事件所属的数据组
Data []byte // 数据内容:事件携带的数据
Done bool // 完成标志:标识该组是否已完成所有数据处理
}
// ==================== 记录序列化方法 ====================
// ToBytes 将数据记录序列化为字节数组
//
// 返回值:
// - []byte: 序列化后的字节数组
// - error: 序列化错误
//
// 使用JSON格式进行序列化便于跨平台兼容和调试
func (dr *DataRecord) ToBytes() ([]byte, error) {
return json.Marshal(dr)
}
// FromBytes 从字节数组反序列化数据记录
//
// 参数:
// - data: 序列化的字节数组
//
// 返回值:
// - error: 反序列化错误
//
// 从JSON格式的字节数组中恢复数据记录对象
func (dr *DataRecord) FromBytes(data []byte) error {
return json.Unmarshal(data, dr)
}
// ==================== 核心数据库结构 ====================
// PipelineDB 管道数据库主结构体
//
// 集成了存储引擎、索引管理、缓存系统、管道处理等所有核心功能
// 提供线程安全的数据库操作接口
type PipelineDB struct {
// ========== 存储引擎组件 ==========
file *os.File // 数据库文件句柄
header *Header // 文件头信息
cache *PageCache // 页面缓存系统
freePageMgr *FreePageManager // 空闲页面管理器
indexMgr *IndexManager // 索引管理器
rowMutexes map[int64]*sync.RWMutex // 行级锁映射
mu sync.RWMutex // 数据库级读写锁
// ========== 配置和处理器 ==========
config *Config // 数据库配置
handler Handler // 数据处理器接口
logger *slog.Logger // 结构化日志器
// ========== 并发控制 ==========
ctx context.Context // 上下文控制
cancel context.CancelFunc // 取消函数
wg sync.WaitGroup // 等待组,用于优雅关闭
// ========== 业务组件 ==========
groupManager *GroupManager // 组管理器
}
// ==================== 配置结构定义 ====================
// Config 数据库统一配置结构
//
// 包含存储引擎配置和管道处理配置
// 支持JSON序列化便于配置文件管理
type Config struct {
// ========== 存储引擎配置 ==========
CacheSize int `json:"cache_size"` // 页缓存大小:缓存的页面数量
SyncWrites bool `json:"sync_writes"` // 同步写入:是否立即同步到磁盘
CreateIfMiss bool `json:"create_if_miss"` // 自动创建:文件不存在时是否自动创建
// ========== 管道处理配置 ==========
WarmInterval time.Duration `json:"warm_interval"` // 预热间隔Hot->Warm状态转换的时间间隔
ProcessInterval time.Duration `json:"process_interval"` // 处理间隔Warm->Cold状态转换的时间间隔
BatchSize int `json:"batch_size"` // 批处理大小:每次处理的记录数量
EnableMetrics bool `json:"enable_metrics"` // 启用指标:是否收集性能指标
}
// DefaultConfig 返回默认配置
//
// 返回值:
// - *Config: 包含合理默认值的配置对象
//
// 默认配置适用于大多数使用场景,可以根据实际需求进行调整
func DefaultConfig() *Config {
return &Config{
CacheSize: 256, // 256页缓存约1MB内存
SyncWrites: false, // 异步写入,提高性能
CreateIfMiss: true, // 自动创建数据库文件
WarmInterval: 5 * time.Second, // 5秒预热间隔
ProcessInterval: 10 * time.Second, // 10秒处理间隔
BatchSize: 100, // 每批处理100条记录
EnableMetrics: true, // 启用性能指标收集
}
}
// ==================== 数据处理器接口 ====================
// Handler 数据处理器接口
//
// 定义了数据在状态流转过程中的处理回调
// 用户可以实现此接口来定制业务逻辑处理
type Handler interface {
// WillWarm 预热处理回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 数据组名称
// - data: 待处理的数据
//
// 返回值:
// - []byte: 处理后的数据
// - error: 处理错误
//
// 在数据从Hot状态转换为Warm状态时调用
// 可以在此阶段进行数据验证、转换、清洗等操作
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
// WillCold 冷却处理回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 数据组名称
// - data: 待处理的数据
//
// 返回值:
// - []byte: 处理后的数据
// - error: 处理错误
//
// 在数据从Warm状态转换为Cold状态时调用
// 可以在此阶段进行数据压缩、归档、发送等操作
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
// OnComplete 组完成回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 完成处理的数据组名称
//
// 返回值:
// - error: 处理错误
//
// 在组内所有数据都完成处理后调用
// 可以在此阶段进行清理、通知、统计等操作
OnComplete(ctx context.Context, group string) error
}
// ==================== 数据库选项结构 ====================
// Options 数据库打开选项
//
// 包含打开数据库所需的所有配置参数
// 使用选项模式,提供灵活的配置方式
type Options struct {
Filename string // 数据库文件路径
Config *Config // 数据库配置nil时使用默认配置
Handler Handler // 数据处理器,可选
Logger *slog.Logger // 日志器nil时使用默认日志器
}
// ==================== 数据库核心方法 ====================
// Open 打开或创建管道数据库
//
// 参数:
// - opts: 数据库打开选项,包含文件路径、配置、处理器等
//
// 返回值:
// - *PipelineDB: 数据库实例
// - error: 打开错误
//
// 执行流程:
// 1. 验证和设置默认配置
// 2. 打开或创建数据库文件
// 3. 初始化存储引擎组件(缓存、索引、页面管理器等)
// 4. 加载或创建文件头信息
// 5. 初始化组管理器和管道处理器
// 6. 启动后台处理协程
//
// 线程安全:返回的数据库实例支持并发访问
func Open(opts Options) (*PipelineDB, error) {
// 设置默认配置
if opts.Config == nil {
opts.Config = DefaultConfig()
}
// 设置默认 logger
logger := opts.Logger
if logger == nil {
logger = slog.Default()
}
pdb := &PipelineDB{
config: opts.Config,
handler: opts.Handler,
logger: logger,
rowMutexes: make(map[int64]*sync.RWMutex),
cache: NewPageCache(opts.Config.CacheSize),
freePageMgr: NewFreePageManager(),
indexMgr: NewIndexManager(),
}
// 创建上下文
pdb.ctx, pdb.cancel = context.WithCancel(context.Background())
// 打开文件
var err error
if opts.Config.CreateIfMiss {
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644)
} else {
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR, 0644)
}
if err != nil {
return nil, err
}
// 检查文件是否为空
stat, err := pdb.file.Stat()
if err != nil {
return nil, err
}
if stat.Size() == 0 {
// 初始化新文件
if err := pdb.initializeFile(); err != nil {
return nil, err
}
} else {
// 加载现有文件
if err := pdb.loadHeader(); err != nil {
return nil, err
}
// 加载空闲页信息
if err := pdb.freePageMgr.LoadFromHeader(pdb.header.FreeHead, pdb.readPageDirect); err != nil {
return nil, err
}
// 重建索引
if err := pdb.rebuildIndexes(); err != nil {
return nil, err
}
}
// 创建组管理器
pdb.groupManager = NewGroupManager(pdb)
return pdb, nil
}
// Start 启动管道数据库
func (pdb *PipelineDB) Start(groupEventCh <-chan GroupDataEvent) error {
pdb.logger.Info("🚀 启动管道数据库...")
// 启动自动管道处理器
pdb.wg.Add(1)
go func() {
defer pdb.wg.Done()
pdb.runAutoPipeline(pdb.ctx)
}()
// 启动组数据事件监听器
if groupEventCh != nil {
pdb.wg.Add(1)
go func() {
defer pdb.wg.Done()
pdb.handleGroupDataEvents(groupEventCh)
}()
}
pdb.logger.Info("✅ 管道数据库启动完成")
return nil
}
// Stop 停止管道数据库
func (pdb *PipelineDB) Stop() error {
pdb.logger.Info("🛑 停止管道数据库...")
pdb.cancel()
pdb.wg.Wait()
// 刷新缓存
if err := pdb.cache.Flush(pdb.writePageDirect); err != nil {
return err
}
// 保存空闲页信息
freeHead, err := pdb.freePageMgr.SaveToHeader(pdb.writePageDirect)
if err != nil {
return err
}
// 更新文件头
pdb.header.FreeHead = freeHead
if err := pdb.saveHeader(); err != nil {
return err
}
if err := pdb.file.Close(); err != nil {
return err
}
pdb.logger.Info("✅ 管道数据库已停止")
return nil
}
// AcceptData Producer 接收数据 (业务入口)
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error) {
// 检查组是否被暂停
if pdb.groupManager.IsPaused(group) {
return 0, fmt.Errorf("组 [%s] 已暂停接收数据", group)
}
// 生成自增ID
id := pdb.groupManager.GetNextID(group)
record := &DataRecord{
ID: id,
Group: group,
Status: StatusHot,
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: metadata,
}
// 序列化记录
recordBytes, err := record.ToBytes()
if err != nil {
return 0, fmt.Errorf("序列化记录失败: %w", err)
}
// 直接使用组名作为表名
err = pdb.insert(group, id, recordBytes)
if err != nil {
return 0, fmt.Errorf("存储数据失败: %w", err)
}
// 更新统计缓存
pdb.groupManager.IncrementStats(group, StatusHot)
pdb.logger.Info("📥 接收数据",
"group", group,
"id", id,
"size", len(data),
"status", StatusHot)
return id, nil
}
// GetRecord 获取记录
func (pdb *PipelineDB) GetRecord(group string, id int64) (*DataRecord, error) {
data, err := pdb.get(group, id)
if err != nil {
return nil, err
}
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil, err
}
return record, nil
}
// updateRecord 内部更新记录(不对外暴露)
func (pdb *PipelineDB) updateRecord(record *DataRecord) error {
record.UpdatedAt = time.Now()
recordBytes, err := record.ToBytes()
if err != nil {
return fmt.Errorf("序列化记录失败: %w", err)
}
return pdb.update(record.Group, record.ID, recordBytes)
}
// GetRecordsByStatus 按状态获取记录(跨所有组)
func (pdb *PipelineDB) GetRecordsByStatus(status DataStatus, limit int) ([]*DataRecord, error) {
var records []*DataRecord
count := 0
// 遍历所有组的表
groups := pdb.getAllGroups()
for _, group := range groups {
if count >= limit {
break
}
// 查询当前组的记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
if count >= limit {
return fmt.Errorf("达到限制") // 停止遍历
}
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
if record.Status == status {
records = append(records, record)
count++
}
return nil
})
if err != nil && err.Error() != "达到限制" {
return nil, fmt.Errorf("查询组 [%s] 失败: %w", group, err)
}
}
return records, nil
}
// GetRecordsByGroup 按组获取记录(支持分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, pageReq *PageRequest) (*PageResponse, error) {
// 参数验证
if pageReq.Page < 1 {
pageReq.Page = 1
}
if pageReq.PageSize < 1 {
pageReq.PageSize = 10 // 默认每页10条
}
if pageReq.PageSize > 1000 {
pageReq.PageSize = 1000 // 最大每页1000条
}
var allRecords []*DataRecord
// 收集所有记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
allRecords = append(allRecords, record)
return nil
})
if err != nil {
return nil, fmt.Errorf("查询记录失败: %w", err)
}
// 排序记录
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
// 计算分页信息
totalCount := len(allRecords)
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
// 计算当前页的起始和结束索引
startIndex := (pageReq.Page - 1) * pageReq.PageSize
endIndex := startIndex + pageReq.PageSize
// 边界检查
if startIndex >= totalCount {
return &PageResponse{
Records: []*DataRecord{},
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: false,
HasPrevious: pageReq.Page > 1,
}, nil
}
if endIndex > totalCount {
endIndex = totalCount
}
// 提取当前页的记录
pageRecords := allRecords[startIndex:endIndex]
// 构建响应
response := &PageResponse{
Records: pageRecords,
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: pageReq.Page < totalPages,
HasPrevious: pageReq.Page > 1,
}
return response, nil
}
// SortOrder 排序方向
type SortOrder string
const (
SortAsc SortOrder = "asc" // 升序
SortDesc SortOrder = "desc" // 降序
)
// SortField 排序字段
type SortField string
const (
SortByID SortField = "id" // 按ID排序
SortByCreatedAt SortField = "created_at" // 按创建时间排序
SortByUpdatedAt SortField = "updated_at" // 按更新时间排序
SortByStatus SortField = "status" // 按状态排序
)
// PageRequest 分页请求参数
type PageRequest struct {
Page int `json:"page"` // 页码从1开始
PageSize int `json:"page_size"` // 每页大小
SortField SortField `json:"sort_field"` // 排序字段
SortOrder SortOrder `json:"sort_order"` // 排序方向
}
// PageResponse 分页响应结果
type PageResponse struct {
Records []*DataRecord `json:"records"` // 当前页记录
Page int `json:"page"` // 当前页码
PageSize int `json:"page_size"` // 每页大小
TotalCount int `json:"total_count"` // 总记录数
TotalPages int `json:"total_pages"` // 总页数
HasNext bool `json:"has_next"` // 是否有下一页
HasPrevious bool `json:"has_previous"` // 是否有上一页
}
// GetRecordsByGroupAndStatus 按组和状态获取记录(支持分页)
func (pdb *PipelineDB) GetRecordsByGroupAndStatus(group string, status DataStatus, pageReq *PageRequest) (*PageResponse, error) {
// 参数验证
if pageReq.Page < 1 {
pageReq.Page = 1
}
if pageReq.PageSize < 1 {
pageReq.PageSize = 10 // 默认每页10条
}
if pageReq.PageSize > 1000 {
pageReq.PageSize = 1000 // 最大每页1000条
}
var allRecords []*DataRecord
// 首先收集所有匹配的记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
if record.Status == status {
allRecords = append(allRecords, record)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("查询记录失败: %w", err)
}
// 排序记录
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
// 计算分页信息
totalCount := len(allRecords)
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
// 计算当前页的起始和结束索引
startIndex := (pageReq.Page - 1) * pageReq.PageSize
endIndex := startIndex + pageReq.PageSize
// 边界检查
if startIndex >= totalCount {
// 页码超出范围,返回空结果
return &PageResponse{
Records: []*DataRecord{},
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: false,
HasPrevious: pageReq.Page > 1,
}, nil
}
if endIndex > totalCount {
endIndex = totalCount
}
// 提取当前页的记录
pageRecords := allRecords[startIndex:endIndex]
// 构建响应
response := &PageResponse{
Records: pageRecords,
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: pageReq.Page < totalPages,
HasPrevious: pageReq.Page > 1,
}
return response, nil
}
// GetStats 获取统计信息(高性能版本)
func (pdb *PipelineDB) GetStats() (*Stats, error) {
stats := &Stats{
Timestamp: time.Now(),
GroupStats: make(map[string]*GroupStats),
}
// 从 GroupManager 快速获取统计缓存O(1)复杂度)
groupStatsCache := pdb.groupManager.GetFastStats()
// 统计各状态的记录数
var totalRecords, hotRecords, warmRecords, coldRecords int
// 转换缓存数据为 Stats 格式
for group, cache := range groupStatsCache {
stats.GroupStats[group] = &GroupStats{
Group: group,
TotalRecords: cache.TotalRecords,
HotRecords: cache.HotRecords,
WarmRecords: cache.WarmRecords,
ColdRecords: cache.ColdRecords,
}
// 累计总体统计
totalRecords += cache.TotalRecords
hotRecords += cache.HotRecords
warmRecords += cache.WarmRecords
coldRecords += cache.ColdRecords
}
// 设置总体统计
stats.TotalRecords = totalRecords
stats.HotRecords = hotRecords
stats.WarmRecords = warmRecords
stats.ColdRecords = coldRecords
// 获取数据库统计
hits, misses, hitRate := pdb.cache.Stats()
stats.CacheHits = hits
stats.CacheMisses = misses
stats.CacheHitRate = hitRate
stats.TotalPages = int(pdb.header.TotalPages)
stats.FreePages = pdb.freePageMgr.FreeCount()
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
return stats, nil
}
// ValidateStats 验证统计信息(通过遍历实际数据)
// 用于数据校验和调试,性能较慢但结果准确
func (pdb *PipelineDB) ValidateStats() (*Stats, error) {
stats := &Stats{
Timestamp: time.Now(),
GroupStats: make(map[string]*GroupStats),
}
// 统计各状态的记录数
statusCounts := make(map[DataStatus]int)
// 遍历所有组的表
groups := pdb.getAllGroups()
for _, group := range groups {
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
// 更新总体统计
statusCounts[record.Status]++
// 更新组统计
if stats.GroupStats[record.Group] == nil {
stats.GroupStats[record.Group] = &GroupStats{
Group: record.Group,
}
}
stats.GroupStats[record.Group].TotalRecords++
// 更新组的状态计数
switch record.Status {
case StatusHot:
stats.GroupStats[record.Group].HotRecords++
case StatusWarm:
stats.GroupStats[record.Group].WarmRecords++
case StatusCold:
stats.GroupStats[record.Group].ColdRecords++
}
return nil
})
if err != nil {
return nil, fmt.Errorf("统计组 [%s] 失败: %w", group, err)
}
}
// 设置总体统计
stats.TotalRecords = statusCounts[StatusHot] + statusCounts[StatusWarm] + statusCounts[StatusCold]
stats.HotRecords = statusCounts[StatusHot]
stats.WarmRecords = statusCounts[StatusWarm]
stats.ColdRecords = statusCounts[StatusCold]
// 获取数据库统计
hits, misses, hitRate := pdb.cache.Stats()
stats.CacheHits = hits
stats.CacheMisses = misses
stats.CacheHitRate = hitRate
stats.TotalPages = int(pdb.header.TotalPages)
stats.FreePages = pdb.freePageMgr.FreeCount()
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
return stats, nil
}
// PauseGroup 暂停组接收数据
func (pdb *PipelineDB) PauseGroup(group string) {
pdb.groupManager.PauseGroup(group)
}
// ResumeGroup 恢复组接收数据
func (pdb *PipelineDB) ResumeGroup(group string) {
pdb.groupManager.ResumeGroup(group)
}
// IsGroupPaused 检查组是否暂停
func (pdb *PipelineDB) IsGroupPaused(group string) bool {
return pdb.groupManager.IsPaused(group)
}
// GetPausedGroups 获取所有暂停的组
func (pdb *PipelineDB) GetPausedGroups() []string {
return pdb.groupManager.GetPausedGroups()
}
// IsGroupReadyForCleanup 检查组是否可以清理所有数据都是cold
func (pdb *PipelineDB) IsGroupReadyForCleanup(group string) (bool, error) {
// 检查组是否暂停
if !pdb.groupManager.IsPaused(group) {
return false, fmt.Errorf("组 [%s] 未暂停,不能清理", group)
}
// 检查是否有非-cold数据
hotPageReq := &PageRequest{Page: 1, PageSize: 1}
hotResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusHot, hotPageReq)
if err != nil {
return false, err
}
if len(hotResponse.Records) > 0 {
return false, nil // 还有热数据
}
warmPageReq := &PageRequest{Page: 1, PageSize: 1}
warmResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusWarm, warmPageReq)
if err != nil {
return false, err
}
warmRecords := warmResponse.Records
if len(warmRecords) > 0 {
return false, nil // 还有温数据
}
return true, nil // 所有数据都是cold可以清理
}
// sortRecords 对记录进行排序
func (pdb *PipelineDB) sortRecords(records []*DataRecord, sortField SortField, sortOrder SortOrder) {
if len(records) <= 1 {
return
}
// 设置默认排序
if sortField == "" {
sortField = SortByID
}
if sortOrder == "" {
sortOrder = SortAsc
}
sort.Slice(records, func(i, j int) bool {
var less bool
switch sortField {
case SortByID:
less = records[i].ID < records[j].ID
case SortByCreatedAt:
less = records[i].CreatedAt.Before(records[j].CreatedAt)
case SortByUpdatedAt:
less = records[i].UpdatedAt.Before(records[j].UpdatedAt)
case SortByStatus:
// 状态排序hot < warm < cold
statusOrder := map[DataStatus]int{
StatusHot: 1,
StatusWarm: 2,
StatusCold: 3,
}
less = statusOrder[records[i].Status] < statusOrder[records[j].Status]
default:
// 默认按ID排序
less = records[i].ID < records[j].ID
}
// 如果是降序,反转比较结果
if sortOrder == SortDesc {
less = !less
}
return less
})
}
// CleanupGroup 清理组数据
func (pdb *PipelineDB) CleanupGroup(group string) error {
// 检查是否可以清理
ready, err := pdb.IsGroupReadyForCleanup(group)
if err != nil {
return err
}
if !ready {
return fmt.Errorf("组 [%s] 还有未处理完成的数据,不能清理", group)
}
pdb.logger.Info("🧽 开始清理组数据", "group", group)
// 调用外部处理器的完成回调
if pdb.handler != nil {
// 检查是否已经有 OnComplete 在执行
if pdb.groupManager.IsOnCompleteExecuting(group) {
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,等待完成", "group", group)
// 等待当前执行完成
for pdb.groupManager.IsOnCompleteExecuting(group) {
time.Sleep(100 * time.Millisecond)
}
pdb.logger.Info("✅ 组的 OnComplete 执行完成,继续清理", "group", group)
} else {
// 标记为正在执行
pdb.groupManager.SetOnCompleteExecuting(group, true)
defer pdb.groupManager.SetOnCompleteExecuting(group, false)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pdb.logger.Info("🎯 调用组完成回调", "group", group)
if err := pdb.handler.OnComplete(ctx, group); err != nil {
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
// 不阻止清理过程,只记录错误
} else {
pdb.logger.Info("✅ 组完成回调成功", "group", group)
}
}
}
// 获取组中所有记录
pageReq := &PageRequest{Page: 1, PageSize: 10000} // 假设最多10000条
response, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
return err
}
records := response.Records
// 删除所有记录
deletedCount := len(records)
// TODO: 实现实际的记录删除逻辑
pdb.logger.Info("📋 组中有记录待删除", "group", group, "count", deletedCount)
pdb.logger.Info("🧽 组清理完成", "group", group, "deleted_count", deletedCount)
return nil
}
// checkGroupCompletion 检查组是否完成处理并调用完成回调
func (pdb *PipelineDB) checkGroupCompletion(group string) {
// 只检查暂停的组
if !pdb.groupManager.IsPaused(group) {
return
}
// 检查是否已经有 OnComplete 在执行
if pdb.groupManager.IsOnCompleteExecuting(group) {
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,跳过重复调用", "group", group)
return
}
// 检查是否所有数据都已转为 cold
ready, err := pdb.IsGroupReadyForCleanup(group)
if err != nil {
pdb.logger.Warn("⚠️ 检查组完成状态失败", "group", group, "error", err)
return
}
if ready && pdb.handler != nil {
// 标记为正在执行
pdb.groupManager.SetOnCompleteExecuting(group, true)
// 异步执行完成回调,避免阻塞
go func() {
defer pdb.groupManager.SetOnCompleteExecuting(group, false) // 确保执行完成后清除标记
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pdb.logger.Info("🎯 组所有数据已转为 cold调用完成回调", "group", group)
if err := pdb.handler.OnComplete(ctx, group); err != nil {
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
} else {
pdb.logger.Info("✅ 组完成回调成功", "group", group)
}
}()
}
}
// handleGroupDataEvents 处理组数据事件
func (pdb *PipelineDB) handleGroupDataEvents(groupEventCh <-chan GroupDataEvent) {
pdb.logger.Info("📡 启动组数据事件监听器")
for {
select {
case <-pdb.ctx.Done():
pdb.logger.Info("📡 组数据事件监听器停止")
return
case event, ok := <-groupEventCh:
if !ok {
pdb.logger.Info("📡 组数据事件通道已关闭")
return
}
pdb.processGroupDataEvent(event)
}
}
}
// processGroupDataEvent 处理单个组数据事件
func (pdb *PipelineDB) processGroupDataEvent(event GroupDataEvent) {
if event.Done {
// 组完成,自动暂停
pdb.logger.Info("🔚 组数据产出完成,自动暂停", "group", event.Group)
pdb.groupManager.PauseGroup(event.Group)
// 可选:触发组完成检查
go func() {
// 等待一小段时间让当前数据处理完成
time.Sleep(100 * time.Millisecond)
pdb.checkGroupCompletion(event.Group)
}()
} else if len(event.Data) > 0 {
// 有新数据,确保组未暂停
if pdb.groupManager.IsPaused(event.Group) {
pdb.logger.Info("▶️ 组有新数据产出,自动恢复", "group", event.Group)
pdb.groupManager.ResumeGroup(event.Group)
}
// 自动接收数据
_, err := pdb.AcceptData(event.Group, event.Data, "auto_from_event")
if err != nil {
pdb.logger.Warn("⚠️ 自动接收组数据失败", "group", event.Group, "error", err)
} else {
pdb.logger.Info("📥 自动接收组数据成功", "group", event.Group)
}
}
}
// getAllGroups 获取所有组名(通过索引管理器)
func (pdb *PipelineDB) getAllGroups() []string {
// 简化实现:通过索引管理器获取所有组名
var groups []string
for groupName := range pdb.indexMgr.indexes {
groups = append(groups, groupName)
}
return groups
}
// GetGroupCleanupStatus 获取组清理状态
func (pdb *PipelineDB) GetGroupCleanupStatus(group string) (*GroupCleanupStatus, error) {
status := &GroupCleanupStatus{
Group: group,
Paused: pdb.groupManager.IsPaused(group),
}
// 获取组统计
stats, err := pdb.GetStats()
if err != nil {
return nil, err
}
if groupStat, exists := stats.GroupStats[group]; exists {
status.TotalRecords = groupStat.TotalRecords
status.HotRecords = groupStat.HotRecords
status.WarmRecords = groupStat.WarmRecords
status.ColdRecords = groupStat.ColdRecords
status.ReadyForCleanup = (groupStat.HotRecords == 0 && groupStat.WarmRecords == 0 && status.Paused)
}
return status, nil
}
// Stats 统计信息
type Stats struct {
Timestamp time.Time `json:"timestamp"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
GroupStats map[string]*GroupStats `json:"group_stats"`
TotalPages int `json:"total_pages"`
FreePages int `json:"free_pages"`
CacheHits int64 `json:"cache_hits"`
CacheMisses int64 `json:"cache_misses"`
CacheHitRate float64 `json:"cache_hit_rate"`
FileSizeBytes int64 `json:"file_size_bytes"`
}
// GroupStats 组统计信息
type GroupStats struct {
Group string `json:"group"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
}
// GroupCleanupStatus 组清理状态
type GroupCleanupStatus struct {
Group string `json:"group"`
Paused bool `json:"paused"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
ReadyForCleanup bool `json:"ready_for_cleanup"`
}
// 内部方法
func (pdb *PipelineDB) initializeFile() error {
pdb.header = &Header{
Magic: 0x57A5DB,
PageSize: PageSize,
TotalPages: 2,
FreeHead: 0,
RootPage: 1,
}
// 写入文件头
buf := make([]byte, PageSize)
pdb.writeHeaderToBuffer(buf)
if _, err := pdb.file.Write(buf); err != nil {
return err
}
// 初始化根页
rootPage := make([]byte, PageSize)
binary.LittleEndian.PutUint16(rootPage[0:2], 0) // numSlots
binary.LittleEndian.PutUint16(rootPage[2:4], PageSize) // freeOff
binary.LittleEndian.PutUint16(rootPage[4:6], 0) // nextPage
if _, err := pdb.file.Write(rootPage); err != nil {
return err
}
return nil
}
func (pdb *PipelineDB) loadHeader() error {
buf := make([]byte, HdrSize)
if _, err := pdb.file.ReadAt(buf, 0); err != nil {
return err
}
pdb.header = &Header{
Magic: binary.LittleEndian.Uint32(buf[0:4]),
PageSize: binary.LittleEndian.Uint16(buf[4:6]),
TotalPages: binary.LittleEndian.Uint16(buf[6:8]),
FreeHead: binary.LittleEndian.Uint16(buf[8:10]),
RootPage: binary.LittleEndian.Uint16(buf[10:12]),
CounterPage: binary.LittleEndian.Uint16(buf[12:14]),
}
if pdb.header.Magic != 0x57A5DB || pdb.header.PageSize != PageSize {
return errors.New("invalid database file")
}
return nil
}
func (pdb *PipelineDB) saveHeader() error {
buf := make([]byte, PageSize)
pdb.writeHeaderToBuffer(buf)
_, err := pdb.file.WriteAt(buf, 0)
return err
}
func (pdb *PipelineDB) writeHeaderToBuffer(buf []byte) {
binary.LittleEndian.PutUint32(buf[0:4], pdb.header.Magic)
binary.LittleEndian.PutUint16(buf[4:6], pdb.header.PageSize)
binary.LittleEndian.PutUint16(buf[6:8], pdb.header.TotalPages)
binary.LittleEndian.PutUint16(buf[8:10], pdb.header.FreeHead)
binary.LittleEndian.PutUint16(buf[10:12], pdb.header.RootPage)
binary.LittleEndian.PutUint16(buf[12:14], pdb.header.CounterPage)
}
func (pdb *PipelineDB) getRowMutex(id int64) *sync.RWMutex {
pdb.mu.Lock()
defer pdb.mu.Unlock()
if mutex, exists := pdb.rowMutexes[id]; exists {
return mutex
}
mutex := &sync.RWMutex{}
pdb.rowMutexes[id] = mutex
return mutex
}
// runAutoPipeline 运行自动管道处理器
func (pdb *PipelineDB) runAutoPipeline(ctx context.Context) {
pdb.logger.Info("🔥 启动自动管道处理器")
// 创建两个独立的定时器
warmTicker := time.NewTicker(pdb.config.WarmInterval)
processTicker := time.NewTicker(pdb.config.ProcessInterval)
defer warmTicker.Stop()
defer processTicker.Stop()
for {
select {
case <-ctx.Done():
pdb.logger.Info("🔥 自动管道处理器已停止")
return
case <-warmTicker.C:
// 处理 hot -> warm
if err := pdb.processHotData(); err != nil {
pdb.logger.Error("❌ 预热处理错误", "error", err)
}
case <-processTicker.C:
// 处理 warm -> cold
if err := pdb.processWarmData(); err != nil {
pdb.logger.Error("❌ 温数据处理错误", "error", err)
}
}
}
}
// processHotData 处理热数据
func (pdb *PipelineDB) processHotData() error {
// 获取热数据
hotRecords, err := pdb.GetRecordsByStatus(StatusHot, pdb.config.BatchSize)
if err != nil {
return err
}
if len(hotRecords) == 0 {
return nil // 没有热数据需要处理
}
pdb.logger.Info("🔥 开始预热热数据", "count", len(hotRecords))
processed := 0
for _, record := range hotRecords {
if err := pdb.warmupRecord(record); err != nil {
pdb.logger.Error("❌ 预热记录失败", "id", record.ID, "error", err)
continue
}
processed++
}
pdb.logger.Info("🔥 预热完成", "processed", processed, "total", len(hotRecords))
return nil
}
// warmupRecord 预热单条记录
func (pdb *PipelineDB) warmupRecord(record *DataRecord) error {
// 执行预热逻辑
if err := pdb.performWarmup(record); err != nil {
return err
}
// 更新状态为 warm
record.Status = StatusWarm
// 保存更新
if err := pdb.updateRecord(record); err != nil {
return err
}
pdb.logger.Info("🔥 记录已预热: hot -> warm", "id", record.ID)
return nil
}
// performWarmup 执行实际的预热操作
func (pdb *PipelineDB) performWarmup(record *DataRecord) error {
// 调用外部处理器的预热方法
if pdb.handler != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
processedData, err := pdb.handler.WillWarm(ctx, record.Group, record.Data)
if err != nil {
return err
}
// 更新处理后的数据
if len(processedData) > 0 {
record.Data = processedData
}
}
// 添加预热元数据
if record.Metadata == "" {
record.Metadata = "warmed_up"
} else {
record.Metadata += ",warmed_up"
}
return nil
}
// processWarmData 处理温数据
func (pdb *PipelineDB) processWarmData() error {
// 获取 warm 数据
warmRecords, err := pdb.GetRecordsByStatus(StatusWarm, pdb.config.BatchSize)
if err != nil {
return err
}
if len(warmRecords) == 0 {
return nil // 没有温数据需要处理
}
pdb.logger.Info("⚙️ 开始处理温数据", "count", len(warmRecords))
processed := 0
for _, record := range warmRecords {
if err := pdb.cooldownRecord(record); err != nil {
pdb.logger.Error("❌ 处理记录失败", "id", record.ID, "error", err)
continue
}
processed++
}
pdb.logger.Info("⚙️ 处理完成", "processed", processed, "total", len(warmRecords))
return nil
}
// cooldownRecord 处理单条记录 (warm -> cold)
func (pdb *PipelineDB) cooldownRecord(record *DataRecord) error {
// 自动处理器处理 warm 数据
pdb.logger.Info("⚙️ 自动处理器处理记录", "id", record.ID)
// 调用外部处理器
if err := pdb.performCooldown(record); err != nil {
return err
}
// 更新状态为 cold 并保存回存储
record.Status = StatusCold
if err := pdb.updateRecord(record); err != nil {
return err
}
pdb.logger.Info("⚙️ 记录处理完成: warm -> cold", "id", record.ID)
return nil
}
// performCooldown 调用外部处理器
func (pdb *PipelineDB) performCooldown(record *DataRecord) error {
pdb.logger.Info("🔗 调用外部处理器处理记录", "id", record.ID)
// 创建超时上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 调用外部处理器
processedData, err := pdb.handler.WillCold(ctx, record.Group, record.Data)
if err != nil {
pdb.logger.Error("❌ 外部处理器处理失败", "error", err)
return err
}
// 更新处理后的数据
record.Data = processedData
// 成功处理
pdb.logger.Info("✅ 外部处理器成功处理记录", "id", record.ID)
// 检查组是否完成处理
pdb.checkGroupCompletion(record.Group)
return nil
}
// rebuildIndexes 重建所有索引(用于数据库重启后恢复)
func (pdb *PipelineDB) rebuildIndexes() error {
// 如果没有根页面或总页数为0跳过重建
if pdb.header.RootPage == 0 || pdb.header.TotalPages <= 1 {
pdb.logger.Info("🔄 数据库为空,跳过索引重建")
return nil
}
pdb.logger.Info("🔄 重建索引...")
// 遍历所有数据页重建索引
pdb.logger.Info("📊 开始扫描页面", "totalPages", pdb.header.TotalPages, "rootPage", pdb.header.RootPage)
recordCount := 0
// 从根页面开始扫描页链
for pageNo := pdb.header.RootPage; pageNo != 0 && pageNo < pdb.header.TotalPages; {
// 读取页面
page, err := pdb.readPage(pageNo)
if err != nil {
pdb.logger.Info("⚠️ 跳过无法读取的页面", "pageNo", pageNo, "error", err)
break // 页链断裂,停止扫描
}
// 检查是否是数据页
if len(page) < 8 {
pdb.logger.Info("⚠️ 跳过太小的页面", "pageNo", pageNo, "size", len(page))
continue
}
// 解析页面头部
numSlots := binary.LittleEndian.Uint16(page[0:2]) // 正确的槽数量
freeOff := binary.LittleEndian.Uint16(page[2:4])
nextPage := binary.LittleEndian.Uint16(page[4:6])
// 只在有数据时记录详细日志
if numSlots > 0 {
pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage)
}
if numSlots == 0 {
pageNo = nextPage
continue
}
// 遍历页面中的所有记录(使用槽数组)
for slotNo := uint16(0); slotNo < numSlots && slotNo < 100; slotNo++ { // 限制最大槽数,避免无效数据
// 读取槽偏移槽数组从偏移6开始
slotOffset := 6 + slotNo*2
if slotOffset+2 > uint16(len(page)) {
pdb.logger.Info("⚠️ 槽偏移超出页面", "slotNo", slotNo, "slotOffset", slotOffset, "pageSize", len(page))
break
}
recordOffset := binary.LittleEndian.Uint16(page[slotOffset : slotOffset+2])
// 读取记录
if recordOffset+8 > uint16(len(page)) {
continue
}
// 读取ID
id := binary.LittleEndian.Uint64(page[recordOffset : recordOffset+8])
// 读取数据长度(变长编码)
dataLen, n := binary.Uvarint(page[recordOffset+8:])
if n <= 0 {
continue
}
// 读取数据
dataStart := recordOffset + 8 + uint16(n)
dataEnd := dataStart + uint16(dataLen)
if dataEnd > uint16(len(page)) {
continue
}
// 解析JSON记录获取组名
var record DataRecord
if err := record.FromBytes(page[dataStart:dataEnd]); err != nil {
continue
}
// 添加到索引
idx := pdb.indexMgr.GetOrCreateIndex(record.Group)
idx.Insert(int64(id), pageNo, slotNo)
recordCount++
}
// 移动到下一个页面
nextPageNo := Page(page).nextPage()
if nextPageNo == 0 {
break // 页链结束
}
pageNo = nextPageNo
}
pdb.logger.Info("✅ 索引重建完成", "recordCount", recordCount)
return nil
}