From 3148bf226dff30cb472b82bde5a7baae61da23cf Mon Sep 17 00:00:00 2001 From: bourdon Date: Fri, 10 Oct 2025 20:26:18 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E5=8C=96=E6=97=A5=E5=BF=97=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 引入 slog 替代 fmt.Printf 日志输出 - Database 和 Table 支持自定义 Logger - 优化错误和警告信息的结构化记录 - 改进 Compaction 配置传递机制 - 完善数据库和表的初始化流程 --- compaction.go | 312 +++++++++++++++++++++++++++++++++++++++----------- database.go | 188 +++++++++++++++++++++++++++--- table.go | 15 ++- 3 files changed, 433 insertions(+), 82 deletions(-) diff --git a/compaction.go b/compaction.go index 7089a43..c43ba13 100644 --- a/compaction.go +++ b/compaction.go @@ -2,6 +2,8 @@ package srdb import ( "fmt" + "io" + "log/slog" "os" "path/filepath" "sort" @@ -62,18 +64,52 @@ type CompactionTask struct { type Picker struct { mu sync.Mutex currentStage int // 当前阶段:0=L0合并, 1=L0升级, 2=L1升级, 3=L2升级 + + // 层级大小限制(可配置) + level0SizeLimit int64 + level1SizeLimit int64 + level2SizeLimit int64 + level3SizeLimit int64 } -// NewPicker 创建新的 Compaction Picker +// NewPicker 创建新的 Compaction Picker(使用默认值) func NewPicker() *Picker { return &Picker{ - currentStage: 0, // 从 L0 合并开始 + currentStage: 0, // 从 L0 合并开始 + level0SizeLimit: level0SizeLimit, + level1SizeLimit: level1SizeLimit, + level2SizeLimit: level2SizeLimit, + level3SizeLimit: level3SizeLimit, } } -// getLevelSizeLimit 获取层级大小限制(调用包级私有函数) +// UpdateLevelLimits 更新层级大小限制(由 CompactionManager 调用) +func (p *Picker) UpdateLevelLimits(l0, l1, l2, l3 int64) { + p.mu.Lock() + defer p.mu.Unlock() + p.level0SizeLimit = l0 + p.level1SizeLimit = l1 + p.level2SizeLimit = l2 + p.level3SizeLimit = l3 +} + +// getLevelSizeLimit 获取层级大小限制(从配置读取) func (p *Picker) getLevelSizeLimit(level int) int64 { - return getLevelSizeLimit(level) + p.mu.Lock() + defer p.mu.Unlock() + + switch level { + case 0: + return p.level0SizeLimit + case 1: + return p.level1SizeLimit + case 2: + return p.level2SizeLimit + case 3: + return p.level3SizeLimit + default: + return p.level3SizeLimit + } } // PickCompaction 选择需要 Compaction 的任务(按阶段返回,阶段内并发执行) @@ -102,43 +138,27 @@ func (p *Picker) PickCompaction(version *Version) []*CompactionTask { defer p.mu.Unlock() var tasks []*CompactionTask - var stageName string // 根据当前阶段选择任务 switch p.currentStage { case 0: // Stage 0: L0 合并任务 tasks = p.pickL0MergeTasks(version) - stageName = "L0-merge" case 1: // Stage 1: L0 升级任务 tasks = p.pickL0UpgradeTasks(version) - stageName = "L0-upgrade" case 2: // Stage 2: L1 升级任务 tasks = p.pickLevelCompaction(version, 1) - stageName = "L1-upgrade" case 3: // Stage 3: L2 升级任务 tasks = p.pickLevelCompaction(version, 2) - stageName = "L2-upgrade" } - // 保存当前阶段索引 - currentStage := p.currentStage - - // 推进到下一阶段(无论是否有任务),这里是否巧妙地 + // 推进到下一阶段(无论是否有任务),这里巧妙地 // 使用了取模运算来保证阶段递增与阶段重置。 p.currentStage = (p.currentStage + 1) % 4 - if len(tasks) > 0 { - fmt.Printf("[Picker] Stage %d (%s): found %d tasks, next stage will be %d\n", - currentStage, stageName, len(tasks), p.currentStage) - } else { - fmt.Printf("[Picker] Stage %d (%s): no tasks, next stage will be %d\n", - currentStage, stageName, p.currentStage) - } - return tasks } @@ -451,7 +471,8 @@ type Compactor struct { picker *Picker versionSet *VersionSet schema *Schema - mu sync.RWMutex // 只保护 schema 字段的读写 + logger *slog.Logger + mu sync.RWMutex // 只保护 schema 和 logger 字段的读写 } // NewCompactor 创建新的 Compactor @@ -460,6 +481,7 @@ func NewCompactor(sstDir string, versionSet *VersionSet) *Compactor { sstDir: sstDir, picker: NewPicker(), versionSet: versionSet, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), // 默认丢弃日志 } } @@ -470,6 +492,13 @@ func (c *Compactor) SetSchema(schema *Schema) { c.schema = schema } +// SetLogger 设置 Logger +func (c *Compactor) SetLogger(logger *slog.Logger) { + c.mu.Lock() + defer c.mu.Unlock() + c.logger = logger +} + // GetPicker 获取 Picker func (c *Compactor) GetPicker() *Picker { return c.picker @@ -482,6 +511,11 @@ func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*Versi return nil, fmt.Errorf("compaction task is nil") } + // 获取 logger + c.mu.RLock() + logger := c.logger + c.mu.RUnlock() + // 0. 验证输入文件是否存在(防止并发 compaction 导致的竞态) existingInputFiles := make([]*FileMetadata, 0, len(task.InputFiles)) for _, file := range task.InputFiles { @@ -489,13 +523,14 @@ func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*Versi if _, err := os.Stat(sstPath); err == nil { existingInputFiles = append(existingInputFiles, file) } else { - fmt.Printf("[Compaction] Warning: input file %06d.sst not found, skipping from task\n", file.FileNumber) + logger.Warn("[Compaction] Input file not found, skipping", + "file_number", file.FileNumber) } } // 如果所有输入文件都不存在,直接返回(无需 compaction) if len(existingInputFiles) == 0 { - fmt.Printf("[Compaction] All input files missing, compaction skipped\n") + logger.Warn("[Compaction] All input files missing, compaction skipped") return nil, nil // 返回 nil 表示不需要应用任何 VersionEdit } @@ -519,7 +554,8 @@ func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*Versi existingOutputFiles = append(existingOutputFiles, file) } else { // 输出层级的文件不存在,记录并在 VersionEdit 中删除它 - fmt.Printf("[Compaction] Warning: overlapping output file %06d.sst missing, will remove from MANIFEST\n", file.FileNumber) + logger.Warn("[Compaction] Overlapping output file missing, will remove from MANIFEST", + "file_number", file.FileNumber) missingOutputFiles = append(missingOutputFiles, file) } } @@ -558,7 +594,8 @@ func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*Versi // 删除缺失的输出层级文件(清理 MANIFEST 中的过期引用) for _, file := range missingOutputFiles { edit.DeleteFile(file.FileNumber) - fmt.Printf("[Compaction] Removing missing file %06d.sst from MANIFEST\n", file.FileNumber) + logger.Info("[Compaction] Removing missing file from MANIFEST", + "file_number", file.FileNumber) } // 添加新文件,并跟踪最大文件编号 @@ -874,6 +911,19 @@ type CompactionManager struct { sstManager *SSTableManager // 添加 sstManager 引用,用于同步删除 readers sstDir string + // 配置(从 Database Options 传递) + configMu sync.RWMutex + logger *slog.Logger + level0SizeLimit int64 + level1SizeLimit int64 + level2SizeLimit int64 + level3SizeLimit int64 + compactionInterval time.Duration + gcInterval time.Duration + gcFileMinAge time.Duration + disableCompaction bool + disableGC bool + // 控制后台 Compaction stopCh chan struct{} wg sync.WaitGroup @@ -891,17 +941,55 @@ type CompactionManager struct { totalOrphansFound int64 } -// NewCompactionManager 创建新的 Compaction Manager +// NewCompactionManager 创建新的 Compaction Manager(使用默认配置) func NewCompactionManager(sstDir string, versionSet *VersionSet, sstManager *SSTableManager) *CompactionManager { return &CompactionManager{ - compactor: NewCompactor(sstDir, versionSet), - versionSet: versionSet, - sstManager: sstManager, - sstDir: sstDir, - stopCh: make(chan struct{}), + compactor: NewCompactor(sstDir, versionSet), + versionSet: versionSet, + sstManager: sstManager, + sstDir: sstDir, + stopCh: make(chan struct{}), + // 默认 logger:丢弃日志(将在 ApplyConfig 中设置为 Database.options.Logger) + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + // 使用硬编码常量作为默认值(向后兼容) + level0SizeLimit: level0SizeLimit, + level1SizeLimit: level1SizeLimit, + level2SizeLimit: level2SizeLimit, + level3SizeLimit: level3SizeLimit, + compactionInterval: 10 * time.Second, + gcInterval: 5 * time.Minute, + gcFileMinAge: 1 * time.Minute, + disableCompaction: false, + disableGC: false, } } +// ApplyConfig 应用数据库级配置(从 Database Options) +func (m *CompactionManager) ApplyConfig(opts *Options) { + m.configMu.Lock() + defer m.configMu.Unlock() + + m.logger = opts.Logger + m.level0SizeLimit = opts.Level0SizeLimit + m.level1SizeLimit = opts.Level1SizeLimit + m.level2SizeLimit = opts.Level2SizeLimit + m.level3SizeLimit = opts.Level3SizeLimit + m.compactionInterval = opts.CompactionInterval + m.gcInterval = opts.GCInterval + m.gcFileMinAge = opts.GCFileMinAge + m.disableCompaction = opts.DisableAutoCompaction + m.disableGC = opts.DisableGC + + // 同时更新 compactor 的 picker 和 logger + m.compactor.picker.UpdateLevelLimits( + m.level0SizeLimit, + m.level1SizeLimit, + m.level2SizeLimit, + m.level3SizeLimit, + ) + m.compactor.SetLogger(opts.Logger) +} + // GetPicker 获取 Compaction Picker func (m *CompactionManager) GetPicker() *Picker { return m.compactor.GetPicker() @@ -929,7 +1017,17 @@ func (m *CompactionManager) Stop() { func (m *CompactionManager) backgroundCompaction() { defer m.wg.Done() - ticker := time.NewTicker(10 * time.Second) // 每 10 秒检查一次 + // 使用配置的间隔时间 + m.configMu.RLock() + interval := m.compactionInterval + disabled := m.disableCompaction + m.configMu.RUnlock() + + if disabled { + return // 禁用自动 Compaction,直接退出 + } + + ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -937,6 +1035,22 @@ func (m *CompactionManager) backgroundCompaction() { case <-m.stopCh: return case <-ticker.C: + // 检查配置是否被更新 + m.configMu.RLock() + newInterval := m.compactionInterval + disabled := m.disableCompaction + m.configMu.RUnlock() + + if disabled { + return // 运行中被禁用,退出 + } + + // 如果间隔时间改变,重新创建 ticker + if newInterval != interval { + interval = newInterval + ticker.Reset(interval) + } + m.maybeCompact() } } @@ -987,7 +1101,9 @@ func (m *CompactionManager) doCompact() { } totalStagesExecuted++ - fmt.Printf("[Compaction] Found %d tasks in stage %d to execute concurrently\n", len(tasks), stage) + m.logger.Info("[Compaction] Found tasks to execute concurrently", + "stage", stage, + "task_count", len(tasks)) // 并发执行同一阶段的所有任务 var wg sync.WaitGroup @@ -999,8 +1115,10 @@ func (m *CompactionManager) doCompact() { firstFile := task.InputFiles[0].FileNumber m.mu.Lock() if m.lastFailedFile == firstFile && m.consecutiveFails >= 3 { - fmt.Printf("[Compaction] Skipping L%d file %d (failed %d times)\n", - task.Level, firstFile, m.consecutiveFails) + m.logger.Warn("[Compaction] Skipping file (failed multiple times)", + "level", task.Level, + "file_number", firstFile, + "consecutive_fails", m.consecutiveFails) m.consecutiveFails = 0 m.lastFailedFile = 0 m.mu.Unlock() @@ -1020,12 +1138,17 @@ func (m *CompactionManager) doCompact() { } // 执行 Compaction - fmt.Printf("[Compaction] Starting: L%d -> L%d, files: %d\n", - task.Level, task.OutputLevel, len(task.InputFiles)) + m.logger.Info("[Compaction] Starting", + "source_level", task.Level, + "target_level", task.OutputLevel, + "file_count", len(task.InputFiles)) err := m.DoCompactionWithVersion(task, currentVersion) if err != nil { - fmt.Printf("[Compaction] Failed L%d -> L%d: %v\n", task.Level, task.OutputLevel, err) + m.logger.Error("[Compaction] Failed", + "source_level", task.Level, + "target_level", task.OutputLevel, + "error", err) // 记录失败信息 if len(task.InputFiles) > 0 { @@ -1040,7 +1163,9 @@ func (m *CompactionManager) doCompact() { m.mu.Unlock() } } else { - fmt.Printf("[Compaction] Completed: L%d -> L%d\n", task.Level, task.OutputLevel) + m.logger.Info("[Compaction] Completed", + "source_level", task.Level, + "target_level", task.OutputLevel) // 清除失败计数 m.mu.Lock() @@ -1057,7 +1182,10 @@ func (m *CompactionManager) doCompact() { // 等待当前阶段的所有任务完成 wg.Wait() - fmt.Printf("[Compaction] Stage %d completed: %d/%d tasks succeeded\n", stage, successCount.Load(), len(tasks)) + m.logger.Info("[Compaction] Stage completed", + "stage", stage, + "succeeded", successCount.Load(), + "total", len(tasks)) } // 如果所有阶段都没有任务,输出诊断信息 @@ -1080,7 +1208,7 @@ func (m *CompactionManager) printCompactionStats(version *Version, picker *Picke } m.lastCompactionTime = time.Now() - fmt.Println("[Compaction] Status check:") + m.logger.Info("[Compaction] Status check") for level := range NumLevels { files := version.GetLevel(level) if len(files) == 0 { @@ -1093,8 +1221,11 @@ func (m *CompactionManager) printCompactionStats(version *Version, picker *Picke } score := picker.GetLevelScore(version, level) - fmt.Printf(" L%d: %d files, %.2f MB, score: %.2f\n", - level, len(files), float64(totalSize)/(1024*1024), score) + m.logger.Info("[Compaction] Level status", + "level", level, + "file_count", len(files), + "size_mb", float64(totalSize)/(1024*1024), + "score", score) } } @@ -1112,7 +1243,7 @@ func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, versio // 如果 edit 为 nil,说明所有文件都已经不存在,无需应用变更 if edit == nil { - fmt.Printf("[Compaction] No changes needed (files already removed)\n") + m.logger.Info("[Compaction] No changes needed (files already removed)") return nil } @@ -1120,7 +1251,8 @@ func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, versio err = m.versionSet.LogAndApply(edit) if err != nil { // LogAndApply 失败,清理已写入的新 SST 文件(防止孤儿文件) - fmt.Printf("[Compaction] LogAndApply failed, cleaning up new files: %v\n", err) + m.logger.Error("[Compaction] LogAndApply failed, cleaning up new files", + "error", err) m.cleanupNewFiles(edit) return fmt.Errorf("apply version edit: %w", err) } @@ -1132,7 +1264,9 @@ func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, versio sstPath := filepath.Join(m.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber)) reader, err := NewSSTableReader(sstPath) if err != nil { - fmt.Printf("[Compaction] Warning: failed to open new file %06d.sst: %v\n", file.FileNumber, err) + m.logger.Warn("[Compaction] Failed to open new file", + "file_number", file.FileNumber, + "error", err) continue } // 设置 Schema @@ -1176,16 +1310,20 @@ func (m *CompactionManager) cleanupNewFiles(edit *VersionEdit) { return } - fmt.Printf("[Compaction] Cleaning up %d new files after LogAndApply failure\n", len(edit.AddedFiles)) + m.logger.Warn("[Compaction] Cleaning up new files after LogAndApply failure", + "file_count", len(edit.AddedFiles)) // 删除新创建的文件 for _, file := range edit.AddedFiles { sstPath := filepath.Join(m.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber)) err := os.Remove(sstPath) if err != nil { - fmt.Printf("[Compaction] Failed to cleanup new file %06d.sst: %v\n", file.FileNumber, err) + m.logger.Warn("[Compaction] Failed to cleanup new file", + "file_number", file.FileNumber, + "error", err) } else { - fmt.Printf("[Compaction] Cleaned up new file %06d.sst\n", file.FileNumber) + m.logger.Info("[Compaction] Cleaned up new file", + "file_number", file.FileNumber) } } } @@ -1193,11 +1331,12 @@ func (m *CompactionManager) cleanupNewFiles(edit *VersionEdit) { // deleteObsoleteFiles 删除废弃的 SST 文件 func (m *CompactionManager) deleteObsoleteFiles(edit *VersionEdit) { if edit == nil { - fmt.Printf("[Compaction] deleteObsoleteFiles: edit is nil\n") + m.logger.Warn("[Compaction] deleteObsoleteFiles: edit is nil") return } - fmt.Printf("[Compaction] deleteObsoleteFiles: %d files to delete\n", len(edit.DeletedFiles)) + m.logger.Info("[Compaction] Deleting obsolete files", + "file_count", len(edit.DeletedFiles)) // 删除被标记为删除的文件 for _, fileNum := range edit.DeletedFiles { @@ -1205,7 +1344,9 @@ func (m *CompactionManager) deleteObsoleteFiles(edit *VersionEdit) { if m.sstManager != nil { err := m.sstManager.RemoveReader(fileNum) if err != nil { - fmt.Printf("[Compaction] Failed to remove reader for %06d.sst: %v\n", fileNum, err) + m.logger.Warn("[Compaction] Failed to remove reader", + "file_number", fileNum, + "error", err) } } @@ -1215,9 +1356,12 @@ func (m *CompactionManager) deleteObsoleteFiles(edit *VersionEdit) { if err != nil { // 删除失败只记录日志,不影响 compaction 流程 // 后台垃圾回收器会重试 - fmt.Printf("[Compaction] Failed to delete obsolete file %06d.sst: %v\n", fileNum, err) + m.logger.Warn("[Compaction] Failed to delete obsolete file", + "file_number", fileNum, + "error", err) } else { - fmt.Printf("[Compaction] Deleted obsolete file %06d.sst\n", fileNum) + m.logger.Info("[Compaction] Deleted obsolete file", + "file_number", fileNum) } } } @@ -1295,7 +1439,17 @@ func (m *CompactionManager) GetLevelStats() []LevelStats { func (m *CompactionManager) backgroundGarbageCollection() { defer m.wg.Done() - ticker := time.NewTicker(5 * time.Minute) // 每 5 分钟检查一次 + // 使用配置的间隔时间 + m.configMu.RLock() + interval := m.gcInterval + disabled := m.disableGC + m.configMu.RUnlock() + + if disabled { + return // 禁用垃圾回收,直接退出 + } + + ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -1303,6 +1457,22 @@ func (m *CompactionManager) backgroundGarbageCollection() { case <-m.stopCh: return case <-ticker.C: + // 检查配置是否被更新 + m.configMu.RLock() + newInterval := m.gcInterval + disabled := m.disableGC + m.configMu.RUnlock() + + if disabled { + return // 运行中被禁用,退出 + } + + // 如果间隔时间改变,重新创建 ticker + if newInterval != interval { + interval = newInterval + ticker.Reset(interval) + } + m.collectOrphanFiles() } } @@ -1328,7 +1498,7 @@ func (m *CompactionManager) collectOrphanFiles() { pattern := filepath.Join(m.sstDir, "*.sst") sstFiles, err := filepath.Glob(pattern) if err != nil { - fmt.Printf("[GC] Failed to scan SST directory: %v\n", err) + m.logger.Error("[GC] Failed to scan SST directory", "error", err) return } @@ -1345,23 +1515,32 @@ func (m *CompactionManager) collectOrphanFiles() { // 检查是否是活跃文件 if !activeFiles[fileNum] { // 检查文件修改时间,避免删除正在 flush 的文件 - // 如果文件在最近 1 分钟内创建/修改,跳过(可能正在 LogAndApply) + // 使用配置的文件最小年龄(默认 1 分钟,可能正在 LogAndApply) + m.configMu.RLock() + minAge := m.gcFileMinAge + m.configMu.RUnlock() + fileInfo, err := os.Stat(sstPath) if err != nil { continue } - if time.Since(fileInfo.ModTime()) < 1*time.Minute { - fmt.Printf("[GC] Skipping recently modified file %06d.sst (age: %v)\n", - fileNum, time.Since(fileInfo.ModTime())) + if time.Since(fileInfo.ModTime()) < minAge { + m.logger.Info("[GC] Skipping recently modified file", + "file_number", fileNum, + "age", time.Since(fileInfo.ModTime()), + "min_age", minAge) continue } // 这是孤儿文件,删除它 err = os.Remove(sstPath) if err != nil { - fmt.Printf("[GC] Failed to delete orphan file %06d.sst: %v\n", fileNum, err) + m.logger.Warn("[GC] Failed to delete orphan file", + "file_number", fileNum, + "error", err) } else { - fmt.Printf("[GC] Deleted orphan file %06d.sst\n", fileNum) + m.logger.Info("[GC] Deleted orphan file", + "file_number", fileNum) orphanCount++ } } @@ -1371,15 +1550,18 @@ func (m *CompactionManager) collectOrphanFiles() { m.mu.Lock() m.lastGCTime = time.Now() m.totalOrphansFound += int64(orphanCount) + totalOrphans := m.totalOrphansFound m.mu.Unlock() if orphanCount > 0 { - fmt.Printf("[GC] Completed: cleaned up %d orphan files (total: %d)\n", orphanCount, m.totalOrphansFound) + m.logger.Info("[GC] Completed", + "cleaned_up", orphanCount, + "total_orphans", totalOrphans) } } // CleanupOrphanFiles 手动触发孤儿文件清理(可在启动时调用) func (m *CompactionManager) CleanupOrphanFiles() { - fmt.Println("[GC] Manual cleanup triggered") + m.logger.Info("[GC] Manual cleanup triggered") m.collectOrphanFiles() } diff --git a/database.go b/database.go index 384e1dc..cc78d44 100644 --- a/database.go +++ b/database.go @@ -3,6 +3,8 @@ package srdb import ( "encoding/json" "fmt" + "io" + "log/slog" "maps" "os" "path/filepath" @@ -21,6 +23,9 @@ type Database struct { // 元数据 metadata *Metadata + // 配置选项 + options *Options + // 锁 mu sync.RWMutex } @@ -38,17 +43,144 @@ type TableInfo struct { CreatedAt int64 `json:"created_at"` } -// Open 打开数据库 +// Options 数据库配置选项 +type Options struct { + // ========== 基础配置 ========== + Dir string // 数据库目录(必需) + Logger *slog.Logger // 日志器(可选,nil 表示不输出日志) + + // ========== MemTable 配置 ========== + MemTableSize int64 // MemTable 大小限制(字节),默认 64MB + AutoFlushTimeout time.Duration // 自动 flush 超时时间,默认 30s,0 表示禁用 + + // ========== Compaction 配置 ========== + // 层级大小限制 + Level0SizeLimit int64 // L0 层大小限制,默认 64MB + Level1SizeLimit int64 // L1 层大小限制,默认 256MB + Level2SizeLimit int64 // L2 层大小限制,默认 512MB + Level3SizeLimit int64 // L3 层大小限制,默认 1GB + + // 后台任务间隔 + CompactionInterval time.Duration // Compaction 检查间隔,默认 10s + GCInterval time.Duration // 垃圾回收检查间隔,默认 5min + + // ========== 高级配置(可选)========== + DisableAutoCompaction bool // 禁用自动 Compaction,默认 false + DisableGC bool // 禁用垃圾回收,默认 false + GCFileMinAge time.Duration // GC 文件最小年龄,默认 1min +} + +// DefaultOptions 返回默认配置 +func DefaultOptions(dir string) *Options { + return &Options{ + Dir: dir, + Logger: nil, // 默认不输出日志 + MemTableSize: 64 * 1024 * 1024, // 64MB + AutoFlushTimeout: 30 * time.Second, // 30s + Level0SizeLimit: 64 * 1024 * 1024, // 64MB + Level1SizeLimit: 256 * 1024 * 1024, // 256MB + Level2SizeLimit: 512 * 1024 * 1024, // 512MB + Level3SizeLimit: 1024 * 1024 * 1024, // 1GB + CompactionInterval: 10 * time.Second, // 10s + GCInterval: 5 * time.Minute, // 5min + DisableAutoCompaction: false, + DisableGC: false, + GCFileMinAge: 1 * time.Minute, // 1min + } +} + +// fillDefaults 填充未设置的默认值(修改传入的 opts) +func (opts *Options) fillDefaults() { + // Logger:如果为 nil,创建一个丢弃所有日志的 logger + if opts.Logger == nil { + opts.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + } + if opts.MemTableSize == 0 { + opts.MemTableSize = 64 * 1024 * 1024 // 64MB + } + if opts.AutoFlushTimeout == 0 { + opts.AutoFlushTimeout = 30 * time.Second // 30s + } + if opts.Level0SizeLimit == 0 { + opts.Level0SizeLimit = 64 * 1024 * 1024 // 64MB + } + if opts.Level1SizeLimit == 0 { + opts.Level1SizeLimit = 256 * 1024 * 1024 // 256MB + } + if opts.Level2SizeLimit == 0 { + opts.Level2SizeLimit = 512 * 1024 * 1024 // 512MB + } + if opts.Level3SizeLimit == 0 { + opts.Level3SizeLimit = 1024 * 1024 * 1024 // 1GB + } + if opts.CompactionInterval == 0 { + opts.CompactionInterval = 10 * time.Second // 10s + } + if opts.GCInterval == 0 { + opts.GCInterval = 5 * time.Minute // 5min + } + if opts.GCFileMinAge == 0 { + opts.GCFileMinAge = 1 * time.Minute // 1min + } +} + +// Validate 验证配置的有效性 +func (opts *Options) Validate() error { + if opts.Dir == "" { + return NewErrorf(ErrCodeInvalidParam, "database directory cannot be empty") + } + if opts.MemTableSize < 1*1024*1024 { + return NewErrorf(ErrCodeInvalidParam, "MemTableSize must be at least 1MB, got %d", opts.MemTableSize) + } + if opts.Level0SizeLimit < 1*1024*1024 { + return NewErrorf(ErrCodeInvalidParam, "Level0SizeLimit must be at least 1MB, got %d", opts.Level0SizeLimit) + } + if opts.Level1SizeLimit < opts.Level0SizeLimit { + return NewErrorf(ErrCodeInvalidParam, "Level1SizeLimit (%d) must be >= Level0SizeLimit (%d)", opts.Level1SizeLimit, opts.Level0SizeLimit) + } + if opts.Level2SizeLimit < opts.Level1SizeLimit { + return NewErrorf(ErrCodeInvalidParam, "Level2SizeLimit (%d) must be >= Level1SizeLimit (%d)", opts.Level2SizeLimit, opts.Level1SizeLimit) + } + if opts.Level3SizeLimit < opts.Level2SizeLimit { + return NewErrorf(ErrCodeInvalidParam, "Level3SizeLimit (%d) must be >= Level2SizeLimit (%d)", opts.Level3SizeLimit, opts.Level2SizeLimit) + } + if opts.CompactionInterval < 1*time.Second { + return NewErrorf(ErrCodeInvalidParam, "CompactionInterval must be at least 1s, got %v", opts.CompactionInterval) + } + if opts.GCInterval < 1*time.Minute { + return NewErrorf(ErrCodeInvalidParam, "GCInterval must be at least 1min, got %v", opts.GCInterval) + } + if opts.GCFileMinAge < 0 { + return NewErrorf(ErrCodeInvalidParam, "GCFileMinAge cannot be negative, got %v", opts.GCFileMinAge) + } + return nil +} + +// Open 打开数据库(向后兼容,使用默认配置) func Open(dir string) (*Database, error) { + return OpenWithOptions(DefaultOptions(dir)) +} + +// OpenWithOptions 使用指定配置打开数据库 +func OpenWithOptions(opts *Options) (*Database, error) { + // 填充默认值 + opts.fillDefaults() + + // 验证配置 + if err := opts.Validate(); err != nil { + return nil, err + } + // 创建目录 - err := os.MkdirAll(dir, 0755) + err := os.MkdirAll(opts.Dir, 0755) if err != nil { return nil, err } db := &Database{ - dir: dir, - tables: make(map[string]*Table), + dir: opts.Dir, + tables: make(map[string]*Table), + options: opts, } // 加载元数据 @@ -111,24 +243,39 @@ func (db *Database) recoverTables() error { for _, tableInfo := range db.metadata.Tables { tableDir := filepath.Join(db.dir, tableInfo.Name) table, err := OpenTable(&TableOptions{ - Dir: tableDir, - MemTableSize: DefaultMemTableSize, + Dir: tableDir, + MemTableSize: db.options.MemTableSize, + AutoFlushTimeout: db.options.AutoFlushTimeout, }) if err != nil { // 记录失败的表,但继续恢复其他表 failedTables = append(failedTables, tableInfo.Name) - fmt.Printf("[WARNING] Failed to open table %s: %v\n", tableInfo.Name, err) - fmt.Printf("[WARNING] Table %s will be skipped. You may need to drop and recreate it.\n", tableInfo.Name) + db.options.Logger.Warn("[Database] Failed to open table", + "table", tableInfo.Name, + "error", err) + db.options.Logger.Warn("[Database] Table will be skipped. You may need to drop and recreate it.", + "table", tableInfo.Name) continue } + + // 设置 Logger + table.SetLogger(db.options.Logger) + + // 将数据库级 Compaction 配置应用到表的 CompactionManager + if table.compactionManager != nil { + table.compactionManager.ApplyConfig(db.options) + } + db.tables[tableInfo.Name] = table } // 如果有失败的表,输出汇总信息 if len(failedTables) > 0 { - fmt.Printf("[WARNING] %d table(s) failed to recover: %v\n", len(failedTables), failedTables) - fmt.Printf("[WARNING] To fix: Delete the corrupted table directory and restart.\n") - fmt.Printf("[WARNING] Example: rm -rf %s/\n", db.dir) + db.options.Logger.Warn("[Database] Failed to recover tables", + "failed_count", len(failedTables), + "failed_tables", failedTables) + db.options.Logger.Warn("[Database] To fix: Delete the corrupted table directory and restart", + "example", fmt.Sprintf("rm -rf %s/", db.dir)) } return nil @@ -151,18 +298,27 @@ func (db *Database) CreateTable(name string, schema *Schema) (*Table, error) { return nil, err } - // 创建表 + // 创建表(传递数据库级配置) table, err := OpenTable(&TableOptions{ - Dir: tableDir, - MemTableSize: DefaultMemTableSize, - Name: schema.Name, - Fields: schema.Fields, + Dir: tableDir, + MemTableSize: db.options.MemTableSize, + AutoFlushTimeout: db.options.AutoFlushTimeout, + Name: schema.Name, + Fields: schema.Fields, }) if err != nil { os.RemoveAll(tableDir) return nil, err } + // 设置 Logger + table.SetLogger(db.options.Logger) + + // 将数据库级 Compaction 配置应用到表的 CompactionManager + if table.compactionManager != nil { + table.compactionManager.ApplyConfig(db.options) + } + // 添加到 tables map db.tables[name] = table diff --git a/table.go b/table.go index e5ffea2..f305e88 100644 --- a/table.go +++ b/table.go @@ -3,6 +3,8 @@ package srdb import ( "encoding/json" "fmt" + "io" + "log/slog" "os" "path/filepath" "reflect" @@ -28,6 +30,7 @@ type Table struct { memtableManager *MemTableManager // MemTable 管理器 versionSet *VersionSet // MANIFEST 管理器 compactionManager *CompactionManager // Compaction 管理器 + logger *slog.Logger // 日志器 seq atomic.Int64 flushMu sync.Mutex @@ -142,7 +145,11 @@ func OpenTable(opts *TableOptions) (*Table, error) { err := indexMgr.CreateIndex(field.Name) if err != nil { // 索引创建失败,记录警告但不阻塞表创建 - fmt.Fprintf(os.Stderr, "[WARNING] Failed to create index for field %s: %v\n", field.Name, err) + // 此时使用临时 logger(Table 还未完全创建) + tmpLogger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + tmpLogger.Warn("[Table] Failed to create index for field", + "field", field.Name, + "error", err) } } } @@ -176,6 +183,7 @@ func OpenTable(opts *TableOptions) (*Table, error) { sstManager: sstMgr, memtableManager: memMgr, versionSet: versionSet, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), // 默认丢弃日志 } // 先恢复数据(包括从 WAL 恢复) @@ -446,6 +454,11 @@ func (t *Table) insertSingle(data map[string]any) error { return nil } +// SetLogger 设置 logger(由 Database 调用) +func (t *Table) SetLogger(logger *slog.Logger) { + t.logger = logger +} + // Get 查询数据 func (t *Table) Get(seq int64) (*SSTableRow, error) { // 1. 先查 MemTable Manager (Active + Immutables)