Some checks failed
CI Pipeline / 测试和基准测试 (1.24.x) (push) Failing after 4s
CI Pipeline / 测试和基准测试 (1.25.x) (push) Failing after 4s
CI Pipeline / 构建验证 (amd64, darwin) (push) Has been skipped
CI Pipeline / 构建验证 (amd64, linux) (push) Has been skipped
CI Pipeline / 构建验证 (amd64, windows) (push) Has been skipped
CI Pipeline / 构建验证 (arm64, darwin) (push) Has been skipped
CI Pipeline / 构建验证 (arm64, linux) (push) Has been skipped
CI Pipeline / 安全扫描 (push) Failing after 4s
CI Pipeline / 代码质量检查 (push) Failing after 4s
CI Pipeline / 性能回归测试 (push) Has been skipped
CI Pipeline / 通知 (push) Failing after 1s
- 更新 examples/auto-processing/main.go 中的示例代码 - 在 .gitignore 中添加 examples/test-empty/ 目录忽略规则
Pipeline Database
一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。
🚀 核心特性
- 基于页面的存储引擎:高效的数据存储和检索,4KB页面对齐
- 分组数据管理:支持按业务组织数据,独立管理和统计
- 三级数据状态:Hot(热)→ Warm(温)→ Cold(冷)的自动流转
- 自定义处理器:支持用户实现 Handler 接口定制业务逻辑
- 高并发支持:线程安全的索引和存储操作
- 页面缓存:内置缓存系统提高数据访问性能
- 持久化存储:数据安全持久保存到磁盘
📊 数据流转模型
flowchart LR
A[新数据接收] --> B[Hot<br/>热数据]
B -->|预热处理<br/>Handler.WillWarm| C[Warm<br/>温数据]
C -->|冷却处理<br/>Handler.WillCold| D[Cold<br/>冷数据]
style B fill:#ff9999
style C fill:#ffcc99
style D fill:#99ccff
🛠️ 快速开始
安装
go get code.tczkiot.com/wlw/pipelinedb
基础使用
package main
import (
"context"
"log"
"code.tczkiot.com/wlw/pipelinedb"
)
// 实现自定义处理器
type MyHandler struct{}
func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) {
// 预热处理逻辑:数据验证、转换等
return append([]byte("processed_"), data...), nil
}
func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) {
// 冷却处理逻辑:数据压缩、归档等
return append(data, []byte("_archived")...), nil
}
func (h *MyHandler) OnComplete(ctx context.Context, group string) error {
// 组完成回调:清理、通知等
log.Printf("Group %s processing completed", group)
return nil
}
func main() {
// 打开数据库
pdb, err := pipelinedb.Open(pipelinedb.Options{
Filename: "my_pipeline.db",
Handler: &MyHandler{},
Config: pipelinedb.DefaultConfig(),
})
if err != nil {
log.Fatal(err)
}
defer pdb.Stop()
// 启动管道处理
pdb.Start()
// 接收数据
id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata")
if err != nil {
log.Fatal(err)
}
log.Printf("Data accepted with ID: %d", id)
// 查询数据
pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10}
response, err := pdb.GetRecordsByGroup("user_events", pageReq)
if err != nil {
log.Fatal(err)
}
for _, record := range response.Records {
log.Printf("Record ID: %d, Status: %s, Data: %s",
record.ID, record.Status, string(record.Data))
}
}
📚 API 文档
核心接口
Handler 接口
type Handler interface {
// 预热处理回调:Hot -> Warm
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
// 冷却处理回调:Warm -> Cold
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
// 组完成回调:所有数据处理完成
OnComplete(ctx context.Context, group string) error
}
主要方法
// 打开数据库
func Open(opts Options) (*PipelineDB, error)
// 接收数据
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error)
// 按组查询数据(分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error)
// 获取统计信息
func (pdb *PipelineDB) GetStats() *DatabaseStats
// 启动管道处理
func (pdb *PipelineDB) Start()
// 停止数据库
func (pdb *PipelineDB) Stop()
配置选项
type Config struct {
CacheSize int // 页缓存大小(页数)
SyncWrites bool // 是否同步写入
CreateIfMiss bool // 文件不存在时自动创建
WarmInterval time.Duration // 预热间隔
ProcessInterval time.Duration // 处理间隔
BatchSize int // 批处理大小
EnableMetrics bool // 启用性能指标
}
🎯 使用场景
数据管道处理
- ETL 流程:数据提取、转换、加载
- 数据清洗:数据验证、格式化、去重
- 数据转换:格式转换、数据映射
业务流程管理
- 订单处理:订单创建 → 支付确认 → 发货处理
- 用户行为分析:事件收集 → 数据处理 → 报告生成
- 审批流程:申请提交 → 审核处理 → 结果通知
实时数据处理
- 日志分析:日志收集 → 解析处理 → 存储归档
- 监控数据:指标收集 → 聚合计算 → 告警处理
- 消息队列:消息接收 → 业务处理 → 确认回复
📁 项目结构
pipelinedb/
├── README.md # 项目说明
├── go.mod # Go 模块定义
├── pipeline_db.go # 核心数据库实现
├── storage.go # 存储引擎
├── cache.go # 页面缓存
├── index.go # 索引管理
├── group_manager.go # 组管理器
├── counter.go # ID 计数器
├── examples/ # 使用示例
│ ├── basic-usage/ # 基础使用示例
│ ├── group-management/ # 组管理示例
│ ├── external-handler/ # 自定义处理器示例
│ ├── data-analytics/ # 数据分析示例
│ ├── concurrent-processing/ # 并发处理示例
│ └── high-concurrency/ # 高并发压力测试
└── *_test.go # 测试文件
🧪 运行示例
项目提供了多个完整的使用示例:
# 基础使用示例
cd examples/basic-usage && go run main.go
# 组管理示例
cd examples/group-management && go run main.go
# 自定义处理器示例
cd examples/external-handler && go run main.go
# 数据分析示例
cd examples/data-analytics && go run main.go
# 并发处理示例
cd examples/concurrent-processing && go run main.go
# 高并发压力测试
cd examples/high-concurrency && go run main.go
🔧 开发和测试
运行测试
# 运行所有测试
go test -v
# 运行特定测试
go test -run TestPipelineDB -v
# 运行基准测试
go test -bench=. -benchmem
# 生成测试覆盖率报告
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out
性能基准
在现代硬件上的典型性能表现:
- 写入性能:~50,000 QPS
- 读取性能:~100,000 QPS
- 内存使用:~1MB(256页缓存)
- 磁盘使用:高效的页面存储,支持TB级数据
🏗️ 架构设计
存储层
- 页面管理:4KB 页面,支持链式扩展
- 槽位目录:变长记录存储,空间高效利用
- 空闲页管理:智能页面回收和重用
索引层
- B+树索引:快速数据定位和范围查询
- 分组索引:独立的组级别索引管理
- 内存索引:热数据索引常驻内存
缓存层
- LRU 缓存:最近最少使用页面淘汰策略
- 写回缓存:批量写入提高性能
- 预读机制:顺序访问优化
并发控制
- 行级锁:细粒度锁定,提高并发性能
- 读写锁:读操作并发,写操作独占
- 无锁设计:关键路径避免锁竞争
🤝 贡献指南
欢迎提交 Issue 和 Pull Request!
- Fork 项目
- 创建特性分支:
git checkout -b feature/amazing-feature - 提交更改:
git commit -m 'Add amazing feature' - 推送分支:
git push origin feature/amazing-feature - 提交 Pull Request
📄 许可证
本项目采用 MIT 许可证。详见 LICENSE 文件。
🙏 致谢
感谢所有为这个项目做出贡献的开发者!
Pipeline Database - 让数据管道处理更简单、更高效! 🚀
Description
Languages
Go
96.6%
Makefile
3.4%