# Pipeline Database [![Go Version](https://img.shields.io/badge/Go-1.21+-00ADD8?style=flat&logo=go)](https://golang.org/) [![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) [![Build Status](https://img.shields.io/badge/Build-Passing-brightgreen.svg)](https://code.tczkiot.com/wlw/pipelinedb) 一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。 ## 🚀 核心特性 - **基于页面的存储引擎**:高效的数据存储和检索,4KB页面对齐 - **分组数据管理**:支持按业务组织数据,独立管理和统计 - **三级数据状态**:Hot(热)→ Warm(温)→ Cold(冷)的自动流转 - **自定义处理器**:支持用户实现 Handler 接口定制业务逻辑 - **高并发支持**:线程安全的索引和存储操作 - **页面缓存**:内置缓存系统提高数据访问性能 - **持久化存储**:数据安全持久保存到磁盘 ## 📊 数据流转模型 ```mermaid flowchart LR A[新数据接收] --> B[Hot
热数据] B -->|预热处理
Handler.WillWarm| C[Warm
温数据] C -->|冷却处理
Handler.WillCold| D[Cold
冷数据] style B fill:#ff9999 style C fill:#ffcc99 style D fill:#99ccff ``` ## 🛠️ 快速开始 ### 安装 ```bash go get code.tczkiot.com/wlw/pipelinedb ``` ### 基础使用 ```go package main import ( "context" "log" "code.tczkiot.com/wlw/pipelinedb" ) // 实现自定义处理器 type MyHandler struct{} func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) { // 预热处理逻辑:数据验证、转换等 return append([]byte("processed_"), data...), nil } func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) { // 冷却处理逻辑:数据压缩、归档等 return append(data, []byte("_archived")...), nil } func (h *MyHandler) OnComplete(ctx context.Context, group string) error { // 组完成回调:清理、通知等 log.Printf("Group %s processing completed", group) return nil } func main() { // 打开数据库 pdb, err := pipelinedb.Open(pipelinedb.Options{ Filename: "my_pipeline.db", Handler: &MyHandler{}, Config: pipelinedb.DefaultConfig(), }) if err != nil { log.Fatal(err) } defer pdb.Stop() // 启动管道处理 pdb.Start() // 接收数据 id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata") if err != nil { log.Fatal(err) } log.Printf("Data accepted with ID: %d", id) // 查询数据 pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10} response, err := pdb.GetRecordsByGroup("user_events", pageReq) if err != nil { log.Fatal(err) } for _, record := range response.Records { log.Printf("Record ID: %d, Status: %s, Data: %s", record.ID, record.Status, string(record.Data)) } } ``` ## 📚 API 文档 ### 核心接口 #### Handler 接口 ```go type Handler interface { // 预热处理回调:Hot -> Warm WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) // 冷却处理回调:Warm -> Cold WillCold(ctx context.Context, group string, data []byte) ([]byte, error) // 组完成回调:所有数据处理完成 OnComplete(ctx context.Context, group string) error } ``` #### 主要方法 ```go // 打开数据库 func Open(opts Options) (*PipelineDB, error) // 接收数据 func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error) // 按组查询数据(分页) func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error) // 获取统计信息 func (pdb *PipelineDB) GetStats() *DatabaseStats // 启动管道处理 func (pdb *PipelineDB) Start() // 停止数据库 func (pdb *PipelineDB) Stop() ``` ### 配置选项 ```go type Config struct { CacheSize int // 页缓存大小(页数) SyncWrites bool // 是否同步写入 CreateIfMiss bool // 文件不存在时自动创建 WarmInterval time.Duration // 预热间隔 ProcessInterval time.Duration // 处理间隔 BatchSize int // 批处理大小 EnableMetrics bool // 启用性能指标 } ``` ## 🎯 使用场景 ### 数据管道处理 - **ETL 流程**:数据提取、转换、加载 - **数据清洗**:数据验证、格式化、去重 - **数据转换**:格式转换、数据映射 ### 业务流程管理 - **订单处理**:订单创建 → 支付确认 → 发货处理 - **用户行为分析**:事件收集 → 数据处理 → 报告生成 - **审批流程**:申请提交 → 审核处理 → 结果通知 ### 实时数据处理 - **日志分析**:日志收集 → 解析处理 → 存储归档 - **监控数据**:指标收集 → 聚合计算 → 告警处理 - **消息队列**:消息接收 → 业务处理 → 确认回复 ## 📁 项目结构 ``` pipelinedb/ ├── README.md # 项目说明 ├── go.mod # Go 模块定义 ├── pipeline_db.go # 核心数据库实现 ├── storage.go # 存储引擎 ├── cache.go # 页面缓存 ├── index.go # 索引管理 ├── group_manager.go # 组管理器 ├── counter.go # ID 计数器 ├── examples/ # 使用示例 │ ├── basic-usage/ # 基础使用示例 │ ├── group-management/ # 组管理示例 │ ├── external-handler/ # 自定义处理器示例 │ ├── data-analytics/ # 数据分析示例 │ ├── concurrent-processing/ # 并发处理示例 │ └── high-concurrency/ # 高并发压力测试 └── *_test.go # 测试文件 ``` ## 🧪 运行示例 项目提供了多个完整的使用示例: ```bash # 基础使用示例 cd examples/basic-usage && go run main.go # 组管理示例 cd examples/group-management && go run main.go # 自定义处理器示例 cd examples/external-handler && go run main.go # 数据分析示例 cd examples/data-analytics && go run main.go # 并发处理示例 cd examples/concurrent-processing && go run main.go # 高并发压力测试 cd examples/high-concurrency && go run main.go ``` ## 🔧 开发和测试 ### 运行测试 ```bash # 运行所有测试 go test -v # 运行特定测试 go test -run TestPipelineDB -v # 运行基准测试 go test -bench=. -benchmem # 生成测试覆盖率报告 go test -cover -coverprofile=coverage.out go tool cover -html=coverage.out ``` ### 性能基准 在现代硬件上的典型性能表现: - **写入性能**:~50,000 QPS - **读取性能**:~100,000 QPS - **内存使用**:~1MB(256页缓存) - **磁盘使用**:高效的页面存储,支持TB级数据 ## 🏗️ 架构设计 ### 存储层 - **页面管理**:4KB 页面,支持链式扩展 - **槽位目录**:变长记录存储,空间高效利用 - **空闲页管理**:智能页面回收和重用 ### 索引层 - **B+树索引**:快速数据定位和范围查询 - **分组索引**:独立的组级别索引管理 - **内存索引**:热数据索引常驻内存 ### 缓存层 - **LRU 缓存**:最近最少使用页面淘汰策略 - **写回缓存**:批量写入提高性能 - **预读机制**:顺序访问优化 ### 并发控制 - **行级锁**:细粒度锁定,提高并发性能 - **读写锁**:读操作并发,写操作独占 - **无锁设计**:关键路径避免锁竞争 ## 🤝 贡献指南 欢迎提交 Issue 和 Pull Request! 1. Fork 项目 2. 创建特性分支:`git checkout -b feature/amazing-feature` 3. 提交更改:`git commit -m 'Add amazing feature'` 4. 推送分支:`git push origin feature/amazing-feature` 5. 提交 Pull Request ## 📄 许可证 本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。 ## 🙏 致谢 感谢所有为这个项目做出贡献的开发者! --- **Pipeline Database** - 让数据管道处理更简单、更高效! 🚀