296 lines
8.1 KiB
Markdown
296 lines
8.1 KiB
Markdown
|
|
# Pipeline Database
|
|||
|
|
|
|||
|
|
[](https://golang.org/)
|
|||
|
|
[](LICENSE)
|
|||
|
|
[](https://code.tczkiot.com/wlw/pipelinedb)
|
|||
|
|
|
|||
|
|
一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。
|
|||
|
|
|
|||
|
|
## 🚀 核心特性
|
|||
|
|
|
|||
|
|
- **基于页面的存储引擎**:高效的数据存储和检索,4KB页面对齐
|
|||
|
|
- **分组数据管理**:支持按业务组织数据,独立管理和统计
|
|||
|
|
- **三级数据状态**:Hot(热)→ Warm(温)→ Cold(冷)的自动流转
|
|||
|
|
- **自定义处理器**:支持用户实现 Handler 接口定制业务逻辑
|
|||
|
|
- **高并发支持**:线程安全的索引和存储操作
|
|||
|
|
- **页面缓存**:内置缓存系统提高数据访问性能
|
|||
|
|
- **持久化存储**:数据安全持久保存到磁盘
|
|||
|
|
|
|||
|
|
## 📊 数据流转模型
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
flowchart LR
|
|||
|
|
|
|||
|
|
A[新数据接收] --> B[Hot<br/>热数据]
|
|||
|
|
B -->|预热处理<br/>Handler.WillWarm| C[Warm<br/>温数据]
|
|||
|
|
C -->|冷却处理<br/>Handler.WillCold| D[Cold<br/>冷数据]
|
|||
|
|
|
|||
|
|
style B fill:#ff9999
|
|||
|
|
style C fill:#ffcc99
|
|||
|
|
style D fill:#99ccff
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 🛠️ 快速开始
|
|||
|
|
|
|||
|
|
### 安装
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
go get code.tczkiot.com/wlw/pipelinedb
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 基础使用
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
package main
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"log"
|
|||
|
|
"code.tczkiot.com/wlw/pipelinedb"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 实现自定义处理器
|
|||
|
|
type MyHandler struct{}
|
|||
|
|
|
|||
|
|
func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) {
|
|||
|
|
// 预热处理逻辑:数据验证、转换等
|
|||
|
|
return append([]byte("processed_"), data...), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) {
|
|||
|
|
// 冷却处理逻辑:数据压缩、归档等
|
|||
|
|
return append(data, []byte("_archived")...), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (h *MyHandler) OnComplete(ctx context.Context, group string) error {
|
|||
|
|
// 组完成回调:清理、通知等
|
|||
|
|
log.Printf("Group %s processing completed", group)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func main() {
|
|||
|
|
// 打开数据库
|
|||
|
|
pdb, err := pipelinedb.Open(pipelinedb.Options{
|
|||
|
|
Filename: "my_pipeline.db",
|
|||
|
|
Handler: &MyHandler{},
|
|||
|
|
Config: pipelinedb.DefaultConfig(),
|
|||
|
|
})
|
|||
|
|
if err != nil {
|
|||
|
|
log.Fatal(err)
|
|||
|
|
}
|
|||
|
|
defer pdb.Stop()
|
|||
|
|
|
|||
|
|
// 启动管道处理
|
|||
|
|
pdb.Start()
|
|||
|
|
|
|||
|
|
// 接收数据
|
|||
|
|
id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata")
|
|||
|
|
if err != nil {
|
|||
|
|
log.Fatal(err)
|
|||
|
|
}
|
|||
|
|
log.Printf("Data accepted with ID: %d", id)
|
|||
|
|
|
|||
|
|
// 查询数据
|
|||
|
|
pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10}
|
|||
|
|
response, err := pdb.GetRecordsByGroup("user_events", pageReq)
|
|||
|
|
if err != nil {
|
|||
|
|
log.Fatal(err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for _, record := range response.Records {
|
|||
|
|
log.Printf("Record ID: %d, Status: %s, Data: %s",
|
|||
|
|
record.ID, record.Status, string(record.Data))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 📚 API 文档
|
|||
|
|
|
|||
|
|
### 核心接口
|
|||
|
|
|
|||
|
|
#### Handler 接口
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
type Handler interface {
|
|||
|
|
// 预热处理回调:Hot -> Warm
|
|||
|
|
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
|
|||
|
|
|
|||
|
|
// 冷却处理回调:Warm -> Cold
|
|||
|
|
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
|
|||
|
|
|
|||
|
|
// 组完成回调:所有数据处理完成
|
|||
|
|
OnComplete(ctx context.Context, group string) error
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 主要方法
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
// 打开数据库
|
|||
|
|
func Open(opts Options) (*PipelineDB, error)
|
|||
|
|
|
|||
|
|
// 接收数据
|
|||
|
|
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error)
|
|||
|
|
|
|||
|
|
// 按组查询数据(分页)
|
|||
|
|
func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error)
|
|||
|
|
|
|||
|
|
// 获取统计信息
|
|||
|
|
func (pdb *PipelineDB) GetStats() *DatabaseStats
|
|||
|
|
|
|||
|
|
// 启动管道处理
|
|||
|
|
func (pdb *PipelineDB) Start()
|
|||
|
|
|
|||
|
|
// 停止数据库
|
|||
|
|
func (pdb *PipelineDB) Stop()
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 配置选项
|
|||
|
|
|
|||
|
|
```go
|
|||
|
|
type Config struct {
|
|||
|
|
CacheSize int // 页缓存大小(页数)
|
|||
|
|
SyncWrites bool // 是否同步写入
|
|||
|
|
CreateIfMiss bool // 文件不存在时自动创建
|
|||
|
|
WarmInterval time.Duration // 预热间隔
|
|||
|
|
ProcessInterval time.Duration // 处理间隔
|
|||
|
|
BatchSize int // 批处理大小
|
|||
|
|
EnableMetrics bool // 启用性能指标
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 🎯 使用场景
|
|||
|
|
|
|||
|
|
### 数据管道处理
|
|||
|
|
- **ETL 流程**:数据提取、转换、加载
|
|||
|
|
- **数据清洗**:数据验证、格式化、去重
|
|||
|
|
- **数据转换**:格式转换、数据映射
|
|||
|
|
|
|||
|
|
### 业务流程管理
|
|||
|
|
- **订单处理**:订单创建 → 支付确认 → 发货处理
|
|||
|
|
- **用户行为分析**:事件收集 → 数据处理 → 报告生成
|
|||
|
|
- **审批流程**:申请提交 → 审核处理 → 结果通知
|
|||
|
|
|
|||
|
|
### 实时数据处理
|
|||
|
|
- **日志分析**:日志收集 → 解析处理 → 存储归档
|
|||
|
|
- **监控数据**:指标收集 → 聚合计算 → 告警处理
|
|||
|
|
- **消息队列**:消息接收 → 业务处理 → 确认回复
|
|||
|
|
|
|||
|
|
## 📁 项目结构
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
pipelinedb/
|
|||
|
|
├── README.md # 项目说明
|
|||
|
|
├── go.mod # Go 模块定义
|
|||
|
|
├── pipeline_db.go # 核心数据库实现
|
|||
|
|
├── storage.go # 存储引擎
|
|||
|
|
├── cache.go # 页面缓存
|
|||
|
|
├── index.go # 索引管理
|
|||
|
|
├── group_manager.go # 组管理器
|
|||
|
|
├── counter.go # ID 计数器
|
|||
|
|
├── examples/ # 使用示例
|
|||
|
|
│ ├── basic-usage/ # 基础使用示例
|
|||
|
|
│ ├── group-management/ # 组管理示例
|
|||
|
|
│ ├── external-handler/ # 自定义处理器示例
|
|||
|
|
│ ├── data-analytics/ # 数据分析示例
|
|||
|
|
│ ├── concurrent-processing/ # 并发处理示例
|
|||
|
|
│ └── high-concurrency/ # 高并发压力测试
|
|||
|
|
└── *_test.go # 测试文件
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 🧪 运行示例
|
|||
|
|
|
|||
|
|
项目提供了多个完整的使用示例:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 基础使用示例
|
|||
|
|
cd examples/basic-usage && go run main.go
|
|||
|
|
|
|||
|
|
# 组管理示例
|
|||
|
|
cd examples/group-management && go run main.go
|
|||
|
|
|
|||
|
|
# 自定义处理器示例
|
|||
|
|
cd examples/external-handler && go run main.go
|
|||
|
|
|
|||
|
|
# 数据分析示例
|
|||
|
|
cd examples/data-analytics && go run main.go
|
|||
|
|
|
|||
|
|
# 并发处理示例
|
|||
|
|
cd examples/concurrent-processing && go run main.go
|
|||
|
|
|
|||
|
|
# 高并发压力测试
|
|||
|
|
cd examples/high-concurrency && go run main.go
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 🔧 开发和测试
|
|||
|
|
|
|||
|
|
### 运行测试
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 运行所有测试
|
|||
|
|
go test -v
|
|||
|
|
|
|||
|
|
# 运行特定测试
|
|||
|
|
go test -run TestPipelineDB -v
|
|||
|
|
|
|||
|
|
# 运行基准测试
|
|||
|
|
go test -bench=. -benchmem
|
|||
|
|
|
|||
|
|
# 生成测试覆盖率报告
|
|||
|
|
go test -cover -coverprofile=coverage.out
|
|||
|
|
go tool cover -html=coverage.out
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 性能基准
|
|||
|
|
|
|||
|
|
在现代硬件上的典型性能表现:
|
|||
|
|
|
|||
|
|
- **写入性能**:~50,000 QPS
|
|||
|
|
- **读取性能**:~100,000 QPS
|
|||
|
|
- **内存使用**:~1MB(256页缓存)
|
|||
|
|
- **磁盘使用**:高效的页面存储,支持TB级数据
|
|||
|
|
|
|||
|
|
## 🏗️ 架构设计
|
|||
|
|
|
|||
|
|
### 存储层
|
|||
|
|
- **页面管理**:4KB 页面,支持链式扩展
|
|||
|
|
- **槽位目录**:变长记录存储,空间高效利用
|
|||
|
|
- **空闲页管理**:智能页面回收和重用
|
|||
|
|
|
|||
|
|
### 索引层
|
|||
|
|
- **B+树索引**:快速数据定位和范围查询
|
|||
|
|
- **分组索引**:独立的组级别索引管理
|
|||
|
|
- **内存索引**:热数据索引常驻内存
|
|||
|
|
|
|||
|
|
### 缓存层
|
|||
|
|
- **LRU 缓存**:最近最少使用页面淘汰策略
|
|||
|
|
- **写回缓存**:批量写入提高性能
|
|||
|
|
- **预读机制**:顺序访问优化
|
|||
|
|
|
|||
|
|
### 并发控制
|
|||
|
|
- **行级锁**:细粒度锁定,提高并发性能
|
|||
|
|
- **读写锁**:读操作并发,写操作独占
|
|||
|
|
- **无锁设计**:关键路径避免锁竞争
|
|||
|
|
|
|||
|
|
## 🤝 贡献指南
|
|||
|
|
|
|||
|
|
欢迎提交 Issue 和 Pull Request!
|
|||
|
|
|
|||
|
|
1. Fork 项目
|
|||
|
|
2. 创建特性分支:`git checkout -b feature/amazing-feature`
|
|||
|
|
3. 提交更改:`git commit -m 'Add amazing feature'`
|
|||
|
|
4. 推送分支:`git push origin feature/amazing-feature`
|
|||
|
|
5. 提交 Pull Request
|
|||
|
|
|
|||
|
|
## 📄 许可证
|
|||
|
|
|
|||
|
|
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
|
|||
|
|
|
|||
|
|
## 🙏 致谢
|
|||
|
|
|
|||
|
|
感谢所有为这个项目做出贡献的开发者!
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
**Pipeline Database** - 让数据管道处理更简单、更高效! 🚀
|