Files
pipelinedb/pipeline_db.go

1491 lines
42 KiB
Go
Raw Normal View History

2025-09-30 15:05:56 +08:00
// Package pipelinedb provides an integrated pipeline database system
//
// # Pipeline Database V4 是一个集成了数据库存储和业务管道处理的一体化解决方案
//
// 核心特性:
// - 基于页面的存储引擎:高效的数据存储和检索
// - 分组数据管理:支持按业务组织数据,独立管理和统计
// - 三级数据状态Hot-> Warm-> Cold的自动流转
// - 外部处理器集成:支持自定义业务逻辑处理
// - 高并发支持:线程安全的索引和存储操作
// - 页面缓存:提高数据访问性能
// - 持久化存储:数据安全持久保存
//
// 数据流转模型:
// 1. 数据接收新数据以Hot状态进入系统
// 2. 预热处理Hot数据经过预热处理转为Warm状态
// 3. 冷却处理Warm数据经过冷却处理转为Cold状态
// 4. 完成回调:组内所有数据处理完成后触发回调
//
// 使用场景:
// - 数据管道处理ETL、数据清洗、数据转换
// - 业务流程管理:订单处理、用户行为分析
// - 实时数据处理:日志分析、监控数据处理
// - 批量数据处理:数据导入、数据同步
package pipelinedb
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"sort"
"sync"
"time"
)
// ==================== 系统常量定义 ====================
const (
PageSize = 4096 // 页面大小4KB与操作系统页面大小对齐
HdrSize = 14 // 文件头大小:包含所有头部字段的字节数
MaxRecSize = 3000 // 最大记录大小:限制单条记录的最大字节数
)
// ==================== 数据结构定义 ====================
// Header 数据库文件头结构
//
// 文件头存储在文件的开始位置,包含数据库的元数据信息
// 用于数据库启动时的初始化和状态恢复
type Header struct {
Magic uint32 // 魔数:用于文件格式验证
PageSize uint16 // 页面大小固定为4096字节
TotalPages uint16 // 总页面数:当前数据库文件包含的页面数量
FreeHead uint16 // 空闲页链头:空闲页面管理的起始页面
RootPage uint16 // 根页面:数据存储的根页面编号
CounterPage uint16 // 计数器页面用于生成唯一ID的计数器页面
}
// DataStatus 数据状态枚举
//
// 定义数据在管道中的处理状态,支持三级状态流转
// 状态转换Hot -> Warm -> Cold
type DataStatus string
const (
StatusHot DataStatus = "hot" // 热数据:刚接收的新数据,等待预热处理
StatusWarm DataStatus = "warm" // 温数据:已预热的数据,等待冷却处理
StatusCold DataStatus = "cold" // 冷数据:已完成处理的数据,可以归档或清理
)
// DataRecord 数据记录结构
//
// 表示系统中的一条数据记录,包含完整的数据信息和元数据
// 支持JSON序列化便于存储和传输
type DataRecord struct {
ID int64 `json:"id"` // 记录唯一标识符
Group string `json:"group"` // 所属数据组名称
Status DataStatus `json:"status"` // 当前数据状态
Data []byte `json:"data"` // 实际数据内容
CreatedAt time.Time `json:"created_at"` // 创建时间戳
UpdatedAt time.Time `json:"updated_at"` // 最后更新时间戳
Metadata string `json:"metadata,omitempty"` // 可选的元数据信息
}
// GroupDataEvent 组数据事件
//
// 用于在组数据处理过程中传递事件信息
// 支持事件驱动的数据处理模式
type GroupDataEvent struct {
Group string // 组名:事件所属的数据组
Data []byte // 数据内容:事件携带的数据
Done bool // 完成标志:标识该组是否已完成所有数据处理
}
// ==================== 记录序列化方法 ====================
// ToBytes 将数据记录序列化为字节数组
//
// 返回值:
// - []byte: 序列化后的字节数组
// - error: 序列化错误
//
// 使用JSON格式进行序列化便于跨平台兼容和调试
func (dr *DataRecord) ToBytes() ([]byte, error) {
return json.Marshal(dr)
}
// FromBytes 从字节数组反序列化数据记录
//
// 参数:
// - data: 序列化的字节数组
//
// 返回值:
// - error: 反序列化错误
//
// 从JSON格式的字节数组中恢复数据记录对象
func (dr *DataRecord) FromBytes(data []byte) error {
return json.Unmarshal(data, dr)
}
// ==================== 核心数据库结构 ====================
// PipelineDB 管道数据库主结构体
//
// 集成了存储引擎、索引管理、缓存系统、管道处理等所有核心功能
// 提供线程安全的数据库操作接口
type PipelineDB struct {
// ========== 存储引擎组件 ==========
file *os.File // 数据库文件句柄
header *Header // 文件头信息
cache *PageCache // 页面缓存系统
freePageMgr *FreePageManager // 空闲页面管理器
indexMgr *IndexManager // 索引管理器
rowMutexes map[int64]*sync.RWMutex // 行级锁映射
mu sync.RWMutex // 数据库级读写锁
// ========== 配置和处理器 ==========
config *Config // 数据库配置
handler Handler // 数据处理器接口
logger *slog.Logger // 结构化日志器
// ========== 并发控制 ==========
ctx context.Context // 上下文控制
cancel context.CancelFunc // 取消函数
wg sync.WaitGroup // 等待组,用于优雅关闭
// ========== 业务组件 ==========
groupManager *GroupManager // 组管理器
}
// ==================== 配置结构定义 ====================
// Config 数据库统一配置结构
//
// 包含存储引擎配置和管道处理配置
// 支持JSON序列化便于配置文件管理
type Config struct {
// ========== 存储引擎配置 ==========
CacheSize int `json:"cache_size"` // 页缓存大小:缓存的页面数量
SyncWrites bool `json:"sync_writes"` // 同步写入:是否立即同步到磁盘
CreateIfMiss bool `json:"create_if_miss"` // 自动创建:文件不存在时是否自动创建
// ========== 管道处理配置 ==========
WarmInterval time.Duration `json:"warm_interval"` // 预热间隔Hot->Warm状态转换的时间间隔
ProcessInterval time.Duration `json:"process_interval"` // 处理间隔Warm->Cold状态转换的时间间隔
BatchSize int `json:"batch_size"` // 批处理大小:每次处理的记录数量
EnableMetrics bool `json:"enable_metrics"` // 启用指标:是否收集性能指标
}
// DefaultConfig 返回默认配置
//
// 返回值:
// - *Config: 包含合理默认值的配置对象
//
// 默认配置适用于大多数使用场景,可以根据实际需求进行调整
func DefaultConfig() *Config {
return &Config{
CacheSize: 256, // 256页缓存约1MB内存
SyncWrites: false, // 异步写入,提高性能
CreateIfMiss: true, // 自动创建数据库文件
WarmInterval: 5 * time.Second, // 5秒预热间隔
ProcessInterval: 10 * time.Second, // 10秒处理间隔
BatchSize: 100, // 每批处理100条记录
EnableMetrics: true, // 启用性能指标收集
}
}
// ==================== 数据处理器接口 ====================
// Handler 数据处理器接口
//
// 定义了数据在状态流转过程中的处理回调
// 用户可以实现此接口来定制业务逻辑处理
type Handler interface {
// WillWarm 预热处理回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 数据组名称
// - data: 待处理的数据
//
// 返回值:
// - []byte: 处理后的数据
// - error: 处理错误
//
// 在数据从Hot状态转换为Warm状态时调用
// 可以在此阶段进行数据验证、转换、清洗等操作
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
// WillCold 冷却处理回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 数据组名称
// - data: 待处理的数据
//
// 返回值:
// - []byte: 处理后的数据
// - error: 处理错误
//
// 在数据从Warm状态转换为Cold状态时调用
// 可以在此阶段进行数据压缩、归档、发送等操作
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
// OnComplete 组完成回调
//
// 参数:
// - ctx: 上下文,用于超时控制和取消操作
// - group: 完成处理的数据组名称
//
// 返回值:
// - error: 处理错误
//
// 在组内所有数据都完成处理后调用
// 可以在此阶段进行清理、通知、统计等操作
OnComplete(ctx context.Context, group string) error
}
// ==================== 数据库选项结构 ====================
// Options 数据库打开选项
//
// 包含打开数据库所需的所有配置参数
// 使用选项模式,提供灵活的配置方式
type Options struct {
Filename string // 数据库文件路径
Config *Config // 数据库配置nil时使用默认配置
Handler Handler // 数据处理器,可选
Logger *slog.Logger // 日志器nil时使用默认日志器
}
// ==================== 数据库核心方法 ====================
// Open 打开或创建管道数据库
//
// 参数:
// - opts: 数据库打开选项,包含文件路径、配置、处理器等
//
// 返回值:
// - *PipelineDB: 数据库实例
// - error: 打开错误
//
// 执行流程:
// 1. 验证和设置默认配置
// 2. 打开或创建数据库文件
// 3. 初始化存储引擎组件(缓存、索引、页面管理器等)
// 4. 加载或创建文件头信息
// 5. 初始化组管理器和管道处理器
// 6. 启动后台处理协程
//
// 线程安全:返回的数据库实例支持并发访问
func Open(opts Options) (*PipelineDB, error) {
// 设置默认配置
if opts.Config == nil {
opts.Config = DefaultConfig()
}
// 设置默认 logger
logger := opts.Logger
if logger == nil {
logger = slog.Default()
}
pdb := &PipelineDB{
config: opts.Config,
handler: opts.Handler,
logger: logger,
rowMutexes: make(map[int64]*sync.RWMutex),
cache: NewPageCache(opts.Config.CacheSize),
freePageMgr: NewFreePageManager(),
indexMgr: NewIndexManager(),
}
// 创建上下文
pdb.ctx, pdb.cancel = context.WithCancel(context.Background())
// 打开文件
var err error
if opts.Config.CreateIfMiss {
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644)
} else {
pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR, 0644)
}
if err != nil {
return nil, err
}
// 检查文件是否为空
stat, err := pdb.file.Stat()
if err != nil {
return nil, err
}
if stat.Size() == 0 {
// 初始化新文件
if err := pdb.initializeFile(); err != nil {
return nil, err
}
} else {
// 加载现有文件
if err := pdb.loadHeader(); err != nil {
return nil, err
}
// 加载空闲页信息
if err := pdb.freePageMgr.LoadFromHeader(pdb.header.FreeHead, pdb.readPageDirect); err != nil {
return nil, err
}
// 重建索引
if err := pdb.rebuildIndexes(); err != nil {
return nil, err
}
}
// 创建组管理器
pdb.groupManager = NewGroupManager(pdb)
return pdb, nil
}
// Start 启动管道数据库
func (pdb *PipelineDB) Start(groupEventCh <-chan GroupDataEvent) error {
pdb.logger.Info("🚀 启动管道数据库...")
// 启动自动管道处理器
pdb.wg.Add(1)
go func() {
defer pdb.wg.Done()
pdb.runAutoPipeline(pdb.ctx)
}()
// 启动组数据事件监听器
if groupEventCh != nil {
pdb.wg.Add(1)
go func() {
defer pdb.wg.Done()
pdb.handleGroupDataEvents(groupEventCh)
}()
}
pdb.logger.Info("✅ 管道数据库启动完成")
return nil
}
// Stop 停止管道数据库
func (pdb *PipelineDB) Stop() error {
pdb.logger.Info("🛑 停止管道数据库...")
pdb.cancel()
pdb.wg.Wait()
// 刷新缓存
if err := pdb.cache.Flush(pdb.writePageDirect); err != nil {
return err
}
// 保存空闲页信息
freeHead, err := pdb.freePageMgr.SaveToHeader(pdb.writePageDirect)
if err != nil {
return err
}
// 更新文件头
pdb.header.FreeHead = freeHead
if err := pdb.saveHeader(); err != nil {
return err
}
if err := pdb.file.Close(); err != nil {
return err
}
pdb.logger.Info("✅ 管道数据库已停止")
return nil
}
// AcceptData Producer 接收数据 (业务入口)
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error) {
// 检查组是否被暂停
if pdb.groupManager.IsPaused(group) {
return 0, fmt.Errorf("组 [%s] 已暂停接收数据", group)
}
// 生成自增ID
id := pdb.groupManager.GetNextID(group)
record := &DataRecord{
ID: id,
Group: group,
Status: StatusHot,
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Metadata: metadata,
}
// 序列化记录
recordBytes, err := record.ToBytes()
if err != nil {
return 0, fmt.Errorf("序列化记录失败: %w", err)
}
// 直接使用组名作为表名
err = pdb.insert(group, id, recordBytes)
if err != nil {
return 0, fmt.Errorf("存储数据失败: %w", err)
}
// 更新统计缓存
pdb.groupManager.IncrementStats(group, StatusHot)
pdb.logger.Info("📥 接收数据",
"group", group,
"id", id,
"size", len(data),
"status", StatusHot)
return id, nil
}
// GetRecord 获取记录
func (pdb *PipelineDB) GetRecord(group string, id int64) (*DataRecord, error) {
data, err := pdb.get(group, id)
if err != nil {
return nil, err
}
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil, err
}
return record, nil
}
// updateRecord 内部更新记录(不对外暴露)
func (pdb *PipelineDB) updateRecord(record *DataRecord) error {
record.UpdatedAt = time.Now()
recordBytes, err := record.ToBytes()
if err != nil {
return fmt.Errorf("序列化记录失败: %w", err)
}
return pdb.update(record.Group, record.ID, recordBytes)
}
// GetRecordsByStatus 按状态获取记录(跨所有组)
func (pdb *PipelineDB) GetRecordsByStatus(status DataStatus, limit int) ([]*DataRecord, error) {
var records []*DataRecord
count := 0
// 遍历所有组的表
groups := pdb.getAllGroups()
for _, group := range groups {
if count >= limit {
break
}
// 查询当前组的记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
if count >= limit {
return fmt.Errorf("达到限制") // 停止遍历
}
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
if record.Status == status {
records = append(records, record)
count++
}
return nil
})
if err != nil && err.Error() != "达到限制" {
return nil, fmt.Errorf("查询组 [%s] 失败: %w", group, err)
}
}
return records, nil
}
// GetRecordsByGroup 按组获取记录(支持分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, pageReq *PageRequest) (*PageResponse, error) {
// 参数验证
if pageReq.Page < 1 {
pageReq.Page = 1
}
if pageReq.PageSize < 1 {
pageReq.PageSize = 10 // 默认每页10条
}
if pageReq.PageSize > 1000 {
pageReq.PageSize = 1000 // 最大每页1000条
}
var allRecords []*DataRecord
// 收集所有记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
allRecords = append(allRecords, record)
return nil
})
if err != nil {
return nil, fmt.Errorf("查询记录失败: %w", err)
}
// 排序记录
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
// 计算分页信息
totalCount := len(allRecords)
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
// 计算当前页的起始和结束索引
startIndex := (pageReq.Page - 1) * pageReq.PageSize
endIndex := startIndex + pageReq.PageSize
// 边界检查
if startIndex >= totalCount {
return &PageResponse{
Records: []*DataRecord{},
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: false,
HasPrevious: pageReq.Page > 1,
}, nil
}
if endIndex > totalCount {
endIndex = totalCount
}
// 提取当前页的记录
pageRecords := allRecords[startIndex:endIndex]
// 构建响应
response := &PageResponse{
Records: pageRecords,
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: pageReq.Page < totalPages,
HasPrevious: pageReq.Page > 1,
}
return response, nil
}
// SortOrder 排序方向
type SortOrder string
const (
SortAsc SortOrder = "asc" // 升序
SortDesc SortOrder = "desc" // 降序
)
// SortField 排序字段
type SortField string
const (
SortByID SortField = "id" // 按ID排序
SortByCreatedAt SortField = "created_at" // 按创建时间排序
SortByUpdatedAt SortField = "updated_at" // 按更新时间排序
SortByStatus SortField = "status" // 按状态排序
)
// PageRequest 分页请求参数
type PageRequest struct {
Page int `json:"page"` // 页码从1开始
PageSize int `json:"page_size"` // 每页大小
SortField SortField `json:"sort_field"` // 排序字段
SortOrder SortOrder `json:"sort_order"` // 排序方向
}
// PageResponse 分页响应结果
type PageResponse struct {
Records []*DataRecord `json:"records"` // 当前页记录
Page int `json:"page"` // 当前页码
PageSize int `json:"page_size"` // 每页大小
TotalCount int `json:"total_count"` // 总记录数
TotalPages int `json:"total_pages"` // 总页数
HasNext bool `json:"has_next"` // 是否有下一页
HasPrevious bool `json:"has_previous"` // 是否有上一页
}
// GetRecordsByGroupAndStatus 按组和状态获取记录(支持分页)
func (pdb *PipelineDB) GetRecordsByGroupAndStatus(group string, status DataStatus, pageReq *PageRequest) (*PageResponse, error) {
// 参数验证
if pageReq.Page < 1 {
pageReq.Page = 1
}
if pageReq.PageSize < 1 {
pageReq.PageSize = 10 // 默认每页10条
}
if pageReq.PageSize > 1000 {
pageReq.PageSize = 1000 // 最大每页1000条
}
var allRecords []*DataRecord
// 首先收集所有匹配的记录
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
if record.Status == status {
allRecords = append(allRecords, record)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("查询记录失败: %w", err)
}
// 排序记录
pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder)
// 计算分页信息
totalCount := len(allRecords)
totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize
// 计算当前页的起始和结束索引
startIndex := (pageReq.Page - 1) * pageReq.PageSize
endIndex := startIndex + pageReq.PageSize
// 边界检查
if startIndex >= totalCount {
// 页码超出范围,返回空结果
return &PageResponse{
Records: []*DataRecord{},
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: false,
HasPrevious: pageReq.Page > 1,
}, nil
}
if endIndex > totalCount {
endIndex = totalCount
}
// 提取当前页的记录
pageRecords := allRecords[startIndex:endIndex]
// 构建响应
response := &PageResponse{
Records: pageRecords,
Page: pageReq.Page,
PageSize: pageReq.PageSize,
TotalCount: totalCount,
TotalPages: totalPages,
HasNext: pageReq.Page < totalPages,
HasPrevious: pageReq.Page > 1,
}
return response, nil
}
// GetStats 获取统计信息(高性能版本)
func (pdb *PipelineDB) GetStats() (*Stats, error) {
stats := &Stats{
Timestamp: time.Now(),
GroupStats: make(map[string]*GroupStats),
}
// 从 GroupManager 快速获取统计缓存O(1)复杂度)
groupStatsCache := pdb.groupManager.GetFastStats()
// 统计各状态的记录数
var totalRecords, hotRecords, warmRecords, coldRecords int
// 转换缓存数据为 Stats 格式
for group, cache := range groupStatsCache {
stats.GroupStats[group] = &GroupStats{
Group: group,
TotalRecords: cache.TotalRecords,
HotRecords: cache.HotRecords,
WarmRecords: cache.WarmRecords,
ColdRecords: cache.ColdRecords,
}
// 累计总体统计
totalRecords += cache.TotalRecords
hotRecords += cache.HotRecords
warmRecords += cache.WarmRecords
coldRecords += cache.ColdRecords
}
// 设置总体统计
stats.TotalRecords = totalRecords
stats.HotRecords = hotRecords
stats.WarmRecords = warmRecords
stats.ColdRecords = coldRecords
// 获取数据库统计
hits, misses, hitRate := pdb.cache.Stats()
stats.CacheHits = hits
stats.CacheMisses = misses
stats.CacheHitRate = hitRate
stats.TotalPages = int(pdb.header.TotalPages)
stats.FreePages = pdb.freePageMgr.FreeCount()
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
return stats, nil
}
// ValidateStats 验证统计信息(通过遍历实际数据)
// 用于数据校验和调试,性能较慢但结果准确
func (pdb *PipelineDB) ValidateStats() (*Stats, error) {
stats := &Stats{
Timestamp: time.Now(),
GroupStats: make(map[string]*GroupStats),
}
// 统计各状态的记录数
statusCounts := make(map[DataStatus]int)
// 遍历所有组的表
groups := pdb.getAllGroups()
for _, group := range groups {
err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error {
record := &DataRecord{}
if err := record.FromBytes(data); err != nil {
return nil // 跳过损坏的记录
}
// 更新总体统计
statusCounts[record.Status]++
// 更新组统计
if stats.GroupStats[record.Group] == nil {
stats.GroupStats[record.Group] = &GroupStats{
Group: record.Group,
}
}
stats.GroupStats[record.Group].TotalRecords++
// 更新组的状态计数
switch record.Status {
case StatusHot:
stats.GroupStats[record.Group].HotRecords++
case StatusWarm:
stats.GroupStats[record.Group].WarmRecords++
case StatusCold:
stats.GroupStats[record.Group].ColdRecords++
}
return nil
})
if err != nil {
return nil, fmt.Errorf("统计组 [%s] 失败: %w", group, err)
}
}
// 设置总体统计
stats.TotalRecords = statusCounts[StatusHot] + statusCounts[StatusWarm] + statusCounts[StatusCold]
stats.HotRecords = statusCounts[StatusHot]
stats.WarmRecords = statusCounts[StatusWarm]
stats.ColdRecords = statusCounts[StatusCold]
// 获取数据库统计
hits, misses, hitRate := pdb.cache.Stats()
stats.CacheHits = hits
stats.CacheMisses = misses
stats.CacheHitRate = hitRate
stats.TotalPages = int(pdb.header.TotalPages)
stats.FreePages = pdb.freePageMgr.FreeCount()
stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize
return stats, nil
}
// PauseGroup 暂停组接收数据
func (pdb *PipelineDB) PauseGroup(group string) {
pdb.groupManager.PauseGroup(group)
}
// ResumeGroup 恢复组接收数据
func (pdb *PipelineDB) ResumeGroup(group string) {
pdb.groupManager.ResumeGroup(group)
}
// IsGroupPaused 检查组是否暂停
func (pdb *PipelineDB) IsGroupPaused(group string) bool {
return pdb.groupManager.IsPaused(group)
}
// GetPausedGroups 获取所有暂停的组
func (pdb *PipelineDB) GetPausedGroups() []string {
return pdb.groupManager.GetPausedGroups()
}
// IsGroupReadyForCleanup 检查组是否可以清理所有数据都是cold
func (pdb *PipelineDB) IsGroupReadyForCleanup(group string) (bool, error) {
// 检查组是否暂停
if !pdb.groupManager.IsPaused(group) {
return false, fmt.Errorf("组 [%s] 未暂停,不能清理", group)
}
// 检查是否有非-cold数据
hotPageReq := &PageRequest{Page: 1, PageSize: 1}
hotResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusHot, hotPageReq)
if err != nil {
return false, err
}
if len(hotResponse.Records) > 0 {
return false, nil // 还有热数据
}
warmPageReq := &PageRequest{Page: 1, PageSize: 1}
warmResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusWarm, warmPageReq)
if err != nil {
return false, err
}
warmRecords := warmResponse.Records
if len(warmRecords) > 0 {
return false, nil // 还有温数据
}
return true, nil // 所有数据都是cold可以清理
}
// sortRecords 对记录进行排序
func (pdb *PipelineDB) sortRecords(records []*DataRecord, sortField SortField, sortOrder SortOrder) {
if len(records) <= 1 {
return
}
// 设置默认排序
if sortField == "" {
sortField = SortByID
}
if sortOrder == "" {
sortOrder = SortAsc
}
sort.Slice(records, func(i, j int) bool {
var less bool
switch sortField {
case SortByID:
less = records[i].ID < records[j].ID
case SortByCreatedAt:
less = records[i].CreatedAt.Before(records[j].CreatedAt)
case SortByUpdatedAt:
less = records[i].UpdatedAt.Before(records[j].UpdatedAt)
case SortByStatus:
// 状态排序hot < warm < cold
statusOrder := map[DataStatus]int{
StatusHot: 1,
StatusWarm: 2,
StatusCold: 3,
}
less = statusOrder[records[i].Status] < statusOrder[records[j].Status]
default:
// 默认按ID排序
less = records[i].ID < records[j].ID
}
// 如果是降序,反转比较结果
if sortOrder == SortDesc {
less = !less
}
return less
})
}
// CleanupGroup 清理组数据
func (pdb *PipelineDB) CleanupGroup(group string) error {
// 检查是否可以清理
ready, err := pdb.IsGroupReadyForCleanup(group)
if err != nil {
return err
}
if !ready {
return fmt.Errorf("组 [%s] 还有未处理完成的数据,不能清理", group)
}
pdb.logger.Info("🧽 开始清理组数据", "group", group)
// 调用外部处理器的完成回调
if pdb.handler != nil {
// 检查是否已经有 OnComplete 在执行
if pdb.groupManager.IsOnCompleteExecuting(group) {
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,等待完成", "group", group)
// 等待当前执行完成
for pdb.groupManager.IsOnCompleteExecuting(group) {
time.Sleep(100 * time.Millisecond)
}
pdb.logger.Info("✅ 组的 OnComplete 执行完成,继续清理", "group", group)
} else {
// 标记为正在执行
pdb.groupManager.SetOnCompleteExecuting(group, true)
defer pdb.groupManager.SetOnCompleteExecuting(group, false)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pdb.logger.Info("🎯 调用组完成回调", "group", group)
if err := pdb.handler.OnComplete(ctx, group); err != nil {
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
// 不阻止清理过程,只记录错误
} else {
pdb.logger.Info("✅ 组完成回调成功", "group", group)
}
}
}
// 获取组中所有记录
pageReq := &PageRequest{Page: 1, PageSize: 10000} // 假设最多10000条
response, err := pdb.GetRecordsByGroup(group, pageReq)
if err != nil {
return err
}
records := response.Records
// 删除所有记录
deletedCount := len(records)
// TODO: 实现实际的记录删除逻辑
pdb.logger.Info("📋 组中有记录待删除", "group", group, "count", deletedCount)
pdb.logger.Info("🧽 组清理完成", "group", group, "deleted_count", deletedCount)
return nil
}
// checkGroupCompletion 检查组是否完成处理并调用完成回调
func (pdb *PipelineDB) checkGroupCompletion(group string) {
// 只检查暂停的组
if !pdb.groupManager.IsPaused(group) {
return
}
// 检查是否已经有 OnComplete 在执行
if pdb.groupManager.IsOnCompleteExecuting(group) {
pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,跳过重复调用", "group", group)
return
}
// 检查是否所有数据都已转为 cold
ready, err := pdb.IsGroupReadyForCleanup(group)
if err != nil {
pdb.logger.Warn("⚠️ 检查组完成状态失败", "group", group, "error", err)
return
}
if ready && pdb.handler != nil {
// 标记为正在执行
pdb.groupManager.SetOnCompleteExecuting(group, true)
// 异步执行完成回调,避免阻塞
go func() {
defer pdb.groupManager.SetOnCompleteExecuting(group, false) // 确保执行完成后清除标记
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pdb.logger.Info("🎯 组所有数据已转为 cold调用完成回调", "group", group)
if err := pdb.handler.OnComplete(ctx, group); err != nil {
pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err)
} else {
pdb.logger.Info("✅ 组完成回调成功", "group", group)
}
}()
}
}
// handleGroupDataEvents 处理组数据事件
func (pdb *PipelineDB) handleGroupDataEvents(groupEventCh <-chan GroupDataEvent) {
pdb.logger.Info("📡 启动组数据事件监听器")
for {
select {
case <-pdb.ctx.Done():
pdb.logger.Info("📡 组数据事件监听器停止")
return
case event, ok := <-groupEventCh:
if !ok {
pdb.logger.Info("📡 组数据事件通道已关闭")
return
}
pdb.processGroupDataEvent(event)
}
}
}
// processGroupDataEvent 处理单个组数据事件
func (pdb *PipelineDB) processGroupDataEvent(event GroupDataEvent) {
if event.Done {
// 组完成,自动暂停
pdb.logger.Info("🔚 组数据产出完成,自动暂停", "group", event.Group)
pdb.groupManager.PauseGroup(event.Group)
// 可选:触发组完成检查
go func() {
// 等待一小段时间让当前数据处理完成
time.Sleep(100 * time.Millisecond)
pdb.checkGroupCompletion(event.Group)
}()
} else if len(event.Data) > 0 {
// 有新数据,确保组未暂停
if pdb.groupManager.IsPaused(event.Group) {
pdb.logger.Info("▶️ 组有新数据产出,自动恢复", "group", event.Group)
pdb.groupManager.ResumeGroup(event.Group)
}
// 自动接收数据
_, err := pdb.AcceptData(event.Group, event.Data, "auto_from_event")
if err != nil {
pdb.logger.Warn("⚠️ 自动接收组数据失败", "group", event.Group, "error", err)
} else {
pdb.logger.Info("📥 自动接收组数据成功", "group", event.Group)
}
}
}
// getAllGroups 获取所有组名(通过索引管理器)
func (pdb *PipelineDB) getAllGroups() []string {
// 简化实现:通过索引管理器获取所有组名
var groups []string
for groupName := range pdb.indexMgr.indexes {
groups = append(groups, groupName)
}
return groups
}
// GetGroupCleanupStatus 获取组清理状态
func (pdb *PipelineDB) GetGroupCleanupStatus(group string) (*GroupCleanupStatus, error) {
status := &GroupCleanupStatus{
Group: group,
Paused: pdb.groupManager.IsPaused(group),
}
// 获取组统计
stats, err := pdb.GetStats()
if err != nil {
return nil, err
}
if groupStat, exists := stats.GroupStats[group]; exists {
status.TotalRecords = groupStat.TotalRecords
status.HotRecords = groupStat.HotRecords
status.WarmRecords = groupStat.WarmRecords
status.ColdRecords = groupStat.ColdRecords
status.ReadyForCleanup = (groupStat.HotRecords == 0 && groupStat.WarmRecords == 0 && status.Paused)
}
return status, nil
}
// Stats 统计信息
type Stats struct {
Timestamp time.Time `json:"timestamp"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
GroupStats map[string]*GroupStats `json:"group_stats"`
TotalPages int `json:"total_pages"`
FreePages int `json:"free_pages"`
CacheHits int64 `json:"cache_hits"`
CacheMisses int64 `json:"cache_misses"`
CacheHitRate float64 `json:"cache_hit_rate"`
FileSizeBytes int64 `json:"file_size_bytes"`
}
// GroupStats 组统计信息
type GroupStats struct {
Group string `json:"group"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
}
// GroupCleanupStatus 组清理状态
type GroupCleanupStatus struct {
Group string `json:"group"`
Paused bool `json:"paused"`
TotalRecords int `json:"total_records"`
HotRecords int `json:"hot_records"`
WarmRecords int `json:"warm_records"`
ColdRecords int `json:"cold_records"`
ReadyForCleanup bool `json:"ready_for_cleanup"`
}
// 内部方法
func (pdb *PipelineDB) initializeFile() error {
pdb.header = &Header{
Magic: 0x57A5DB,
PageSize: PageSize,
TotalPages: 2,
FreeHead: 0,
RootPage: 1,
}
// 写入文件头
buf := make([]byte, PageSize)
pdb.writeHeaderToBuffer(buf)
if _, err := pdb.file.Write(buf); err != nil {
return err
}
// 初始化根页
rootPage := make([]byte, PageSize)
binary.LittleEndian.PutUint16(rootPage[0:2], 0) // numSlots
binary.LittleEndian.PutUint16(rootPage[2:4], PageSize) // freeOff
binary.LittleEndian.PutUint16(rootPage[4:6], 0) // nextPage
if _, err := pdb.file.Write(rootPage); err != nil {
return err
}
return nil
}
func (pdb *PipelineDB) loadHeader() error {
buf := make([]byte, HdrSize)
if _, err := pdb.file.ReadAt(buf, 0); err != nil {
return err
}
pdb.header = &Header{
Magic: binary.LittleEndian.Uint32(buf[0:4]),
PageSize: binary.LittleEndian.Uint16(buf[4:6]),
TotalPages: binary.LittleEndian.Uint16(buf[6:8]),
FreeHead: binary.LittleEndian.Uint16(buf[8:10]),
RootPage: binary.LittleEndian.Uint16(buf[10:12]),
CounterPage: binary.LittleEndian.Uint16(buf[12:14]),
}
if pdb.header.Magic != 0x57A5DB || pdb.header.PageSize != PageSize {
return errors.New("invalid database file")
}
return nil
}
func (pdb *PipelineDB) saveHeader() error {
buf := make([]byte, PageSize)
pdb.writeHeaderToBuffer(buf)
_, err := pdb.file.WriteAt(buf, 0)
return err
}
func (pdb *PipelineDB) writeHeaderToBuffer(buf []byte) {
binary.LittleEndian.PutUint32(buf[0:4], pdb.header.Magic)
binary.LittleEndian.PutUint16(buf[4:6], pdb.header.PageSize)
binary.LittleEndian.PutUint16(buf[6:8], pdb.header.TotalPages)
binary.LittleEndian.PutUint16(buf[8:10], pdb.header.FreeHead)
binary.LittleEndian.PutUint16(buf[10:12], pdb.header.RootPage)
binary.LittleEndian.PutUint16(buf[12:14], pdb.header.CounterPage)
}
func (pdb *PipelineDB) getRowMutex(id int64) *sync.RWMutex {
pdb.mu.Lock()
defer pdb.mu.Unlock()
if mutex, exists := pdb.rowMutexes[id]; exists {
return mutex
}
mutex := &sync.RWMutex{}
pdb.rowMutexes[id] = mutex
return mutex
}
// runAutoPipeline 运行自动管道处理器
func (pdb *PipelineDB) runAutoPipeline(ctx context.Context) {
pdb.logger.Info("🔥 启动自动管道处理器")
// 创建两个独立的定时器
warmTicker := time.NewTicker(pdb.config.WarmInterval)
processTicker := time.NewTicker(pdb.config.ProcessInterval)
defer warmTicker.Stop()
defer processTicker.Stop()
for {
select {
case <-ctx.Done():
pdb.logger.Info("🔥 自动管道处理器已停止")
return
case <-warmTicker.C:
// 处理 hot -> warm
if err := pdb.processHotData(); err != nil {
pdb.logger.Error("❌ 预热处理错误", "error", err)
}
case <-processTicker.C:
// 处理 warm -> cold
if err := pdb.processWarmData(); err != nil {
pdb.logger.Error("❌ 温数据处理错误", "error", err)
}
}
}
}
// processHotData 处理热数据
func (pdb *PipelineDB) processHotData() error {
// 获取热数据
hotRecords, err := pdb.GetRecordsByStatus(StatusHot, pdb.config.BatchSize)
if err != nil {
return err
}
if len(hotRecords) == 0 {
return nil // 没有热数据需要处理
}
pdb.logger.Info("🔥 开始预热热数据", "count", len(hotRecords))
processed := 0
for _, record := range hotRecords {
if err := pdb.warmupRecord(record); err != nil {
pdb.logger.Error("❌ 预热记录失败", "id", record.ID, "error", err)
continue
}
processed++
}
pdb.logger.Info("🔥 预热完成", "processed", processed, "total", len(hotRecords))
return nil
}
// warmupRecord 预热单条记录
func (pdb *PipelineDB) warmupRecord(record *DataRecord) error {
// 执行预热逻辑
if err := pdb.performWarmup(record); err != nil {
return err
}
// 更新状态为 warm
record.Status = StatusWarm
// 保存更新
if err := pdb.updateRecord(record); err != nil {
return err
}
pdb.logger.Info("🔥 记录已预热: hot -> warm", "id", record.ID)
return nil
}
// performWarmup 执行实际的预热操作
func (pdb *PipelineDB) performWarmup(record *DataRecord) error {
// 调用外部处理器的预热方法
if pdb.handler != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
processedData, err := pdb.handler.WillWarm(ctx, record.Group, record.Data)
if err != nil {
return err
}
// 更新处理后的数据
if len(processedData) > 0 {
record.Data = processedData
}
}
// 添加预热元数据
if record.Metadata == "" {
record.Metadata = "warmed_up"
} else {
record.Metadata += ",warmed_up"
}
return nil
}
// processWarmData 处理温数据
func (pdb *PipelineDB) processWarmData() error {
// 获取 warm 数据
warmRecords, err := pdb.GetRecordsByStatus(StatusWarm, pdb.config.BatchSize)
if err != nil {
return err
}
if len(warmRecords) == 0 {
return nil // 没有温数据需要处理
}
pdb.logger.Info("⚙️ 开始处理温数据", "count", len(warmRecords))
processed := 0
for _, record := range warmRecords {
if err := pdb.cooldownRecord(record); err != nil {
pdb.logger.Error("❌ 处理记录失败", "id", record.ID, "error", err)
continue
}
processed++
}
pdb.logger.Info("⚙️ 处理完成", "processed", processed, "total", len(warmRecords))
return nil
}
// cooldownRecord 处理单条记录 (warm -> cold)
func (pdb *PipelineDB) cooldownRecord(record *DataRecord) error {
// 自动处理器处理 warm 数据
pdb.logger.Info("⚙️ 自动处理器处理记录", "id", record.ID)
// 调用外部处理器
if err := pdb.performCooldown(record); err != nil {
return err
}
// 更新状态为 cold 并保存回存储
record.Status = StatusCold
if err := pdb.updateRecord(record); err != nil {
return err
}
pdb.logger.Info("⚙️ 记录处理完成: warm -> cold", "id", record.ID)
return nil
}
// performCooldown 调用外部处理器
func (pdb *PipelineDB) performCooldown(record *DataRecord) error {
pdb.logger.Info("🔗 调用外部处理器处理记录", "id", record.ID)
// 创建超时上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 调用外部处理器
processedData, err := pdb.handler.WillCold(ctx, record.Group, record.Data)
if err != nil {
pdb.logger.Error("❌ 外部处理器处理失败", "error", err)
return err
}
// 更新处理后的数据
record.Data = processedData
// 成功处理
pdb.logger.Info("✅ 外部处理器成功处理记录", "id", record.ID)
// 检查组是否完成处理
pdb.checkGroupCompletion(record.Group)
return nil
}
// rebuildIndexes 重建所有索引(用于数据库重启后恢复)
func (pdb *PipelineDB) rebuildIndexes() error {
// 如果没有根页面或总页数为0跳过重建
if pdb.header.RootPage == 0 || pdb.header.TotalPages <= 1 {
pdb.logger.Info("🔄 数据库为空,跳过索引重建")
return nil
}
2025-09-30 15:05:56 +08:00
pdb.logger.Info("🔄 重建索引...")
// 遍历所有数据页重建索引
pdb.logger.Info("📊 开始扫描页面", "totalPages", pdb.header.TotalPages, "rootPage", pdb.header.RootPage)
recordCount := 0
2025-09-30 15:05:56 +08:00
// 从根页面开始扫描页链
for pageNo := pdb.header.RootPage; pageNo != 0 && pageNo < pdb.header.TotalPages; {
// 读取页面
page, err := pdb.readPage(pageNo)
if err != nil {
pdb.logger.Info("⚠️ 跳过无法读取的页面", "pageNo", pageNo, "error", err)
break // 页链断裂,停止扫描
}
// 检查是否是数据页
if len(page) < 8 {
pdb.logger.Info("⚠️ 跳过太小的页面", "pageNo", pageNo, "size", len(page))
continue
}
// 解析页面头部
numSlots := binary.LittleEndian.Uint16(page[0:2]) // 正确的槽数量
freeOff := binary.LittleEndian.Uint16(page[2:4])
nextPage := binary.LittleEndian.Uint16(page[4:6])
// 只在有数据时记录详细日志
if numSlots > 0 {
pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage)
}
2025-09-30 15:05:56 +08:00
if numSlots == 0 {
pageNo = nextPage
2025-09-30 15:05:56 +08:00
continue
}
// 遍历页面中的所有记录(使用槽数组)
for slotNo := uint16(0); slotNo < numSlots && slotNo < 100; slotNo++ { // 限制最大槽数,避免无效数据
// 读取槽偏移槽数组从偏移6开始
slotOffset := 6 + slotNo*2
if slotOffset+2 > uint16(len(page)) {
pdb.logger.Info("⚠️ 槽偏移超出页面", "slotNo", slotNo, "slotOffset", slotOffset, "pageSize", len(page))
break
}
recordOffset := binary.LittleEndian.Uint16(page[slotOffset : slotOffset+2])
// 读取记录
if recordOffset+8 > uint16(len(page)) {
continue
}
// 读取ID
id := binary.LittleEndian.Uint64(page[recordOffset : recordOffset+8])
// 读取数据长度(变长编码)
dataLen, n := binary.Uvarint(page[recordOffset+8:])
if n <= 0 {
continue
}
// 读取数据
dataStart := recordOffset + 8 + uint16(n)
dataEnd := dataStart + uint16(dataLen)
if dataEnd > uint16(len(page)) {
continue
}
// 解析JSON记录获取组名
var record DataRecord
if err := record.FromBytes(page[dataStart:dataEnd]); err != nil {
continue
}
// 添加到索引
idx := pdb.indexMgr.GetOrCreateIndex(record.Group)
idx.Insert(int64(id), pageNo, slotNo)
recordCount++
2025-09-30 15:05:56 +08:00
}
// 移动到下一个页面
nextPageNo := Page(page).nextPage()
if nextPageNo == 0 {
break // 页链结束
}
pageNo = nextPageNo
}
pdb.logger.Info("✅ 索引重建完成", "recordCount", recordCount)
2025-09-30 15:05:56 +08:00
return nil
}