Files
srdb/compaction.go

1545 lines
43 KiB
Go
Raw Permalink Normal View History

package srdb
import (
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
)
// Compaction 层级大小限制Append-Only 优化设计)
//
// 设计理念:
// - 触发阈值 = 目标文件大小(不 split
// - 每个层级累积到阈值后,直接合并成一个对应大小的文件
// - 适用于 Append-Only 场景:没有更新/删除,不需要增量合并
//
// 触发规则:
// - L0 累积到 64MB → 合并成 1 个 64MB 文件,升级到 L1
// - L1 累积到 256MB → 合并成 1 个 256MB 文件,升级到 L2
// - L2 累积到 512MB → 合并成 1 个 512MB 文件,升级到 L3
// - L3 累积到 1GB → 保持在 L3最后一层
//
// 层级设计Append-Only 优化):
// - L064MB MemTable flush小文件合并
// - L1256MB L0 升级,减少文件数)
// - L2512MB L1 升级,温数据)
// - L31GB L2 升级,冷数据,最后一层)
const (
level0SizeLimit = 64 * 1024 * 1024 // 64MB
level1SizeLimit = 256 * 1024 * 1024 // 256MB
level2SizeLimit = 512 * 1024 * 1024 // 512MB
level3SizeLimit = 1024 * 1024 * 1024 // 1GB
)
// getLevelSizeLimit 获取层级大小限制(私有函数,供 Picker 和 Compactor 共用)
func getLevelSizeLimit(level int) int64 {
switch level {
case 0:
return level0SizeLimit
case 1:
return level1SizeLimit
case 2:
return level2SizeLimit
case 3:
return level3SizeLimit
default:
return level3SizeLimit
}
}
// CompactionTask 表示一个 Compaction 任务
type CompactionTask struct {
Level int // 源层级
InputFiles []*FileMetadata // 需要合并的输入文件
OutputLevel int // 输出层级
}
// Picker 负责选择需要 Compaction 的文件
type Picker struct {
mu sync.Mutex
currentStage int // 当前阶段0=L0合并, 1=L0升级, 2=L1升级, 3=L2升级
// 层级大小限制(可配置)
level0SizeLimit int64
level1SizeLimit int64
level2SizeLimit int64
level3SizeLimit int64
}
// NewPicker 创建新的 Compaction Picker使用默认值
func NewPicker() *Picker {
return &Picker{
currentStage: 0, // 从 L0 合并开始
level0SizeLimit: level0SizeLimit,
level1SizeLimit: level1SizeLimit,
level2SizeLimit: level2SizeLimit,
level3SizeLimit: level3SizeLimit,
}
}
// UpdateLevelLimits 更新层级大小限制(由 CompactionManager 调用)
func (p *Picker) UpdateLevelLimits(l0, l1, l2, l3 int64) {
p.mu.Lock()
defer p.mu.Unlock()
p.level0SizeLimit = l0
p.level1SizeLimit = l1
p.level2SizeLimit = l2
p.level3SizeLimit = l3
}
// getLevelSizeLimit 获取层级大小限制(从配置读取)
// 注意:层级配置在 UpdateLevelLimits 后不会改变,因此不需要加锁
func (p *Picker) getLevelSizeLimit(level int) int64 {
switch level {
case 0:
return p.level0SizeLimit
case 1:
return p.level1SizeLimit
case 2:
return p.level2SizeLimit
case 3:
return p.level3SizeLimit
default:
return p.level3SizeLimit
}
}
// PickCompaction 选择需要 Compaction 的任务(按阶段返回,阶段内并发执行)
// 返回空切片表示当前阶段不需要 Compaction
//
// 执行策略4 阶段串行 + 阶段内并发):
// 1. Stage 0 - L0 合并:小文件合并,减少 L0 文件数(可能保持在 L0
// 2. Stage 1 - L0 升级:大文件或已合并文件升级到 L1
// 3. Stage 2 - L1 升级L1 文件升级到 L2
// 4. Stage 3 - L2 升级L2 文件升级到 L3
//
// 阶段控制:
// - 使用 currentStage 跟踪当前应该执行哪个阶段0-3
// - 每次调用返回当前阶段的任务,然后自动推进到下一阶段
// - 调用者应该循环调用此方法以确保所有阶段都被尝试
//
// 为什么阶段内可以并发?
// - 同一阶段的任务处理不同的文件批次(按 seq 连续划分)
// - 这些批次的文件不重叠,可以安全并发
//
// 为什么阶段间要串行?
// - L0 执行后可能产生新文件,影响下一阶段的任务计算
// - 必须基于最新的 version 重新计算下一阶段的任务
func (p *Picker) PickCompaction(version *Version) []*CompactionTask {
p.mu.Lock()
defer p.mu.Unlock()
var tasks []*CompactionTask
// 根据当前阶段选择任务
switch p.currentStage {
case 0:
// Stage 0: L0 合并任务
tasks = p.pickL0MergeTasks(version)
case 1:
// Stage 1: L0 升级任务
tasks = p.pickL0UpgradeTasks(version)
case 2:
// Stage 2: L1 升级任务
tasks = p.pickLevelCompaction(version, 1)
case 3:
// Stage 3: L2 升级任务
tasks = p.pickLevelCompaction(version, 2)
}
// 推进到下一阶段(无论是否有任务),这里巧妙地
// 使用了取模运算来保证阶段递增与阶段重置。
p.currentStage = (p.currentStage + 1) % 4
return tasks
}
// pickL0MergeTasks 选择 L0 的合并任务Stage 0
//
// 策略:
// - 按 seq 顺序遍历,只合并连续的小文件块
// - 遇到大文件(≥ 32MB时停止当前批次创建任务
// - 累积到 64MB 时创建一个合并任务
// - OutputLevel=0让 determineLevel 决定是否保持在 L0
//
// 为什么必须连续?
// - 不能跳过中间文件,否则会导致 seq 范围不连续
// - 例如:[文件1: seq 1-100, 29MB] [文件2: seq 101-200, 36MB] [文件3: seq 201-300, 8MB]
// - 不能合并文件1和文件3否则新文件 seq 范围是 [1-100, 201-300],缺失 101-200
//
// 目的:
// - 减少 L0 文件数(防止读放大)
// - 小文件在 L0 内部合并,大文件留给 Stage 1 处理
func (p *Picker) pickL0MergeTasks(version *Version) []*CompactionTask {
files := version.GetLevel(0)
if len(files) == 0 {
return nil
}
// 按 MinKey 排序,确保处理连续的 seq
sort.Slice(files, func(i, j int) bool {
return files[i].MinKey < files[j].MinKey
})
const smallFileThreshold = 32 * 1024 * 1024 // 32MB
tasks := make([]*CompactionTask, 0)
var currentBatch []*FileMetadata
var currentSize int64
for _, file := range files {
// 如果是大文件,停止当前批次
if file.FileSize >= smallFileThreshold {
// 如果当前批次有多个小文件(> 1创建合并任务
if len(currentBatch) > 1 {
tasks = append(tasks, &CompactionTask{
Level: 0,
InputFiles: currentBatch,
OutputLevel: 0,
})
}
// 重置批次(单个小文件不合并,留给升级阶段)
currentBatch = nil
currentSize = 0
// 跳过大文件,留给 Stage 1 处理
continue
}
// 小文件:加入当前批次
currentBatch = append(currentBatch, file)
currentSize += file.FileSize
// 累积到 64MB 时创建合并任务
if currentSize >= level0SizeLimit {
tasks = append(tasks, &CompactionTask{
Level: 0,
InputFiles: currentBatch,
OutputLevel: 0, // 建议 L0determineLevel 会根据大小决定
})
currentBatch = nil
currentSize = 0
}
}
// 剩余的小文件:只有 > 1 个才创建合并任务
if len(currentBatch) > 1 {
tasks = append(tasks, &CompactionTask{
Level: 0,
InputFiles: currentBatch,
OutputLevel: 0,
})
}
return tasks
}
// pickL0UpgradeTasks 选择 L0 的升级任务Stage 1
//
// 策略:
// - 以大文件(≥ 32MB为中心搭配前后的文件一起升级
// - 找到大文件后,向左右扩展收集文件,直到累积到 256MBL1 限制)
// - 这样可以把大文件周围的小文件也一起带走,更高效地清理 L0
// - OutputLevel=1强制升级到 L1+
//
// 为什么要搭配周围文件?
// - 大文件是升级的主体,周围的小文件(包括单个小文件)可以顺便带走
// - 避免小文件留在 L0 成为孤立文件
// - 例如:[小20MB] [大40MB] [小15MB] → 一起升级 → L1
//
// 目的:
// - 将成熟的大文件推到 L1
// - 顺便清理周围的小文件,为 L0 腾出空间
func (p *Picker) pickL0UpgradeTasks(version *Version) []*CompactionTask {
files := version.GetLevel(0)
if len(files) == 0 {
return nil
}
// 按 MinKey 排序,确保处理连续的 seq
sort.Slice(files, func(i, j int) bool {
return files[i].MinKey < files[j].MinKey
})
const largeFileThreshold = 32 * 1024 * 1024 // 32MB
tasks := make([]*CompactionTask, 0)
processed := make(map[int64]bool) // 跟踪已处理的文件
// 遍历文件,找到大文件作为起点
for i, file := range files {
// 跳过已处理的文件
if processed[file.FileNumber] {
continue
}
// 如果不是大文件,跳过(小文件等待被大文件带走)
if file.FileSize < largeFileThreshold {
continue
}
// 找到大文件,以它为中心,向左右扩展收集文件
var batch []*FileMetadata
var batchSize int64
// 向左收集:找到连续的未处理文件
left := i - 1
var leftFiles []*FileMetadata
for left >= 0 && !processed[files[left].FileNumber] {
leftFiles = append([]*FileMetadata{files[left]}, leftFiles...) // 前插
left--
}
// 加入左边的文件
for _, f := range leftFiles {
batch = append(batch, f)
batchSize += f.FileSize
processed[f.FileNumber] = true
}
// 加入中心的大文件
batch = append(batch, file)
batchSize += file.FileSize
processed[file.FileNumber] = true
// 向右收集:继续收集直到达到 256MB 或遇到已处理文件
right := i + 1
for right < len(files) && !processed[files[right].FileNumber] {
// 检查是否超过限制
if batchSize+files[right].FileSize > level1SizeLimit {
break
}
batch = append(batch, files[right])
batchSize += files[right].FileSize
processed[files[right].FileNumber] = true
right++
}
// 创建升级任务
if len(batch) > 0 {
tasks = append(tasks, &CompactionTask{
Level: 0,
InputFiles: batch,
OutputLevel: 1, // 升级到 L1+
})
}
}
return tasks
}
// pickLevelCompaction 选择指定层级的 Compaction 任务(用于 L1、L2返回所有任务可并发执行
//
// 触发规则:
// - 按 seq 顺序遍历当前层级的文件
// - 累积连续文件的大小
// - 当累积大小 >= 当前层级的大小限制时,创建一个 compaction 任务
// - 重置累积,继续处理剩余文件
// - 返回所有任务(可并发执行)
//
// 示例L1 有 5 个文件 [50MB, 60MB, 70MB, 80MB, 90MB]
// - L1 的限制是 256MB
// - 文件1+2+3+4 = 260MB >= 256MB → 创建任务4个文件 → L2
// - 文件5 = 90MB < 256MB → 不创建任务(未达到升级条件)
// - 返回 1 个任务
func (p *Picker) pickLevelCompaction(version *Version, level int) []*CompactionTask {
if level < 0 || level >= NumLevels-1 {
return nil
}
files := version.GetLevel(level)
if len(files) == 0 {
return nil
}
// 按 MinKey 排序,确保处理连续的 seq
sort.Slice(files, func(i, j int) bool {
return files[i].MinKey < files[j].MinKey
})
tasks := make([]*CompactionTask, 0)
currentLevelLimit := p.getLevelSizeLimit(level)
// 遍历文件,累积大小,当达到当前层级的大小限制时创建任务
var currentBatch []*FileMetadata
var currentSize int64
for _, file := range files {
currentBatch = append(currentBatch, file)
currentSize += file.FileSize
// 如果当前批次的大小达到当前层级的限制,创建 compaction 任务
if currentSize >= currentLevelLimit {
tasks = append(tasks, &CompactionTask{
Level: level,
InputFiles: currentBatch,
OutputLevel: level + 1,
})
// 重置批次
currentBatch = nil
currentSize = 0
}
}
// 不处理剩余文件(未达到大小限制的文件不升级)
// 等待更多文件累积后再升级
return tasks
}
// GetCurrentStage 获取当前阶段(用于测试和调试)
func (p *Picker) GetCurrentStage() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.currentStage
}
// ShouldCompact 判断是否需要 Compaction只检查不推进阶段
func (p *Picker) ShouldCompact(version *Version) bool {
// 检查所有阶段,不推进 currentStage
p.mu.Lock()
defer p.mu.Unlock()
// 检查 Stage 0: L0 合并
if len(p.pickL0MergeTasks(version)) > 0 {
return true
}
// 检查 Stage 1: L0 升级
if len(p.pickL0UpgradeTasks(version)) > 0 {
return true
}
// 检查 Stage 2: L1 升级
if len(p.pickLevelCompaction(version, 1)) > 0 {
return true
}
// 检查 Stage 3: L2 升级
if len(p.pickLevelCompaction(version, 2)) > 0 {
return true
}
return false
}
// GetLevelScore 获取每层的 Compaction 得分 (用于优先级排序)
// 得分越高,越需要 Compaction
func (p *Picker) GetLevelScore(version *Version, level int) float64 {
if level < 0 || level >= NumLevels {
return 0
}
// L3 是最后一层,不需要 compaction
if level == NumLevels-1 {
return 0
}
files := version.GetLevel(level)
if len(files) == 0 {
return 0
}
// 计算总大小
totalSize := int64(0)
for _, file := range files {
totalSize += file.FileSize
}
// 使用下一级的大小限制来计算得分
// 这样可以反映出该层级需要向上合并的紧迫程度
nextLevelLimit := p.getLevelSizeLimit(level + 1)
if nextLevelLimit == 0 {
return 0
}
return float64(totalSize) / float64(nextLevelLimit)
}
// Compactor 负责执行 Compaction
type Compactor struct {
sstDir string
picker *Picker
versionSet *VersionSet
schema *Schema
logger *slog.Logger
mu sync.RWMutex // 只保护 schema 和 logger 字段的读写
}
// NewCompactor 创建新的 Compactor
func NewCompactor(sstDir string, versionSet *VersionSet) *Compactor {
return &Compactor{
sstDir: sstDir,
picker: NewPicker(),
versionSet: versionSet,
logger: slog.New(slog.NewTextHandler(io.Discard, nil)), // 默认丢弃日志
}
}
// SetSchema 设置 Schema用于读取 SST 文件)
func (c *Compactor) SetSchema(schema *Schema) {
c.mu.Lock()
defer c.mu.Unlock()
c.schema = schema
}
// SetLogger 设置 Logger
func (c *Compactor) SetLogger(logger *slog.Logger) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger = logger
}
// GetPicker 获取 Picker
func (c *Compactor) GetPicker() *Picker {
return c.picker
}
// DoCompaction 执行一次 Compaction
// 返回: VersionEdit (记录变更), error
func (c *Compactor) DoCompaction(task *CompactionTask, version *Version) (*VersionEdit, error) {
if task == nil {
return nil, fmt.Errorf("compaction task is nil")
}
// 获取 logger
c.mu.RLock()
logger := c.logger
c.mu.RUnlock()
// 0. 验证输入文件是否存在(防止并发 compaction 导致的竞态)
existingInputFiles := make([]*FileMetadata, 0, len(task.InputFiles))
for _, file := range task.InputFiles {
sstPath := filepath.Join(c.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber))
if _, err := os.Stat(sstPath); err == nil {
existingInputFiles = append(existingInputFiles, file)
} else {
logger.Warn("[Compaction] Input file not found, skipping",
"file_number", file.FileNumber)
}
}
// 如果所有输入文件都不存在,直接返回(无需 compaction
if len(existingInputFiles) == 0 {
logger.Warn("[Compaction] All input files missing, compaction skipped")
return nil, nil // 返回 nil 表示不需要应用任何 VersionEdit
}
// 1. 读取输入文件的所有行
inputRows, err := c.readInputFiles(existingInputFiles)
if err != nil {
return nil, fmt.Errorf("read input files: %w", err)
}
// 2. 如果输出层级有文件,需要合并重叠的文件
outputFiles := c.getOverlappingFiles(version, task.OutputLevel, inputRows)
var existingOutputFiles []*FileMetadata
var missingOutputFiles []*FileMetadata
if len(outputFiles) > 0 {
// 验证输出文件是否存在
existingOutputFiles = make([]*FileMetadata, 0, len(outputFiles))
missingOutputFiles = make([]*FileMetadata, 0)
for _, file := range outputFiles {
sstPath := filepath.Join(c.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber))
if _, err := os.Stat(sstPath); err == nil {
existingOutputFiles = append(existingOutputFiles, file)
} else {
// 输出层级的文件不存在,记录并在 VersionEdit 中删除它
logger.Warn("[Compaction] Overlapping output file missing, will remove from MANIFEST",
"file_number", file.FileNumber)
missingOutputFiles = append(missingOutputFiles, file)
}
}
outputRows, err := c.readInputFiles(existingOutputFiles)
if err != nil {
return nil, fmt.Errorf("read output files: %w", err)
}
inputRows = append(inputRows, outputRows...)
}
// 3. 合并和去重 (保留最新的记录)
mergedRows := c.mergeRows(inputRows)
// 计算平均行大小(基于输入文件的 FileMetadata
avgRowSize := c.calculateAvgRowSize(existingInputFiles, existingOutputFiles)
// 4. 写入新的 SST 文件
// 传入输出层级L0合并时根据文件大小动态决定升级任务强制使用OutputLevel
newFiles, err := c.writeOutputFiles(mergedRows, task.OutputLevel, avgRowSize)
if err != nil {
return nil, fmt.Errorf("write output files: %w", err)
}
// 5. 创建 VersionEdit
edit := NewVersionEdit()
// 删除实际存在且被处理的输入文件
for _, file := range existingInputFiles {
edit.DeleteFile(file.FileNumber)
}
// 删除实际存在且被处理的输出层级文件
for _, file := range existingOutputFiles {
edit.DeleteFile(file.FileNumber)
}
// 删除缺失的输出层级文件(清理 MANIFEST 中的过期引用)
for _, file := range missingOutputFiles {
edit.DeleteFile(file.FileNumber)
logger.Info("[Compaction] Removing missing file from MANIFEST",
"file_number", file.FileNumber)
}
// 添加新文件,并跟踪最大文件编号
var maxFileNumber int64
for _, file := range newFiles {
edit.AddFile(file)
if file.FileNumber > maxFileNumber {
maxFileNumber = file.FileNumber
}
}
// 持久化当前的文件编号计数器(关键修复:防止重启后文件编号重用)
// 使用最大文件编号 + 1 确保并发安全
if maxFileNumber > 0 {
edit.SetNextFileNumber(maxFileNumber + 1)
} else {
// 如果没有新文件,使用当前值
edit.SetNextFileNumber(c.versionSet.GetNextFileNumber())
}
return edit, nil
}
// readInputFiles 读取输入文件的所有行
// 注意:调用者必须确保传入的文件都存在,否则会返回错误
func (c *Compactor) readInputFiles(files []*FileMetadata) ([]*SSTableRow, error) {
var allRows []*SSTableRow
for _, file := range files {
sstPath := filepath.Join(c.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber))
reader, err := NewSSTableReader(sstPath)
if err != nil {
return nil, fmt.Errorf("open sst %d: %w", file.FileNumber, err)
}
// 设置 Schema如果可用
c.mu.RLock()
schema := c.schema
c.mu.RUnlock()
if schema != nil {
reader.SetSchema(schema)
}
// 获取文件中实际存在的所有 key不能用 MinKey-MaxKey 范围遍历,因为 key 可能是稀疏的)
keys := reader.GetAllKeys()
for _, seq := range keys {
row, err := reader.Get(seq)
if err != nil {
// 这种情况理论上不应该发生key 来自索引),但为了安全还是处理一下
continue
}
allRows = append(allRows, row)
}
reader.Close()
}
return allRows, nil
}
// getOverlappingFiles 获取输出层级中与输入行重叠的文件
func (c *Compactor) getOverlappingFiles(version *Version, level int, rows []*SSTableRow) []*FileMetadata {
if len(rows) == 0 {
return nil
}
// 找到输入行的 key range
minKey := rows[0].Seq
maxKey := rows[0].Seq
for _, row := range rows {
if row.Seq < minKey {
minKey = row.Seq
}
if row.Seq > maxKey {
maxKey = row.Seq
}
}
// 找到输出层级中重叠的文件
var overlapping []*FileMetadata
levelFiles := version.GetLevel(level)
for _, file := range levelFiles {
// 检查 key range 是否重叠
if file.MaxKey >= minKey && file.MinKey <= maxKey {
overlapping = append(overlapping, file)
}
}
return overlapping
}
// mergeRows 合并行,去重并保留最新的记录
func (c *Compactor) mergeRows(rows []*SSTableRow) []*SSTableRow {
if len(rows) == 0 {
return rows
}
// 按 Seq 排序
sort.Slice(rows, func(i, j int) bool {
return rows[i].Seq < rows[j].Seq
})
// 去重:保留相同 Seq 的最新记录 (Timestamp 最大的)
merged := make([]*SSTableRow, 0, len(rows))
var lastRow *SSTableRow
for _, row := range rows {
if lastRow == nil || lastRow.Seq != row.Seq {
// 新的 Seq
merged = append(merged, row)
lastRow = row
} else {
// 相同 Seq保留 Time 更大的
if row.Time > lastRow.Time {
merged[len(merged)-1] = row
lastRow = row
}
}
}
return merged
}
// calculateAvgRowSize 基于输入文件的 FileMetadata 计算平均行大小
func (c *Compactor) calculateAvgRowSize(inputFiles []*FileMetadata, outputFiles []*FileMetadata) int64 {
var totalSize int64
var totalRows int64
// 统计输入文件
for _, file := range inputFiles {
totalSize += file.FileSize
totalRows += file.RowCount
}
// 统计输出文件
for _, file := range outputFiles {
totalSize += file.FileSize
totalRows += file.RowCount
}
// 计算平均值
if totalRows == 0 {
return 1024 // 默认 1KB
}
return totalSize / totalRows
}
// writeOutputFiles 将合并后的行写入新的 SST 文件Append-Only 优化:不 split
//
// 设计理念:
// - Append-Only 场景:没有重叠数据,不需要增量合并
// - 直接将所有行写入一个文件,不进行分割
// - 简化逻辑,提高性能,减少文件数量
//
// 为什么不 split
// - 触发阈值已经控制了文件大小64MB/256MB/512MB/1GB
// - 没有必要累积到大阈值后再分割成小文件
// - mmap 可以高效处理大文件(按需加载 4KB 页面)
func (c *Compactor) writeOutputFiles(rows []*SSTableRow, level int, avgRowSize int64) ([]*FileMetadata, error) {
if len(rows) == 0 {
return nil, nil
}
// Append-Only 优化:不分割,直接写成一个文件
file, err := c.writeFile(rows, level)
if err != nil {
return nil, err
}
return []*FileMetadata{file}, nil
}
// getTargetFileSize 根据层级返回目标文件大小Append-Only 优化)
//
// 设计理念:
// - 目标文件大小 = 层级大小限制(不 split
// - 每个层级合并后产生 1 个对应大小的文件
// - 适用于 Append-Only 场景:没有重叠数据,不需要增量合并
//
// 层级文件大小:
// - L0: 64MB MemTable flush 后的小文件合并)
// - L1: 256MB L0 升级)
// - L2: 512MB L1 升级)
// - L3: 1GB L2 升级,最后一层)
func (c *Compactor) getTargetFileSize(level int) int64 {
return getLevelSizeLimit(level)
}
// determineLevel 根据文件大小和源层级决定文件应该放在哪一层
//
// 判断逻辑:
// - 如果文件大小 <= 源层级的目标文件大小 × 1.2,保持在源层级
// - 否则,向上查找合适的层级,直到文件大小 <= 该层级的目标文件大小 × 1.2
// - 如果都不适合放在最后一层L3
//
// 1.2 倍容差的考量:
// - 目标文件大小是理想值L0: 8MB, L1: 32MB, L2: 128MB, L3: 512MB
// - 实际写入时可能略微超出(写入最后几行后超出)
// - 20% 容差避免文件因稍微超出而被强制升级
//
// 示例:
// - 10MB 的文件,源层级 L0目标 8MB × 1.2 = 9.6MB
// → 10MB > 9.6MB,升级到 L1
// - 30MB 的文件,源层级 L1目标 32MB × 1.2 = 38.4MB
// → 30MB <= 38.4MB,保持在 L1
func (c *Compactor) determineLevel(fileSize int64, sourceLevel int) int {
// 检查文件是否适合源层级
// 使用目标文件大小的 1.2 倍作为阈值(允许一定的溢出)
sourceTargetSize := c.getTargetFileSize(sourceLevel)
threshold := sourceTargetSize * 120 / 100
if fileSize <= threshold {
// 文件大小适合源层级,保持在源层级
return sourceLevel
}
// 否则,找到合适的更高层级
// 从源层级的下一层开始查找
for level := sourceLevel + 1; level < NumLevels; level++ {
targetSize := c.getTargetFileSize(level)
threshold := targetSize * 120 / 100
if fileSize <= threshold {
return level
}
}
// 如果都不适合,放在最后一层
return NumLevels - 1
}
// writeFile 写入单个 SST 文件
func (c *Compactor) writeFile(rows []*SSTableRow, level int) (*FileMetadata, error) {
// 从 VersionSet 分配新的文件编号
fileNumber := c.versionSet.AllocateFileNumber()
sstPath := filepath.Join(c.sstDir, fmt.Sprintf("%06d.sst", fileNumber))
// 创建文件
file, err := os.Create(sstPath)
if err != nil {
return nil, err
}
defer file.Close()
// 使用 Compactor 的 Schema 创建 writer
c.mu.RLock()
schema := c.schema
c.mu.RUnlock()
writer := NewSSTableWriter(file, schema)
// 注意:这个方法只负责创建文件,不负责注册到 SSTableManager
// 注册工作由 CompactionManager 在 VersionEdit apply 后完成
// 写入所有行
for _, row := range rows {
err = writer.Add(row)
if err != nil {
os.Remove(sstPath)
return nil, err
}
}
// 完成写入
err = writer.Finish()
if err != nil {
os.Remove(sstPath)
return nil, err
}
// 获取文件信息
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
// 根据实际文件大小决定最终层级
// level 参数是建议的输出层级(通常是 sourceLevel + 1
// 但我们根据文件大小重新决定,如果文件足够小可能保持在源层级
actualLevel := c.determineLevel(fileInfo.Size(), level)
// 创建 FileMetadata
metadata := &FileMetadata{
FileNumber: fileNumber,
Level: actualLevel,
FileSize: fileInfo.Size(),
MinKey: rows[0].Seq,
MaxKey: rows[len(rows)-1].Seq,
RowCount: int64(len(rows)),
}
return metadata, nil
}
// CompactionStats Compaction 统计信息
type CompactionStats struct {
TotalCompactions int64 `json:"total_compactions"` // 总 compaction 次数
LastCompactionTime time.Time `json:"last_compaction_time"` // 最后一次 compaction 时间
}
// LevelStats 层级统计信息
type LevelStats struct {
Level int `json:"level"` // 层级编号 (0-3)
FileCount int `json:"file_count"` // 文件数量
TotalSize int64 `json:"total_size"` // 总大小(字节)
Score float64 `json:"score"` // Compaction 得分
}
// CompactionManager 管理 Compaction 流程
type CompactionManager struct {
compactor *Compactor
versionSet *VersionSet
sstManager *SSTableManager // 添加 sstManager 引用,用于同步删除 readers
sstDir string
// 配置(从 Database Options 传递,启动后不可变)
configMu sync.Mutex // 仅用于 ApplyConfig防御性编程
logger *slog.Logger
level0SizeLimit int64
level1SizeLimit int64
level2SizeLimit int64
level3SizeLimit int64
compactionInterval time.Duration
gcInterval time.Duration
gcFileMinAge time.Duration
disableCompaction bool
disableGC bool
// 控制后台 Compaction
stopCh chan struct{}
wg sync.WaitGroup
// Compaction 并发控制
compactionMu sync.Mutex // 防止并发执行 compaction
// 统计信息
mu sync.RWMutex
totalCompactions int64
lastCompactionTime time.Time
lastFailedFile int64 // 最后失败的文件编号
consecutiveFails int // 连续失败次数
lastGCTime time.Time
totalOrphansFound int64
}
// NewCompactionManager 创建新的 Compaction Manager使用默认配置
func NewCompactionManager(sstDir string, versionSet *VersionSet, sstManager *SSTableManager) *CompactionManager {
return &CompactionManager{
compactor: NewCompactor(sstDir, versionSet),
versionSet: versionSet,
sstManager: sstManager,
sstDir: sstDir,
stopCh: make(chan struct{}),
// 默认 logger丢弃日志将在 ApplyConfig 中设置为 Database.options.Logger
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
// 使用硬编码常量作为默认值(向后兼容)
level0SizeLimit: level0SizeLimit,
level1SizeLimit: level1SizeLimit,
level2SizeLimit: level2SizeLimit,
level3SizeLimit: level3SizeLimit,
compactionInterval: 10 * time.Second,
gcInterval: 5 * time.Minute,
gcFileMinAge: 1 * time.Minute,
disableCompaction: false,
disableGC: false,
}
}
// ApplyConfig 应用数据库级配置(从 Database Options
func (m *CompactionManager) ApplyConfig(opts *Options) {
m.configMu.Lock()
defer m.configMu.Unlock()
m.logger = opts.Logger
m.level0SizeLimit = opts.Level0SizeLimit
m.level1SizeLimit = opts.Level1SizeLimit
m.level2SizeLimit = opts.Level2SizeLimit
m.level3SizeLimit = opts.Level3SizeLimit
m.compactionInterval = opts.CompactionInterval
m.gcInterval = opts.GCInterval
m.gcFileMinAge = opts.GCFileMinAge
m.disableCompaction = opts.DisableAutoCompaction
m.disableGC = opts.DisableGC
// 同时更新 compactor 的 picker 和 logger
m.compactor.picker.UpdateLevelLimits(
m.level0SizeLimit,
m.level1SizeLimit,
m.level2SizeLimit,
m.level3SizeLimit,
)
m.compactor.SetLogger(opts.Logger)
}
// GetPicker 获取 Compaction Picker
func (m *CompactionManager) GetPicker() *Picker {
return m.compactor.GetPicker()
}
// SetSchema 设置 Schema用于优化 SST 文件读写)
func (m *CompactionManager) SetSchema(schema *Schema) {
m.compactor.SetSchema(schema)
}
// Start 启动后台 Compaction 和垃圾回收
func (m *CompactionManager) Start() {
m.wg.Add(2)
go m.backgroundCompaction()
go m.backgroundGarbageCollection()
}
// Stop 停止后台 Compaction
func (m *CompactionManager) Stop() {
close(m.stopCh)
m.wg.Wait()
}
// backgroundCompaction 后台 Compaction 循环
func (m *CompactionManager) backgroundCompaction() {
defer m.wg.Done()
// Options 是不可变的,配置在启动后不会改变,直接读取即可
interval := m.compactionInterval
disabled := m.disableCompaction
if disabled {
return // 禁用自动 Compaction直接退出
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.maybeCompact()
}
}
}
// MaybeCompact 检查是否需要 Compaction 并执行(公开方法,供外部调用)
// 非阻塞:如果已有 compaction 在执行,直接返回
func (m *CompactionManager) MaybeCompact() {
// 尝试获取锁,如果已有 compaction 在执行,直接返回
if !m.compactionMu.TryLock() {
return
}
defer m.compactionMu.Unlock()
m.doCompact()
}
// maybeCompact 内部使用的阻塞版本(后台 goroutine 使用)
func (m *CompactionManager) maybeCompact() {
m.compactionMu.Lock()
defer m.compactionMu.Unlock()
m.doCompact()
}
// doCompact 实际执行 compaction 的逻辑(必须在持有 compactionMu 时调用)
// 阶段串行 + 阶段内并发:
// - 循环执行 4 个阶段Stage 0 → 1 → 2 → 3
// - 同一阶段的任务并发执行L0 的多个批次、L1 的多个批次等)
// - 不同阶段串行执行(执行完一个阶段后,基于新 version 再执行下一阶段)
func (m *CompactionManager) doCompact() {
picker := m.compactor.GetPicker()
totalStagesExecuted := 0
// 循环执行 4 个阶段
for stage := 0; stage < 4; stage++ {
// 获取当前版本(每个阶段都重新获取,因为上一阶段可能修改了文件结构)
version := m.versionSet.GetCurrent()
if version == nil {
return
}
// 获取当前阶段的任务
tasks := picker.PickCompaction(version)
if len(tasks) == 0 {
// 当前阶段没有任务,继续下一阶段
continue
}
totalStagesExecuted++
m.logger.Info("[Compaction] Found tasks to execute concurrently",
"stage", stage,
"task_count", len(tasks))
// 并发执行同一阶段的所有任务
var wg sync.WaitGroup
var successCount atomic.Int64
for _, task := range tasks {
// 检查是否是上次失败的文件(防止无限重试)
if len(task.InputFiles) > 0 {
firstFile := task.InputFiles[0].FileNumber
m.mu.Lock()
if m.lastFailedFile == firstFile && m.consecutiveFails >= 3 {
m.logger.Warn("[Compaction] Skipping file (failed multiple times)",
"level", task.Level,
"file_number", firstFile,
"consecutive_fails", m.consecutiveFails)
m.consecutiveFails = 0
m.lastFailedFile = 0
m.mu.Unlock()
continue
}
m.mu.Unlock()
}
wg.Add(1)
go func(task *CompactionTask) {
defer wg.Done()
// 获取最新版本(每个任务执行前)
currentVersion := m.versionSet.GetCurrent()
if currentVersion == nil {
return
}
// 执行 Compaction
m.logger.Info("[Compaction] Starting",
"source_level", task.Level,
"target_level", task.OutputLevel,
"file_count", len(task.InputFiles))
err := m.DoCompactionWithVersion(task, currentVersion)
if err != nil {
m.logger.Error("[Compaction] Failed",
"source_level", task.Level,
"target_level", task.OutputLevel,
"error", err)
// 记录失败信息
if len(task.InputFiles) > 0 {
firstFile := task.InputFiles[0].FileNumber
m.mu.Lock()
if m.lastFailedFile == firstFile {
m.consecutiveFails++
} else {
m.lastFailedFile = firstFile
m.consecutiveFails = 1
}
m.mu.Unlock()
}
} else {
m.logger.Info("[Compaction] Completed",
"source_level", task.Level,
"target_level", task.OutputLevel)
// 清除失败计数
m.mu.Lock()
m.consecutiveFails = 0
m.lastFailedFile = 0
m.mu.Unlock()
// 更新成功计数(使用原子操作)
successCount.Add(1)
}
}(task)
}
// 等待当前阶段的所有任务完成
wg.Wait()
m.logger.Info("[Compaction] Stage completed",
"stage", stage,
"succeeded", successCount.Load(),
"total", len(tasks))
}
// 如果所有阶段都没有任务,输出诊断信息
if totalStagesExecuted == 0 {
version := m.versionSet.GetCurrent()
if version != nil {
m.printCompactionStats(version, picker)
}
}
}
// printCompactionStats 输出 Compaction 统计信息(每分钟一次)
func (m *CompactionManager) printCompactionStats(version *Version, picker *Picker) {
m.mu.Lock()
defer m.mu.Unlock()
// 限制输出频率:每 60 秒输出一次
if time.Since(m.lastCompactionTime) < 60*time.Second {
return
}
m.lastCompactionTime = time.Now()
m.logger.Info("[Compaction] Status check")
for level := range NumLevels {
files := version.GetLevel(level)
if len(files) == 0 {
continue
}
totalSize := int64(0)
for _, f := range files {
totalSize += f.FileSize
}
score := picker.GetLevelScore(version, level)
m.logger.Info("[Compaction] Level status",
"level", level,
"file_count", len(files),
"size_mb", float64(totalSize)/(1024*1024),
"score", score)
}
}
// DoCompactionWithVersion 使用指定的版本执行 Compaction
func (m *CompactionManager) DoCompactionWithVersion(task *CompactionTask, version *Version) error {
if version == nil {
return fmt.Errorf("version is nil")
}
// 执行 Compaction使用传入的 version而不是重新获取
edit, err := m.compactor.DoCompaction(task, version)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// 如果 edit 为 nil说明所有文件都已经不存在无需应用变更
if edit == nil {
m.logger.Info("[Compaction] No changes needed (files already removed)")
return nil
}
// 应用 VersionEdit
err = m.versionSet.LogAndApply(edit)
if err != nil {
// LogAndApply 失败,清理已写入的新 SST 文件(防止孤儿文件)
m.logger.Error("[Compaction] LogAndApply failed, cleaning up new files",
"error", err)
m.cleanupNewFiles(edit)
return fmt.Errorf("apply version edit: %w", err)
}
// LogAndApply 成功后,注册新创建的 SST 文件到 SSTableManager
// 这样查询才能读取到 compaction 创建的文件
if m.sstManager != nil {
for _, file := range edit.AddedFiles {
sstPath := filepath.Join(m.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber))
reader, err := NewSSTableReader(sstPath)
if err != nil {
m.logger.Warn("[Compaction] Failed to open new file",
"file_number", file.FileNumber,
"error", err)
continue
}
// 设置 Schema
m.compactor.mu.RLock()
schema := m.compactor.schema
m.compactor.mu.RUnlock()
if schema != nil {
reader.SetSchema(schema)
}
// 添加到 SSTableManager
m.sstManager.AddReader(reader)
}
}
// LogAndApply 成功后,删除废弃的 SST 文件
m.deleteObsoleteFiles(edit)
// 更新统计信息
m.mu.Lock()
m.totalCompactions++
m.lastCompactionTime = time.Now()
m.mu.Unlock()
return nil
}
// DoCompaction 执行一次 Compaction兼容旧接口
func (m *CompactionManager) DoCompaction(task *CompactionTask) error {
// 获取当前版本
version := m.versionSet.GetCurrent()
if version == nil {
return fmt.Errorf("no current version")
}
return m.DoCompactionWithVersion(task, version)
}
// cleanupNewFiles 清理 LogAndApply 失败后的新文件(防止孤儿文件)
func (m *CompactionManager) cleanupNewFiles(edit *VersionEdit) {
if edit == nil {
return
}
m.logger.Warn("[Compaction] Cleaning up new files after LogAndApply failure",
"file_count", len(edit.AddedFiles))
// 删除新创建的文件
for _, file := range edit.AddedFiles {
sstPath := filepath.Join(m.sstDir, fmt.Sprintf("%06d.sst", file.FileNumber))
err := os.Remove(sstPath)
if err != nil {
m.logger.Warn("[Compaction] Failed to cleanup new file",
"file_number", file.FileNumber,
"error", err)
} else {
m.logger.Info("[Compaction] Cleaned up new file",
"file_number", file.FileNumber)
}
}
}
// deleteObsoleteFiles 删除废弃的 SST 文件
func (m *CompactionManager) deleteObsoleteFiles(edit *VersionEdit) {
if edit == nil {
m.logger.Warn("[Compaction] deleteObsoleteFiles: edit is nil")
return
}
m.logger.Info("[Compaction] Deleting obsolete files",
"file_count", len(edit.DeletedFiles))
// 删除被标记为删除的文件
for _, fileNum := range edit.DeletedFiles {
// 1. 从 SSTableManager 移除 reader如果 sstManager 可用)
if m.sstManager != nil {
err := m.sstManager.RemoveReader(fileNum)
if err != nil {
m.logger.Warn("[Compaction] Failed to remove reader",
"file_number", fileNum,
"error", err)
}
}
// 2. 删除物理文件
sstPath := filepath.Join(m.sstDir, fmt.Sprintf("%06d.sst", fileNum))
err := os.Remove(sstPath)
if err != nil {
// 删除失败只记录日志,不影响 compaction 流程
// 后台垃圾回收器会重试
m.logger.Warn("[Compaction] Failed to delete obsolete file",
"file_number", fileNum,
"error", err)
} else {
m.logger.Info("[Compaction] Deleted obsolete file",
"file_number", fileNum)
}
}
}
// TriggerCompaction 手动触发一次 Compaction遍历所有阶段
func (m *CompactionManager) TriggerCompaction() error {
picker := m.compactor.GetPicker()
// 循环执行 4 个阶段
for range 4 {
version := m.versionSet.GetCurrent()
if version == nil {
return fmt.Errorf("no current version")
}
// 获取当前阶段的任务
tasks := picker.PickCompaction(version)
if len(tasks) == 0 {
// 当前阶段没有任务,继续下一阶段
continue
}
// 串行执行当前阶段的所有任务
for _, task := range tasks {
currentVersion := m.versionSet.GetCurrent()
if err := m.DoCompactionWithVersion(task, currentVersion); err != nil {
return err
}
}
}
return nil
}
// GetStats 获取 Compaction 统计信息
func (m *CompactionManager) GetStats() *CompactionStats {
m.mu.RLock()
defer m.mu.RUnlock()
return &CompactionStats{
TotalCompactions: m.totalCompactions,
LastCompactionTime: m.lastCompactionTime,
}
}
// GetLevelStats 获取每层的统计信息
func (m *CompactionManager) GetLevelStats() []LevelStats {
version := m.versionSet.GetCurrent()
if version == nil {
return nil
}
picker := m.compactor.GetPicker()
stats := make([]LevelStats, NumLevels)
for level := range NumLevels {
files := version.GetLevel(level)
totalSize := int64(0)
for _, file := range files {
totalSize += file.FileSize
}
stats[level] = LevelStats{
Level: level,
FileCount: len(files),
TotalSize: totalSize,
Score: picker.GetLevelScore(version, level),
}
}
return stats
}
// GetLevelSizeLimit 获取指定层级的大小限制(公开方法,供 WebUI 等外部使用)
// 注意Options 是不可变的,配置在 ApplyConfig 后不会改变,因此不需要加锁
func (m *CompactionManager) GetLevelSizeLimit(level int) int64 {
switch level {
case 0:
return m.level0SizeLimit
case 1:
return m.level1SizeLimit
case 2:
return m.level2SizeLimit
case 3:
return m.level3SizeLimit
default:
return m.level3SizeLimit
}
}
// backgroundGarbageCollection 后台垃圾回收循环
func (m *CompactionManager) backgroundGarbageCollection() {
defer m.wg.Done()
// Options 是不可变的,配置在启动后不会改变,直接读取即可
interval := m.gcInterval
disabled := m.disableGC
if disabled {
return // 禁用垃圾回收,直接退出
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.collectOrphanFiles()
}
}
}
// collectOrphanFiles 收集并删除孤儿 SST 文件
func (m *CompactionManager) collectOrphanFiles() {
// 1. 获取当前版本中的所有活跃文件
version := m.versionSet.GetCurrent()
if version == nil {
return
}
activeFiles := make(map[int64]bool)
for level := range NumLevels {
files := version.GetLevel(level)
for _, file := range files {
activeFiles[file.FileNumber] = true
}
}
// 2. 扫描 SST 目录中的所有文件
pattern := filepath.Join(m.sstDir, "*.sst")
sstFiles, err := filepath.Glob(pattern)
if err != nil {
m.logger.Error("[GC] Failed to scan SST directory", "error", err)
return
}
// 3. 找出孤儿文件并删除
orphanCount := 0
for _, sstPath := range sstFiles {
// 提取文件编号
var fileNum int64
_, err := fmt.Sscanf(filepath.Base(sstPath), "%d.sst", &fileNum)
if err != nil {
continue
}
// 检查是否是活跃文件
if !activeFiles[fileNum] {
// 检查文件修改时间,避免删除正在 flush 的文件
// Options 是不可变的,直接读取配置即可
minAge := m.gcFileMinAge
fileInfo, err := os.Stat(sstPath)
if err != nil {
continue
}
if time.Since(fileInfo.ModTime()) < minAge {
m.logger.Info("[GC] Skipping recently modified file",
"file_number", fileNum,
"age", time.Since(fileInfo.ModTime()),
"min_age", minAge)
continue
}
// 这是孤儿文件,删除它
err = os.Remove(sstPath)
if err != nil {
m.logger.Warn("[GC] Failed to delete orphan file",
"file_number", fileNum,
"error", err)
} else {
m.logger.Info("[GC] Deleted orphan file",
"file_number", fileNum)
orphanCount++
}
}
}
// 4. 更新统计信息
m.mu.Lock()
m.lastGCTime = time.Now()
m.totalOrphansFound += int64(orphanCount)
totalOrphans := m.totalOrphansFound
m.mu.Unlock()
if orphanCount > 0 {
m.logger.Info("[GC] Completed",
"cleaned_up", orphanCount,
"total_orphans", totalOrphans)
}
}
// CleanupOrphanFiles 手动触发孤儿文件清理(可在启动时调用)
func (m *CompactionManager) CleanupOrphanFiles() {
m.logger.Info("[GC] Manual cleanup triggered")
m.collectOrphanFiles()
}