Files
pipelinedb/README.md
2025-09-30 15:05:56 +08:00

296 lines
8.1 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Pipeline Database
[![Go Version](https://img.shields.io/badge/Go-1.21+-00ADD8?style=flat&logo=go)](https://golang.org/)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
[![Build Status](https://img.shields.io/badge/Build-Passing-brightgreen.svg)](https://code.tczkiot.com/wlw/pipelinedb)
一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。
## 🚀 核心特性
- **基于页面的存储引擎**高效的数据存储和检索4KB页面对齐
- **分组数据管理**:支持按业务组织数据,独立管理和统计
- **三级数据状态**Hot→ Warm→ Cold的自动流转
- **自定义处理器**:支持用户实现 Handler 接口定制业务逻辑
- **高并发支持**:线程安全的索引和存储操作
- **页面缓存**:内置缓存系统提高数据访问性能
- **持久化存储**:数据安全持久保存到磁盘
## 📊 数据流转模型
```mermaid
flowchart LR
A[新数据接收] --> B[Hot<br/>热数据]
B -->|预热处理<br/>Handler.WillWarm| C[Warm<br/>温数据]
C -->|冷却处理<br/>Handler.WillCold| D[Cold<br/>冷数据]
style B fill:#ff9999
style C fill:#ffcc99
style D fill:#99ccff
```
## 🛠️ 快速开始
### 安装
```bash
go get code.tczkiot.com/wlw/pipelinedb
```
### 基础使用
```go
package main
import (
"context"
"log"
"code.tczkiot.com/wlw/pipelinedb"
)
// 实现自定义处理器
type MyHandler struct{}
func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) {
// 预热处理逻辑:数据验证、转换等
return append([]byte("processed_"), data...), nil
}
func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) {
// 冷却处理逻辑:数据压缩、归档等
return append(data, []byte("_archived")...), nil
}
func (h *MyHandler) OnComplete(ctx context.Context, group string) error {
// 组完成回调:清理、通知等
log.Printf("Group %s processing completed", group)
return nil
}
func main() {
// 打开数据库
pdb, err := pipelinedb.Open(pipelinedb.Options{
Filename: "my_pipeline.db",
Handler: &MyHandler{},
Config: pipelinedb.DefaultConfig(),
})
if err != nil {
log.Fatal(err)
}
defer pdb.Stop()
// 启动管道处理
pdb.Start()
// 接收数据
id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata")
if err != nil {
log.Fatal(err)
}
log.Printf("Data accepted with ID: %d", id)
// 查询数据
pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10}
response, err := pdb.GetRecordsByGroup("user_events", pageReq)
if err != nil {
log.Fatal(err)
}
for _, record := range response.Records {
log.Printf("Record ID: %d, Status: %s, Data: %s",
record.ID, record.Status, string(record.Data))
}
}
```
## 📚 API 文档
### 核心接口
#### Handler 接口
```go
type Handler interface {
// 预热处理回调Hot -> Warm
WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
// 冷却处理回调Warm -> Cold
WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
// 组完成回调:所有数据处理完成
OnComplete(ctx context.Context, group string) error
}
```
#### 主要方法
```go
// 打开数据库
func Open(opts Options) (*PipelineDB, error)
// 接收数据
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error)
// 按组查询数据(分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error)
// 获取统计信息
func (pdb *PipelineDB) GetStats() *DatabaseStats
// 启动管道处理
func (pdb *PipelineDB) Start()
// 停止数据库
func (pdb *PipelineDB) Stop()
```
### 配置选项
```go
type Config struct {
CacheSize int // 页缓存大小(页数)
SyncWrites bool // 是否同步写入
CreateIfMiss bool // 文件不存在时自动创建
WarmInterval time.Duration // 预热间隔
ProcessInterval time.Duration // 处理间隔
BatchSize int // 批处理大小
EnableMetrics bool // 启用性能指标
}
```
## 🎯 使用场景
### 数据管道处理
- **ETL 流程**:数据提取、转换、加载
- **数据清洗**:数据验证、格式化、去重
- **数据转换**:格式转换、数据映射
### 业务流程管理
- **订单处理**:订单创建 → 支付确认 → 发货处理
- **用户行为分析**:事件收集 → 数据处理 → 报告生成
- **审批流程**:申请提交 → 审核处理 → 结果通知
### 实时数据处理
- **日志分析**:日志收集 → 解析处理 → 存储归档
- **监控数据**:指标收集 → 聚合计算 → 告警处理
- **消息队列**:消息接收 → 业务处理 → 确认回复
## 📁 项目结构
```
pipelinedb/
├── README.md # 项目说明
├── go.mod # Go 模块定义
├── pipeline_db.go # 核心数据库实现
├── storage.go # 存储引擎
├── cache.go # 页面缓存
├── index.go # 索引管理
├── group_manager.go # 组管理器
├── counter.go # ID 计数器
├── examples/ # 使用示例
│ ├── basic-usage/ # 基础使用示例
│ ├── group-management/ # 组管理示例
│ ├── external-handler/ # 自定义处理器示例
│ ├── data-analytics/ # 数据分析示例
│ ├── concurrent-processing/ # 并发处理示例
│ └── high-concurrency/ # 高并发压力测试
└── *_test.go # 测试文件
```
## 🧪 运行示例
项目提供了多个完整的使用示例:
```bash
# 基础使用示例
cd examples/basic-usage && go run main.go
# 组管理示例
cd examples/group-management && go run main.go
# 自定义处理器示例
cd examples/external-handler && go run main.go
# 数据分析示例
cd examples/data-analytics && go run main.go
# 并发处理示例
cd examples/concurrent-processing && go run main.go
# 高并发压力测试
cd examples/high-concurrency && go run main.go
```
## 🔧 开发和测试
### 运行测试
```bash
# 运行所有测试
go test -v
# 运行特定测试
go test -run TestPipelineDB -v
# 运行基准测试
go test -bench=. -benchmem
# 生成测试覆盖率报告
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out
```
### 性能基准
在现代硬件上的典型性能表现:
- **写入性能**~50,000 QPS
- **读取性能**~100,000 QPS
- **内存使用**~1MB256页缓存
- **磁盘使用**高效的页面存储支持TB级数据
## 🏗️ 架构设计
### 存储层
- **页面管理**4KB 页面,支持链式扩展
- **槽位目录**:变长记录存储,空间高效利用
- **空闲页管理**:智能页面回收和重用
### 索引层
- **B+树索引**:快速数据定位和范围查询
- **分组索引**:独立的组级别索引管理
- **内存索引**:热数据索引常驻内存
### 缓存层
- **LRU 缓存**:最近最少使用页面淘汰策略
- **写回缓存**:批量写入提高性能
- **预读机制**:顺序访问优化
### 并发控制
- **行级锁**:细粒度锁定,提高并发性能
- **读写锁**:读操作并发,写操作独占
- **无锁设计**:关键路径避免锁竞争
## 🤝 贡献指南
欢迎提交 Issue 和 Pull Request
1. Fork 项目
2. 创建特性分支:`git checkout -b feature/amazing-feature`
3. 提交更改:`git commit -m 'Add amazing feature'`
4. 推送分支:`git push origin feature/amazing-feature`
5. 提交 Pull Request
## 📄 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
## 🙏 致谢
感谢所有为这个项目做出贡献的开发者!
---
**Pipeline Database** - 让数据管道处理更简单、更高效! 🚀