添加 Clean 和 Destroy 功能

主要改动:
- Engine: 添加 Clean() 和 Destroy() 方法
- Table: 添加 Clean() 和 Destroy() 方法(不持有 Database 引用)
- Database: 添加 Clean()、CleanTable()、DestroyTable()、Destroy() 方法
- 自动 flush: 添加长时间无写入自动 flush 策略(默认 30 秒)
- WebUI 优化: 优化分页查询性能

新增功能:
- Clean(): 清除数据但保留结构,Engine/Table/Database 仍可用
- Destroy(): 销毁并删除所有文件,对象不可用
- CleanTable(name): 清除指定表的数据
- DestroyTable(name): 销毁指定表并从 Database 中删除
- 自动 flush 监控: 后台定期检查,空闲时自动持久化

代码优化:
- Engine.Close(): 支持 Destroy 后调用,不会 panic
- 二级索引持久化: 在 flush 时自动持久化索引
- WebUI 分页: 预构建字段类型 map,减少 Schema 查询
- 职责分离: Table 不再持有 Database 引用

测试覆盖:
- engine_clean_test.go: Engine Clean/Destroy 测试
- table_clean_test.go: Table Clean/Destroy 测试
- database_clean_test.go: Database Clean/Destroy 测试
- database_table_ops_test.go: Database CleanTable/DestroyTable 测试
This commit is contained in:
2025-10-09 01:03:22 +08:00
parent 23843493b8
commit 9175b98202
8 changed files with 1413 additions and 123 deletions

300
engine.go
View File

