# Pipeline Database
[](https://golang.org/)
[](LICENSE)
[](https://code.tczkiot.com/wlw/pipelinedb)
一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。
## 🚀 核心特性
- **基于页面的存储引擎**:高效的数据存储和检索,4KB页面对齐
- **分组数据管理**:支持按业务组织数据,独立管理和统计
- **三级数据状态**:Hot(热)→ Warm(温)→ Cold(冷)的自动流转
- **自定义处理器**:支持用户实现 Handler 接口定制业务逻辑
- **高并发支持**:线程安全的索引和存储操作
- **页面缓存**:内置缓存系统提高数据访问性能
- **持久化存储**:数据安全持久保存到磁盘
## 📊 数据流转模型
```mermaid
flowchart LR
A[新数据接收] --> B[Hot
热数据]
B -->|预热处理
Handler.WillWarm| C[Warm
温数据]
C -->|冷却处理
Handler.WillCold| D[Cold
冷数据]
style B fill:#ff9999
style C fill:#ffcc99
style D fill:#99ccff
```
## 🛠️ 快速开始
### 安装
```bash
go get code.tczkiot.com/wlw/pipelinedb
```
### 基础使用
```go
package main
import (
"context"
"log"
"code.tczkiot.com/wlw/pipelinedb"
)
// 实现自定义处理器
type MyHandler struct{}
func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) {
// 预热处理逻辑:数据验证、转换等
return append([]byte("processed_"), data...), nil
}
func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) {
// 冷却处理逻辑:数据压缩、归档等
return append(data, []byte("_archived")...), nil
}
func (h *MyHandler) OnComplete(ctx context.Context, group string) error {
// 组完成回调:清理、通知等
log.Printf("Group %s processing completed", group)
return nil
}
func main() {
// 打开数据库
pdb, err := pipelinedb.Open(pipelinedb.Options{
Filename: "my_pipeline.db",
Handler: &MyHandler{},
Config: pipelinedb.DefaultConfig(),
})
if err != nil {
log.Fatal(err)
}
defer pdb.Stop()
// 启动管道处理
pdb.Start()
// 接收数据
id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata")
if err != nil {
log.Fatal(err)
}
log.Printf("Data accepted with ID: %d", id)
// 查询数据
pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10}
response, err := pdb.GetRecordsByGroup("user_events", pageReq)
if err != nil {
log.Fatal(err)
}
for _, record := range response.Records {
log.Printf("Record ID: %d, Status: %s, Data: %s",
record.ID, record.Status, string(record.Data))
}
}
```
## 📚 API 文档
### 核心接口
#### Handler 接口
```go
type Handler interface {
// 预热处理回调:Hot -> Warm
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
// 冷却处理回调:Warm -> Cold
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
// 组完成回调:所有数据处理完成
OnComplete(ctx context.Context, group string) error
}
```
#### 主要方法
```go
// 打开数据库
func Open(opts Options) (*PipelineDB, error)
// 接收数据
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error)
// 按组查询数据(分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error)
// 获取统计信息
func (pdb *PipelineDB) GetStats() *DatabaseStats
// 启动管道处理
func (pdb *PipelineDB) Start()
// 停止数据库
func (pdb *PipelineDB) Stop()
```
### 配置选项
```go
type Config struct {
CacheSize int // 页缓存大小(页数)
SyncWrites bool // 是否同步写入
CreateIfMiss bool // 文件不存在时自动创建
WarmInterval time.Duration // 预热间隔
ProcessInterval time.Duration // 处理间隔
BatchSize int // 批处理大小
EnableMetrics bool // 启用性能指标
}
```
## 🎯 使用场景
### 数据管道处理
- **ETL 流程**:数据提取、转换、加载
- **数据清洗**:数据验证、格式化、去重
- **数据转换**:格式转换、数据映射
### 业务流程管理
- **订单处理**:订单创建 → 支付确认 → 发货处理
- **用户行为分析**:事件收集 → 数据处理 → 报告生成
- **审批流程**:申请提交 → 审核处理 → 结果通知
### 实时数据处理
- **日志分析**:日志收集 → 解析处理 → 存储归档
- **监控数据**:指标收集 → 聚合计算 → 告警处理
- **消息队列**:消息接收 → 业务处理 → 确认回复
## 📁 项目结构
```
pipelinedb/
├── README.md # 项目说明
├── go.mod # Go 模块定义
├── pipeline_db.go # 核心数据库实现
├── storage.go # 存储引擎
├── cache.go # 页面缓存
├── index.go # 索引管理
├── group_manager.go # 组管理器
├── counter.go # ID 计数器
├── examples/ # 使用示例
│ ├── basic-usage/ # 基础使用示例
│ ├── group-management/ # 组管理示例
│ ├── external-handler/ # 自定义处理器示例
│ ├── data-analytics/ # 数据分析示例
│ ├── concurrent-processing/ # 并发处理示例
│ └── high-concurrency/ # 高并发压力测试
└── *_test.go # 测试文件
```
## 🧪 运行示例
项目提供了多个完整的使用示例:
```bash
# 基础使用示例
cd examples/basic-usage && go run main.go
# 组管理示例
cd examples/group-management && go run main.go
# 自定义处理器示例
cd examples/external-handler && go run main.go
# 数据分析示例
cd examples/data-analytics && go run main.go
# 并发处理示例
cd examples/concurrent-processing && go run main.go
# 高并发压力测试
cd examples/high-concurrency && go run main.go
```
## 🔧 开发和测试
### 运行测试
```bash
# 运行所有测试
go test -v
# 运行特定测试
go test -run TestPipelineDB -v
# 运行基准测试
go test -bench=. -benchmem
# 生成测试覆盖率报告
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out
```
### 性能基准
在现代硬件上的典型性能表现:
- **写入性能**:~50,000 QPS
- **读取性能**:~100,000 QPS
- **内存使用**:~1MB(256页缓存)
- **磁盘使用**:高效的页面存储,支持TB级数据
## 🏗️ 架构设计
### 存储层
- **页面管理**:4KB 页面,支持链式扩展
- **槽位目录**:变长记录存储,空间高效利用
- **空闲页管理**:智能页面回收和重用
### 索引层
- **B+树索引**:快速数据定位和范围查询
- **分组索引**:独立的组级别索引管理
- **内存索引**:热数据索引常驻内存
### 缓存层
- **LRU 缓存**:最近最少使用页面淘汰策略
- **写回缓存**:批量写入提高性能
- **预读机制**:顺序访问优化
### 并发控制
- **行级锁**:细粒度锁定,提高并发性能
- **读写锁**:读操作并发,写操作独占
- **无锁设计**:关键路径避免锁竞争
## 🤝 贡献指南
欢迎提交 Issue 和 Pull Request!
1. Fork 项目
2. 创建特性分支:`git checkout -b feature/amazing-feature`
3. 提交更改:`git commit -m 'Add amazing feature'`
4. 推送分支:`git push origin feature/amazing-feature`
5. 提交 Pull Request
## 📄 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
## 🙏 致谢
感谢所有为这个项目做出贡献的开发者!
---
**Pipeline Database** - 让数据管道处理更简单、更高效! 🚀