Files
srdb/compaction_test.go
bourdon 77087d36c6 功能:增强 Schema 系统和添加新示例
- 扩展 Schema 支持更多数据类型(Duration、URL、JSON 等)
- 优化 SSTable 编码解码性能
- 添加多个新示例程序:
  - all_types: 展示所有支持的数据类型
  - new_types: 演示新增类型的使用
  - struct_tags: 展示结构体标签功能
  - time_duration: 时间和持续时间处理示例
- 完善测试用例和文档
- 优化代码结构和错误处理
2025-10-10 02:57:36 +08:00

1061 lines
26 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package srdb
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
)
func TestCompactionBasic(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
sstDir := filepath.Join(tmpDir, "sst")
manifestDir := tmpDir
err := os.MkdirAll(sstDir, 0755)
if err != nil {
t.Fatal(err)
}
// 创建 Schema
schema, err := NewSchema("test", []Field{
{Name: "value", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
// 创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
// 创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
t.Fatal(err)
}
defer sstMgr.Close()
// 设置 Schema
sstMgr.SetSchema(schema)
// 创建测试数据
rows1 := make([]*SSTableRow, 100)
for i := range 100 {
rows1[i] = &SSTableRow{
Seq: int64(i),
Time: 1000,
Data: map[string]any{"value": i},
}
}
// 创建第一个 SST 文件
reader1, err := sstMgr.CreateSST(1, rows1)
if err != nil {
t.Fatal(err)
}
// 添加到 Version
edit1 := NewVersionEdit()
edit1.AddFile(&FileMetadata{
FileNumber: 1,
Level: 0,
FileSize: 1024,
MinKey: 0,
MaxKey: 99,
RowCount: 100,
})
nextFileNum := int64(2)
edit1.SetNextFileNumber(nextFileNum)
err = versionSet.LogAndApply(edit1)
if err != nil {
t.Fatal(err)
}
// 验证 Version
version := versionSet.GetCurrent()
if version.GetLevelFileCount(0) != 1 {
t.Errorf("Expected 1 file in L0, got %d", version.GetLevelFileCount(0))
}
// 创建 Compaction Manager
compactionMgr := NewCompactionManager(sstDir, versionSet, sstMgr)
compactionMgr.SetSchema(schema)
// 创建更多文件触发 Compaction
for i := 1; i < 5; i++ {
rows := make([]*SSTableRow, 50)
for j := range 50 {
rows[j] = &SSTableRow{
Seq: int64(i*100 + j),
Time: int64(1000 + i),
Data: map[string]any{"value": i*100 + j},
}
}
_, err := sstMgr.CreateSST(int64(i+1), rows)
if err != nil {
t.Fatal(err)
}
edit := NewVersionEdit()
edit.AddFile(&FileMetadata{
FileNumber: int64(i + 1),
Level: 0,
FileSize: 512,
MinKey: int64(i * 100),
MaxKey: int64(i*100 + 49),
RowCount: 50,
})
nextFileNum := int64(i + 2)
edit.SetNextFileNumber(nextFileNum)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
}
// 验证 L0 有 5 个文件
version = versionSet.GetCurrent()
if version.GetLevelFileCount(0) != 5 {
t.Errorf("Expected 5 files in L0, got %d", version.GetLevelFileCount(0))
}
// 检查是否需要 Compaction
picker := compactionMgr.GetPicker()
if !picker.ShouldCompact(version) {
t.Error("Expected compaction to be needed")
}
// 获取 Compaction 任务
tasks := picker.PickCompaction(version)
if len(tasks) == 0 {
t.Fatal("Expected compaction task")
}
task := tasks[0] // 获取第一个任务(优先级最高)
if task.Level != 0 {
t.Errorf("Expected L0 compaction, got L%d", task.Level)
}
// 注意L0 compaction 任务的 OutputLevel 设为 0建议层级
// 实际层级由 determineLevel 根据合并后的文件大小决定
if task.OutputLevel != 0 {
t.Errorf("Expected output to L0 (suggested), got L%d", task.OutputLevel)
}
t.Logf("Found %d compaction tasks", len(tasks))
t.Logf("First task: L%d -> L%d, %d files (determineLevel will decide actual level)", task.Level, task.OutputLevel, len(task.InputFiles))
// 清理
reader1.Close()
}
func TestPickerLevelScore(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
manifestDir := tmpDir
// 创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
// 创建 Picker
picker := NewPicker()
// 添加一些文件到 L0
edit := NewVersionEdit()
for i := range 3 {
edit.AddFile(&FileMetadata{
FileNumber: int64(i + 1),
Level: 0,
FileSize: 1024 * 1024, // 1MB
MinKey: int64(i * 100),
MaxKey: int64((i+1)*100 - 1),
RowCount: 100,
})
}
nextFileNum := int64(4)
edit.SetNextFileNumber(nextFileNum)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
version := versionSet.GetCurrent()
// 计算 L0 的得分
score := picker.GetLevelScore(version, 0)
// L0 有 3 个文件,每个 1MB总共 3MB
// 下一级L1的限制是 256MB
// 得分应该是 3MB / 256MB = 0.01171875
totalSize := int64(3 * 1024 * 1024) // 3MB
expectedScore := float64(totalSize) / float64(level1SizeLimit)
t.Logf("L0 score: %.4f (files: %d, total: %d bytes, next level limit: %d)",
score, version.GetLevelFileCount(0), totalSize, level1SizeLimit)
if score != expectedScore {
t.Errorf("Expected L0 score %.4f, got %.4f", expectedScore, score)
}
}
func TestCompactionMerge(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
sstDir := filepath.Join(tmpDir, "sst")
manifestDir := tmpDir
err := os.MkdirAll(sstDir, 0755)
if err != nil {
t.Fatal(err)
}
// 创建 Schema
schema, err := NewSchema("test", []Field{
{Name: "value", Type: String},
})
if err != nil {
t.Fatal(err)
}
// 创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
// 创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
t.Fatal(err)
}
defer sstMgr.Close()
// 设置 Schema
sstMgr.SetSchema(schema)
// 创建两个有重叠 key 的 SST 文件
rows1 := []*SSTableRow{
{Seq: 1, Time: 1000, Data: map[string]any{"value": "old"}},
{Seq: 2, Time: 1000, Data: map[string]any{"value": "old"}},
}
rows2 := []*SSTableRow{
{Seq: 1, Time: 2000, Data: map[string]any{"value": "new"}}, // 更新
{Seq: 3, Time: 2000, Data: map[string]any{"value": "new"}},
}
reader1, err := sstMgr.CreateSST(1, rows1)
if err != nil {
t.Fatal(err)
}
defer reader1.Close()
reader2, err := sstMgr.CreateSST(2, rows2)
if err != nil {
t.Fatal(err)
}
defer reader2.Close()
// 添加到 Version
edit := NewVersionEdit()
edit.AddFile(&FileMetadata{
FileNumber: 1,
Level: 0,
FileSize: 512,
MinKey: 1,
MaxKey: 2,
RowCount: 2,
})
edit.AddFile(&FileMetadata{
FileNumber: 2,
Level: 0,
FileSize: 512,
MinKey: 1,
MaxKey: 3,
RowCount: 2,
})
nextFileNum := int64(3)
edit.SetNextFileNumber(nextFileNum)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
// 创建 Compactor
compactor := NewCompactor(sstDir, versionSet)
compactor.SetSchema(schema)
// 创建 Compaction 任务
version := versionSet.GetCurrent()
task := &CompactionTask{
Level: 0,
InputFiles: version.GetLevel(0),
OutputLevel: 1,
}
// 执行 Compaction
resultEdit, err := compactor.DoCompaction(task, version)
if err != nil {
t.Fatal(err)
}
// 验证结果
if len(resultEdit.DeletedFiles) != 2 {
t.Errorf("Expected 2 deleted files, got %d", len(resultEdit.DeletedFiles))
}
if len(resultEdit.AddedFiles) == 0 {
t.Error("Expected at least 1 new file")
}
t.Logf("Compaction result: deleted %d files, added %d files", len(resultEdit.DeletedFiles), len(resultEdit.AddedFiles))
// 验证新文件在 L1
for _, file := range resultEdit.AddedFiles {
if file.Level != 1 {
t.Errorf("Expected new file in L1, got L%d", file.Level)
}
t.Logf("New file: %d, L%d, rows: %d, key range: [%d, %d]",
file.FileNumber, file.Level, file.RowCount, file.MinKey, file.MaxKey)
}
}
func BenchmarkCompaction(b *testing.B) {
// 创建临时目录
tmpDir := b.TempDir()
sstDir := filepath.Join(tmpDir, "sst")
manifestDir := tmpDir
err := os.MkdirAll(sstDir, 0755)
if err != nil {
b.Fatal(err)
}
// 创建 Schema
schema, err := NewSchema("test", []Field{
{Name: "value", Type: String},
})
if err != nil {
b.Fatal(err)
}
// 创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
b.Fatal(err)
}
defer versionSet.Close()
// 创建 SST Manager
sstMgr, err := NewSSTableManager(sstDir)
if err != nil {
b.Fatal(err)
}
defer sstMgr.Close()
// 设置 Schema
sstMgr.SetSchema(schema)
// 创建测试数据
const numFiles = 5
const rowsPerFile = 1000
for i := range numFiles {
rows := make([]*SSTableRow, rowsPerFile)
for j := range rowsPerFile {
rows[j] = &SSTableRow{
Seq: int64(i*rowsPerFile + j),
Time: int64(1000 + i),
Data: map[string]any{
"value": fmt.Sprintf("data-%d-%d", i, j),
},
}
}
reader, err := sstMgr.CreateSST(int64(i+1), rows)
if err != nil {
b.Fatal(err)
}
reader.Close()
edit := NewVersionEdit()
edit.AddFile(&FileMetadata{
FileNumber: int64(i + 1),
Level: 0,
FileSize: 10240,
MinKey: int64(i * rowsPerFile),
MaxKey: int64((i+1)*rowsPerFile - 1),
RowCount: rowsPerFile,
})
nextFileNum := int64(i + 2)
edit.SetNextFileNumber(nextFileNum)
err = versionSet.LogAndApply(edit)
if err != nil {
b.Fatal(err)
}
}
// 创建 Compactor
compactor := NewCompactor(sstDir, versionSet)
compactor.SetSchema(schema)
version := versionSet.GetCurrent()
task := &CompactionTask{
Level: 0,
InputFiles: version.GetLevel(0),
OutputLevel: 1,
}
for b.Loop() {
_, err := compactor.DoCompaction(task, version)
if err != nil {
b.Fatal(err)
}
}
}
// TestCompactionQueryOrder 测试 compaction 后查询结果的排序
func TestCompactionQueryOrder(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
// 创建 Schema - 包含多个字段以增加数据大小
schema, err := NewSchema("test", []Field{
{Name: "id", Type: Int64},
{Name: "name", Type: String},
{Name: "data", Type: String},
{Name: "timestamp", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
// 打开 Table (使用较小的 MemTable 触发频繁 flush)
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
MemTableSize: 2 * 1024 * 1024, // 2MB MemTable
Name: schema.Name, Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
t.Logf("开始插入 4000 条数据...")
// 插入 4000 条数据,每条数据大小在 2KB-1MB 之间
for i := range 4000 {
// 生成 2KB 到 1MB 的随机数据
dataSize := 2*1024 + (i % (1024*1024 - 2*1024)) // 2KB ~ 1MB
largeData := make([]byte, dataSize)
for j := range largeData {
largeData[j] = byte('A' + (j % 26))
}
err := table.Insert(map[string]any{
"id": int64(i),
"name": fmt.Sprintf("user_%d", i),
"data": string(largeData),
"timestamp": int64(1000000 + i),
})
if err != nil {
t.Fatal(err)
}
if (i+1)%500 == 0 {
t.Logf("已插入 %d 条数据", i+1)
}
}
t.Logf("插入完成,等待后台 compaction...")
// 等待一段时间让后台 compaction 有机会运行
// 后台 compaction 每 10 秒检查一次,所以需要等待至少 12 秒
time.Sleep(12 * time.Second)
t.Logf("开始查询所有数据...")
// 查询所有数据
rows, err := table.Query().Rows()
if err != nil {
t.Fatal(err)
}
defer rows.Close()
// 验证顺序和数据完整性
var lastSeq int64 = 0
count := 0
expectedIDs := make(map[int64]bool) // 用于验证所有 ID 都存在
for rows.Next() {
row := rows.Row()
data := row.Data()
currentSeq := data["_seq"].(int64)
// 验证顺序
if currentSeq <= lastSeq {
t.Errorf("Query results NOT in order: got seq %d after seq %d", currentSeq, lastSeq)
}
// 验证数据完整性
id, ok := data["id"].(int64)
if !ok {
// 尝试其他类型
if idFloat, ok2 := data["id"].(float64); ok2 {
id = int64(idFloat)
expectedIDs[id] = true
} else {
t.Errorf("Seq %d: missing or invalid id field, actual type: %T, value: %v",
currentSeq, data["id"], data["id"])
}
} else {
expectedIDs[id] = true
}
// 验证 name 字段
name, ok := data["name"].(string)
if !ok || name != fmt.Sprintf("user_%d", id) {
t.Errorf("Seq %d: invalid name field, expected 'user_%d', got '%v'", currentSeq, id, name)
}
// 验证 data 字段存在且不为空
dataStr, ok := data["data"].(string)
if !ok || len(dataStr) < 2*1024 {
t.Errorf("Seq %d: invalid data field size", currentSeq)
}
lastSeq = currentSeq
count++
}
if count != 4000 {
t.Errorf("Expected 4000 rows, got %d", count)
}
// 验证所有 ID 都存在
for i := range int64(4000) {
if !expectedIDs[i] {
t.Errorf("Missing ID: %d", i)
}
}
t.Logf("✓ 查询返回 %d 条记录,顺序正确 (seq 1→%d)", count, lastSeq)
t.Logf("✓ 所有数据完整性验证通过")
// 输出 compaction 统计信息
stats := table.GetCompactionManager().GetLevelStats()
t.Logf("Compaction 统计:")
for _, levelStat := range stats {
level := levelStat.Level
fileCount := levelStat.FileCount
totalSize := levelStat.TotalSize
if fileCount > 0 {
t.Logf(" L%d: %d 个文件, %.2f MB", level, fileCount, float64(totalSize)/(1024*1024))
}
}
}
// TestPickerStageRotation 测试 Picker 的阶段轮换机制
func TestPickerStageRotation(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
manifestDir := tmpDir
// 创建 VersionSet
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
// 创建 Picker
picker := NewPicker()
// 初始阶段应该是 L0
if stage := picker.GetCurrentStage(); stage != 0 {
t.Errorf("Initial stage should be 0 (L0), got %d", stage)
}
// 添加 L0 文件(触发 L0 compaction
edit := NewVersionEdit()
for i := 0; i < 10; i++ {
edit.AddFile(&FileMetadata{
FileNumber: int64(i + 1),
Level: 0,
FileSize: 10 * 1024 * 1024, // 10MB each
MinKey: int64(i * 100),
MaxKey: int64((i+1)*100 - 1),
RowCount: 100,
})
}
edit.SetNextFileNumber(11)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
version := versionSet.GetCurrent()
// 第1次调用应该返回 L0 任务,然后推进到 L1
t.Log("=== 第1次调用 PickCompaction ===")
tasks1 := picker.PickCompaction(version)
if len(tasks1) == 0 {
t.Error("Expected L0 tasks on first call")
}
for _, task := range tasks1 {
if task.Level != 0 {
t.Errorf("Expected L0 task, got L%d", task.Level)
}
}
if stage := picker.GetCurrentStage(); stage != 1 {
t.Errorf("After L0 tasks, stage should be 1 (L1), got %d", stage)
}
t.Logf("✓ Returned %d L0 tasks, stage advanced to L1", len(tasks1))
// 第2次调用应该尝试 Stage 1 (L0-upgrade没有大文件)
t.Log("=== 第2次调用 PickCompaction ===")
tasks2 := picker.PickCompaction(version)
if len(tasks2) == 0 {
t.Log("✓ Stage 1 (L0-upgrade) has no tasks")
}
// 此时 stage 应该已经循环(尝试了 Stage 1→2→3→0...
if stage := picker.GetCurrentStage(); stage >= 0 {
t.Logf("After trying, current stage is %d", stage)
}
// 现在添加 L1 文件
edit2 := NewVersionEdit()
for i := 0; i < 20; i++ {
edit2.AddFile(&FileMetadata{
FileNumber: int64(100 + i + 1),
Level: 1,
FileSize: 20 * 1024 * 1024, // 20MB each
MinKey: int64(i * 200),
MaxKey: int64((i+1)*200 - 1),
RowCount: 200,
})
}
edit2.SetNextFileNumber(121)
err = versionSet.LogAndApply(edit2)
if err != nil {
t.Fatal(err)
}
version2 := versionSet.GetCurrent()
// 现在可能需要多次调用才能到达 Stage 2 (L1-upgrade)
// 因为要经过 Stage 1 (L0-upgrade) 和 Stage 0 (L0-merge)
t.Log("=== 多次调用 PickCompaction 直到找到 L1 任务 ===")
var tasks3 []*CompactionTask
for i := 0; i < 8; i++ { // 最多尝试两轮4个阶段×2
tasks3 = picker.PickCompaction(version2)
if len(tasks3) > 0 && tasks3[0].Level == 1 {
t.Logf("✓ Found %d L1 tasks after %d attempts", len(tasks3), i+1)
break
}
}
if len(tasks3) == 0 || tasks3[0].Level != 1 {
t.Error("Expected to find L1 tasks within 8 attempts")
}
t.Log("=== Stage rotation test passed ===")
}
// TestPickerStageWithMultipleLevels 测试多层级同时有任务时的阶段轮换
func TestPickerStageWithMultipleLevels(t *testing.T) {
tmpDir := t.TempDir()
manifestDir := tmpDir
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
picker := NewPicker()
// 同时添加 L0、L1、L2 文件
edit := NewVersionEdit()
// L0 小文件: 5 files × 10MB = 50MB (应该触发 Stage 0: L0-merge)
for i := 0; i < 5; i++ {
edit.AddFile(&FileMetadata{
FileNumber: int64(i + 1),
Level: 0,
FileSize: 10 * 1024 * 1024,
MinKey: int64(i * 100),
MaxKey: int64((i+1)*100 - 1),
RowCount: 100,
})
}
// L0 大文件: 5 files × 40MB = 200MB (应该触发 Stage 1: L0-upgrade)
for i := 0; i < 5; i++ {
edit.AddFile(&FileMetadata{
FileNumber: int64(10 + i + 1),
Level: 0,
FileSize: 40 * 1024 * 1024,
MinKey: int64((i + 5) * 100),
MaxKey: int64((i+6)*100 - 1),
RowCount: 100,
})
}
// L1: 20 files × 20MB = 400MB (应该触发 Stage 2: L1-upgrade256MB阈值)
for i := 0; i < 20; i++ {
edit.AddFile(&FileMetadata{
FileNumber: int64(100 + i + 1),
Level: 1,
FileSize: 20 * 1024 * 1024,
MinKey: int64(i * 200),
MaxKey: int64((i+1)*200 - 1),
RowCount: 200,
})
}
// L2: 10 files × 150MB = 1500MB (应该触发 Stage 3: L2-upgrade1GB阈值)
for i := 0; i < 10; i++ {
edit.AddFile(&FileMetadata{
FileNumber: int64(200 + i + 1),
Level: 2,
FileSize: 150 * 1024 * 1024,
MinKey: int64(i * 300),
MaxKey: int64((i+1)*300 - 1),
RowCount: 300,
})
}
edit.SetNextFileNumber(301)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
version := versionSet.GetCurrent()
// 验证阶段按顺序执行Stage 0→1→2→3→0→1→2→3
expectedStages := []struct {
stage int
name string
level int
}{
{0, "L0-merge", 0},
{1, "L0-upgrade", 0},
{2, "L1-upgrade", 1},
{3, "L2-upgrade", 2},
{0, "L0-merge", 0},
{1, "L0-upgrade", 0},
{2, "L1-upgrade", 1},
{3, "L2-upgrade", 2},
}
for i, expected := range expectedStages {
t.Logf("=== 第%d次调用 PickCompaction (期望 Stage %d: %s) ===", i+1, expected.stage, expected.name)
tasks := picker.PickCompaction(version)
if len(tasks) == 0 {
t.Errorf("Call %d: Expected tasks from Stage %d (%s), got no tasks", i+1, expected.stage, expected.name)
continue
}
actualLevel := tasks[0].Level
if actualLevel != expected.level {
t.Errorf("Call %d: Expected L%d tasks, got L%d tasks", i+1, expected.level, actualLevel)
} else {
t.Logf("✓ Call %d: Got %d tasks from L%d (Stage %d: %s) as expected",
i+1, len(tasks), actualLevel, expected.stage, expected.name)
}
}
t.Log("=== Multi-level stage rotation test passed ===")
}
// TestPickL0MergeContinuity 测试 L0 合并任务的连续性
func TestPickL0MergeContinuity(t *testing.T) {
tmpDir := t.TempDir()
manifestDir := tmpDir
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
picker := NewPicker()
// 创建混合大小的文件:小-大-小-小
// 这是触发 bug 的场景
edit := NewVersionEdit()
// 文件1: 29MB (小文件)
edit.AddFile(&FileMetadata{
FileNumber: 1,
Level: 0,
FileSize: 29 * 1024 * 1024,
MinKey: 1,
MaxKey: 100,
RowCount: 100,
})
// 文件2: 36MB (大文件)
edit.AddFile(&FileMetadata{
FileNumber: 2,
Level: 0,
FileSize: 36 * 1024 * 1024,
MinKey: 101,
MaxKey: 200,
RowCount: 100,
})
// 文件3: 8MB (小文件)
edit.AddFile(&FileMetadata{
FileNumber: 3,
Level: 0,
FileSize: 8 * 1024 * 1024,
MinKey: 201,
MaxKey: 300,
RowCount: 100,
})
// 文件4: 15MB (小文件)
edit.AddFile(&FileMetadata{
FileNumber: 4,
Level: 0,
FileSize: 15 * 1024 * 1024,
MinKey: 301,
MaxKey: 400,
RowCount: 100,
})
edit.SetNextFileNumber(5)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
version := versionSet.GetCurrent()
// 测试 Stage 0: L0 合并任务
t.Log("=== 测试 Stage 0: L0 合并 ===")
tasks := picker.pickL0MergeTasks(version)
if len(tasks) == 0 {
t.Fatal("Expected L0 merge tasks")
}
t.Logf("找到 %d 个合并任务", len(tasks))
// 验证任务应该只有1个任务包含文件3和文件4
// 文件1是单个小文件不合并len > 1 才合并)
// 文件2是大文件跳过
// 文件3+文件4是连续的2个小文件应该合并
if len(tasks) != 1 {
t.Errorf("Expected 1 task, got %d", len(tasks))
for i, task := range tasks {
t.Logf("Task %d: %d files", i+1, len(task.InputFiles))
for _, f := range task.InputFiles {
t.Logf(" - File %d", f.FileNumber)
}
}
}
task1 := tasks[0]
if len(task1.InputFiles) != 2 {
t.Errorf("Task 1: expected 2 files, got %d", len(task1.InputFiles))
}
if task1.InputFiles[0].FileNumber != 3 || task1.InputFiles[1].FileNumber != 4 {
t.Errorf("Task 1: expected files 3,4, got %d,%d",
task1.InputFiles[0].FileNumber, task1.InputFiles[1].FileNumber)
}
t.Logf("✓ 合并任务: 文件3+文件4 (连续的2个小文件)")
t.Logf("✓ 文件1 (单个小文件) 不合并,留给升级阶段")
// 验证 seq 范围连续性
// 任务1: seq 201-400 (文件3+文件4)
// 文件1seq 1-100, 单个小文件)留给升级阶段
// 文件2seq 101-200, 大文件)留给 Stage 1
if task1.InputFiles[0].MinKey != 201 || task1.InputFiles[1].MaxKey != 400 {
t.Errorf("Task 1 seq range incorrect: [%d, %d]",
task1.InputFiles[0].MinKey, task1.InputFiles[1].MaxKey)
}
t.Logf("✓ Seq 范围正确任务1 [201-400]")
// 测试 Stage 1: L0 升级任务
t.Log("=== 测试 Stage 1: L0 升级 ===")
upgradeTasks := picker.pickL0UpgradeTasks(version)
if len(upgradeTasks) == 0 {
t.Fatal("Expected L0 upgrade tasks")
}
// 应该有1个任务以文件2大文件为中心搭配周围的小文件
// 文件2向左收集文件1向右收集文件3和文件4
// 总共文件1 (29MB) + 文件2 (36MB) + 文件3 (8MB) + 文件4 (15MB) = 88MB
if len(upgradeTasks) != 1 {
t.Errorf("Expected 1 upgrade task, got %d", len(upgradeTasks))
}
upgradeTask := upgradeTasks[0]
// 应该包含所有4个文件
if len(upgradeTask.InputFiles) != 4 {
t.Errorf("Upgrade task: expected 4 files, got %d", len(upgradeTask.InputFiles))
for i, f := range upgradeTask.InputFiles {
t.Logf(" File %d: %d", i+1, f.FileNumber)
}
}
// 验证文件顺序1, 2, 3, 4
expectedFiles := []int64{1, 2, 3, 4}
for i, expected := range expectedFiles {
if upgradeTask.InputFiles[i].FileNumber != expected {
t.Errorf("Upgrade task file %d: expected %d, got %d",
i, expected, upgradeTask.InputFiles[i].FileNumber)
}
}
if upgradeTask.OutputLevel != 1 {
t.Errorf("Upgrade task: expected OutputLevel 1, got %d", upgradeTask.OutputLevel)
}
t.Logf("✓ 升级任务: 文件1+文件2+文件3+文件4 (以大文件为中心,搭配周围小文件) → L1")
t.Log("=== 连续性测试通过 ===")
}
// TestPickL0UpgradeContinuity 测试 L0 升级任务的连续性
func TestPickL0UpgradeContinuity(t *testing.T) {
tmpDir := t.TempDir()
manifestDir := tmpDir
versionSet, err := NewVersionSet(manifestDir)
if err != nil {
t.Fatal(err)
}
defer versionSet.Close()
picker := NewPicker()
// 创建混合大小的文件:大-小-大-大
edit := NewVersionEdit()
// 文件1: 40MB (大文件)
edit.AddFile(&FileMetadata{
FileNumber: 1,
Level: 0,
FileSize: 40 * 1024 * 1024,
MinKey: 1,
MaxKey: 100,
RowCount: 100,
})
// 文件2: 20MB (小文件)
edit.AddFile(&FileMetadata{
FileNumber: 2,
Level: 0,
FileSize: 20 * 1024 * 1024,
MinKey: 101,
MaxKey: 200,
RowCount: 100,
})
// 文件3: 50MB (大文件)
edit.AddFile(&FileMetadata{
FileNumber: 3,
Level: 0,
FileSize: 50 * 1024 * 1024,
MinKey: 201,
MaxKey: 300,
RowCount: 100,
})
// 文件4: 45MB (大文件)
edit.AddFile(&FileMetadata{
FileNumber: 4,
Level: 0,
FileSize: 45 * 1024 * 1024,
MinKey: 301,
MaxKey: 400,
RowCount: 100,
})
edit.SetNextFileNumber(5)
err = versionSet.LogAndApply(edit)
if err != nil {
t.Fatal(err)
}
version := versionSet.GetCurrent()
// 测试 L0 升级任务
t.Log("=== 测试 L0 升级任务连续性 ===")
tasks := picker.pickL0UpgradeTasks(version)
if len(tasks) == 0 {
t.Fatal("Expected L0 upgrade tasks")
}
t.Logf("找到 %d 个升级任务", len(tasks))
// 验证任务1应该包含所有4个文件以大文件为锚点搭配周围文件
// 文件1大文件作为锚点 → 向左无文件 → 向右收集文件2+文件3+文件4
// 总大小40+20+50+45 = 155MB < 256MB符合 L1 限制
task1 := tasks[0]
expectedFileCount := 4
if len(task1.InputFiles) != expectedFileCount {
t.Errorf("Task 1: expected %d files, got %d", expectedFileCount, len(task1.InputFiles))
for i, f := range task1.InputFiles {
t.Logf(" File %d: %d", i+1, f.FileNumber)
}
}
// 验证文件顺序1, 2, 3, 4
expectedFiles := []int64{1, 2, 3, 4}
for i, expected := range expectedFiles {
if i >= len(task1.InputFiles) {
break
}
if task1.InputFiles[i].FileNumber != expected {
t.Errorf("Task 1 file %d: expected %d, got %d",
i, expected, task1.InputFiles[i].FileNumber)
}
}
t.Logf("✓ Task 1: 文件1+文件2+文件3+文件4 (以大文件为锚点搭配周围文件总155MB < 256MB)")
// 只应该有1个任务所有文件都被收集了
if len(tasks) != 1 {
t.Errorf("Expected 1 task (all files collected), got %d", len(tasks))
for i, task := range tasks {
t.Logf("Task %d: %d files", i+1, len(task.InputFiles))
for _, f := range task.InputFiles {
t.Logf(" - File %d", f.FileNumber)
}
}
}
// 验证所有任务的 OutputLevel 都是 1
for i, task := range tasks {
if task.OutputLevel != 1 {
t.Errorf("Task %d: expected OutputLevel 1, got %d", i+1, task.OutputLevel)
}
}
t.Logf("✓ 所有任务都升级到 L1")
t.Log("=== 升级任务连续性测试通过 ===")
}