@@ -12,7 +12,8 @@ import (
)
const (
DefaultMemTableSize = 64 * 1024 * 1024 // 64 MB
DefaultMemTableSize = 64 * 1024 * 1024 // 64 MB
DefaultAutoFlushTimeout = 30 * time.Second // 30 秒无写入自动 flush
)
// Engine 存储引擎
@@ -26,15 +27,20 @@ type Engine struct {
versionSet *VersionSet // MANIFEST 管理器
compactionManager *CompactionManager // Compaction 管理器
seq atomic.Int64
mu sync.RWMutex
flushMu sync.Mutex
// 自动 flush 相关
autoFlushTimeout time.Duration
lastWriteTime atomic.Int64 // 最后写入时间UnixNano
stopAutoFlush chan struct{}
}
// EngineOptions 配置选项
type EngineOptions struct {
Dir string
MemTableSize int64
Schema *Schema // 可选的 Schema 定义
Dir string
MemTableSize int64
Schema *Schema // 可选的 Schema 定义
AutoFlushTimeout time.Duration // 自动 flush 超时时间0 表示禁用
}
// OpenEngine 打开数据库
@@ -183,6 +189,18 @@ func OpenEngine(opts *EngineOptions) (*Engine, error) {
engine.verifyAndRepairIndexes()
}
// 设置自动 flush 超时时间
if opts.AutoFlushTimeout > 0 {
engine.autoFlushTimeout = opts.AutoFlushTimeout
} else {
engine.autoFlushTimeout = DefaultAutoFlushTimeout
}
engine.stopAutoFlush = make(chan struct{})
engine.lastWriteTime.Store(time.Now().UnixNano())
// 启动自动 flush 监控
go engine.autoFlushMonitor()
return engine, nil
}
@@ -230,7 +248,10 @@ func (e *Engine) Insert(data map[string]any) error {
e.indexManager.AddToIndexes(data, seq)
}
// 7. 检查是否需要切换 MemTable
// 7. 更新最后写入时间
e.lastWriteTime.Store(time.Now().UnixNano())
// 8. 检查是否需要切换 MemTable
if e.memtableManager.ShouldSwitch() {
go e.switchMemTable()
}
@@ -372,7 +393,12 @@ func (e *Engine) flushImmutable(imm *ImmutableMemTable, walNumber int64) error {
// 7. 从 Immutable 列表中移除
e.memtableManager.RemoveImmutable(imm)
// 8. Compaction 由后台线程负责,不在 flush 路径中触发
// 8. 持久化索引(防止崩溃丢失索引数据)
if e.indexManager != nil {
e.indexManager.BuildAll()
}
// 9. Compaction 由后台线程负责,不在 flush 路径中触发
// 避免同步 compaction 导致刚创建的文件立即被删除
// e.compactionManager.MaybeCompact()
@@ -436,37 +462,90 @@ func (e *Engine) recover() error {
return nil
}
// autoFlushMonitor 自动 flush 监控
func (e *Engine) autoFlushMonitor() {
ticker := time.NewTicker(e.autoFlushTimeout / 2) // 每半个超时时间检查一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否超时
lastWrite := time.Unix(0, e.lastWriteTime.Load())
if time.Since(lastWrite) >= e.autoFlushTimeout {
// 检查 MemTable 是否有数据
active := e.memtableManager.GetActive()
if active != nil && active.Size() > 0 {
// 触发 flush
e.Flush()
}
}
case <-e.stopAutoFlush:
return
}
}
}
// Flush 手动刷新 Active MemTable 到磁盘
func (e *Engine) Flush() error {
// 检查 Active MemTable 是否有数据
active := e.memtableManager.GetActive()
if active == nil || active.Size() == 0 {
return nil // 没有数据,无需 flush
}
// 强制切换 MemTableswitchMemTable 内部有锁)
return e.switchMemTable()
}
// Close 关闭引擎
func (e *Engine) Close() error {
// 1. 停止后台 Compaction
// 1. 停止自动 flush 监控(如果还在运行)
if e.stopAutoFlush != nil {
select {
case <-e.stopAutoFlush:
// 已经关闭,跳过
default:
close(e.stopAutoFlush)
}
}
// 2. 停止 Compaction Manager
if e.compactionManager != nil {
e.compactionManager.Stop()
}
// 2. Flush Active MemTable
if e.memtableManager.GetActiveCount() > 0 {
// 切换并 Flush
e.switchMemTable()
// 3. 刷新 Active MemTable(确保所有数据都写入磁盘)
// 检查 memtableManager 是否存在(可能已被 Destroy
if e.memtableManager != nil {
e.Flush()
}
// 等待所有 Immutable Flush 完成
// 3. 关闭 WAL Manager
if e.walManager != nil {
e.walManager.Close()
}
// 4. 等待所有 Immutable Flush 完成
// TODO: 添加更优雅的等待机制
for e.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
if e.memtableManager != nil {
for e.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
}
// 3. 保存所有索引
// 5. 保存所有索引
if e.indexManager != nil {
e.indexManager.BuildAll()
e.indexManager.Close()
}
// 4. 关闭 VersionSet
// 6. 关闭 VersionSet
if e.versionSet != nil {
e.versionSet.Close()
}
// 5. 关闭 WAL Manager
// 7. 关闭 WAL Manager
if e.walManager != nil {
e.walManager.Close()
}
@@ -479,6 +558,133 @@ func (e *Engine) Close() error {
return nil
}
// Clean 清除所有数据(保留 Engine 可用)
func (e *Engine) Clean() error {
e.flushMu.Lock()
defer e.flushMu.Unlock()
// 0. 停止自动 flush 监控(临时)
if e.stopAutoFlush != nil {
close(e.stopAutoFlush)
}
// 1. 停止 Compaction Manager
if e.compactionManager != nil {
e.compactionManager.Stop()
}
// 2. 等待所有 Immutable Flush 完成
for e.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 3. 清空 MemTable
e.memtableManager = NewMemTableManager(DefaultMemTableSize)
// 2. 删除所有 WAL 文件
if e.walManager != nil {
e.walManager.Close()
walDir := filepath.Join(e.dir, "wal")
os.RemoveAll(walDir)
os.MkdirAll(walDir, 0755)
// 重新创建 WAL Manager
walMgr, err := NewWALManager(walDir)
if err != nil {
return fmt.Errorf("recreate wal manager: %w", err)
}
e.walManager = walMgr
e.memtableManager.SetActiveWAL(walMgr.GetCurrentNumber())
}
// 3. 删除所有 SST 文件
if e.sstManager != nil {
e.sstManager.Close()
sstDir := filepath.Join(e.dir, "sst")
os.RemoveAll(sstDir)
os.MkdirAll(sstDir, 0755)
// 重新创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
return fmt.Errorf("recreate sst manager: %w", err)
}
e.sstManager = sstMgr
}
// 4. 删除所有索引文件
if e.indexManager != nil {
e.indexManager.Close()
indexFiles, _ := filepath.Glob(filepath.Join(e.dir, "idx_*.sst"))
for _, f := range indexFiles {
os.Remove(f)
}
// 重新创建 Index Manager
if e.schema != nil {
e.indexManager = NewIndexManager(e.dir, e.schema)
}
}
// 5. 重置 MANIFEST
if e.versionSet != nil {
e.versionSet.Close()
manifestDir := e.dir
os.Remove(filepath.Join(manifestDir, "MANIFEST"))
os.Remove(filepath.Join(manifestDir, "CURRENT"))
// 重新创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
return fmt.Errorf("recreate version set: %w", err)
}
e.versionSet = versionSet
}
// 6. 重新创建 Compaction Manager
sstDir := filepath.Join(e.dir, "sst")
e.compactionManager = NewCompactionManager(sstDir, e.versionSet, e.sstManager)
if e.schema != nil {
e.compactionManager.SetSchema(e.schema)
}
e.compactionManager.Start()
// 7. 重置序列号
e.seq.Store(0)
// 8. 更新最后写入时间
e.lastWriteTime.Store(time.Now().UnixNano())
// 9. 重启自动 flush 监控
e.stopAutoFlush = make(chan struct{})
go e.autoFlushMonitor()
return nil
}
// Destroy 销毁 Engine 并删除所有数据文件
func (e *Engine) Destroy() error {
// 1. 先关闭 Engine
if err := e.Close(); err != nil {
return fmt.Errorf("close engine: %w", err)
}
// 2. 删除整个数据目录
if err := os.RemoveAll(e.dir); err != nil {
return fmt.Errorf("remove data directory: %w", err)
}
// 3. 标记 Engine 为不可用(将所有管理器设为 nil
e.walManager = nil
e.sstManager = nil
e.memtableManager = nil
e.versionSet = nil
e.compactionManager = nil
e.indexManager = nil
return nil
}
// TableStats 统计信息
type TableStats struct {
MemTableSize int64
@@ -585,64 +791,6 @@ func (e *Engine) Query() *QueryBuilder {
return newQueryBuilder(e)
}
// scanAllWithBuilder 使用 QueryBuilder 全表扫描
func (e *Engine) scanAllWithBuilder(qb *QueryBuilder) ([]*SSTableRow, error) {
// 使用 map 去重(同一个 seq 只保留一次)
rowMap := make(map[int64]*SSTableRow)
// 扫描 Active MemTable
iter := e.memtableManager.NewIterator()
for iter.Next() {
seq := iter.Key()
row, err := e.Get(seq)
if err == nil && qb.Match(row.Data) {
rowMap[seq] = row
}
}
// 扫描 Immutable MemTables
immutables := e.memtableManager.GetImmutables()
for _, imm := range immutables {
iter := imm.NewIterator()
for iter.Next() {
seq := iter.Key()
if _, exists := rowMap[seq]; !exists {
row, err := e.Get(seq)
if err == nil && qb.Match(row.Data) {
rowMap[seq] = row
}
}
}
}
// 扫描 SST 文件
readers := e.sstManager.GetReaders()
for _, reader := range readers {
header := reader.GetHeader()
for seq := header.MinKey; seq <= header.MaxKey; seq++ {
if _, exists := rowMap[seq]; !exists {
row, err := reader.Get(seq)
if err == nil && qb.Match(row.Data) {
rowMap[seq] = row
}
}
}
}
// 转换为数组并按 Seq 排序
results := make([]*SSTableRow, 0, len(rowMap))
for _, row := range rowMap {
results = append(results, row)
}
// 按 Seq 排序(保证查询结果有序)
sort.Slice(results, func(i, j int) bool {
return results[i].Seq < results[j].Seq
})
return results, nil
}
// verifyAndRepairIndexes 验证并修复索引
func (e *Engine) verifyAndRepairIndexes() error {
if e.indexManager == nil {