Files
srdb/engine.go
bourdon 9175b98202 添加 Clean 和 Destroy 功能
主要改动:
- Engine: 添加 Clean() 和 Destroy() 方法
- Table: 添加 Clean() 和 Destroy() 方法(不持有 Database 引用)
- Database: 添加 Clean()、CleanTable()、DestroyTable()、Destroy() 方法
- 自动 flush: 添加长时间无写入自动 flush 策略(默认 30 秒)
- WebUI 优化: 优化分页查询性能

新增功能:
- Clean(): 清除数据但保留结构,Engine/Table/Database 仍可用
- Destroy(): 销毁并删除所有文件,对象不可用
- CleanTable(name): 清除指定表的数据
- DestroyTable(name): 销毁指定表并从 Database 中删除
- 自动 flush 监控: 后台定期检查,空闲时自动持久化

代码优化:
- Engine.Close(): 支持 Destroy 后调用,不会 panic
- 二级索引持久化: 在 flush 时自动持久化索引
- WebUI 分页: 预构建字段类型 map,减少 Schema 查询
- 职责分离: Table 不再持有 Database 引用

测试覆盖:
- engine_clean_test.go: Engine Clean/Destroy 测试
- table_clean_test.go: Table Clean/Destroy 测试
- database_clean_test.go: Database Clean/Destroy 测试
- database_table_ops_test.go: Database CleanTable/DestroyTable 测试
2025-10-09 01:03:22 +08:00

