diff --git a/compaction.go b/compaction.go index 4c2647d..7089a43 100644 --- a/compaction.go +++ b/compaction.go @@ -6,9 +6,51 @@ import ( "path/filepath" "sort" "sync" + "sync/atomic" "time" ) +// Compaction 层级大小限制(Append-Only 优化设计) +// +// 设计理念: +// - 触发阈值 = 目标文件大小(不 split) +// - 每个层级累积到阈值后,直接合并成一个对应大小的文件 +// - 适用于 Append-Only 场景:没有更新/删除,不需要增量合并 +// +// 触发规则: +// - L0 累积到 64MB → 合并成 1 个 64MB 文件,升级到 L1 +// - L1 累积到 256MB → 合并成 1 个 256MB 文件,升级到 L2 +// - L2 累积到 512MB → 合并成 1 个 512MB 文件,升级到 L3 +// - L3 累积到 1GB → 保持在 L3(最后一层) +// +// 层级设计(Append-Only 优化): +// - L0:64MB (MemTable flush,小文件合并) +// - L1:256MB (L0 升级,减少文件数) +// - L2:512MB (L1 升级,温数据) +// - L3:1GB (L2 升级,冷数据,最后一层) +const ( + level0SizeLimit = 64 * 1024 * 1024 // 64MB + level1SizeLimit = 256 * 1024 * 1024 // 256MB + level2SizeLimit = 512 * 1024 * 1024 // 512MB + level3SizeLimit = 1024 * 1024 * 1024 // 1GB +) + +// getLevelSizeLimit 获取层级大小限制(私有函数,供 Picker 和 Compactor 共用) +func getLevelSizeLimit(level int) int64 { + switch level { + case 0: + return level0SizeLimit + case 1: + return level1SizeLimit + case 2: + return level2SizeLimit + case 3: + return level3SizeLimit + default: + return level3SizeLimit + } +} + // CompactionTask 表示一个 Compaction 任务 type CompactionTask struct { Level int // 源层级 @@ -18,119 +60,278 @@ type CompactionTask struct { // Picker 负责选择需要 Compaction 的文件 type Picker struct { - // Level 大小限制 (字节) - levelSizeLimits [NumLevels]int64 - - // Level 文件数量限制 - levelFileLimits [NumLevels]int + mu sync.Mutex + currentStage int // 当前阶段:0=L0合并, 1=L0升级, 2=L1升级, 3=L2升级 } // NewPicker 创建新的 Compaction Picker func NewPicker() *Picker { - p := &Picker{} - - // 设置每层的大小限制 (指数增长) - // L0: 10MB, L1: 100MB, L2: 1GB, L3: 10GB, L4: 100GB, L5: 1TB, L6: 无限制 - p.levelSizeLimits[0] = 10 * 1024 * 1024 // 10MB - p.levelSizeLimits[1] = 100 * 1024 * 1024 // 100MB - p.levelSizeLimits[2] = 1024 * 1024 * 1024 // 1GB - p.levelSizeLimits[3] = 10 * 1024 * 1024 * 1024 // 10GB - p.levelSizeLimits[4] = 100 * 1024 * 1024 * 1024 // 100GB - p.levelSizeLimits[5] = 1024 * 1024 * 1024 * 1024 // 1TB - p.levelSizeLimits[6] = 0 // 无限制 - - // 设置每层的文件数量限制 - // L0 特殊处理:文件数量限制为 4 (当有4个或更多文件时触发 compaction) - p.levelFileLimits[0] = 4 - // L1-L6: 不限制文件数量,只限制总大小 - for i := 1; i < NumLevels; i++ { - p.levelFileLimits[i] = 0 // 0 表示不限制 + return &Picker{ + currentStage: 0, // 从 L0 合并开始 } - - return p } -// PickCompaction 选择需要 Compaction 的任务(支持多任务并发) -// 返回空切片表示当前不需要 Compaction +// getLevelSizeLimit 获取层级大小限制(调用包级私有函数) +func (p *Picker) getLevelSizeLimit(level int) int64 { + return getLevelSizeLimit(level) +} + +// PickCompaction 选择需要 Compaction 的任务(按阶段返回,阶段内并发执行) +// 返回空切片表示当前阶段不需要 Compaction +// +// 执行策略(4 阶段串行 + 阶段内并发): +// 1. Stage 0 - L0 合并:小文件合并,减少 L0 文件数(可能保持在 L0) +// 2. Stage 1 - L0 升级:大文件或已合并文件升级到 L1 +// 3. Stage 2 - L1 升级:L1 文件升级到 L2 +// 4. Stage 3 - L2 升级:L2 文件升级到 L3 +// +// 阶段控制: +// - 使用 currentStage 跟踪当前应该执行哪个阶段(0-3) +// - 每次调用返回当前阶段的任务,然后自动推进到下一阶段 +// - 调用者应该循环调用此方法以确保所有阶段都被尝试 +// +// 为什么阶段内可以并发? +// - 同一阶段的任务处理不同的文件批次(按 seq 连续划分) +// - 这些批次的文件不重叠,可以安全并发 +// +// 为什么阶段间要串行? +// - L0 执行后可能产生新文件,影响下一阶段的任务计算 +// - 必须基于最新的 version 重新计算下一阶段的任务 func (p *Picker) PickCompaction(version *Version) []*CompactionTask { - tasks := make([]*CompactionTask, 0) + p.mu.Lock() + defer p.mu.Unlock() - // 1. 检查 L0 (基于文件数量) - if task := p.pickL0Compaction(version); task != nil { - tasks = append(tasks, task) + var tasks []*CompactionTask + var stageName string + + // 根据当前阶段选择任务 + switch p.currentStage { + case 0: + // Stage 0: L0 合并任务 + tasks = p.pickL0MergeTasks(version) + stageName = "L0-merge" + case 1: + // Stage 1: L0 升级任务 + tasks = p.pickL0UpgradeTasks(version) + stageName = "L0-upgrade" + case 2: + // Stage 2: L1 升级任务 + tasks = p.pickLevelCompaction(version, 1) + stageName = "L1-upgrade" + case 3: + // Stage 3: L2 升级任务 + tasks = p.pickLevelCompaction(version, 2) + stageName = "L2-upgrade" } - // 2. 检查 L1-L5 (基于大小) - for level := 1; level < NumLevels-1; level++ { - if task := p.pickLevelCompaction(version, level); task != nil { - tasks = append(tasks, task) - } - } + // 保存当前阶段索引 + currentStage := p.currentStage - // 3. 按优先级排序(score 越高越优先) - if len(tasks) > 1 { - p.sortTasksByPriority(tasks, version) + // 推进到下一阶段(无论是否有任务),这里是否巧妙地 + // 使用了取模运算来保证阶段递增与阶段重置。 + p.currentStage = (p.currentStage + 1) % 4 + + if len(tasks) > 0 { + fmt.Printf("[Picker] Stage %d (%s): found %d tasks, next stage will be %d\n", + currentStage, stageName, len(tasks), p.currentStage) + } else { + fmt.Printf("[Picker] Stage %d (%s): no tasks, next stage will be %d\n", + currentStage, stageName, p.currentStage) } return tasks } -// sortTasksByPriority 按优先级对任务排序(score 从高到低) -func (p *Picker) sortTasksByPriority(tasks []*CompactionTask, version *Version) { - // 简单的冒泡排序(任务数量通常很少,< 7) - for i := 0; i < len(tasks)-1; i++ { - for j := i + 1; j < len(tasks); j++ { - scoreI := p.GetLevelScore(version, tasks[i].Level) - scoreJ := p.GetLevelScore(version, tasks[j].Level) - if scoreJ > scoreI { - tasks[i], tasks[j] = tasks[j], tasks[i] +// pickL0MergeTasks 选择 L0 的合并任务(Stage 0) +// +// 策略: +// - 按 seq 顺序遍历,只合并连续的小文件块 +// - 遇到大文件(≥ 32MB)时停止当前批次,创建任务 +// - 累积到 64MB 时创建一个合并任务 +// - OutputLevel=0,让 determineLevel 决定是否保持在 L0 +// +// 为什么必须连续? +// - 不能跳过中间文件,否则会导致 seq 范围不连续 +// - 例如:[文件1: seq 1-100, 29MB] [文件2: seq 101-200, 36MB] [文件3: seq 201-300, 8MB] +// - 不能合并文件1和文件3,否则新文件 seq 范围是 [1-100, 201-300],缺失 101-200 +// +// 目的: +// - 减少 L0 文件数(防止读放大) +// - 小文件在 L0 内部合并,大文件留给 Stage 1 处理 +func (p *Picker) pickL0MergeTasks(version *Version) []*CompactionTask { + files := version.GetLevel(0) + if len(files) == 0 { + return nil + } + + // 按 MinKey 排序,确保处理连续的 seq + sort.Slice(files, func(i, j int) bool { + return files[i].MinKey < files[j].MinKey + }) + + const smallFileThreshold = 32 * 1024 * 1024 // 32MB + + tasks := make([]*CompactionTask, 0) + var currentBatch []*FileMetadata + var currentSize int64 + + for _, file := range files { + // 如果是大文件,停止当前批次 + if file.FileSize >= smallFileThreshold { + // 如果当前批次有多个小文件(> 1),创建合并任务 + if len(currentBatch) > 1 { + tasks = append(tasks, &CompactionTask{ + Level: 0, + InputFiles: currentBatch, + OutputLevel: 0, + }) } + // 重置批次(单个小文件不合并,留给升级阶段) + currentBatch = nil + currentSize = 0 + // 跳过大文件,留给 Stage 1 处理 + continue + } + + // 小文件:加入当前批次 + currentBatch = append(currentBatch, file) + currentSize += file.FileSize + + // 累积到 64MB 时创建合并任务 + if currentSize >= level0SizeLimit { + tasks = append(tasks, &CompactionTask{ + Level: 0, + InputFiles: currentBatch, + OutputLevel: 0, // 建议 L0,determineLevel 会根据大小决定 + }) + + currentBatch = nil + currentSize = 0 } } + + // 剩余的小文件:只有 > 1 个才创建合并任务 + if len(currentBatch) > 1 { + tasks = append(tasks, &CompactionTask{ + Level: 0, + InputFiles: currentBatch, + OutputLevel: 0, + }) + } + + return tasks } -// pickL0Compaction 选择 L0 的 Compaction 任务 -// L0 特殊:文件可能有重叠的 key range,需要全部合并 -func (p *Picker) pickL0Compaction(version *Version) *CompactionTask { - l0Files := version.GetLevel(0) - if len(l0Files) == 0 { +// pickL0UpgradeTasks 选择 L0 的升级任务(Stage 1) +// +// 策略: +// - 以大文件(≥ 32MB)为中心,搭配前后的文件一起升级 +// - 找到大文件后,向左右扩展收集文件,直到累积到 256MB(L1 限制) +// - 这样可以把大文件周围的小文件也一起带走,更高效地清理 L0 +// - OutputLevel=1,强制升级到 L1+ +// +// 为什么要搭配周围文件? +// - 大文件是升级的主体,周围的小文件(包括单个小文件)可以顺便带走 +// - 避免小文件留在 L0 成为孤立文件 +// - 例如:[小20MB] [大40MB] [小15MB] → 一起升级 → L1 +// +// 目的: +// - 将成熟的大文件推到 L1 +// - 顺便清理周围的小文件,为 L0 腾出空间 +func (p *Picker) pickL0UpgradeTasks(version *Version) []*CompactionTask { + files := version.GetLevel(0) + if len(files) == 0 { return nil } - // 计算 L0 总大小 - totalSize := int64(0) - for _, file := range l0Files { - totalSize += file.FileSize + // 按 MinKey 排序,确保处理连续的 seq + sort.Slice(files, func(i, j int) bool { + return files[i].MinKey < files[j].MinKey + }) + + const largeFileThreshold = 32 * 1024 * 1024 // 32MB + + tasks := make([]*CompactionTask, 0) + processed := make(map[int64]bool) // 跟踪已处理的文件 + + // 遍历文件,找到大文件作为起点 + for i, file := range files { + // 跳过已处理的文件 + if processed[file.FileNumber] { + continue + } + + // 如果不是大文件,跳过(小文件等待被大文件带走) + if file.FileSize < largeFileThreshold { + continue + } + + // 找到大文件,以它为中心,向左右扩展收集文件 + var batch []*FileMetadata + var batchSize int64 + + // 向左收集:找到连续的未处理文件 + left := i - 1 + var leftFiles []*FileMetadata + for left >= 0 && !processed[files[left].FileNumber] { + leftFiles = append([]*FileMetadata{files[left]}, leftFiles...) // 前插 + left-- + } + + // 加入左边的文件 + for _, f := range leftFiles { + batch = append(batch, f) + batchSize += f.FileSize + processed[f.FileNumber] = true + } + + // 加入中心的大文件 + batch = append(batch, file) + batchSize += file.FileSize + processed[file.FileNumber] = true + + // 向右收集:继续收集直到达到 256MB 或遇到已处理文件 + right := i + 1 + for right < len(files) && !processed[files[right].FileNumber] { + // 检查是否超过限制 + if batchSize+files[right].FileSize > level1SizeLimit { + break + } + batch = append(batch, files[right]) + batchSize += files[right].FileSize + processed[files[right].FileNumber] = true + right++ + } + + // 创建升级任务 + if len(batch) > 0 { + tasks = append(tasks, &CompactionTask{ + Level: 0, + InputFiles: batch, + OutputLevel: 1, // 升级到 L1+ + }) + } } - // 检查是否需要 Compaction(同时考虑文件数量和总大小) - // 1. 文件数量超过限制(避免读放大:每次读取需要检查太多文件) - // 2. 总大小超过限制(避免 L0 占用过多空间) - needCompaction := false - if p.levelFileLimits[0] > 0 && len(l0Files) >= p.levelFileLimits[0] { - needCompaction = true - } - if p.levelSizeLimits[0] > 0 && totalSize >= p.levelSizeLimits[0] { - needCompaction = true - } - - if !needCompaction { - return nil - } - - // L0 → L1 Compaction - // 选择所有 L0 文件(因为 key range 可能重叠) - return &CompactionTask{ - Level: 0, - InputFiles: l0Files, - OutputLevel: 1, - } + return tasks } -// pickLevelCompaction 选择 L1-L5 的 Compaction 任务 -// L1+ 的文件 key range 不重叠,可以选择多个不重叠的文件 -func (p *Picker) pickLevelCompaction(version *Version, level int) *CompactionTask { - if level < 1 || level >= NumLevels-1 { +// pickLevelCompaction 选择指定层级的 Compaction 任务(用于 L1、L2,返回所有任务可并发执行) +// +// 触发规则: +// - 按 seq 顺序遍历当前层级的文件 +// - 累积连续文件的大小 +// - 当累积大小 >= 当前层级的大小限制时,创建一个 compaction 任务 +// - 重置累积,继续处理剩余文件 +// - 返回所有任务(可并发执行) +// +// 示例:L1 有 5 个文件 [50MB, 60MB, 70MB, 80MB, 90MB] +// - L1 的限制是 256MB +// - 文件1+2+3+4 = 260MB >= 256MB → 创建任务(4个文件 → L2) +// - 文件5 = 90MB < 256MB → 不创建任务(未达到升级条件) +// - 返回 1 个任务 +func (p *Picker) pickLevelCompaction(version *Version, level int) []*CompactionTask { + if level < 0 || level >= NumLevels-1 { return nil } @@ -139,91 +340,76 @@ func (p *Picker) pickLevelCompaction(version *Version, level int) *CompactionTas return nil } - // 计算当前层级的总大小 - totalSize := int64(0) - for _, file := range files { - totalSize += file.FileSize - } + // 按 MinKey 排序,确保处理连续的 seq + sort.Slice(files, func(i, j int) bool { + return files[i].MinKey < files[j].MinKey + }) - // 检查是否超过大小限制 - if totalSize < p.levelSizeLimits[level] { - return nil - } + tasks := make([]*CompactionTask, 0) + currentLevelLimit := p.getLevelSizeLimit(level) - // 改进策略:根据层级压力动态调整选择策略 - // 1. 计算当前层级的压力(超过限制的倍数) - pressure := float64(totalSize) / float64(p.levelSizeLimits[level]) - - // 2. 根据压力确定目标大小和文件数量限制 - targetSize := p.getTargetCompactionSize(level + 1) - maxFiles := 10 // 默认最多 10 个文件 - - if pressure >= 10.0 { - // 压力极高(超过 10 倍):选择更多文件,增大目标 - maxFiles = 100 - targetSize *= 5 - fmt.Printf("[Compaction] L%d pressure: %.1fx (CRITICAL) - selecting up to %d files, target: %s\n", - level, pressure, maxFiles, formatBytes(targetSize)) - } else if pressure >= 5.0 { - // 压力很高(超过 5 倍) - maxFiles = 50 - targetSize *= 3 - fmt.Printf("[Compaction] L%d pressure: %.1fx (HIGH) - selecting up to %d files, target: %s\n", - level, pressure, maxFiles, formatBytes(targetSize)) - } else if pressure >= 2.0 { - // 压力较高(超过 2 倍) - maxFiles = 20 - targetSize *= 2 - fmt.Printf("[Compaction] L%d pressure: %.1fx (ELEVATED) - selecting up to %d files, target: %s\n", - level, pressure, maxFiles, formatBytes(targetSize)) - } - - // 选择文件,直到累计大小接近目标 - selectedFiles := make([]*FileMetadata, 0) - currentSize := int64(0) + // 遍历文件,累积大小,当达到当前层级的大小限制时创建任务 + var currentBatch []*FileMetadata + var currentSize int64 for _, file := range files { - selectedFiles = append(selectedFiles, file) + currentBatch = append(currentBatch, file) currentSize += file.FileSize - // 如果已经达到目标大小,停止选择 - if currentSize >= targetSize { - break - } + // 如果当前批次的大小达到当前层级的限制,创建 compaction 任务 + if currentSize >= currentLevelLimit { + tasks = append(tasks, &CompactionTask{ + Level: level, + InputFiles: currentBatch, + OutputLevel: level + 1, + }) - // 达到文件数量限制 - if len(selectedFiles) >= maxFiles { - break + // 重置批次 + currentBatch = nil + currentSize = 0 } } - return &CompactionTask{ - Level: level, - InputFiles: selectedFiles, - OutputLevel: level + 1, - } + // 不处理剩余文件(未达到大小限制的文件不升级) + // 等待更多文件累积后再升级 + + return tasks } -// getTargetCompactionSize 根据层级返回建议的 compaction 大小 -func (p *Picker) getTargetCompactionSize(level int) int64 { - switch level { - case 0: - return 2 * 1024 * 1024 // 2MB - case 1: - return 10 * 1024 * 1024 // 10MB - case 2: - return 50 * 1024 * 1024 // 50MB - case 3: - return 100 * 1024 * 1024 // 100MB - default: // L4+ - return 200 * 1024 * 1024 // 200MB - } +// GetCurrentStage 获取当前阶段(用于测试和调试) +func (p *Picker) GetCurrentStage() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.currentStage } -// ShouldCompact 判断是否需要 Compaction +// ShouldCompact 判断是否需要 Compaction(只检查,不推进阶段) func (p *Picker) ShouldCompact(version *Version) bool { - tasks := p.PickCompaction(version) - return len(tasks) > 0 + // 检查所有阶段,不推进 currentStage + p.mu.Lock() + defer p.mu.Unlock() + + // 检查 Stage 0: L0 合并 + if len(p.pickL0MergeTasks(version)) > 0 { + return true + } + + // 检查 Stage 1: L0 升级 + if len(p.pickL0UpgradeTasks(version)) > 0 { + return true + } + + // 检查 Stage 2: L1 升级 + if len(p.pickLevelCompaction(version, 1)) > 0 { + return true + } + + // 检查 Stage 3: L2 升级 + if len(p.pickLevelCompaction(version, 2)) > 0 { + return true + } + + return false } // GetLevelScore 获取每层的 Compaction 得分 (用于优先级排序) @@ -233,58 +419,30 @@ func (p *Picker) GetLevelScore(version *Version, level int) float64 { return 0 } - files := version.GetLevel(level) - - // L0 同时考虑文件数量和总大小,取较大值作为得分 - if level == 0 { - scoreByCount := float64(0) - scoreBySize := float64(0) - - if p.levelFileLimits[0] > 0 { - scoreByCount = float64(len(files)) / float64(p.levelFileLimits[0]) - } - - if p.levelSizeLimits[0] > 0 { - totalSize := int64(0) - for _, file := range files { - totalSize += file.FileSize - } - scoreBySize = float64(totalSize) / float64(p.levelSizeLimits[0]) - } - - // 返回两者中的较大值(哪个维度更紧迫) - if scoreByCount > scoreBySize { - return scoreByCount - } - return scoreBySize - } - - // L1+ 基于总大小 - if p.levelSizeLimits[level] == 0 { + // L3 是最后一层,不需要 compaction + if level == NumLevels-1 { return 0 } + files := version.GetLevel(level) + if len(files) == 0 { + return 0 + } + + // 计算总大小 totalSize := int64(0) for _, file := range files { totalSize += file.FileSize } - return float64(totalSize) / float64(p.levelSizeLimits[level]) -} + // 使用下一级的大小限制来计算得分 + // 这样可以反映出该层级需要向上合并的紧迫程度 + nextLevelLimit := p.getLevelSizeLimit(level + 1) + if nextLevelLimit == 0 { + return 0 + } -// formatBytes 格式化字节大小显示 -func formatBytes(bytes int64) string { - const unit = 1024 - if bytes < unit { - return fmt.Sprintf("%d B", bytes) - } - div, exp := int64(unit), 0 - for n := bytes / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - units := []string{"KB", "MB", "GB", "TB"} - return fmt.Sprintf("%.2f %s", float64(bytes)/float64(div), units[exp]) + return float64(totalSize) / float64(nextLevelLimit) } // Compactor 负责执行 Compaction @@ -293,7 +451,7 @@ type Compactor struct { picker *Picker versionSet *VersionSet schema *Schema - mu sync.Mutex + mu sync.RWMutex // 只保护 schema 字段的读写 } // NewCompactor 创建新的 Compactor @@ -320,9 +478,6 @@ func (c *Compactor) GetPicker() *Picker { // DoCompaction 执行一次 Compaction // 返回: VersionEdit (记录变更), error func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*VersionEdit, error) { - c.mu.Lock() - defer c.mu.Unlock() - if task == nil { return nil, fmt.Errorf("compaction task is nil") } @@ -382,7 +537,8 @@ func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*Versi // 计算平均行大小(基于输入文件的 FileMetadata) avgRowSize := c.calculateAvgRowSize(existingInputFiles, existingOutputFiles) - // 4. 写入新的 SST 文件到输出层级 + // 4. 写入新的 SST 文件 + // 传入输出层级,L0合并时根据文件大小动态决定,升级任务强制使用OutputLevel newFiles, err := c.writeOutputFiles(mergedRows, task.OutputLevel, avgRowSize) if err != nil { return nil, fmt.Errorf("write output files: %w", err) @@ -440,8 +596,11 @@ func (c *Compactor) readInputFiles(files []*FileMetadata) ([]*SSTableRow, error) } // 设置 Schema(如果可用) - if c.schema != nil { - reader.SetSchema(c.schema) + c.mu.RLock() + schema := c.schema + c.mu.RUnlock() + if schema != nil { + reader.SetSchema(schema) } // 获取文件中实际存在的所有 key(不能用 MinKey-MaxKey 范围遍历,因为 key 可能是稀疏的) @@ -548,75 +707,88 @@ func (c *Compactor) calculateAvgRowSize(inputFiles []*FileMetadata, outputFiles return totalSize / totalRows } -// writeOutputFiles 将合并后的行写入新的 SST 文件 +// writeOutputFiles 将合并后的行写入新的 SST 文件(Append-Only 优化:不 split) +// +// 设计理念: +// - Append-Only 场景:没有重叠数据,不需要增量合并 +// - 直接将所有行写入一个文件,不进行分割 +// - 简化逻辑,提高性能,减少文件数量 +// +// 为什么不 split? +// - 触发阈值已经控制了文件大小(64MB/256MB/512MB/1GB) +// - 没有必要累积到大阈值后再分割成小文件 +// - mmap 可以高效处理大文件(按需加载 4KB 页面) func (c *Compactor) writeOutputFiles(rows []*SSTableRow, level int, avgRowSize int64) ([]*FileMetadata, error) { if len(rows) == 0 { return nil, nil } - // 根据层级动态调整文件大小目标 - // L0: 2MB (快速 flush,小文件) - // L1: 10MB - // L2: 50MB - // L3: 100MB - // L4+: 200MB - targetFileSize := c.getTargetFileSize(level) - - // 应用安全系数:由于压缩率、索引开销等因素,估算值可能不准确 - // 使用 80% 的目标大小作为分割点,避免实际文件超出目标过多 - targetFileSize = targetFileSize * 80 / 100 - - var newFiles []*FileMetadata - var currentRows []*SSTableRow - var currentSize int64 - - for _, row := range rows { - // 使用平均行大小估算(基于输入文件的统计信息) - rowSize := avgRowSize - - // 如果当前文件大小超过目标,写入文件 - if currentSize > 0 && currentSize+rowSize > targetFileSize { - file, err := c.writeFile(currentRows, level) - if err != nil { - return nil, err - } - newFiles = append(newFiles, file) - - // 重置 - currentRows = nil - currentSize = 0 - } - - currentRows = append(currentRows, row) - currentSize += rowSize + // Append-Only 优化:不分割,直接写成一个文件 + file, err := c.writeFile(rows, level) + if err != nil { + return nil, err } - // 写入最后一个文件 - if len(currentRows) > 0 { - file, err := c.writeFile(currentRows, level) - if err != nil { - return nil, err - } - newFiles = append(newFiles, file) - } - - return newFiles, nil + return []*FileMetadata{file}, nil } -// getTargetFileSize 根据层级返回目标文件大小 +// getTargetFileSize 根据层级返回目标文件大小(Append-Only 优化) +// +// 设计理念: +// - 目标文件大小 = 层级大小限制(不 split) +// - 每个层级合并后产生 1 个对应大小的文件 +// - 适用于 Append-Only 场景:没有重叠数据,不需要增量合并 +// +// 层级文件大小: +// - L0: 64MB (MemTable flush 后的小文件合并) +// - L1: 256MB (L0 升级) +// - L2: 512MB (L1 升级) +// - L3: 1GB (L2 升级,最后一层) func (c *Compactor) getTargetFileSize(level int) int64 { - switch level { - case 0: - return 2 * 1024 * 1024 // 2MB - case 1: - return 10 * 1024 * 1024 // 10MB - case 2: - return 50 * 1024 * 1024 // 50MB - case 3: - return 100 * 1024 * 1024 // 100MB - default: // L4+ - return 200 * 1024 * 1024 // 200MB + return getLevelSizeLimit(level) +} + +// determineLevel 根据文件大小和源层级决定文件应该放在哪一层 +// +// 判断逻辑: +// - 如果文件大小 <= 源层级的目标文件大小 × 1.2,保持在源层级 +// - 否则,向上查找合适的层级,直到文件大小 <= 该层级的目标文件大小 × 1.2 +// - 如果都不适合,放在最后一层(L3) +// +// 1.2 倍容差的考量: +// - 目标文件大小是理想值(L0: 8MB, L1: 32MB, L2: 128MB, L3: 512MB) +// - 实际写入时可能略微超出(写入最后几行后超出) +// - 20% 容差避免文件因稍微超出而被强制升级 +// +// 示例: +// - 10MB 的文件,源层级 L0(目标 8MB × 1.2 = 9.6MB) +// → 10MB > 9.6MB,升级到 L1 +// - 30MB 的文件,源层级 L1(目标 32MB × 1.2 = 38.4MB) +// → 30MB <= 38.4MB,保持在 L1 +func (c *Compactor) determineLevel(fileSize int64, sourceLevel int) int { + // 检查文件是否适合源层级 + // 使用目标文件大小的 1.2 倍作为阈值(允许一定的溢出) + sourceTargetSize := c.getTargetFileSize(sourceLevel) + threshold := sourceTargetSize * 120 / 100 + + if fileSize <= threshold { + // 文件大小适合源层级,保持在源层级 + return sourceLevel } + + // 否则,找到合适的更高层级 + // 从源层级的下一层开始查找 + for level := sourceLevel + 1; level < NumLevels; level++ { + targetSize := c.getTargetFileSize(level) + threshold := targetSize * 120 / 100 + + if fileSize <= threshold { + return level + } + } + + // 如果都不适合,放在最后一层 + return NumLevels - 1 } // writeFile 写入单个 SST 文件 @@ -633,7 +805,10 @@ func (c *Compactor) writeFile(rows []*SSTableRow, level int) (*FileMetadata, err defer file.Close() // 使用 Compactor 的 Schema 创建 writer - writer := NewSSTableWriter(file, c.schema) + c.mu.RLock() + schema := c.schema + c.mu.RUnlock() + writer := NewSSTableWriter(file, schema) // 注意:这个方法只负责创建文件,不负责注册到 SSTableManager // 注册工作由 CompactionManager 在 VersionEdit apply 后完成 @@ -660,10 +835,15 @@ func (c *Compactor) writeFile(rows []*SSTableRow, level int) (*FileMetadata, err return nil, err } + // 根据实际文件大小决定最终层级 + // level 参数是建议的输出层级(通常是 sourceLevel + 1) + // 但我们根据文件大小重新决定,如果文件足够小可能保持在源层级 + actualLevel := c.determineLevel(fileInfo.Size(), level) + // 创建 FileMetadata metadata := &FileMetadata{ FileNumber: fileNumber, - Level: level, + Level: actualLevel, FileSize: fileInfo.Size(), MinKey: rows[0].Seq, MaxKey: rows[len(rows)-1].Seq, @@ -673,6 +853,20 @@ func (c *Compactor) writeFile(rows []*SSTableRow, level int) (*FileMetadata, err return metadata, nil } +// CompactionStats Compaction 统计信息 +type CompactionStats struct { + TotalCompactions int64 `json:"total_compactions"` // 总 compaction 次数 + LastCompactionTime time.Time `json:"last_compaction_time"` // 最后一次 compaction 时间 +} + +// LevelStats 层级统计信息 +type LevelStats struct { + Level int `json:"level"` // 层级编号 (0-3) + FileCount int `json:"file_count"` // 文件数量 + TotalSize int64 `json:"total_size"` // 总大小(字节) + Score float64 `json:"score"` // Compaction 得分 +} + // CompactionManager 管理 Compaction 流程 type CompactionManager struct { compactor *Compactor @@ -769,82 +963,110 @@ func (m *CompactionManager) maybeCompact() { } // doCompact 实际执行 compaction 的逻辑(必须在持有 compactionMu 时调用) -// 支持并发执行多个层级的 compaction +// 阶段串行 + 阶段内并发: +// - 循环执行 4 个阶段(Stage 0 → 1 → 2 → 3) +// - 同一阶段的任务并发执行(L0 的多个批次、L1 的多个批次等) +// - 不同阶段串行执行(执行完一个阶段后,基于新 version 再执行下一阶段) func (m *CompactionManager) doCompact() { - // 获取当前版本 - version := m.versionSet.GetCurrent() - if version == nil { - return - } - - // 获取所有需要 Compaction 的任务(已按优先级排序) picker := m.compactor.GetPicker() - tasks := picker.PickCompaction(version) - if len(tasks) == 0 { - // 输出诊断信息 - m.printCompactionStats(version, picker) - return - } + totalStagesExecuted := 0 - fmt.Printf("[Compaction] Found %d tasks to execute\n", len(tasks)) - - // 并发执行所有任务 - successCount := 0 - for _, task := range tasks { - // 检查是否是上次失败的文件(防止无限重试) - if len(task.InputFiles) > 0 { - firstFile := task.InputFiles[0].FileNumber - m.mu.Lock() - if m.lastFailedFile == firstFile && m.consecutiveFails >= 3 { - fmt.Printf("[Compaction] Skipping L%d file %d (failed %d times)\n", - task.Level, firstFile, m.consecutiveFails) - m.consecutiveFails = 0 - m.lastFailedFile = 0 - m.mu.Unlock() - continue - } - m.mu.Unlock() + // 循环执行 4 个阶段 + for stage := 0; stage < 4; stage++ { + // 获取当前版本(每个阶段都重新获取,因为上一阶段可能修改了文件结构) + version := m.versionSet.GetCurrent() + if version == nil { + return } - // 获取最新版本(每个任务执行前) - currentVersion := m.versionSet.GetCurrent() - if currentVersion == nil { + // 获取当前阶段的任务 + tasks := picker.PickCompaction(version) + if len(tasks) == 0 { + // 当前阶段没有任务,继续下一阶段 continue } - // 执行 Compaction - fmt.Printf("[Compaction] Starting: L%d -> L%d, files: %d\n", - task.Level, task.OutputLevel, len(task.InputFiles)) + totalStagesExecuted++ + fmt.Printf("[Compaction] Found %d tasks in stage %d to execute concurrently\n", len(tasks), stage) - err := m.DoCompactionWithVersion(task, currentVersion) - if err != nil { - fmt.Printf("[Compaction] Failed L%d -> L%d: %v\n", task.Level, task.OutputLevel, err) + // 并发执行同一阶段的所有任务 + var wg sync.WaitGroup + var successCount atomic.Int64 - // 记录失败信息 + for _, task := range tasks { + // 检查是否是上次失败的文件(防止无限重试) if len(task.InputFiles) > 0 { firstFile := task.InputFiles[0].FileNumber m.mu.Lock() - if m.lastFailedFile == firstFile { - m.consecutiveFails++ - } else { - m.lastFailedFile = firstFile - m.consecutiveFails = 1 + if m.lastFailedFile == firstFile && m.consecutiveFails >= 3 { + fmt.Printf("[Compaction] Skipping L%d file %d (failed %d times)\n", + task.Level, firstFile, m.consecutiveFails) + m.consecutiveFails = 0 + m.lastFailedFile = 0 + m.mu.Unlock() + continue } m.mu.Unlock() } - } else { - fmt.Printf("[Compaction] Completed: L%d -> L%d\n", task.Level, task.OutputLevel) - successCount++ - // 清除失败计数 - m.mu.Lock() - m.consecutiveFails = 0 - m.lastFailedFile = 0 - m.mu.Unlock() + wg.Add(1) + go func(task *CompactionTask) { + defer wg.Done() + + // 获取最新版本(每个任务执行前) + currentVersion := m.versionSet.GetCurrent() + if currentVersion == nil { + return + } + + // 执行 Compaction + fmt.Printf("[Compaction] Starting: L%d -> L%d, files: %d\n", + task.Level, task.OutputLevel, len(task.InputFiles)) + + err := m.DoCompactionWithVersion(task, currentVersion) + if err != nil { + fmt.Printf("[Compaction] Failed L%d -> L%d: %v\n", task.Level, task.OutputLevel, err) + + // 记录失败信息 + if len(task.InputFiles) > 0 { + firstFile := task.InputFiles[0].FileNumber + m.mu.Lock() + if m.lastFailedFile == firstFile { + m.consecutiveFails++ + } else { + m.lastFailedFile = firstFile + m.consecutiveFails = 1 + } + m.mu.Unlock() + } + } else { + fmt.Printf("[Compaction] Completed: L%d -> L%d\n", task.Level, task.OutputLevel) + + // 清除失败计数 + m.mu.Lock() + m.consecutiveFails = 0 + m.lastFailedFile = 0 + m.mu.Unlock() + + // 更新成功计数(使用原子操作) + successCount.Add(1) + } + }(task) } + + // 等待当前阶段的所有任务完成 + wg.Wait() + + fmt.Printf("[Compaction] Stage %d completed: %d/%d tasks succeeded\n", stage, successCount.Load(), len(tasks)) } - fmt.Printf("[Compaction] Batch completed: %d/%d tasks succeeded\n", successCount, len(tasks)) + // 如果所有阶段都没有任务,输出诊断信息 + if totalStagesExecuted == 0 { + version := m.versionSet.GetCurrent() + if version != nil { + m.printCompactionStats(version, picker) + } + } } // printCompactionStats 输出 Compaction 统计信息(每分钟一次) @@ -859,7 +1081,7 @@ func (m *CompactionManager) printCompactionStats(version *Version, picker *Picke m.lastCompactionTime = time.Now() fmt.Println("[Compaction] Status check:") - for level := range 7 { + for level := range NumLevels { files := version.GetLevel(level) if len(files) == 0 { continue @@ -914,8 +1136,11 @@ func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, versio continue } // 设置 Schema - if m.compactor.schema != nil { - reader.SetSchema(m.compactor.schema) + m.compactor.mu.RLock() + schema := m.compactor.schema + m.compactor.mu.RUnlock() + if schema != nil { + reader.SetSchema(schema) } // 添加到 SSTableManager m.sstManager.AddReader(reader) @@ -997,24 +1222,30 @@ func (m *CompactionManager) deleteObsoleteFiles(edit *VersionEdit) { } } -// TriggerCompaction 手动触发一次 Compaction(所有需要的层级) +// TriggerCompaction 手动触发一次 Compaction(遍历所有阶段) func (m *CompactionManager) TriggerCompaction() error { - version := m.versionSet.GetCurrent() - if version == nil { - return fmt.Errorf("no current version") - } - picker := m.compactor.GetPicker() - tasks := picker.PickCompaction(version) - if len(tasks) == 0 { - return nil // 不需要 Compaction - } - // 依次执行所有任务 - for _, task := range tasks { - currentVersion := m.versionSet.GetCurrent() - if err := m.DoCompactionWithVersion(task, currentVersion); err != nil { - return err + // 循环执行 4 个阶段 + for range 4 { + version := m.versionSet.GetCurrent() + if version == nil { + return fmt.Errorf("no current version") + } + + // 获取当前阶段的任务 + tasks := picker.PickCompaction(version) + if len(tasks) == 0 { + // 当前阶段没有任务,继续下一阶段 + continue + } + + // 串行执行当前阶段的所有任务 + for _, task := range tasks { + currentVersion := m.versionSet.GetCurrent() + if err := m.DoCompactionWithVersion(task, currentVersion); err != nil { + return err + } } } @@ -1022,25 +1253,25 @@ func (m *CompactionManager) TriggerCompaction() error { } // GetStats 获取 Compaction 统计信息 -func (m *CompactionManager) GetStats() map[string]any { +func (m *CompactionManager) GetStats() *CompactionStats { m.mu.RLock() defer m.mu.RUnlock() - return map[string]any{ - "total_compactions": m.totalCompactions, - "last_compaction_time": m.lastCompactionTime, + return &CompactionStats{ + TotalCompactions: m.totalCompactions, + LastCompactionTime: m.lastCompactionTime, } } // GetLevelStats 获取每层的统计信息 -func (m *CompactionManager) GetLevelStats() []map[string]any { +func (m *CompactionManager) GetLevelStats() []LevelStats { version := m.versionSet.GetCurrent() if version == nil { return nil } picker := m.compactor.GetPicker() - stats := make([]map[string]any, NumLevels) + stats := make([]LevelStats, NumLevels) for level := range NumLevels { files := version.GetLevel(level) @@ -1049,11 +1280,11 @@ func (m *CompactionManager) GetLevelStats() []map[string]any { totalSize += file.FileSize } - stats[level] = map[string]any{ - "level": level, - "file_count": len(files), - "total_size": totalSize, - "score": picker.GetLevelScore(version, level), + stats[level] = LevelStats{ + Level: level, + FileCount: len(files), + TotalSize: totalSize, + Score: picker.GetLevelScore(version, level), } } diff --git a/compaction_continuity_test.go b/compaction_continuity_test.go new file mode 100644 index 0000000..1ef7c3d --- /dev/null +++ b/compaction_continuity_test.go @@ -0,0 +1,276 @@ +package srdb + +import ( + "testing" +) + +// TestPickL0MergeContinuity 测试 L0 合并任务的连续性 +func TestPickL0MergeContinuity(t *testing.T) { + tmpDir := t.TempDir() + manifestDir := tmpDir + + versionSet, err := NewVersionSet(manifestDir) + if err != nil { + t.Fatal(err) + } + defer versionSet.Close() + + picker := NewPicker() + + // 创建混合大小的文件:小-大-小-小 + // 这是触发 bug 的场景 + edit := NewVersionEdit() + + // 文件1: 29MB (小文件) + edit.AddFile(&FileMetadata{ + FileNumber: 1, + Level: 0, + FileSize: 29 * 1024 * 1024, + MinKey: 1, + MaxKey: 100, + RowCount: 100, + }) + + // 文件2: 36MB (大文件) + edit.AddFile(&FileMetadata{ + FileNumber: 2, + Level: 0, + FileSize: 36 * 1024 * 1024, + MinKey: 101, + MaxKey: 200, + RowCount: 100, + }) + + // 文件3: 8MB (小文件) + edit.AddFile(&FileMetadata{ + FileNumber: 3, + Level: 0, + FileSize: 8 * 1024 * 1024, + MinKey: 201, + MaxKey: 300, + RowCount: 100, + }) + + // 文件4: 15MB (小文件) + edit.AddFile(&FileMetadata{ + FileNumber: 4, + Level: 0, + FileSize: 15 * 1024 * 1024, + MinKey: 301, + MaxKey: 400, + RowCount: 100, + }) + + edit.SetNextFileNumber(5) + err = versionSet.LogAndApply(edit) + if err != nil { + t.Fatal(err) + } + + version := versionSet.GetCurrent() + + // 测试 Stage 0: L0 合并任务 + t.Log("=== 测试 Stage 0: L0 合并 ===") + tasks := picker.pickL0MergeTasks(version) + + if len(tasks) == 0 { + t.Fatal("Expected L0 merge tasks") + } + + t.Logf("找到 %d 个合并任务", len(tasks)) + + // 验证任务:应该只有1个任务,包含文件3和文件4 + // 文件1是单个小文件,不合并(len > 1 才合并) + // 文件2是大文件,跳过 + // 文件3+文件4是连续的2个小文件,应该合并 + if len(tasks) != 1 { + t.Errorf("Expected 1 task, got %d", len(tasks)) + for i, task := range tasks { + t.Logf("Task %d: %d files", i+1, len(task.InputFiles)) + for _, f := range task.InputFiles { + t.Logf(" - File %d", f.FileNumber) + } + } + } + task1 := tasks[0] + if len(task1.InputFiles) != 2 { + t.Errorf("Task 1: expected 2 files, got %d", len(task1.InputFiles)) + } + if task1.InputFiles[0].FileNumber != 3 || task1.InputFiles[1].FileNumber != 4 { + t.Errorf("Task 1: expected files 3,4, got %d,%d", + task1.InputFiles[0].FileNumber, task1.InputFiles[1].FileNumber) + } + t.Logf("✓ 合并任务: 文件3+文件4 (连续的2个小文件)") + t.Logf("✓ 文件1 (单个小文件) 不合并,留给升级阶段") + + // 验证 seq 范围连续性 + // 任务1: seq 201-400 (文件3+文件4) + // 文件1(seq 1-100, 单个小文件)留给升级阶段 + // 文件2(seq 101-200, 大文件)留给 Stage 1 + if task1.InputFiles[0].MinKey != 201 || task1.InputFiles[1].MaxKey != 400 { + t.Errorf("Task 1 seq range incorrect: [%d, %d]", + task1.InputFiles[0].MinKey, task1.InputFiles[1].MaxKey) + } + t.Logf("✓ Seq 范围正确:任务1 [201-400]") + + // 测试 Stage 1: L0 升级任务 + t.Log("=== 测试 Stage 1: L0 升级 ===") + upgradeTasks := picker.pickL0UpgradeTasks(version) + + if len(upgradeTasks) == 0 { + t.Fatal("Expected L0 upgrade tasks") + } + + // 应该有1个任务:以文件2(大文件)为中心,搭配周围的小文件 + // 文件2向左收集文件1,向右收集文件3和文件4 + // 总共:文件1 (29MB) + 文件2 (36MB) + 文件3 (8MB) + 文件4 (15MB) = 88MB + if len(upgradeTasks) != 1 { + t.Errorf("Expected 1 upgrade task, got %d", len(upgradeTasks)) + } + upgradeTask := upgradeTasks[0] + + // 应该包含所有4个文件 + if len(upgradeTask.InputFiles) != 4 { + t.Errorf("Upgrade task: expected 4 files, got %d", len(upgradeTask.InputFiles)) + for i, f := range upgradeTask.InputFiles { + t.Logf(" File %d: %d", i+1, f.FileNumber) + } + } + + // 验证文件顺序:1, 2, 3, 4 + expectedFiles := []int64{1, 2, 3, 4} + for i, expected := range expectedFiles { + if upgradeTask.InputFiles[i].FileNumber != expected { + t.Errorf("Upgrade task file %d: expected %d, got %d", + i, expected, upgradeTask.InputFiles[i].FileNumber) + } + } + + if upgradeTask.OutputLevel != 1 { + t.Errorf("Upgrade task: expected OutputLevel 1, got %d", upgradeTask.OutputLevel) + } + t.Logf("✓ 升级任务: 文件1+文件2+文件3+文件4 (以大文件为中心,搭配周围小文件) → L1") + + t.Log("=== 连续性测试通过 ===") +} + +// TestPickL0UpgradeContinuity 测试 L0 升级任务的连续性 +func TestPickL0UpgradeContinuity(t *testing.T) { + tmpDir := t.TempDir() + manifestDir := tmpDir + + versionSet, err := NewVersionSet(manifestDir) + if err != nil { + t.Fatal(err) + } + defer versionSet.Close() + + picker := NewPicker() + + // 创建混合大小的文件:大-小-大-大 + edit := NewVersionEdit() + + // 文件1: 40MB (大文件) + edit.AddFile(&FileMetadata{ + FileNumber: 1, + Level: 0, + FileSize: 40 * 1024 * 1024, + MinKey: 1, + MaxKey: 100, + RowCount: 100, + }) + + // 文件2: 20MB (小文件) + edit.AddFile(&FileMetadata{ + FileNumber: 2, + Level: 0, + FileSize: 20 * 1024 * 1024, + MinKey: 101, + MaxKey: 200, + RowCount: 100, + }) + + // 文件3: 50MB (大文件) + edit.AddFile(&FileMetadata{ + FileNumber: 3, + Level: 0, + FileSize: 50 * 1024 * 1024, + MinKey: 201, + MaxKey: 300, + RowCount: 100, + }) + + // 文件4: 45MB (大文件) + edit.AddFile(&FileMetadata{ + FileNumber: 4, + Level: 0, + FileSize: 45 * 1024 * 1024, + MinKey: 301, + MaxKey: 400, + RowCount: 100, + }) + + edit.SetNextFileNumber(5) + err = versionSet.LogAndApply(edit) + if err != nil { + t.Fatal(err) + } + + version := versionSet.GetCurrent() + + // 测试 L0 升级任务 + t.Log("=== 测试 L0 升级任务连续性 ===") + tasks := picker.pickL0UpgradeTasks(version) + + if len(tasks) == 0 { + t.Fatal("Expected L0 upgrade tasks") + } + + t.Logf("找到 %d 个升级任务", len(tasks)) + + // 验证任务1:应该包含所有4个文件(以大文件为锚点,搭配周围文件) + // 文件1(大文件)作为锚点 → 向左无文件 → 向右收集文件2(小)+文件3(大)+文件4(大) + // 总大小:40+20+50+45 = 155MB < 256MB,符合 L1 限制 + task1 := tasks[0] + expectedFileCount := 4 + if len(task1.InputFiles) != expectedFileCount { + t.Errorf("Task 1: expected %d files, got %d", expectedFileCount, len(task1.InputFiles)) + for i, f := range task1.InputFiles { + t.Logf(" File %d: %d", i+1, f.FileNumber) + } + } + + // 验证文件顺序:1, 2, 3, 4 + expectedFiles := []int64{1, 2, 3, 4} + for i, expected := range expectedFiles { + if i >= len(task1.InputFiles) { + break + } + if task1.InputFiles[i].FileNumber != expected { + t.Errorf("Task 1 file %d: expected %d, got %d", + i, expected, task1.InputFiles[i].FileNumber) + } + } + t.Logf("✓ Task 1: 文件1+文件2+文件3+文件4 (以大文件为锚点,搭配周围文件,总155MB < 256MB)") + + // 只应该有1个任务(所有文件都被收集了) + if len(tasks) != 1 { + t.Errorf("Expected 1 task (all files collected), got %d", len(tasks)) + for i, task := range tasks { + t.Logf("Task %d: %d files", i+1, len(task.InputFiles)) + for _, f := range task.InputFiles { + t.Logf(" - File %d", f.FileNumber) + } + } + } + + // 验证所有任务的 OutputLevel 都是 1 + for i, task := range tasks { + if task.OutputLevel != 1 { + t.Errorf("Task %d: expected OutputLevel 1, got %d", i+1, task.OutputLevel) + } + } + t.Logf("✓ 所有任务都升级到 L1") + + t.Log("=== 升级任务连续性测试通过 ===") +} diff --git a/compaction_stage_test.go b/compaction_stage_test.go new file mode 100644 index 0000000..f85b945 --- /dev/null +++ b/compaction_stage_test.go @@ -0,0 +1,220 @@ +package srdb + +import ( + "testing" +) + +// TestPickerStageRotation 测试 Picker 的阶段轮换机制 +func TestPickerStageRotation(t *testing.T) { + // 创建临时目录 + tmpDir := t.TempDir() + manifestDir := tmpDir + + // 创建 VersionSet + versionSet, err := NewVersionSet(manifestDir) + if err != nil { + t.Fatal(err) + } + defer versionSet.Close() + + // 创建 Picker + picker := NewPicker() + + // 初始阶段应该是 L0 + if stage := picker.GetCurrentStage(); stage != 0 { + t.Errorf("Initial stage should be 0 (L0), got %d", stage) + } + + // 添加 L0 文件(触发 L0 compaction) + edit := NewVersionEdit() + for i := 0; i < 10; i++ { + edit.AddFile(&FileMetadata{ + FileNumber: int64(i + 1), + Level: 0, + FileSize: 10 * 1024 * 1024, // 10MB each + MinKey: int64(i * 100), + MaxKey: int64((i+1)*100 - 1), + RowCount: 100, + }) + } + edit.SetNextFileNumber(11) + err = versionSet.LogAndApply(edit) + if err != nil { + t.Fatal(err) + } + + version := versionSet.GetCurrent() + + // 第1次调用:应该返回 L0 任务,然后推进到 L1 + t.Log("=== 第1次调用 PickCompaction ===") + tasks1 := picker.PickCompaction(version) + if len(tasks1) == 0 { + t.Error("Expected L0 tasks on first call") + } + for _, task := range tasks1 { + if task.Level != 0 { + t.Errorf("Expected L0 task, got L%d", task.Level) + } + } + if stage := picker.GetCurrentStage(); stage != 1 { + t.Errorf("After L0 tasks, stage should be 1 (L1), got %d", stage) + } + t.Logf("✓ Returned %d L0 tasks, stage advanced to L1", len(tasks1)) + + // 第2次调用:应该尝试 Stage 1 (L0-upgrade,没有大文件) + t.Log("=== 第2次调用 PickCompaction ===") + tasks2 := picker.PickCompaction(version) + if len(tasks2) == 0 { + t.Log("✓ Stage 1 (L0-upgrade) has no tasks") + } + // 此时 stage 应该已经循环(尝试了 Stage 1→2→3→0...) + if stage := picker.GetCurrentStage(); stage >= 0 { + t.Logf("After trying, current stage is %d", stage) + } + + // 现在添加 L1 文件 + edit2 := NewVersionEdit() + for i := 0; i < 20; i++ { + edit2.AddFile(&FileMetadata{ + FileNumber: int64(100 + i + 1), + Level: 1, + FileSize: 20 * 1024 * 1024, // 20MB each + MinKey: int64(i * 200), + MaxKey: int64((i+1)*200 - 1), + RowCount: 200, + }) + } + edit2.SetNextFileNumber(121) + err = versionSet.LogAndApply(edit2) + if err != nil { + t.Fatal(err) + } + + version2 := versionSet.GetCurrent() + + // 现在可能需要多次调用才能到达 Stage 2 (L1-upgrade) + // 因为要经过 Stage 1 (L0-upgrade) 和 Stage 0 (L0-merge) + t.Log("=== 多次调用 PickCompaction 直到找到 L1 任务 ===") + var tasks3 []*CompactionTask + for i := 0; i < 8; i++ { // 最多尝试两轮(4个阶段×2) + tasks3 = picker.PickCompaction(version2) + if len(tasks3) > 0 && tasks3[0].Level == 1 { + t.Logf("✓ Found %d L1 tasks after %d attempts", len(tasks3), i+1) + break + } + } + if len(tasks3) == 0 || tasks3[0].Level != 1 { + t.Error("Expected to find L1 tasks within 8 attempts") + } + + t.Log("=== Stage rotation test passed ===") +} + +// TestPickerStageWithMultipleLevels 测试多层级同时有任务时的阶段轮换 +func TestPickerStageWithMultipleLevels(t *testing.T) { + tmpDir := t.TempDir() + manifestDir := tmpDir + + versionSet, err := NewVersionSet(manifestDir) + if err != nil { + t.Fatal(err) + } + defer versionSet.Close() + + picker := NewPicker() + + // 同时添加 L0、L1、L2 文件 + edit := NewVersionEdit() + + // L0 小文件: 5 files × 10MB = 50MB (应该触发 Stage 0: L0-merge) + for i := 0; i < 5; i++ { + edit.AddFile(&FileMetadata{ + FileNumber: int64(i + 1), + Level: 0, + FileSize: 10 * 1024 * 1024, + MinKey: int64(i * 100), + MaxKey: int64((i+1)*100 - 1), + RowCount: 100, + }) + } + + // L0 大文件: 5 files × 40MB = 200MB (应该触发 Stage 1: L0-upgrade) + for i := 0; i < 5; i++ { + edit.AddFile(&FileMetadata{ + FileNumber: int64(10 + i + 1), + Level: 0, + FileSize: 40 * 1024 * 1024, + MinKey: int64((i+5) * 100), + MaxKey: int64((i+6)*100 - 1), + RowCount: 100, + }) + } + + // L1: 20 files × 20MB = 400MB (应该触发 Stage 2: L1-upgrade,256MB阈值) + for i := 0; i < 20; i++ { + edit.AddFile(&FileMetadata{ + FileNumber: int64(100 + i + 1), + Level: 1, + FileSize: 20 * 1024 * 1024, + MinKey: int64(i * 200), + MaxKey: int64((i+1)*200 - 1), + RowCount: 200, + }) + } + + // L2: 10 files × 150MB = 1500MB (应该触发 Stage 3: L2-upgrade,1GB阈值) + for i := 0; i < 10; i++ { + edit.AddFile(&FileMetadata{ + FileNumber: int64(200 + i + 1), + Level: 2, + FileSize: 150 * 1024 * 1024, + MinKey: int64(i * 300), + MaxKey: int64((i+1)*300 - 1), + RowCount: 300, + }) + } + + edit.SetNextFileNumber(301) + err = versionSet.LogAndApply(edit) + if err != nil { + t.Fatal(err) + } + + version := versionSet.GetCurrent() + + // 验证阶段按顺序执行:Stage 0→1→2→3→0→1→2→3 + expectedStages := []struct { + stage int + name string + level int + }{ + {0, "L0-merge", 0}, + {1, "L0-upgrade", 0}, + {2, "L1-upgrade", 1}, + {3, "L2-upgrade", 2}, + {0, "L0-merge", 0}, + {1, "L0-upgrade", 0}, + {2, "L1-upgrade", 1}, + {3, "L2-upgrade", 2}, + } + + for i, expected := range expectedStages { + t.Logf("=== 第%d次调用 PickCompaction (期望 Stage %d: %s) ===", i+1, expected.stage, expected.name) + tasks := picker.PickCompaction(version) + + if len(tasks) == 0 { + t.Errorf("Call %d: Expected tasks from Stage %d (%s), got no tasks", i+1, expected.stage, expected.name) + continue + } + + actualLevel := tasks[0].Level + if actualLevel != expected.level { + t.Errorf("Call %d: Expected L%d tasks, got L%d tasks", i+1, expected.level, actualLevel) + } else { + t.Logf("✓ Call %d: Got %d tasks from L%d (Stage %d: %s) as expected", + i+1, len(tasks), actualLevel, expected.stage, expected.name) + } + } + + t.Log("=== Multi-level stage rotation test passed ===") +} diff --git a/compaction_test.go b/compaction_test.go index aa9950a..7a8e14d 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -143,12 +143,14 @@ func TestCompactionBasic(t *testing.T) { t.Errorf("Expected L0 compaction, got L%d", task.Level) } - if task.OutputLevel != 1 { - t.Errorf("Expected output to L1, got L%d", task.OutputLevel) + // 注意:L0 compaction 任务的 OutputLevel 设为 0(建议层级) + // 实际层级由 determineLevel 根据合并后的文件大小决定 + if task.OutputLevel != 0 { + t.Errorf("Expected output to L0 (suggested), got L%d", task.OutputLevel) } t.Logf("Found %d compaction tasks", len(tasks)) - t.Logf("First task: L%d -> L%d, %d files", task.Level, task.OutputLevel, len(task.InputFiles)) + t.Logf("First task: L%d -> L%d, %d files (determineLevel will decide actual level)", task.Level, task.OutputLevel, len(task.InputFiles)) // 清理 reader1.Close() @@ -193,12 +195,18 @@ func TestPickerLevelScore(t *testing.T) { // 计算 L0 的得分 score := picker.GetLevelScore(version, 0) - t.Logf("L0 score: %.2f (files: %d, limit: %d)", score, version.GetLevelFileCount(0), picker.levelFileLimits[0]) - // L0 有 3 个文件,限制是 4,得分应该是 0.75 - expectedScore := 3.0 / 4.0 + // L0 有 3 个文件,每个 1MB,总共 3MB + // 下一级(L1)的限制是 256MB + // 得分应该是 3MB / 256MB = 0.01171875 + totalSize := int64(3 * 1024 * 1024) // 3MB + expectedScore := float64(totalSize) / float64(level1SizeLimit) + + t.Logf("L0 score: %.4f (files: %d, total: %d bytes, next level limit: %d)", + score, version.GetLevelFileCount(0), totalSize, level1SizeLimit) + if score != expectedScore { - t.Errorf("Expected L0 score %.2f, got %.2f", expectedScore, score) + t.Errorf("Expected L0 score %.4f, got %.4f", expectedScore, score) } } @@ -544,9 +552,9 @@ func TestCompactionQueryOrder(t *testing.T) { stats := table.GetCompactionManager().GetLevelStats() t.Logf("Compaction 统计:") for _, levelStat := range stats { - level := levelStat["level"].(int) - fileCount := levelStat["file_count"].(int) - totalSize := levelStat["total_size"].(int64) + level := levelStat.Level + fileCount := levelStat.FileCount + totalSize := levelStat.TotalSize if fileCount > 0 { t.Logf(" L%d: %d 个文件, %.2f MB", level, fileCount, float64(totalSize)/(1024*1024)) } diff --git a/table_test.go b/table_test.go index b15bf64..a078dd4 100644 --- a/table_test.go +++ b/table_test.go @@ -948,10 +948,10 @@ func TestTableWithCompaction(t *testing.T) { // 获取 Level 统计信息 levelStats := table.compactionManager.GetLevelStats() for _, stat := range levelStats { - level := stat["level"].(int) - fileCount := stat["file_count"].(int) - totalSize := stat["total_size"].(int64) - score := stat["score"].(float64) + level := stat.Level + fileCount := stat.FileCount + totalSize := stat.TotalSize + score := stat.Score if fileCount > 0 { t.Logf("L%d: %d files, %d bytes, score: %.2f", level, fileCount, totalSize, score) diff --git a/version.go b/version.go index 3e1297f..d8971bc 100644 --- a/version.go +++ b/version.go @@ -16,7 +16,7 @@ import ( // FileMetadata SST 文件元数据 type FileMetadata struct { FileNumber int64 // 文件编号 - Level int // 所在层级 (0-6) + Level int // 所在层级 (0-3) FileSize int64 // 文件大小 MinKey int64 // 最小 key MaxKey int64 // 最大 key @@ -24,12 +24,12 @@ type FileMetadata struct { } const ( - NumLevels = 7 // L0-L6 + NumLevels = 4 // L0-L3 ) // Version 数据库的一个版本快照 type Version struct { - // 分层存储 SST 文件 (L0-L6) + // 分层存储 SST 文件 (L0-L3) Levels [NumLevels][]*FileMetadata // 下一个文件编号 diff --git a/webui/static/index.html b/webui/static/index.html index aadeb8a..acf1c1c 100644 --- a/webui/static/index.html +++ b/webui/static/index.html @@ -11,6 +11,17 @@ rel="stylesheet" /> + + + @@ -20,6 +31,6 @@ - + diff --git a/webui/static/js/common/api.js b/webui/static/js/common/api.js new file mode 100644 index 0000000..56050c8 --- /dev/null +++ b/webui/static/js/common/api.js @@ -0,0 +1,174 @@ +/** + * API 请求管理模块 + * 统一管理所有后端接口请求 + */ + +const API_BASE = '/api'; + +/** + * 通用请求处理函数 + * @param {string} url - 请求 URL + * @param {RequestInit} options - fetch 选项 + * @returns {Promise} + */ +async function request(url, options = {}) { + try { + const response = await fetch(url, { + headers: { + 'Content-Type': 'application/json', + ...options.headers, + }, + ...options, + }); + + if (!response.ok) { + const error = new Error(`HTTP ${response.status}: ${response.statusText}`); + error.status = response.status; + error.response = response; + throw error; + } + + return await response.json(); + } catch (error) { + console.error('API request failed:', url, error); + throw error; + } +} + +/** + * 表相关 API + */ +export const tableAPI = { + /** + * 获取所有表列表 + * @returns {Promise} + */ + async list() { + return request(`${API_BASE}/tables`); + }, + + /** + * 获取表的 Schema + * @param {string} tableName - 表名 + * @returns {Promise} + */ + async getSchema(tableName) { + return request(`${API_BASE}/tables/${tableName}/schema`); + }, + + /** + * 获取表数据(分页) + * @param {string} tableName - 表名 + * @param {Object} params - 查询参数 + * @param {number} params.page - 页码 + * @param {number} params.pageSize - 每页大小 + * @param {string} params.select - 选择的列(逗号分隔) + * @returns {Promise} + */ + async getData(tableName, { page = 1, pageSize = 20, select = '' } = {}) { + const params = new URLSearchParams({ + page: page.toString(), + pageSize: pageSize.toString(), + }); + + if (select) { + params.append('select', select); + } + + return request(`${API_BASE}/tables/${tableName}/data?${params}`); + }, + + /** + * 获取单行数据详情 + * @param {string} tableName - 表名 + * @param {number} seq - 序列号 + * @returns {Promise} + */ + async getRow(tableName, seq) { + return request(`${API_BASE}/tables/${tableName}/data/${seq}`); + }, + + /** + * 获取表的 Manifest 信息 + * @param {string} tableName - 表名 + * @returns {Promise} + */ + async getManifest(tableName) { + return request(`${API_BASE}/tables/${tableName}/manifest`); + }, + + /** + * 插入数据 + * @param {string} tableName - 表名 + * @param {Object} data - 数据对象 + * @returns {Promise} + */ + async insert(tableName, data) { + return request(`${API_BASE}/tables/${tableName}/data`, { + method: 'POST', + body: JSON.stringify(data), + }); + }, + + /** + * 批量插入数据 + * @param {string} tableName - 表名 + * @param {Array} data - 数据数组 + * @returns {Promise} + */ + async batchInsert(tableName, data) { + return request(`${API_BASE}/tables/${tableName}/data/batch`, { + method: 'POST', + body: JSON.stringify(data), + }); + }, + + /** + * 删除表 + * @param {string} tableName - 表名 + * @returns {Promise} + */ + async delete(tableName) { + return request(`${API_BASE}/tables/${tableName}`, { + method: 'DELETE', + }); + }, + + /** + * 获取表统计信息 + * @param {string} tableName - 表名 + * @returns {Promise} + */ + async getStats(tableName) { + return request(`${API_BASE}/tables/${tableName}/stats`); + }, +}; + +/** + * 数据库相关 API + */ +export const databaseAPI = { + /** + * 获取数据库信息 + * @returns {Promise} + */ + async getInfo() { + return request(`${API_BASE}/database/info`); + }, + + /** + * 获取数据库统计信息 + * @returns {Promise} + */ + async getStats() { + return request(`${API_BASE}/database/stats`); + }, +}; + +/** + * 导出默认 API 对象 + */ +export default { + table: tableAPI, + database: databaseAPI, +}; diff --git a/webui/static/js/styles/shared-styles.js b/webui/static/js/common/shared-styles.js similarity index 96% rename from webui/static/js/styles/shared-styles.js rename to webui/static/js/common/shared-styles.js index dff0178..a0f5395 100644 --- a/webui/static/js/styles/shared-styles.js +++ b/webui/static/js/common/shared-styles.js @@ -1,4 +1,4 @@ -import { css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; +import { css } from 'lit'; // 共享的基础样式 export const sharedStyles = css` diff --git a/webui/static/js/components/app.js b/webui/static/js/components/app.js index d0981de..addb0e4 100644 --- a/webui/static/js/components/app.js +++ b/webui/static/js/components/app.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class AppContainer extends LitElement { static properties = { diff --git a/webui/static/js/components/badge.js b/webui/static/js/components/badge.js index 82e1a16..c34781c 100644 --- a/webui/static/js/components/badge.js +++ b/webui/static/js/components/badge.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class Badge extends LitElement { static properties = { diff --git a/webui/static/js/components/data-view.js b/webui/static/js/components/data-view.js index 0f6d9c5..bb3e103 100644 --- a/webui/static/js/components/data-view.js +++ b/webui/static/js/components/data-view.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class DataView extends LitElement { static properties = { diff --git a/webui/static/js/components/field-icon.js b/webui/static/js/components/field-icon.js index 4e5c986..e369663 100644 --- a/webui/static/js/components/field-icon.js +++ b/webui/static/js/components/field-icon.js @@ -1,10 +1,9 @@ -import { LitElement, html, css, svg } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; +import { LitElement, html, css } from 'lit'; export class FieldIcon extends LitElement { static properties = { indexed: { type: Boolean } }; - static styles = css` :host { display: inline-flex; diff --git a/webui/static/js/components/manifest-view.js b/webui/static/js/components/manifest-view.js index eec7a41..f3c88fa 100644 --- a/webui/static/js/components/manifest-view.js +++ b/webui/static/js/components/manifest-view.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class ManifestView extends LitElement { static properties = { diff --git a/webui/static/js/components/modal-dialog.js b/webui/static/js/components/modal-dialog.js index 26660d2..6e19f65 100644 --- a/webui/static/js/components/modal-dialog.js +++ b/webui/static/js/components/modal-dialog.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class ModalDialog extends LitElement { static properties = { diff --git a/webui/static/js/components/page-header.js b/webui/static/js/components/page-header.js index 67da256..0364812 100644 --- a/webui/static/js/components/page-header.js +++ b/webui/static/js/components/page-header.js @@ -1,5 +1,5 @@ -import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; -import { sharedStyles, cssVariables } from '../styles/shared-styles.js'; +import { LitElement, html, css } from 'lit'; +import { sharedStyles, cssVariables } from '~/common/shared-styles.js'; export class PageHeader extends LitElement { static properties = { @@ -247,11 +247,11 @@ export class PageHeader extends LitElement { > Data -