Pipeline Database db6721300a
Some checks failed
CI Pipeline / 测试和基准测试 (1.24.x) (push) Failing after 4s
CI Pipeline / 测试和基准测试 (1.25.x) (push) Failing after 4s
CI Pipeline / 构建验证 (amd64, darwin) (push) Has been skipped
CI Pipeline / 构建验证 (amd64, linux) (push) Has been skipped
CI Pipeline / 构建验证 (amd64, windows) (push) Has been skipped
CI Pipeline / 构建验证 (arm64, darwin) (push) Has been skipped
CI Pipeline / 构建验证 (arm64, linux) (push) Has been skipped
CI Pipeline / 安全扫描 (push) Failing after 4s
CI Pipeline / 代码质量检查 (push) Failing after 4s
CI Pipeline / 性能回归测试 (push) Has been skipped
CI Pipeline / 通知 (push) Failing after 1s
更新自动处理示例和忽略测试目录
- 更新 examples/auto-processing/main.go 中的示例代码
- 在 .gitignore 中添加 examples/test-empty/ 目录忽略规则
2025-09-30 18:40:19 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 17:48:28 +08:00
2025-09-30 15:05:56 +08:00
2025-09-30 15:05:56 +08:00

Pipeline Database

Go Version License Build Status

一个集成了数据库存储和业务管道处理的一体化解决方案,专为数据管道处理场景设计。

🚀 核心特性

  • 基于页面的存储引擎高效的数据存储和检索4KB页面对齐
  • 分组数据管理:支持按业务组织数据,独立管理和统计
  • 三级数据状态Hot→ Warm→ Cold的自动流转
  • 自定义处理器:支持用户实现 Handler 接口定制业务逻辑
  • 高并发支持:线程安全的索引和存储操作
  • 页面缓存:内置缓存系统提高数据访问性能
  • 持久化存储:数据安全持久保存到磁盘

📊 数据流转模型

flowchart LR

    A[新数据接收] --> B[Hot<br/>热数据]
    B -->|预热处理<br/>Handler.WillWarm| C[Warm<br/>温数据]
    C -->|冷却处理<br/>Handler.WillCold| D[Cold<br/>冷数据]
    
    style B fill:#ff9999
    style C fill:#ffcc99
    style D fill:#99ccff

🛠️ 快速开始

安装

go get code.tczkiot.com/wlw/pipelinedb

基础使用

package main

import (
    "context"
    "log"
    "code.tczkiot.com/wlw/pipelinedb"
)

// 实现自定义处理器
type MyHandler struct{}

func (h *MyHandler) WillWarm(ctx context.Context, group string, data []byte) ([]byte, error) {
    // 预热处理逻辑:数据验证、转换等
    return append([]byte("processed_"), data...), nil
}

func (h *MyHandler) WillCold(ctx context.Context, group string, data []byte) ([]byte, error) {
    // 冷却处理逻辑:数据压缩、归档等
    return append(data, []byte("_archived")...), nil
}

func (h *MyHandler) OnComplete(ctx context.Context, group string) error {
    // 组完成回调:清理、通知等
    log.Printf("Group %s processing completed", group)
    return nil
}

