// Package pipelinedb provides an integrated pipeline database system // // # Pipeline Database V4 是一个集成了数据库存储和业务管道处理的一体化解决方案 // // 核心特性: // - 基于页面的存储引擎:高效的数据存储和检索 // - 分组数据管理:支持按业务组织数据,独立管理和统计 // - 三级数据状态:Hot(热)-> Warm(温)-> Cold(冷)的自动流转 // - 外部处理器集成:支持自定义业务逻辑处理 // - 高并发支持:线程安全的索引和存储操作 // - 页面缓存:提高数据访问性能 // - 持久化存储:数据安全持久保存 // // 数据流转模型: // 1. 数据接收:新数据以Hot状态进入系统 // 2. 预热处理:Hot数据经过预热处理转为Warm状态 // 3. 冷却处理:Warm数据经过冷却处理转为Cold状态 // 4. 完成回调:组内所有数据处理完成后触发回调 // // 使用场景: // - 数据管道处理:ETL、数据清洗、数据转换 // - 业务流程管理:订单处理、用户行为分析 // - 实时数据处理:日志分析、监控数据处理 // - 批量数据处理:数据导入、数据同步 package pipelinedb import ( "context" "encoding/binary" "encoding/json" "errors" "fmt" "log/slog" "os" "sort" "sync" "time" ) // ==================== 系统常量定义 ==================== const ( PageSize = 4096 // 页面大小:4KB,与操作系统页面大小对齐 HdrSize = 14 // 文件头大小:包含所有头部字段的字节数 MaxRecSize = 3000 // 最大记录大小:限制单条记录的最大字节数 ) // ==================== 数据结构定义 ==================== // Header 数据库文件头结构 // // 文件头存储在文件的开始位置,包含数据库的元数据信息 // 用于数据库启动时的初始化和状态恢复 type Header struct { Magic uint32 // 魔数:用于文件格式验证 PageSize uint16 // 页面大小:固定为4096字节 TotalPages uint16 // 总页面数:当前数据库文件包含的页面数量 FreeHead uint16 // 空闲页链头:空闲页面管理的起始页面 RootPage uint16 // 根页面:数据存储的根页面编号 CounterPage uint16 // 计数器页面:用于生成唯一ID的计数器页面 } // DataStatus 数据状态枚举 // // 定义数据在管道中的处理状态,支持三级状态流转 // 状态转换:Hot -> Warm -> Cold type DataStatus string const ( StatusHot DataStatus = "hot" // 热数据:刚接收的新数据,等待预热处理 StatusWarm DataStatus = "warm" // 温数据:已预热的数据,等待冷却处理 StatusCold DataStatus = "cold" // 冷数据:已完成处理的数据,可以归档或清理 ) // DataRecord 数据记录结构 // // 表示系统中的一条数据记录,包含完整的数据信息和元数据 // 支持JSON序列化,便于存储和传输 type DataRecord struct { ID int64 `json:"id"` // 记录唯一标识符 Group string `json:"group"` // 所属数据组名称 Status DataStatus `json:"status"` // 当前数据状态 Data []byte `json:"data"` // 实际数据内容 CreatedAt time.Time `json:"created_at"` // 创建时间戳 UpdatedAt time.Time `json:"updated_at"` // 最后更新时间戳 Metadata string `json:"metadata,omitempty"` // 可选的元数据信息 } // GroupDataEvent 组数据事件 // // 用于在组数据处理过程中传递事件信息 // 支持事件驱动的数据处理模式 type GroupDataEvent struct { Group string // 组名:事件所属的数据组 Data []byte // 数据内容:事件携带的数据 Done bool // 完成标志:标识该组是否已完成所有数据处理 } // ==================== 记录序列化方法 ==================== // ToBytes 将数据记录序列化为字节数组 // // 返回值: // - []byte: 序列化后的字节数组 // - error: 序列化错误 // // 使用JSON格式进行序列化,便于跨平台兼容和调试 func (dr *DataRecord) ToBytes() ([]byte, error) { return json.Marshal(dr) } // FromBytes 从字节数组反序列化数据记录 // // 参数: // - data: 序列化的字节数组 // // 返回值: // - error: 反序列化错误 // // 从JSON格式的字节数组中恢复数据记录对象 func (dr *DataRecord) FromBytes(data []byte) error { return json.Unmarshal(data, dr) } // ==================== 核心数据库结构 ==================== // PipelineDB 管道数据库主结构体 // // 集成了存储引擎、索引管理、缓存系统、管道处理等所有核心功能 // 提供线程安全的数据库操作接口 type PipelineDB struct { // ========== 存储引擎组件 ========== file *os.File // 数据库文件句柄 header *Header // 文件头信息 cache *PageCache // 页面缓存系统 freePageMgr *FreePageManager // 空闲页面管理器 indexMgr *IndexManager // 索引管理器 rowMutexes map[int64]*sync.RWMutex // 行级锁映射 mu sync.RWMutex // 数据库级读写锁 // ========== 配置和处理器 ========== config *Config // 数据库配置 handler Handler // 数据处理器接口 logger *slog.Logger // 结构化日志器 // ========== 并发控制 ========== ctx context.Context // 上下文控制 cancel context.CancelFunc // 取消函数 wg sync.WaitGroup // 等待组,用于优雅关闭 // ========== 业务组件 ========== groupManager *GroupManager // 组管理器 } // ==================== 配置结构定义 ==================== // Config 数据库统一配置结构 // // 包含存储引擎配置和管道处理配置 // 支持JSON序列化,便于配置文件管理 type Config struct { // ========== 存储引擎配置 ========== CacheSize int `json:"cache_size"` // 页缓存大小:缓存的页面数量 SyncWrites bool `json:"sync_writes"` // 同步写入:是否立即同步到磁盘 CreateIfMiss bool `json:"create_if_miss"` // 自动创建:文件不存在时是否自动创建 // ========== 管道处理配置 ========== WarmInterval time.Duration `json:"warm_interval"` // 预热间隔:Hot->Warm状态转换的时间间隔 ProcessInterval time.Duration `json:"process_interval"` // 处理间隔:Warm->Cold状态转换的时间间隔 BatchSize int `json:"batch_size"` // 批处理大小:每次处理的记录数量 EnableMetrics bool `json:"enable_metrics"` // 启用指标:是否收集性能指标 } // DefaultConfig 返回默认配置 // // 返回值: // - *Config: 包含合理默认值的配置对象 // // 默认配置适用于大多数使用场景,可以根据实际需求进行调整 func DefaultConfig() *Config { return &Config{ CacheSize: 256, // 256页缓存,约1MB内存 SyncWrites: false, // 异步写入,提高性能 CreateIfMiss: true, // 自动创建数据库文件 WarmInterval: 5 * time.Second, // 5秒预热间隔 ProcessInterval: 10 * time.Second, // 10秒处理间隔 BatchSize: 100, // 每批处理100条记录 EnableMetrics: true, // 启用性能指标收集 } } // ==================== 数据处理器接口 ==================== // Handler 数据处理器接口 // // 定义了数据在状态流转过程中的处理回调 // 用户可以实现此接口来定制业务逻辑处理 type Handler interface { // WillWarm 预热处理回调 // // 参数: // - ctx: 上下文,用于超时控制和取消操作 // - group: 数据组名称 // - data: 待处理的数据 // // 返回值: // - []byte: 处理后的数据 // - error: 处理错误 // // 在数据从Hot状态转换为Warm状态时调用 // 可以在此阶段进行数据验证、转换、清洗等操作 WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) // WillCold 冷却处理回调 // // 参数: // - ctx: 上下文,用于超时控制和取消操作 // - group: 数据组名称 // - data: 待处理的数据 // // 返回值: // - []byte: 处理后的数据 // - error: 处理错误 // // 在数据从Warm状态转换为Cold状态时调用 // 可以在此阶段进行数据压缩、归档、发送等操作 WillCold(ctx context.Context, group string, data []byte) ([]byte, error) // OnComplete 组完成回调 // // 参数: // - ctx: 上下文,用于超时控制和取消操作 // - group: 完成处理的数据组名称 // // 返回值: // - error: 处理错误 // // 在组内所有数据都完成处理后调用 // 可以在此阶段进行清理、通知、统计等操作 OnComplete(ctx context.Context, group string) error } // ==================== 数据库选项结构 ==================== // Options 数据库打开选项 // // 包含打开数据库所需的所有配置参数 // 使用选项模式,提供灵活的配置方式 type Options struct { Filename string // 数据库文件路径 Config *Config // 数据库配置,nil时使用默认配置 Handler Handler // 数据处理器,可选 Logger *slog.Logger // 日志器,nil时使用默认日志器 } // ==================== 数据库核心方法 ==================== // Open 打开或创建管道数据库 // // 参数: // - opts: 数据库打开选项,包含文件路径、配置、处理器等 // // 返回值: // - *PipelineDB: 数据库实例 // - error: 打开错误 // // 执行流程: // 1. 验证和设置默认配置 // 2. 打开或创建数据库文件 // 3. 初始化存储引擎组件(缓存、索引、页面管理器等) // 4. 加载或创建文件头信息 // 5. 初始化组管理器和管道处理器 // 6. 启动后台处理协程 // // 线程安全:返回的数据库实例支持并发访问 func Open(opts Options) (*PipelineDB, error) { // 设置默认配置 if opts.Config == nil { opts.Config = DefaultConfig() } // 设置默认 logger logger := opts.Logger if logger == nil { logger = slog.Default() } pdb := &PipelineDB{ config: opts.Config, handler: opts.Handler, logger: logger, rowMutexes: make(map[int64]*sync.RWMutex), cache: NewPageCache(opts.Config.CacheSize), freePageMgr: NewFreePageManager(), indexMgr: NewIndexManager(), } // 创建上下文 pdb.ctx, pdb.cancel = context.WithCancel(context.Background()) // 打开文件 var err error if opts.Config.CreateIfMiss { pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644) } else { pdb.file, err = os.OpenFile(opts.Filename, os.O_RDWR, 0644) } if err != nil { return nil, err } // 检查文件是否为空 stat, err := pdb.file.Stat() if err != nil { return nil, err } if stat.Size() == 0 { // 初始化新文件 if err := pdb.initializeFile(); err != nil { return nil, err } } else { // 加载现有文件 if err := pdb.loadHeader(); err != nil { return nil, err } // 加载空闲页信息 if err := pdb.freePageMgr.LoadFromHeader(pdb.header.FreeHead, pdb.readPageDirect); err != nil { return nil, err } // 重建索引 if err := pdb.rebuildIndexes(); err != nil { return nil, err } } // 创建组管理器 pdb.groupManager = NewGroupManager(pdb) return pdb, nil } // Start 启动管道数据库 func (pdb *PipelineDB) Start(groupEventCh <-chan GroupDataEvent) error { pdb.logger.Info("🚀 启动管道数据库...") // 启动自动管道处理器 pdb.wg.Add(1) go func() { defer pdb.wg.Done() pdb.runAutoPipeline(pdb.ctx) }() // 启动组数据事件监听器 if groupEventCh != nil { pdb.wg.Add(1) go func() { defer pdb.wg.Done() pdb.handleGroupDataEvents(groupEventCh) }() } pdb.logger.Info("✅ 管道数据库启动完成") return nil } // Stop 停止管道数据库 func (pdb *PipelineDB) Stop() error { pdb.logger.Info("🛑 停止管道数据库...") pdb.cancel() pdb.wg.Wait() // 刷新缓存 if err := pdb.cache.Flush(pdb.writePageDirect); err != nil { return err } // 保存空闲页信息 freeHead, err := pdb.freePageMgr.SaveToHeader(pdb.writePageDirect) if err != nil { return err } // 更新文件头 pdb.header.FreeHead = freeHead if err := pdb.saveHeader(); err != nil { return err } if err := pdb.file.Close(); err != nil { return err } pdb.logger.Info("✅ 管道数据库已停止") return nil } // AcceptData Producer 接收数据 (业务入口) func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error) { // 检查组是否被暂停 if pdb.groupManager.IsPaused(group) { return 0, fmt.Errorf("组 [%s] 已暂停接收数据", group) } // 生成自增ID id := pdb.groupManager.GetNextID(group) record := &DataRecord{ ID: id, Group: group, Status: StatusHot, Data: data, CreatedAt: time.Now(), UpdatedAt: time.Now(), Metadata: metadata, } // 序列化记录 recordBytes, err := record.ToBytes() if err != nil { return 0, fmt.Errorf("序列化记录失败: %w", err) } // 直接使用组名作为表名 err = pdb.insert(group, id, recordBytes) if err != nil { return 0, fmt.Errorf("存储数据失败: %w", err) } // 更新统计缓存 pdb.groupManager.IncrementStats(group, StatusHot) pdb.logger.Info("📥 接收数据", "group", group, "id", id, "size", len(data), "status", StatusHot) return id, nil } // GetRecord 获取记录 func (pdb *PipelineDB) GetRecord(group string, id int64) (*DataRecord, error) { data, err := pdb.get(group, id) if err != nil { return nil, err } record := &DataRecord{} if err := record.FromBytes(data); err != nil { return nil, err } return record, nil } // updateRecord 内部更新记录(不对外暴露) func (pdb *PipelineDB) updateRecord(record *DataRecord) error { record.UpdatedAt = time.Now() recordBytes, err := record.ToBytes() if err != nil { return fmt.Errorf("序列化记录失败: %w", err) } return pdb.update(record.Group, record.ID, recordBytes) } // GetRecordsByStatus 按状态获取记录(跨所有组) func (pdb *PipelineDB) GetRecordsByStatus(status DataStatus, limit int) ([]*DataRecord, error) { var records []*DataRecord count := 0 // 遍历所有组的表 groups := pdb.getAllGroups() for _, group := range groups { if count >= limit { break } // 查询当前组的记录 err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error { if count >= limit { return fmt.Errorf("达到限制") // 停止遍历 } record := &DataRecord{} if err := record.FromBytes(data); err != nil { return nil // 跳过损坏的记录 } if record.Status == status { records = append(records, record) count++ } return nil }) if err != nil && err.Error() != "达到限制" { return nil, fmt.Errorf("查询组 [%s] 失败: %w", group, err) } } return records, nil } // GetRecordsByGroup 按组获取记录(支持分页) func (pdb *PipelineDB) GetRecordsByGroup(group string, pageReq *PageRequest) (*PageResponse, error) { // 参数验证 if pageReq.Page < 1 { pageReq.Page = 1 } if pageReq.PageSize < 1 { pageReq.PageSize = 10 // 默认每页10条 } if pageReq.PageSize > 1000 { pageReq.PageSize = 1000 // 最大每页1000条 } var allRecords []*DataRecord // 收集所有记录 err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error { record := &DataRecord{} if err := record.FromBytes(data); err != nil { return nil // 跳过损坏的记录 } allRecords = append(allRecords, record) return nil }) if err != nil { return nil, fmt.Errorf("查询记录失败: %w", err) } // 排序记录 pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder) // 计算分页信息 totalCount := len(allRecords) totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize // 计算当前页的起始和结束索引 startIndex := (pageReq.Page - 1) * pageReq.PageSize endIndex := startIndex + pageReq.PageSize // 边界检查 if startIndex >= totalCount { return &PageResponse{ Records: []*DataRecord{}, Page: pageReq.Page, PageSize: pageReq.PageSize, TotalCount: totalCount, TotalPages: totalPages, HasNext: false, HasPrevious: pageReq.Page > 1, }, nil } if endIndex > totalCount { endIndex = totalCount } // 提取当前页的记录 pageRecords := allRecords[startIndex:endIndex] // 构建响应 response := &PageResponse{ Records: pageRecords, Page: pageReq.Page, PageSize: pageReq.PageSize, TotalCount: totalCount, TotalPages: totalPages, HasNext: pageReq.Page < totalPages, HasPrevious: pageReq.Page > 1, } return response, nil } // SortOrder 排序方向 type SortOrder string const ( SortAsc SortOrder = "asc" // 升序 SortDesc SortOrder = "desc" // 降序 ) // SortField 排序字段 type SortField string const ( SortByID SortField = "id" // 按ID排序 SortByCreatedAt SortField = "created_at" // 按创建时间排序 SortByUpdatedAt SortField = "updated_at" // 按更新时间排序 SortByStatus SortField = "status" // 按状态排序 ) // PageRequest 分页请求参数 type PageRequest struct { Page int `json:"page"` // 页码,从1开始 PageSize int `json:"page_size"` // 每页大小 SortField SortField `json:"sort_field"` // 排序字段 SortOrder SortOrder `json:"sort_order"` // 排序方向 } // PageResponse 分页响应结果 type PageResponse struct { Records []*DataRecord `json:"records"` // 当前页记录 Page int `json:"page"` // 当前页码 PageSize int `json:"page_size"` // 每页大小 TotalCount int `json:"total_count"` // 总记录数 TotalPages int `json:"total_pages"` // 总页数 HasNext bool `json:"has_next"` // 是否有下一页 HasPrevious bool `json:"has_previous"` // 是否有上一页 } // GetRecordsByGroupAndStatus 按组和状态获取记录(支持分页) func (pdb *PipelineDB) GetRecordsByGroupAndStatus(group string, status DataStatus, pageReq *PageRequest) (*PageResponse, error) { // 参数验证 if pageReq.Page < 1 { pageReq.Page = 1 } if pageReq.PageSize < 1 { pageReq.PageSize = 10 // 默认每页10条 } if pageReq.PageSize > 1000 { pageReq.PageSize = 1000 // 最大每页1000条 } var allRecords []*DataRecord // 首先收集所有匹配的记录 err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error { record := &DataRecord{} if err := record.FromBytes(data); err != nil { return nil // 跳过损坏的记录 } if record.Status == status { allRecords = append(allRecords, record) } return nil }) if err != nil { return nil, fmt.Errorf("查询记录失败: %w", err) } // 排序记录 pdb.sortRecords(allRecords, pageReq.SortField, pageReq.SortOrder) // 计算分页信息 totalCount := len(allRecords) totalPages := (totalCount + pageReq.PageSize - 1) / pageReq.PageSize // 计算当前页的起始和结束索引 startIndex := (pageReq.Page - 1) * pageReq.PageSize endIndex := startIndex + pageReq.PageSize // 边界检查 if startIndex >= totalCount { // 页码超出范围,返回空结果 return &PageResponse{ Records: []*DataRecord{}, Page: pageReq.Page, PageSize: pageReq.PageSize, TotalCount: totalCount, TotalPages: totalPages, HasNext: false, HasPrevious: pageReq.Page > 1, }, nil } if endIndex > totalCount { endIndex = totalCount } // 提取当前页的记录 pageRecords := allRecords[startIndex:endIndex] // 构建响应 response := &PageResponse{ Records: pageRecords, Page: pageReq.Page, PageSize: pageReq.PageSize, TotalCount: totalCount, TotalPages: totalPages, HasNext: pageReq.Page < totalPages, HasPrevious: pageReq.Page > 1, } return response, nil } // GetStats 获取统计信息(高性能版本) func (pdb *PipelineDB) GetStats() (*Stats, error) { stats := &Stats{ Timestamp: time.Now(), GroupStats: make(map[string]*GroupStats), } // 从 GroupManager 快速获取统计缓存(O(1)复杂度) groupStatsCache := pdb.groupManager.GetFastStats() // 统计各状态的记录数 var totalRecords, hotRecords, warmRecords, coldRecords int // 转换缓存数据为 Stats 格式 for group, cache := range groupStatsCache { stats.GroupStats[group] = &GroupStats{ Group: group, TotalRecords: cache.TotalRecords, HotRecords: cache.HotRecords, WarmRecords: cache.WarmRecords, ColdRecords: cache.ColdRecords, } // 累计总体统计 totalRecords += cache.TotalRecords hotRecords += cache.HotRecords warmRecords += cache.WarmRecords coldRecords += cache.ColdRecords } // 设置总体统计 stats.TotalRecords = totalRecords stats.HotRecords = hotRecords stats.WarmRecords = warmRecords stats.ColdRecords = coldRecords // 获取数据库统计 hits, misses, hitRate := pdb.cache.Stats() stats.CacheHits = hits stats.CacheMisses = misses stats.CacheHitRate = hitRate stats.TotalPages = int(pdb.header.TotalPages) stats.FreePages = pdb.freePageMgr.FreeCount() stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize return stats, nil } // ValidateStats 验证统计信息(通过遍历实际数据) // 用于数据校验和调试,性能较慢但结果准确 func (pdb *PipelineDB) ValidateStats() (*Stats, error) { stats := &Stats{ Timestamp: time.Now(), GroupStats: make(map[string]*GroupStats), } // 统计各状态的记录数 statusCounts := make(map[DataStatus]int) // 遍历所有组的表 groups := pdb.getAllGroups() for _, group := range groups { err := pdb.rangeQuery(group, 0, int64(^uint64(0)>>1), func(id int64, data []byte) error { record := &DataRecord{} if err := record.FromBytes(data); err != nil { return nil // 跳过损坏的记录 } // 更新总体统计 statusCounts[record.Status]++ // 更新组统计 if stats.GroupStats[record.Group] == nil { stats.GroupStats[record.Group] = &GroupStats{ Group: record.Group, } } stats.GroupStats[record.Group].TotalRecords++ // 更新组的状态计数 switch record.Status { case StatusHot: stats.GroupStats[record.Group].HotRecords++ case StatusWarm: stats.GroupStats[record.Group].WarmRecords++ case StatusCold: stats.GroupStats[record.Group].ColdRecords++ } return nil }) if err != nil { return nil, fmt.Errorf("统计组 [%s] 失败: %w", group, err) } } // 设置总体统计 stats.TotalRecords = statusCounts[StatusHot] + statusCounts[StatusWarm] + statusCounts[StatusCold] stats.HotRecords = statusCounts[StatusHot] stats.WarmRecords = statusCounts[StatusWarm] stats.ColdRecords = statusCounts[StatusCold] // 获取数据库统计 hits, misses, hitRate := pdb.cache.Stats() stats.CacheHits = hits stats.CacheMisses = misses stats.CacheHitRate = hitRate stats.TotalPages = int(pdb.header.TotalPages) stats.FreePages = pdb.freePageMgr.FreeCount() stats.FileSizeBytes = int64(pdb.header.TotalPages) * PageSize return stats, nil } // PauseGroup 暂停组接收数据 func (pdb *PipelineDB) PauseGroup(group string) { pdb.groupManager.PauseGroup(group) } // ResumeGroup 恢复组接收数据 func (pdb *PipelineDB) ResumeGroup(group string) { pdb.groupManager.ResumeGroup(group) } // IsGroupPaused 检查组是否暂停 func (pdb *PipelineDB) IsGroupPaused(group string) bool { return pdb.groupManager.IsPaused(group) } // GetPausedGroups 获取所有暂停的组 func (pdb *PipelineDB) GetPausedGroups() []string { return pdb.groupManager.GetPausedGroups() } // IsGroupReadyForCleanup 检查组是否可以清理(所有数据都是cold) func (pdb *PipelineDB) IsGroupReadyForCleanup(group string) (bool, error) { // 检查组是否暂停 if !pdb.groupManager.IsPaused(group) { return false, fmt.Errorf("组 [%s] 未暂停,不能清理", group) } // 检查是否有非-cold数据 hotPageReq := &PageRequest{Page: 1, PageSize: 1} hotResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusHot, hotPageReq) if err != nil { return false, err } if len(hotResponse.Records) > 0 { return false, nil // 还有热数据 } warmPageReq := &PageRequest{Page: 1, PageSize: 1} warmResponse, err := pdb.GetRecordsByGroupAndStatus(group, StatusWarm, warmPageReq) if err != nil { return false, err } warmRecords := warmResponse.Records if len(warmRecords) > 0 { return false, nil // 还有温数据 } return true, nil // 所有数据都是cold,可以清理 } // sortRecords 对记录进行排序 func (pdb *PipelineDB) sortRecords(records []*DataRecord, sortField SortField, sortOrder SortOrder) { if len(records) <= 1 { return } // 设置默认排序 if sortField == "" { sortField = SortByID } if sortOrder == "" { sortOrder = SortAsc } sort.Slice(records, func(i, j int) bool { var less bool switch sortField { case SortByID: less = records[i].ID < records[j].ID case SortByCreatedAt: less = records[i].CreatedAt.Before(records[j].CreatedAt) case SortByUpdatedAt: less = records[i].UpdatedAt.Before(records[j].UpdatedAt) case SortByStatus: // 状态排序:hot < warm < cold statusOrder := map[DataStatus]int{ StatusHot: 1, StatusWarm: 2, StatusCold: 3, } less = statusOrder[records[i].Status] < statusOrder[records[j].Status] default: // 默认按ID排序 less = records[i].ID < records[j].ID } // 如果是降序,反转比较结果 if sortOrder == SortDesc { less = !less } return less }) } // CleanupGroup 清理组数据 func (pdb *PipelineDB) CleanupGroup(group string) error { // 检查是否可以清理 ready, err := pdb.IsGroupReadyForCleanup(group) if err != nil { return err } if !ready { return fmt.Errorf("组 [%s] 还有未处理完成的数据,不能清理", group) } pdb.logger.Info("🧽 开始清理组数据", "group", group) // 调用外部处理器的完成回调 if pdb.handler != nil { // 检查是否已经有 OnComplete 在执行 if pdb.groupManager.IsOnCompleteExecuting(group) { pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,等待完成", "group", group) // 等待当前执行完成 for pdb.groupManager.IsOnCompleteExecuting(group) { time.Sleep(100 * time.Millisecond) } pdb.logger.Info("✅ 组的 OnComplete 执行完成,继续清理", "group", group) } else { // 标记为正在执行 pdb.groupManager.SetOnCompleteExecuting(group, true) defer pdb.groupManager.SetOnCompleteExecuting(group, false) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() pdb.logger.Info("🎯 调用组完成回调", "group", group) if err := pdb.handler.OnComplete(ctx, group); err != nil { pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err) // 不阻止清理过程,只记录错误 } else { pdb.logger.Info("✅ 组完成回调成功", "group", group) } } } // 获取组中所有记录 pageReq := &PageRequest{Page: 1, PageSize: 10000} // 假设最多10000条 response, err := pdb.GetRecordsByGroup(group, pageReq) if err != nil { return err } records := response.Records // 删除所有记录 deletedCount := len(records) // TODO: 实现实际的记录删除逻辑 pdb.logger.Info("📋 组中有记录待删除", "group", group, "count", deletedCount) pdb.logger.Info("🧽 组清理完成", "group", group, "deleted_count", deletedCount) return nil } // checkGroupCompletion 检查组是否完成处理并调用完成回调 func (pdb *PipelineDB) checkGroupCompletion(group string) { // 只检查暂停的组 if !pdb.groupManager.IsPaused(group) { return } // 检查是否已经有 OnComplete 在执行 if pdb.groupManager.IsOnCompleteExecuting(group) { pdb.logger.Info("⏳ 组的 OnComplete 正在执行中,跳过重复调用", "group", group) return } // 检查是否所有数据都已转为 cold ready, err := pdb.IsGroupReadyForCleanup(group) if err != nil { pdb.logger.Warn("⚠️ 检查组完成状态失败", "group", group, "error", err) return } if ready && pdb.handler != nil { // 标记为正在执行 pdb.groupManager.SetOnCompleteExecuting(group, true) // 异步执行完成回调,避免阻塞 go func() { defer pdb.groupManager.SetOnCompleteExecuting(group, false) // 确保执行完成后清除标记 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() pdb.logger.Info("🎯 组所有数据已转为 cold,调用完成回调", "group", group) if err := pdb.handler.OnComplete(ctx, group); err != nil { pdb.logger.Warn("⚠️ 组完成回调失败", "group", group, "error", err) } else { pdb.logger.Info("✅ 组完成回调成功", "group", group) } }() } } // handleGroupDataEvents 处理组数据事件 func (pdb *PipelineDB) handleGroupDataEvents(groupEventCh <-chan GroupDataEvent) { pdb.logger.Info("📡 启动组数据事件监听器") for { select { case <-pdb.ctx.Done(): pdb.logger.Info("📡 组数据事件监听器停止") return case event, ok := <-groupEventCh: if !ok { pdb.logger.Info("📡 组数据事件通道已关闭") return } pdb.processGroupDataEvent(event) } } } // processGroupDataEvent 处理单个组数据事件 func (pdb *PipelineDB) processGroupDataEvent(event GroupDataEvent) { if event.Done { // 组完成,自动暂停 pdb.logger.Info("🔚 组数据产出完成,自动暂停", "group", event.Group) pdb.groupManager.PauseGroup(event.Group) // 可选:触发组完成检查 go func() { // 等待一小段时间让当前数据处理完成 time.Sleep(100 * time.Millisecond) pdb.checkGroupCompletion(event.Group) }() } else if len(event.Data) > 0 { // 有新数据,确保组未暂停 if pdb.groupManager.IsPaused(event.Group) { pdb.logger.Info("▶️ 组有新数据产出,自动恢复", "group", event.Group) pdb.groupManager.ResumeGroup(event.Group) } // 自动接收数据 _, err := pdb.AcceptData(event.Group, event.Data, "auto_from_event") if err != nil { pdb.logger.Warn("⚠️ 自动接收组数据失败", "group", event.Group, "error", err) } else { pdb.logger.Info("📥 自动接收组数据成功", "group", event.Group) } } } // getAllGroups 获取所有组名(通过索引管理器) func (pdb *PipelineDB) getAllGroups() []string { // 简化实现:通过索引管理器获取所有组名 var groups []string for groupName := range pdb.indexMgr.indexes { groups = append(groups, groupName) } return groups } // GetGroupCleanupStatus 获取组清理状态 func (pdb *PipelineDB) GetGroupCleanupStatus(group string) (*GroupCleanupStatus, error) { status := &GroupCleanupStatus{ Group: group, Paused: pdb.groupManager.IsPaused(group), } // 获取组统计 stats, err := pdb.GetStats() if err != nil { return nil, err } if groupStat, exists := stats.GroupStats[group]; exists { status.TotalRecords = groupStat.TotalRecords status.HotRecords = groupStat.HotRecords status.WarmRecords = groupStat.WarmRecords status.ColdRecords = groupStat.ColdRecords status.ReadyForCleanup = (groupStat.HotRecords == 0 && groupStat.WarmRecords == 0 && status.Paused) } return status, nil } // Stats 统计信息 type Stats struct { Timestamp time.Time `json:"timestamp"` TotalRecords int `json:"total_records"` HotRecords int `json:"hot_records"` WarmRecords int `json:"warm_records"` ColdRecords int `json:"cold_records"` GroupStats map[string]*GroupStats `json:"group_stats"` TotalPages int `json:"total_pages"` FreePages int `json:"free_pages"` CacheHits int64 `json:"cache_hits"` CacheMisses int64 `json:"cache_misses"` CacheHitRate float64 `json:"cache_hit_rate"` FileSizeBytes int64 `json:"file_size_bytes"` } // GroupStats 组统计信息 type GroupStats struct { Group string `json:"group"` TotalRecords int `json:"total_records"` HotRecords int `json:"hot_records"` WarmRecords int `json:"warm_records"` ColdRecords int `json:"cold_records"` } // GroupCleanupStatus 组清理状态 type GroupCleanupStatus struct { Group string `json:"group"` Paused bool `json:"paused"` TotalRecords int `json:"total_records"` HotRecords int `json:"hot_records"` WarmRecords int `json:"warm_records"` ColdRecords int `json:"cold_records"` ReadyForCleanup bool `json:"ready_for_cleanup"` } // 内部方法 func (pdb *PipelineDB) initializeFile() error { pdb.header = &Header{ Magic: 0x57A5DB, PageSize: PageSize, TotalPages: 2, FreeHead: 0, RootPage: 1, } // 写入文件头 buf := make([]byte, PageSize) pdb.writeHeaderToBuffer(buf) if _, err := pdb.file.Write(buf); err != nil { return err } // 初始化根页 rootPage := make([]byte, PageSize) binary.LittleEndian.PutUint16(rootPage[0:2], 0) // numSlots binary.LittleEndian.PutUint16(rootPage[2:4], PageSize) // freeOff binary.LittleEndian.PutUint16(rootPage[4:6], 0) // nextPage if _, err := pdb.file.Write(rootPage); err != nil { return err } return nil } func (pdb *PipelineDB) loadHeader() error { buf := make([]byte, HdrSize) if _, err := pdb.file.ReadAt(buf, 0); err != nil { return err } pdb.header = &Header{ Magic: binary.LittleEndian.Uint32(buf[0:4]), PageSize: binary.LittleEndian.Uint16(buf[4:6]), TotalPages: binary.LittleEndian.Uint16(buf[6:8]), FreeHead: binary.LittleEndian.Uint16(buf[8:10]), RootPage: binary.LittleEndian.Uint16(buf[10:12]), CounterPage: binary.LittleEndian.Uint16(buf[12:14]), } if pdb.header.Magic != 0x57A5DB || pdb.header.PageSize != PageSize { return errors.New("invalid database file") } return nil } func (pdb *PipelineDB) saveHeader() error { buf := make([]byte, PageSize) pdb.writeHeaderToBuffer(buf) _, err := pdb.file.WriteAt(buf, 0) return err } func (pdb *PipelineDB) writeHeaderToBuffer(buf []byte) { binary.LittleEndian.PutUint32(buf[0:4], pdb.header.Magic) binary.LittleEndian.PutUint16(buf[4:6], pdb.header.PageSize) binary.LittleEndian.PutUint16(buf[6:8], pdb.header.TotalPages) binary.LittleEndian.PutUint16(buf[8:10], pdb.header.FreeHead) binary.LittleEndian.PutUint16(buf[10:12], pdb.header.RootPage) binary.LittleEndian.PutUint16(buf[12:14], pdb.header.CounterPage) } func (pdb *PipelineDB) getRowMutex(id int64) *sync.RWMutex { pdb.mu.Lock() defer pdb.mu.Unlock() if mutex, exists := pdb.rowMutexes[id]; exists { return mutex } mutex := &sync.RWMutex{} pdb.rowMutexes[id] = mutex return mutex } // runAutoPipeline 运行自动管道处理器 func (pdb *PipelineDB) runAutoPipeline(ctx context.Context) { pdb.logger.Info("🔥 启动自动管道处理器") // 创建两个独立的定时器 warmTicker := time.NewTicker(pdb.config.WarmInterval) processTicker := time.NewTicker(pdb.config.ProcessInterval) defer warmTicker.Stop() defer processTicker.Stop() for { select { case <-ctx.Done(): pdb.logger.Info("🔥 自动管道处理器已停止") return case <-warmTicker.C: // 处理 hot -> warm if err := pdb.processHotData(); err != nil { pdb.logger.Error("❌ 预热处理错误", "error", err) } case <-processTicker.C: // 处理 warm -> cold if err := pdb.processWarmData(); err != nil { pdb.logger.Error("❌ 温数据处理错误", "error", err) } } } } // processHotData 处理热数据 func (pdb *PipelineDB) processHotData() error { // 获取热数据 hotRecords, err := pdb.GetRecordsByStatus(StatusHot, pdb.config.BatchSize) if err != nil { return err } if len(hotRecords) == 0 { return nil // 没有热数据需要处理 } pdb.logger.Info("🔥 开始预热热数据", "count", len(hotRecords)) processed := 0 for _, record := range hotRecords { if err := pdb.warmupRecord(record); err != nil { pdb.logger.Error("❌ 预热记录失败", "id", record.ID, "error", err) continue } processed++ } pdb.logger.Info("🔥 预热完成", "processed", processed, "total", len(hotRecords)) return nil } // warmupRecord 预热单条记录 func (pdb *PipelineDB) warmupRecord(record *DataRecord) error { // 执行预热逻辑 if err := pdb.performWarmup(record); err != nil { return err } // 更新状态为 warm record.Status = StatusWarm // 保存更新 if err := pdb.updateRecord(record); err != nil { return err } pdb.logger.Info("🔥 记录已预热: hot -> warm", "id", record.ID) return nil } // performWarmup 执行实际的预热操作 func (pdb *PipelineDB) performWarmup(record *DataRecord) error { // 调用外部处理器的预热方法 if pdb.handler != nil { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() processedData, err := pdb.handler.WillWarm(ctx, record.Group, record.Data) if err != nil { return err } // 更新处理后的数据 if len(processedData) > 0 { record.Data = processedData } } // 添加预热元数据 if record.Metadata == "" { record.Metadata = "warmed_up" } else { record.Metadata += ",warmed_up" } return nil } // processWarmData 处理温数据 func (pdb *PipelineDB) processWarmData() error { // 获取 warm 数据 warmRecords, err := pdb.GetRecordsByStatus(StatusWarm, pdb.config.BatchSize) if err != nil { return err } if len(warmRecords) == 0 { return nil // 没有温数据需要处理 } pdb.logger.Info("⚙️ 开始处理温数据", "count", len(warmRecords)) processed := 0 for _, record := range warmRecords { if err := pdb.cooldownRecord(record); err != nil { pdb.logger.Error("❌ 处理记录失败", "id", record.ID, "error", err) continue } processed++ } pdb.logger.Info("⚙️ 处理完成", "processed", processed, "total", len(warmRecords)) return nil } // cooldownRecord 处理单条记录 (warm -> cold) func (pdb *PipelineDB) cooldownRecord(record *DataRecord) error { // 自动处理器处理 warm 数据 pdb.logger.Info("⚙️ 自动处理器处理记录", "id", record.ID) // 调用外部处理器 if err := pdb.performCooldown(record); err != nil { return err } // 更新状态为 cold 并保存回存储 record.Status = StatusCold if err := pdb.updateRecord(record); err != nil { return err } pdb.logger.Info("⚙️ 记录处理完成: warm -> cold", "id", record.ID) return nil } // performCooldown 调用外部处理器 func (pdb *PipelineDB) performCooldown(record *DataRecord) error { pdb.logger.Info("🔗 调用外部处理器处理记录", "id", record.ID) // 创建超时上下文 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // 调用外部处理器 processedData, err := pdb.handler.WillCold(ctx, record.Group, record.Data) if err != nil { pdb.logger.Error("❌ 外部处理器处理失败", "error", err) return err } // 更新处理后的数据 record.Data = processedData // 成功处理 pdb.logger.Info("✅ 外部处理器成功处理记录", "id", record.ID) // 检查组是否完成处理 pdb.checkGroupCompletion(record.Group) return nil } // rebuildIndexes 重建所有索引(用于数据库重启后恢复) func (pdb *PipelineDB) rebuildIndexes() error { // 如果没有根页面或总页数为0,跳过重建 if pdb.header.RootPage == 0 || pdb.header.TotalPages <= 1 { pdb.logger.Info("🔄 数据库为空,跳过索引重建") return nil } pdb.logger.Info("🔄 重建索引...") // 遍历所有数据页重建索引 pdb.logger.Info("📊 开始扫描页面", "totalPages", pdb.header.TotalPages, "rootPage", pdb.header.RootPage) recordCount := 0 // 从根页面开始扫描页链 for pageNo := pdb.header.RootPage; pageNo != 0 && pageNo < pdb.header.TotalPages; { // 读取页面 page, err := pdb.readPage(pageNo) if err != nil { pdb.logger.Info("⚠️ 跳过无法读取的页面", "pageNo", pageNo, "error", err) break // 页链断裂,停止扫描 } // 检查是否是数据页 if len(page) < 8 { pdb.logger.Info("⚠️ 跳过太小的页面", "pageNo", pageNo, "size", len(page)) continue } // 解析页面头部 numSlots := binary.LittleEndian.Uint16(page[0:2]) // 正确的槽数量 freeOff := binary.LittleEndian.Uint16(page[2:4]) nextPage := binary.LittleEndian.Uint16(page[4:6]) // 只在有数据时记录详细日志 if numSlots > 0 { pdb.logger.Info("📄 扫描页面", "pageNo", pageNo, "numSlots", numSlots, "freeOff", freeOff, "nextPage", nextPage) } if numSlots == 0 { pageNo = nextPage continue } // 遍历页面中的所有记录(使用槽数组) for slotNo := uint16(0); slotNo < numSlots && slotNo < 100; slotNo++ { // 限制最大槽数,避免无效数据 // 读取槽偏移(槽数组从偏移6开始) slotOffset := 6 + slotNo*2 if slotOffset+2 > uint16(len(page)) { pdb.logger.Info("⚠️ 槽偏移超出页面", "slotNo", slotNo, "slotOffset", slotOffset, "pageSize", len(page)) break } recordOffset := binary.LittleEndian.Uint16(page[slotOffset : slotOffset+2]) // 读取记录 if recordOffset+8 > uint16(len(page)) { continue } // 读取ID id := binary.LittleEndian.Uint64(page[recordOffset : recordOffset+8]) // 读取数据长度(变长编码) dataLen, n := binary.Uvarint(page[recordOffset+8:]) if n <= 0 { continue } // 读取数据 dataStart := recordOffset + 8 + uint16(n) dataEnd := dataStart + uint16(dataLen) if dataEnd > uint16(len(page)) { continue } // 解析JSON记录获取组名 var record DataRecord if err := record.FromBytes(page[dataStart:dataEnd]); err != nil { continue } // 添加到索引 idx := pdb.indexMgr.GetOrCreateIndex(record.Group) idx.Insert(int64(id), pageNo, slotNo) recordCount++ } // 移动到下一个页面 nextPageNo := Page(page).nextPage() if nextPageNo == 0 { break // 页链结束 } pageNo = nextPageNo } pdb.logger.Info("✅ 索引重建完成", "recordCount", recordCount) return nil }