815 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package srdb
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
)
const (
DefaultMemTableSize = 64 * 1024 * 1024 // 64 MB
DefaultAutoFlushTimeout = 30 * time.Second // 30 秒无写入自动 flush
)
// Engine 存储引擎
type Engine struct {
dir string
schema *Schema
indexManager *IndexManager
walManager *WALManager // WAL 管理器
sstManager *SSTableManager // SST 管理器
memtableManager *MemTableManager // MemTable 管理器
versionSet *VersionSet // MANIFEST 管理器
compactionManager *CompactionManager // Compaction 管理器
seq atomic.Int64
flushMu sync.Mutex
// 自动 flush 相关
autoFlushTimeout time.Duration
lastWriteTime atomic.Int64 // 最后写入时间UnixNano
stopAutoFlush chan struct{}
}
// EngineOptions 配置选项
type EngineOptions struct {
Dir string
MemTableSize int64
Schema *Schema // 可选的 Schema 定义
AutoFlushTimeout time.Duration // 自动 flush 超时时间0 表示禁用
}
// OpenEngine 打开数据库
func OpenEngine(opts *EngineOptions) (*Engine, error) {
if opts.MemTableSize == 0 {
opts.MemTableSize = DefaultMemTableSize
}
// 创建主目录
err := os.MkdirAll(opts.Dir, 0755)
if err != nil {
return nil, err
}
// 创建子目录
walDir := filepath.Join(opts.Dir, "wal")
sstDir := filepath.Join(opts.Dir, "sst")
idxDir := filepath.Join(opts.Dir, "index")
err = os.MkdirAll(walDir, 0755)
if err != nil {
return nil, err
}
err = os.MkdirAll(sstDir, 0755)
if err != nil {
return nil, err
}
err = os.MkdirAll(idxDir, 0755)
if err != nil {
return nil, err
}
// 尝试从磁盘恢复 Schema如果 Options 中没有提供)
var sch *Schema
if opts.Schema != nil {
// 使用提供的 Schema
sch = opts.Schema
// 保存到磁盘(带校验和)
schemaPath := filepath.Join(opts.Dir, "schema.json")
schemaFile, err := NewSchemaFile(sch)
if err != nil {
return nil, fmt.Errorf("create schema file: %w", err)
}
schemaData, err := json.MarshalIndent(schemaFile, "", " ")
if err != nil {
return nil, fmt.Errorf("marshal schema: %w", err)
}
err = os.WriteFile(schemaPath, schemaData, 0644)
if err != nil {
return nil, fmt.Errorf("write schema: %w", err)
}
} else {
// 尝试从磁盘恢复
schemaPath := filepath.Join(opts.Dir, "schema.json")
schemaData, err := os.ReadFile(schemaPath)
if err == nil {
// 文件存在,尝试解析
schemaFile := &SchemaFile{}
err = json.Unmarshal(schemaData, schemaFile)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal schema from %s: %w", schemaPath, err)
}
// 验证校验和
err = schemaFile.Verify()
if err != nil {
return nil, fmt.Errorf("failed to verify schema from %s: %w", schemaPath, err)
}
sch = schemaFile.Schema
} else if !os.IsNotExist(err) {
// 其他读取错误
return nil, fmt.Errorf("failed to read schema file %s: %w", schemaPath, err)
}
// 如果文件不存在sch 保持为 nil可选
}
// 创建索引管理器
var indexMgr *IndexManager
if sch != nil {
indexMgr = NewIndexManager(idxDir, sch)
}
// 创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
return nil, err
}
// 设置 Schema用于优化编解码
if sch != nil {
sstMgr.SetSchema(sch)
}
// 创建 MemTable Manager
memMgr := NewMemTableManager(opts.MemTableSize)
// 创建/恢复 MANIFEST
manifestDir := opts.Dir
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
return nil, fmt.Errorf("create version set: %w", err)
}
// 创建 Engine暂时不设置 WAL Manager
engine := &Engine{
dir: opts.Dir,
schema: sch,
indexManager: indexMgr,
walManager: nil, // 先不设置,恢复后再创建
sstManager: sstMgr,
memtableManager: memMgr,
versionSet: versionSet,
}
// 先恢复数据(包括从 WAL 恢复)
err = engine.recover()
if err != nil {
return nil, err
}
// 恢复完成后,创建 WAL Manager 用于后续写入
walMgr, err := NewWALManager(walDir)
if err != nil {
return nil, err
}
engine.walManager = walMgr
engine.memtableManager.SetActiveWAL(walMgr.GetCurrentNumber())
// 创建 Compaction Manager
engine.compactionManager = NewCompactionManager(sstDir, versionSet, sstMgr)
// 设置 Schema如果有
if sch != nil {
engine.compactionManager.SetSchema(sch)
}
// 启动时清理孤儿文件(崩溃恢复后的清理)
engine.compactionManager.CleanupOrphanFiles()
// 启动后台 Compaction 和垃圾回收
engine.compactionManager.Start()
// 验证并修复索引
if engine.indexManager != nil {
engine.verifyAndRepairIndexes()
}
// 设置自动 flush 超时时间
if opts.AutoFlushTimeout > 0 {
engine.autoFlushTimeout = opts.AutoFlushTimeout
} else {
engine.autoFlushTimeout = DefaultAutoFlushTimeout
}
engine.stopAutoFlush = make(chan struct{})
engine.lastWriteTime.Store(time.Now().UnixNano())
// 启动自动 flush 监控
go engine.autoFlushMonitor()
return engine, nil
}
// Insert 插入数据
func (e *Engine) Insert(data map[string]any) error {
// 1. 验证 Schema (如果定义了)
if e.schema != nil {
if err := e.schema.Validate(data); err != nil {
return fmt.Errorf("schema validation failed: %v", err)
}
}
// 2. 生成 _seq
seq := e.seq.Add(1)
// 2. 添加系统字段
row := &SSTableRow{
Seq: seq,
Time: time.Now().UnixNano(),
Data: data,
}
// 3. 序列化
rowData, err := json.Marshal(row)
if err != nil {
return err
}
// 4. 写入 WAL
entry := &WALEntry{
Type: WALEntryTypePut,
Seq: seq,
Data: rowData,
}
err = e.walManager.Append(entry)
if err != nil {
return err
}
// 5. 写入 MemTable Manager
e.memtableManager.Put(seq, rowData)
// 6. 添加到索引
if e.indexManager != nil {
e.indexManager.AddToIndexes(data, seq)
}
// 7. 更新最后写入时间
e.lastWriteTime.Store(time.Now().UnixNano())
// 8. 检查是否需要切换 MemTable
if e.memtableManager.ShouldSwitch() {
go e.switchMemTable()
}
return nil
}
// Get 查询数据
func (e *Engine) Get(seq int64) (*SSTableRow, error) {
// 1. 先查 MemTable Manager (Active + Immutables)
data, found := e.memtableManager.Get(seq)
if found {
var row SSTableRow
err := json.Unmarshal(data, &row)
if err != nil {
return nil, err
}
return &row, nil
}
// 2. 查询 SST 文件
return e.sstManager.Get(seq)
}
// GetPartial 按需查询数据(只读取指定字段)
func (e *Engine) GetPartial(seq int64, fields []string) (*SSTableRow, error) {
// 1. 先查 MemTable Manager (Active + Immutables)
data, found := e.memtableManager.Get(seq)
if found {
var row SSTableRow
err := json.Unmarshal(data, &row)
if err != nil {
return nil, err
}
// MemTable 中的数据已经完全解析,需要手动过滤字段
if len(fields) > 0 {
filteredData := make(map[string]any)
for _, field := range fields {
if val, ok := row.Data[field]; ok {
filteredData[field] = val
}
}
row.Data = filteredData
}
return &row, nil
}
// 2. 查询 SST 文件(按需解码)
return e.sstManager.GetPartial(seq, fields)
}
// switchMemTable 切换 MemTable
func (e *Engine) switchMemTable() error {
e.flushMu.Lock()
defer e.flushMu.Unlock()
// 1. 切换到新的 WAL
oldWALNumber, err := e.walManager.Rotate()
if err != nil {
return err
}
newWALNumber := e.walManager.GetCurrentNumber()
// 2. 切换 MemTable (Active → Immutable)
_, immutable := e.memtableManager.Switch(newWALNumber)
// 3. 异步 Flush Immutable
go e.flushImmutable(immutable, oldWALNumber)
return nil
}
// flushImmutable 将 Immutable MemTable 刷新到 SST
func (e *Engine) flushImmutable(imm *ImmutableMemTable, walNumber int64) error {
// 1. 收集所有行
var rows []*SSTableRow
iter := imm.NewIterator()
for iter.Next() {
var row SSTableRow
err := json.Unmarshal(iter.Value(), &row)
if err == nil {
rows = append(rows, &row)
}
}
if len(rows) == 0 {
// 没有数据,直接清理
e.walManager.Delete(walNumber)
e.memtableManager.RemoveImmutable(imm)
return nil
}
// 2. 从 VersionSet 分配文件编号
fileNumber := e.versionSet.AllocateFileNumber()
// 3. 创建 SST 文件到 L0
reader, err := e.sstManager.CreateSST(fileNumber, rows)
if err != nil {
return err
}
// 4. 创建 FileMetadata
header := reader.GetHeader()
// 获取文件大小
sstPath := reader.GetPath()
fileInfo, err := os.Stat(sstPath)
if err != nil {
return fmt.Errorf("stat sst file: %w", err)
}
fileMeta := &FileMetadata{
FileNumber: fileNumber,
Level: 0, // Flush 到 L0
FileSize: fileInfo.Size(),
MinKey: header.MinKey,
MaxKey: header.MaxKey,
RowCount: header.RowCount,
}
// 5. 更新 MANIFEST
edit := NewVersionEdit()
edit.AddFile(fileMeta)
// 持久化当前的文件编号计数器(关键修复:防止重启后文件编号重用)
// 使用 fileNumber + 1 确保并发安全,避免竞态条件
edit.SetNextFileNumber(fileNumber + 1)
err = e.versionSet.LogAndApply(edit)
if err != nil {
return fmt.Errorf("log and apply version edit: %w", err)
}
// 6. 删除对应的 WAL
e.walManager.Delete(walNumber)
// 7. 从 Immutable 列表中移除
e.memtableManager.RemoveImmutable(imm)
// 8. 持久化索引(防止崩溃丢失索引数据)
if e.indexManager != nil {
e.indexManager.BuildAll()
}
// 9. Compaction 由后台线程负责,不在 flush 路径中触发
// 避免同步 compaction 导致刚创建的文件立即被删除
// e.compactionManager.MaybeCompact()
return nil
}
// recover 恢复数据
func (e *Engine) recover() error {
// 1. 恢复 SST 文件SST Manager 已经在 NewManager 中恢复了)
// 只需要获取最大 seq
maxSeq := e.sstManager.GetMaxSeq()
if maxSeq > e.seq.Load() {
e.seq.Store(maxSeq)
}
// 2. 恢复所有 WAL 文件到 MemTable Manager
walDir := filepath.Join(e.dir, "wal")
pattern := filepath.Join(walDir, "*.wal")
walFiles, err := filepath.Glob(pattern)
if err == nil && len(walFiles) > 0 {
// 按文件名排序
sort.Strings(walFiles)
// 依次读取每个 WAL
for _, walPath := range walFiles {
reader, err := NewWALReader(walPath)
if err != nil {
continue
}
entries, err := reader.Read()
reader.Close()
if err != nil {
continue
}
// 重放 WAL 到 Active MemTable
for _, entry := range entries {
// 如果定义了 Schema验证数据
if e.schema != nil {
var row SSTableRow
if err := json.Unmarshal(entry.Data, &row); err != nil {
return fmt.Errorf("failed to unmarshal row during recovery (seq=%d): %w", entry.Seq, err)
}
// 验证 Schema
if err := e.schema.Validate(row.Data); err != nil {
return fmt.Errorf("schema validation failed during recovery (seq=%d): %w", entry.Seq, err)
}
}
e.memtableManager.Put(entry.Seq, entry.Data)
if entry.Seq > e.seq.Load() {
e.seq.Store(entry.Seq)
}
}
}
}
return nil
}
// autoFlushMonitor 自动 flush 监控
func (e *Engine) autoFlushMonitor() {
ticker := time.NewTicker(e.autoFlushTimeout / 2) // 每半个超时时间检查一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否超时
lastWrite := time.Unix(0, e.lastWriteTime.Load())
if time.Since(lastWrite) >= e.autoFlushTimeout {
// 检查 MemTable 是否有数据
active := e.memtableManager.GetActive()
if active != nil && active.Size() > 0 {
// 触发 flush
e.Flush()
}
}
case <-e.stopAutoFlush:
return
}
}
}
// Flush 手动刷新 Active MemTable 到磁盘
func (e *Engine) Flush() error {
// 检查 Active MemTable 是否有数据
active := e.memtableManager.GetActive()
if active == nil || active.Size() == 0 {
return nil // 没有数据,无需 flush
}
// 强制切换 MemTableswitchMemTable 内部有锁)
return e.switchMemTable()
}
// Close 关闭引擎
func (e *Engine) Close() error {
// 1. 停止自动 flush 监控(如果还在运行)
if e.stopAutoFlush != nil {
select {
case <-e.stopAutoFlush:
// 已经关闭,跳过
default:
close(e.stopAutoFlush)
}
}
// 2. 停止 Compaction Manager
if e.compactionManager != nil {
e.compactionManager.Stop()
}
// 3. 刷新 Active MemTable确保所有数据都写入磁盘
// 检查 memtableManager 是否存在(可能已被 Destroy
if e.memtableManager != nil {
e.Flush()
}
// 3. 关闭 WAL Manager
if e.walManager != nil {
e.walManager.Close()
}
// 4. 等待所有 Immutable Flush 完成
// TODO: 添加更优雅的等待机制
if e.memtableManager != nil {
for e.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
}
// 5. 保存所有索引
if e.indexManager != nil {
e.indexManager.BuildAll()
e.indexManager.Close()
}
// 6. 关闭 VersionSet
if e.versionSet != nil {
e.versionSet.Close()
}
// 7. 关闭 WAL Manager
if e.walManager != nil {
e.walManager.Close()
}
// 6. 关闭 SST Manager
if e.sstManager != nil {
e.sstManager.Close()
}
return nil
}
// Clean 清除所有数据(保留 Engine 可用)
func (e *Engine) Clean() error {
e.flushMu.Lock()
defer e.flushMu.Unlock()
// 0. 停止自动 flush 监控(临时)
if e.stopAutoFlush != nil {
close(e.stopAutoFlush)
}
// 1. 停止 Compaction Manager
if e.compactionManager != nil {
e.compactionManager.Stop()
}
// 2. 等待所有 Immutable Flush 完成
for e.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 3. 清空 MemTable
e.memtableManager = NewMemTableManager(DefaultMemTableSize)
// 2. 删除所有 WAL 文件
if e.walManager != nil {
e.walManager.Close()
walDir := filepath.Join(e.dir, "wal")
os.RemoveAll(walDir)
os.MkdirAll(walDir, 0755)
// 重新创建 WAL Manager
walMgr, err := NewWALManager(walDir)
if err != nil {
return fmt.Errorf("recreate wal manager: %w", err)
}
e.walManager = walMgr
e.memtableManager.SetActiveWAL(walMgr.GetCurrentNumber())
}
// 3. 删除所有 SST 文件
if e.sstManager != nil {
e.sstManager.Close()
sstDir := filepath.Join(e.dir, "sst")
os.RemoveAll(sstDir)
os.MkdirAll(sstDir, 0755)
// 重新创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
return fmt.Errorf("recreate sst manager: %w", err)
}
e.sstManager = sstMgr
}
// 4. 删除所有索引文件
if e.indexManager != nil {
e.indexManager.Close()
indexFiles, _ := filepath.Glob(filepath.Join(e.dir, "idx_*.sst"))
for _, f := range indexFiles {
os.Remove(f)
}
// 重新创建 Index Manager
if e.schema != nil {
e.indexManager = NewIndexManager(e.dir, e.schema)
}
}
// 5. 重置 MANIFEST
if e.versionSet != nil {
e.versionSet.Close()
manifestDir := e.dir
os.Remove(filepath.Join(manifestDir, "MANIFEST"))
os.Remove(filepath.Join(manifestDir, "CURRENT"))
// 重新创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
return fmt.Errorf("recreate version set: %w", err)
}
e.versionSet = versionSet
}
// 6. 重新创建 Compaction Manager
sstDir := filepath.Join(e.dir, "sst")
e.compactionManager = NewCompactionManager(sstDir, e.versionSet, e.sstManager)
if e.schema != nil {
e.compactionManager.SetSchema(e.schema)
}
e.compactionManager.Start()
// 7. 重置序列号
e.seq.Store(0)
// 8. 更新最后写入时间
e.lastWriteTime.Store(time.Now().UnixNano())
// 9. 重启自动 flush 监控
e.stopAutoFlush = make(chan struct{})
go e.autoFlushMonitor()
return nil
}
// Destroy 销毁 Engine 并删除所有数据文件
func (e *Engine) Destroy() error {
// 1. 先关闭 Engine
if err := e.Close(); err != nil {
return fmt.Errorf("close engine: %w", err)
}
// 2. 删除整个数据目录
if err := os.RemoveAll(e.dir); err != nil {
return fmt.Errorf("remove data directory: %w", err)
}
// 3. 标记 Engine 为不可用(将所有管理器设为 nil
e.walManager = nil
e.sstManager = nil
e.memtableManager = nil
e.versionSet = nil
e.compactionManager = nil
e.indexManager = nil
return nil
}
// TableStats 统计信息
type TableStats struct {
MemTableSize int64
MemTableCount int
SSTCount int
TotalRows int64
}
// GetVersionSet 获取 VersionSet用于高级操作
func (e *Engine) GetVersionSet() *VersionSet {
return e.versionSet
}
// GetCompactionManager 获取 Compaction Manager用于高级操作
func (e *Engine) GetCompactionManager() *CompactionManager {
return e.compactionManager
}
// GetMemtableManager 获取 Memtable Manager
func (e *Engine) GetMemtableManager() *MemTableManager {
return e.memtableManager
}
// GetSSTManager 获取 SST Manager
func (e *Engine) GetSSTManager() *SSTableManager {
return e.sstManager
}
// GetMaxSeq 获取当前最大的 seq 号
func (e *Engine) GetMaxSeq() int64 {
return e.seq.Load() - 1 // seq 是下一个要分配的,所以最大的是 seq - 1
}
// GetSchema 获取 Schema
func (e *Engine) GetSchema() *Schema {
return e.schema
}
// Stats 获取统计信息
func (e *Engine) Stats() *TableStats {
memStats := e.memtableManager.GetStats()
sstStats := e.sstManager.GetStats()
stats := &TableStats{
MemTableSize: memStats.TotalSize,
MemTableCount: memStats.TotalCount,
SSTCount: sstStats.FileCount,
}
// 计算总行数
stats.TotalRows = int64(memStats.TotalCount)
readers := e.sstManager.GetReaders()
for _, reader := range readers {
header := reader.GetHeader()
stats.TotalRows += header.RowCount
}
return stats
}
// CreateIndex 创建索引
func (e *Engine) CreateIndex(field string) error {
if e.indexManager == nil {
return fmt.Errorf("no schema defined, cannot create index")
}
return e.indexManager.CreateIndex(field)
}
// DropIndex 删除索引
func (e *Engine) DropIndex(field string) error {
if e.indexManager == nil {
return fmt.Errorf("no schema defined, cannot drop index")
}
return e.indexManager.DropIndex(field)
}
// ListIndexes 列出所有索引
func (e *Engine) ListIndexes() []string {
if e.indexManager == nil {
return nil
}
return e.indexManager.ListIndexes()
}
// GetIndexMetadata 获取索引元数据
func (e *Engine) GetIndexMetadata() map[string]IndexMetadata {
if e.indexManager == nil {
return nil
}
return e.indexManager.GetIndexMetadata()
}
// RepairIndexes 手动修复索引
func (e *Engine) RepairIndexes() error {
return e.verifyAndRepairIndexes()
}
// Query 创建查询构建器
func (e *Engine) Query() *QueryBuilder {
return newQueryBuilder(e)
}
// verifyAndRepairIndexes 验证并修复索引
func (e *Engine) verifyAndRepairIndexes() error {
if e.indexManager == nil {
return nil
}
// 获取当前最大 seq
currentMaxSeq := e.seq.Load()
// 创建 getData 函数
getData := func(seq int64) (map[string]any, error) {
row, err := e.Get(seq)
if err != nil {
return nil, err
}
return row.Data, nil
}
// 验证并修复
return e.indexManager.VerifyAndRepair(currentMaxSeq, getData)
}