func main() {
    // 打开数据库
    pdb, err := pipelinedb.Open(pipelinedb.Options{
        Filename: "my_pipeline.db",
        Handler:  &MyHandler{},
        Config:   pipelinedb.DefaultConfig(),
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pdb.Stop()

    // 启动管道处理
    pdb.Start()

    // 接收数据
    id, err := pdb.AcceptData("user_events", []byte("user clicked button"), "metadata")
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Data accepted with ID: %d", id)

    // 查询数据
    pageReq := &pipelinedb.PageRequest{Page: 1, PageSize: 10}
    response, err := pdb.GetRecordsByGroup("user_events", pageReq)
    if err != nil {
        log.Fatal(err)
    }
    
    for _, record := range response.Records {
        log.Printf("Record ID: %d, Status: %s, Data: %s", 
            record.ID, record.Status, string(record.Data))
    }
}

📚 API 文档

核心接口

Handler 接口

type Handler interface {
    // 预热处理回调Hot -> Warm
    WillWarm(ctx context.Context, group string, data []byte) ([]byte, error)
    
    // 冷却处理回调Warm -> Cold  
    WillCold(ctx context.Context, group string, data []byte) ([]byte, error)
    
    // 组完成回调:所有数据处理完成
    OnComplete(ctx context.Context, group string) error
}

主要方法

// 打开数据库
func Open(opts Options) (*PipelineDB, error)

// 接收数据
func (pdb *PipelineDB) AcceptData(group string, data []byte, metadata string) (int64, error)

// 按组查询数据(分页)
func (pdb *PipelineDB) GetRecordsByGroup(group string, req *PageRequest) (*PageResponse, error)

// 获取统计信息
func (pdb *PipelineDB) GetStats() *DatabaseStats

// 启动管道处理
func (pdb *PipelineDB) Start()

// 停止数据库
func (pdb *PipelineDB) Stop()

配置选项

type Config struct {
    CacheSize       int           // 页缓存大小(页数)
    SyncWrites      bool          // 是否同步写入
    CreateIfMiss    bool          // 文件不存在时自动创建
    WarmInterval    time.Duration // 预热间隔
    ProcessInterval time.Duration // 处理间隔
    BatchSize       int           // 批处理大小
    EnableMetrics   bool          // 启用性能指标
}

🎯 使用场景

数据管道处理

  • ETL 流程:数据提取、转换、加载
  • 数据清洗:数据验证、格式化、去重
  • 数据转换:格式转换、数据映射

业务流程管理

  • 订单处理:订单创建 → 支付确认 → 发货处理
  • 用户行为分析:事件收集 → 数据处理 → 报告生成
  • 审批流程:申请提交 → 审核处理 → 结果通知

实时数据处理

  • 日志分析:日志收集 → 解析处理 → 存储归档
  • 监控数据:指标收集 → 聚合计算 → 告警处理
  • 消息队列:消息接收 → 业务处理 → 确认回复

📁 项目结构

pipelinedb/
├── README.md              # 项目说明
├── go.mod                 # Go 模块定义
├── pipeline_db.go         # 核心数据库实现
├── storage.go             # 存储引擎
├── cache.go               # 页面缓存
├── index.go               # 索引管理
├── group_manager.go       # 组管理器
├── counter.go             # ID 计数器
├── examples/              # 使用示例
│   ├── basic-usage/       # 基础使用示例
│   ├── group-management/  # 组管理示例
│   ├── external-handler/  # 自定义处理器示例
│   ├── data-analytics/    # 数据分析示例
│   ├── concurrent-processing/ # 并发处理示例
│   └── high-concurrency/  # 高并发压力测试
└── *_test.go              # 测试文件

🧪 运行示例

项目提供了多个完整的使用示例:

# 基础使用示例
cd examples/basic-usage && go run main.go

# 组管理示例
cd examples/group-management && go run main.go

# 自定义处理器示例
cd examples/external-handler && go run main.go

# 数据分析示例
cd examples/data-analytics && go run main.go

# 并发处理示例
cd examples/concurrent-processing && go run main.go

# 高并发压力测试
cd examples/high-concurrency && go run main.go

🔧 开发和测试

运行测试

# 运行所有测试
go test -v

# 运行特定测试
go test -run TestPipelineDB -v

# 运行基准测试
go test -bench=. -benchmem

# 生成测试覆盖率报告
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out

性能基准

在现代硬件上的典型性能表现:

  • 写入性能~50,000 QPS
  • 读取性能~100,000 QPS
  • 内存使用~1MB256页缓存
  • 磁盘使用高效的页面存储支持TB级数据

🏗️ 架构设计

存储层

  • 页面管理4KB 页面,支持链式扩展
  • 槽位目录:变长记录存储,空间高效利用
  • 空闲页管理:智能页面回收和重用

索引层

  • B+树索引:快速数据定位和范围查询
  • 分组索引:独立的组级别索引管理
  • 内存索引:热数据索引常驻内存

缓存层

  • LRU 缓存:最近最少使用页面淘汰策略
  • 写回缓存:批量写入提高性能
  • 预读机制:顺序访问优化

并发控制

  • 行级锁:细粒度锁定,提高并发性能
  • 读写锁:读操作并发,写操作独占
  • 无锁设计:关键路径避免锁竞争

🤝 贡献指南

欢迎提交 Issue 和 Pull Request

  1. Fork 项目
  2. 创建特性分支:git checkout -b feature/amazing-feature
  3. 提交更改:git commit -m 'Add amazing feature'
  4. 推送分支:git push origin feature/amazing-feature
  5. 提交 Pull Request

📄 许可证

本项目采用 MIT 许可证。详见 LICENSE 文件。

🙏 致谢

感谢所有为这个项目做出贡献的开发者!


Pipeline Database - 让数据管道处理更简单、更高效! 🚀

Description
No description provided
Readme 170 KiB
Languages
Go 96.6%
Makefile 3.4%