重构代码结构并添加完整功能
主要改动: - 重构目录结构:合并子目录到根目录,简化项目结构 - 添加完整的查询 API:支持复杂条件查询、字段选择、游标模式 - 实现 LSM-Tree Compaction:7层结构、Score-based策略、后台异步合并 - 添加 Web UI:基于 Lit 的现代化管理界面,支持数据浏览和 Manifest 查看 - 完善文档:添加 README.md 和 examples/webui/README.md 新增功能: - Query Builder:链式查询 API,支持 Eq/Lt/Gt/In/Between/Contains 等操作符 - Web UI 组件:srdb-app、srdb-table-list、srdb-data-view、srdb-manifest-view 等 - 列选择持久化:自动保存到 localStorage - 刷新按钮:一键刷新当前视图 - 主题切换:深色/浅色主题支持 代码优化: - 使用 Go 1.24 新特性:range 7、min()、maps.Copy()、slices.Sort() - 统一组件命名:所有 Web Components 使用 srdb-* 前缀 - CSS 优化:提取共享样式,减少重复代码 - 清理遗留代码:删除未使用的方法和样式
This commit is contained in:
131
engine.go
131
engine.go
@@ -9,12 +9,6 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.tczkiot.com/srdb/compaction"
|
||||
"code.tczkiot.com/srdb/manifest"
|
||||
"code.tczkiot.com/srdb/memtable"
|
||||
"code.tczkiot.com/srdb/sst"
|
||||
"code.tczkiot.com/srdb/wal"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -26,11 +20,11 @@ type Engine struct {
|
||||
dir string
|
||||
schema *Schema
|
||||
indexManager *IndexManager
|
||||
walManager *wal.Manager // WAL 管理器
|
||||
sstManager *sst.Manager // SST 管理器
|
||||
memtableManager *memtable.Manager // MemTable 管理器
|
||||
versionSet *manifest.VersionSet // MANIFEST 管理器
|
||||
compactionManager *compaction.Manager // Compaction 管理器
|
||||
walManager *WALManager // WAL 管理器
|
||||
sstManager *SSTableManager // SST 管理器
|
||||
memtableManager *MemTableManager // MemTable 管理器
|
||||
versionSet *VersionSet // MANIFEST 管理器
|
||||
compactionManager *CompactionManager // Compaction 管理器
|
||||
seq atomic.Int64
|
||||
mu sync.RWMutex
|
||||
flushMu sync.Mutex
|
||||
@@ -125,17 +119,22 @@ func OpenEngine(opts *EngineOptions) (*Engine, error) {
|
||||
}
|
||||
|
||||
// 创建 SST Manager
|
||||
sstMgr, err := sst.NewManager(sstDir)
|
||||
sstMgr, err := NewSSTableManager(sstDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 设置 Schema(用于优化编解码)
|
||||
if sch != nil {
|
||||
sstMgr.SetSchema(sch)
|
||||
}
|
||||
|
||||
// 创建 MemTable Manager
|
||||
memMgr := memtable.NewManager(opts.MemTableSize)
|
||||
memMgr := NewMemTableManager(opts.MemTableSize)
|
||||
|
||||
// 创建/恢复 MANIFEST
|
||||
manifestDir := opts.Dir
|
||||
versionSet, err := manifest.NewVersionSet(manifestDir)
|
||||
versionSet, err := NewVersionSet(manifestDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create version set: %w", err)
|
||||
}
|
||||
@@ -158,7 +157,7 @@ func OpenEngine(opts *EngineOptions) (*Engine, error) {
|
||||
}
|
||||
|
||||
// 恢复完成后,创建 WAL Manager 用于后续写入
|
||||
walMgr, err := wal.NewManager(walDir)
|
||||
walMgr, err := NewWALManager(walDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -166,7 +165,12 @@ func OpenEngine(opts *EngineOptions) (*Engine, error) {
|
||||
engine.memtableManager.SetActiveWAL(walMgr.GetCurrentNumber())
|
||||
|
||||
// 创建 Compaction Manager
|
||||
engine.compactionManager = compaction.NewManager(sstDir, versionSet)
|
||||
engine.compactionManager = NewCompactionManager(sstDir, versionSet, sstMgr)
|
||||
|
||||
// 设置 Schema(如果有)
|
||||
if sch != nil {
|
||||
engine.compactionManager.SetSchema(sch)
|
||||
}
|
||||
|
||||
// 启动时清理孤儿文件(崩溃恢复后的清理)
|
||||
engine.compactionManager.CleanupOrphanFiles()
|
||||
@@ -195,7 +199,7 @@ func (e *Engine) Insert(data map[string]any) error {
|
||||
seq := e.seq.Add(1)
|
||||
|
||||
// 2. 添加系统字段
|
||||
row := &sst.Row{
|
||||
row := &SSTableRow{
|
||||
Seq: seq,
|
||||
Time: time.Now().UnixNano(),
|
||||
Data: data,
|
||||
@@ -208,8 +212,8 @@ func (e *Engine) Insert(data map[string]any) error {
|
||||
}
|
||||
|
||||
// 4. 写入 WAL
|
||||
entry := &wal.Entry{
|
||||
Type: wal.EntryTypePut,
|
||||
entry := &WALEntry{
|
||||
Type: WALEntryTypePut,
|
||||
Seq: seq,
|
||||
Data: rowData,
|
||||
}
|
||||
@@ -235,11 +239,11 @@ func (e *Engine) Insert(data map[string]any) error {
|
||||
}
|
||||
|
||||
// Get 查询数据
|
||||
func (e *Engine) Get(seq int64) (*sst.Row, error) {
|
||||
func (e *Engine) Get(seq int64) (*SSTableRow, error) {
|
||||
// 1. 先查 MemTable Manager (Active + Immutables)
|
||||
data, found := e.memtableManager.Get(seq)
|
||||
if found {
|
||||
var row sst.Row
|
||||
var row SSTableRow
|
||||
err := json.Unmarshal(data, &row)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -251,6 +255,35 @@ func (e *Engine) Get(seq int64) (*sst.Row, error) {
|
||||
return e.sstManager.Get(seq)
|
||||
}
|
||||
|
||||
// GetPartial 按需查询数据(只读取指定字段)
|
||||
func (e *Engine) GetPartial(seq int64, fields []string) (*SSTableRow, error) {
|
||||
// 1. 先查 MemTable Manager (Active + Immutables)
|
||||
data, found := e.memtableManager.Get(seq)
|
||||
if found {
|
||||
var row SSTableRow
|
||||
err := json.Unmarshal(data, &row)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// MemTable 中的数据已经完全解析,需要手动过滤字段
|
||||
if len(fields) > 0 {
|
||||
filteredData := make(map[string]any)
|
||||
for _, field := range fields {
|
||||
if val, ok := row.Data[field]; ok {
|
||||
filteredData[field] = val
|
||||
}
|
||||
}
|
||||
row.Data = filteredData
|
||||
}
|
||||
|
||||
return &row, nil
|
||||
}
|
||||
|
||||
// 2. 查询 SST 文件(按需解码)
|
||||
return e.sstManager.GetPartial(seq, fields)
|
||||
}
|
||||
|
||||
// switchMemTable 切换 MemTable
|
||||
func (e *Engine) switchMemTable() error {
|
||||
e.flushMu.Lock()
|
||||
@@ -273,12 +306,12 @@ func (e *Engine) switchMemTable() error {
|
||||
}
|
||||
|
||||
// flushImmutable 将 Immutable MemTable 刷新到 SST
|
||||
func (e *Engine) flushImmutable(imm *memtable.ImmutableMemTable, walNumber int64) error {
|
||||
func (e *Engine) flushImmutable(imm *ImmutableMemTable, walNumber int64) error {
|
||||
// 1. 收集所有行
|
||||
var rows []*sst.Row
|
||||
iter := imm.MemTable.NewIterator()
|
||||
var rows []*SSTableRow
|
||||
iter := imm.NewIterator()
|
||||
for iter.Next() {
|
||||
var row sst.Row
|
||||
var row SSTableRow
|
||||
err := json.Unmarshal(iter.Value(), &row)
|
||||
if err == nil {
|
||||
rows = append(rows, &row)
|
||||
@@ -311,7 +344,7 @@ func (e *Engine) flushImmutable(imm *memtable.ImmutableMemTable, walNumber int64
|
||||
return fmt.Errorf("stat sst file: %w", err)
|
||||
}
|
||||
|
||||
fileMeta := &manifest.FileMetadata{
|
||||
fileMeta := &FileMetadata{
|
||||
FileNumber: fileNumber,
|
||||
Level: 0, // Flush 到 L0
|
||||
FileSize: fileInfo.Size(),
|
||||
@@ -321,11 +354,12 @@ func (e *Engine) flushImmutable(imm *memtable.ImmutableMemTable, walNumber int64
|
||||
}
|
||||
|
||||
// 5. 更新 MANIFEST
|
||||
edit := manifest.NewVersionEdit()
|
||||
edit := NewVersionEdit()
|
||||
edit.AddFile(fileMeta)
|
||||
|
||||
// 持久化当前的文件编号计数器(关键修复:防止重启后文件编号重用)
|
||||
edit.SetNextFileNumber(e.versionSet.GetNextFileNumber())
|
||||
// 使用 fileNumber + 1 确保并发安全,避免竞态条件
|
||||
edit.SetNextFileNumber(fileNumber + 1)
|
||||
|
||||
err = e.versionSet.LogAndApply(edit)
|
||||
if err != nil {
|
||||
@@ -338,9 +372,9 @@ func (e *Engine) flushImmutable(imm *memtable.ImmutableMemTable, walNumber int64
|
||||
// 7. 从 Immutable 列表中移除
|
||||
e.memtableManager.RemoveImmutable(imm)
|
||||
|
||||
// 8. 触发 Compaction 检查(非阻塞)
|
||||
// Flush 后 L0 增加了新文件,可能需要立即触发 compaction
|
||||
e.compactionManager.MaybeCompact()
|
||||
// 8. Compaction 由后台线程负责,不在 flush 路径中触发
|
||||
// 避免同步 compaction 导致刚创建的文件立即被删除
|
||||
// e.compactionManager.MaybeCompact()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -364,7 +398,7 @@ func (e *Engine) recover() error {
|
||||
|
||||
// 依次读取每个 WAL
|
||||
for _, walPath := range walFiles {
|
||||
reader, err := wal.NewReader(walPath)
|
||||
reader, err := NewWALReader(walPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -380,7 +414,7 @@ func (e *Engine) recover() error {
|
||||
for _, entry := range entries {
|
||||
// 如果定义了 Schema,验证数据
|
||||
if e.schema != nil {
|
||||
var row sst.Row
|
||||
var row SSTableRow
|
||||
if err := json.Unmarshal(entry.Data, &row); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal row during recovery (seq=%d): %w", entry.Seq, err)
|
||||
}
|
||||
@@ -445,8 +479,8 @@ func (e *Engine) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats 统计信息
|
||||
type Stats struct {
|
||||
// TableStats 统计信息
|
||||
type TableStats struct {
|
||||
MemTableSize int64
|
||||
MemTableCount int
|
||||
SSTCount int
|
||||
@@ -454,22 +488,22 @@ type Stats struct {
|
||||
}
|
||||
|
||||
// GetVersionSet 获取 VersionSet(用于高级操作)
|
||||
func (e *Engine) GetVersionSet() *manifest.VersionSet {
|
||||
func (e *Engine) GetVersionSet() *VersionSet {
|
||||
return e.versionSet
|
||||
}
|
||||
|
||||
// GetCompactionManager 获取 Compaction Manager(用于高级操作)
|
||||
func (e *Engine) GetCompactionManager() *compaction.Manager {
|
||||
func (e *Engine) GetCompactionManager() *CompactionManager {
|
||||
return e.compactionManager
|
||||
}
|
||||
|
||||
// GetMemtableManager 获取 Memtable Manager
|
||||
func (e *Engine) GetMemtableManager() *memtable.Manager {
|
||||
func (e *Engine) GetMemtableManager() *MemTableManager {
|
||||
return e.memtableManager
|
||||
}
|
||||
|
||||
// GetSSTManager 获取 SST Manager
|
||||
func (e *Engine) GetSSTManager() *sst.Manager {
|
||||
func (e *Engine) GetSSTManager() *SSTableManager {
|
||||
return e.sstManager
|
||||
}
|
||||
|
||||
@@ -484,11 +518,11 @@ func (e *Engine) GetSchema() *Schema {
|
||||
}
|
||||
|
||||
// Stats 获取统计信息
|
||||
func (e *Engine) Stats() *Stats {
|
||||
func (e *Engine) Stats() *TableStats {
|
||||
memStats := e.memtableManager.GetStats()
|
||||
sstStats := e.sstManager.GetStats()
|
||||
|
||||
stats := &Stats{
|
||||
stats := &TableStats{
|
||||
MemTableSize: memStats.TotalSize,
|
||||
MemTableCount: memStats.TotalCount,
|
||||
SSTCount: sstStats.FileCount,
|
||||
@@ -552,9 +586,9 @@ func (e *Engine) Query() *QueryBuilder {
|
||||
}
|
||||
|
||||
// scanAllWithBuilder 使用 QueryBuilder 全表扫描
|
||||
func (e *Engine) scanAllWithBuilder(qb *QueryBuilder) ([]*sst.Row, error) {
|
||||
func (e *Engine) scanAllWithBuilder(qb *QueryBuilder) ([]*SSTableRow, error) {
|
||||
// 使用 map 去重(同一个 seq 只保留一次)
|
||||
rowMap := make(map[int64]*sst.Row)
|
||||
rowMap := make(map[int64]*SSTableRow)
|
||||
|
||||
// 扫描 Active MemTable
|
||||
iter := e.memtableManager.NewIterator()
|
||||
@@ -569,7 +603,7 @@ func (e *Engine) scanAllWithBuilder(qb *QueryBuilder) ([]*sst.Row, error) {
|
||||
// 扫描 Immutable MemTables
|
||||
immutables := e.memtableManager.GetImmutables()
|
||||
for _, imm := range immutables {
|
||||
iter := imm.MemTable.NewIterator()
|
||||
iter := imm.NewIterator()
|
||||
for iter.Next() {
|
||||
seq := iter.Key()
|
||||
if _, exists := rowMap[seq]; !exists {
|
||||
@@ -595,12 +629,17 @@ func (e *Engine) scanAllWithBuilder(qb *QueryBuilder) ([]*sst.Row, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// 转换为数组
|
||||
results := make([]*sst.Row, 0, len(rowMap))
|
||||
// 转换为数组并按 Seq 排序
|
||||
results := make([]*SSTableRow, 0, len(rowMap))
|
||||
for _, row := range rowMap {
|
||||
results = append(results, row)
|
||||
}
|
||||
|
||||
// 按 Seq 排序(保证查询结果有序)
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return results[i].Seq < results[j].Seq
|
||||
})
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user