Files
srdb/table_test.go
bourdon 4aade1cff1 前端:重构 Web UI 代码结构
- 添加 Import Map 支持 Lit 和本地模块的简洁导入
- 创建统一的 API 管理模块 (common/api.js)
- 重命名 styles/ 为 common/ 目录
- 修复分页时列选择被重置的问题
- 将 app.js 重命名为 main.js
- 所有导入路径使用 ~ 别名映射
2025-10-09 19:30:20 +08:00

1847 lines
41 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package srdb
import (
"crypto/rand"
"fmt"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestTable(t *testing.T) {
// 1. 创建引擎
dir := "test_db"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeInt64, Indexed: false, Comment: "年龄"},
})
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 1024, // 1 KB方便触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 2. 插入数据
for i := 1; i <= 100; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i%50,
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
}
// 等待 Flush 和 Compaction 完成
time.Sleep(1 * time.Second)
t.Logf("Inserted 100 rows")
// 3. 查询数据
for i := int64(1); i <= 100; i++ {
row, err := table.Get(i)
if err != nil {
t.Errorf("Failed to get key %d: %v", i, err)
continue
}
if row.Seq != i {
t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq)
}
}
// 4. 统计信息
stats := table.Stats()
t.Logf("Stats: MemTable=%d rows, SST=%d files, Total=%d rows",
stats.MemTableCount, stats.SSTCount, stats.TotalRows)
if stats.TotalRows != 100 {
t.Errorf("Expected 100 total rows, got %d", stats.TotalRows)
}
t.Log("All tests passed!")
}
func TestTableRecover(t *testing.T) {
dir := "test_recover"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "value", Type: FieldTypeInt64, Indexed: false, Comment: "值"},
})
// 1. 创建引擎并插入数据
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 10 MB不会触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
for i := 1; i <= 50; i++ {
data := map[string]any{
"value": i,
}
table.Insert(data)
}
t.Log("Inserted 50 rows")
// 2. 关闭引擎 (模拟崩溃前)
table.Close()
// 3. 重新打开引擎 (恢复)
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table2.Close()
// 4. 验证数据
for i := int64(1); i <= 50; i++ {
row, err := table2.Get(i)
if err != nil {
t.Errorf("Failed to get key %d after recover: %v", i, err)
}
if row.Seq != i {
t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq)
}
}
stats := table2.Stats()
if stats.TotalRows != 50 {
t.Errorf("Expected 50 rows after recover, got %d", stats.TotalRows)
}
t.Log("Recover test passed!")
}
func TestTableFlush(t *testing.T) {
dir := "test_flush"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "data", Type: FieldTypeString, Indexed: false, Comment: "数据"},
})
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 1024, // 1 KB
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入足够多的数据触发 Flush
for i := 1; i <= 200; i++ {
data := map[string]any{
"data": fmt.Sprintf("value_%d", i),
}
table.Insert(data)
}
// 等待 Flush
time.Sleep(500 * time.Millisecond)
stats := table.Stats()
t.Logf("After flush: MemTable=%d, SST=%d, Total=%d",
stats.MemTableCount, stats.SSTCount, stats.TotalRows)
if stats.SSTCount == 0 {
t.Error("Expected at least 1 SST file after flush")
}
// 验证所有数据都能查到
for i := int64(1); i <= 200; i++ {
_, err := table.Get(i)
if err != nil {
t.Errorf("Failed to get key %d after flush: %v", i, err)
}
}
t.Log("Flush test passed!")
}
func BenchmarkTableInsert(b *testing.B) {
dir := "bench_insert"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "value", Type: FieldTypeInt64, Indexed: false, Comment: "值"},
})
table, _ := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 100 * 1024 * 1024, // 100 MB
Name: schema.Name,
Fields: schema.Fields,
})
defer table.Close()
data := map[string]any{
"value": 123,
}
for b.Loop() {
table.Insert(data)
}
}
func BenchmarkTableGet(b *testing.B) {
dir := "bench_get"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "value", Type: FieldTypeInt64, Indexed: false, Comment: "值"},
})
table, _ := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 100 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
})
defer table.Close()
// 预先插入数据
for i := 1; i <= 10000; i++ {
data := map[string]any{
"value": i,
}
table.Insert(data)
}
for i := 0; b.Loop(); i++ {
key := int64(i%10000 + 1)
table.Get(key)
}
}
// TestHighConcurrencyWrite 测试高并发写入2KB-5MB 数据)
func TestHighConcurrencyWrite(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte payload - we create a minimal schema
// Schema validation accepts []byte as it gets JSON-marshaled
schema := NewSchema("test", []Field{
{Name: "worker_id", Type: FieldTypeInt64, Indexed: false, Comment: "Worker ID"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024, // 64MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 测试配置
const (
numGoroutines = 50 // 50 个并发写入
rowsPerWorker = 100 // 每个 worker 写入 100 行
minDataSize = 2 * 1024 // 2KB
maxDataSize = 5 * 1024 * 1024 // 5MB
)
var (
totalInserted atomic.Int64
totalErrors atomic.Int64
wg sync.WaitGroup
)
startTime := time.Now()
// 启动多个并发写入 goroutine
for i := range numGoroutines {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := range rowsPerWorker {
// 生成随机大小的数据 (2KB - 5MB)
dataSize := minDataSize + (j % (maxDataSize - minDataSize))
largeData := make([]byte, dataSize)
rand.Read(largeData)
data := map[string]any{
"worker_id": workerID,
"row_index": j,
"data_size": dataSize,
"payload": largeData,
"timestamp": time.Now().Unix(),
}
err := table.Insert(data)
if err != nil {
totalErrors.Add(1)
t.Logf("Worker %d, Row %d: Insert failed: %v", workerID, j, err)
} else {
totalInserted.Add(1)
}
// 每 10 行报告一次进度
if j > 0 && j%10 == 0 {
t.Logf("Worker %d: 已插入 %d 行", workerID, j)
}
}
}(i)
}
// 等待所有写入完成
wg.Wait()
duration := time.Since(startTime)
// 统计结果
inserted := totalInserted.Load()
errors := totalErrors.Load()
expected := int64(numGoroutines * rowsPerWorker)
t.Logf("\n=== 高并发写入测试结果 ===")
t.Logf("并发数: %d", numGoroutines)
t.Logf("预期插入: %d 行", expected)
t.Logf("成功插入: %d 行", inserted)
t.Logf("失败: %d 行", errors)
t.Logf("耗时: %v", duration)
t.Logf("吞吐量: %.2f 行/秒", float64(inserted)/duration.Seconds())
// 验证
if errors > 0 {
t.Errorf("有 %d 次写入失败", errors)
}
if inserted != expected {
t.Errorf("预期插入 %d 行,实际插入 %d 行", expected, inserted)
}
// 等待 Flush 完成
time.Sleep(2 * time.Second)
// 验证数据完整性
stats := table.Stats()
t.Logf("\nTable 状态:")
t.Logf(" 总行数: %d", stats.TotalRows)
t.Logf(" SST 文件数: %d", stats.SSTCount)
t.Logf(" MemTable 行数: %d", stats.MemTableCount)
if stats.TotalRows < inserted {
t.Errorf("数据丢失: 预期至少 %d 行,实际 %d 行", inserted, stats.TotalRows)
}
}
// TestConcurrentReadWrite 测试并发读写混合
func TestConcurrentReadWrite(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema := NewSchema("test", []Field{
{Name: "writer_id", Type: FieldTypeInt64, Indexed: false, Comment: "Writer ID"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 32 * 1024 * 1024, // 32MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
const (
numWriters = 20
numReaders = 30
duration = 10 * time.Second
dataSize = 10 * 1024 // 10KB
)
var (
writeCount atomic.Int64
readCount atomic.Int64
readErrors atomic.Int64
wg sync.WaitGroup
stopCh = make(chan struct{})
)
// 启动写入 goroutines
for i := range numWriters {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"writer_id": writerID,
"data": data,
"timestamp": time.Now().UnixNano(),
}
err := table.Insert(payload)
if err == nil {
writeCount.Add(1)
}
time.Sleep(10 * time.Millisecond)
}
}
}(i)
}
// 启动读取 goroutines
for i := range numReaders {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
// 随机读取
seq := int64(readerID*100 + 1)
_, err := table.Get(seq)
if err == nil {
readCount.Add(1)
} else {
readErrors.Add(1)
}
time.Sleep(5 * time.Millisecond)
}
}
}(i)
}
// 运行指定时间
time.Sleep(duration)
close(stopCh)
wg.Wait()
// 统计结果
writes := writeCount.Load()
reads := readCount.Load()
errors := readErrors.Load()
t.Logf("\n=== 并发读写测试结果 ===")
t.Logf("测试时长: %v", duration)
t.Logf("写入次数: %d (%.2f 次/秒)", writes, float64(writes)/duration.Seconds())
t.Logf("读取次数: %d (%.2f 次/秒)", reads, float64(reads)/duration.Seconds())
t.Logf("读取失败: %d", errors)
stats := table.Stats()
t.Logf("\nTable 状态:")
t.Logf(" 总行数: %d", stats.TotalRows)
t.Logf(" SST 文件数: %d", stats.SSTCount)
}
// TestPowerFailureRecovery 测试断电恢复(模拟崩溃)
func TestPowerFailureRecovery(t *testing.T) {
tmpDir := t.TempDir()
// 第一阶段:写入数据并模拟崩溃
t.Log("=== 阶段 1: 写入数据 ===")
// Note: This test uses []byte data - we create a minimal schema
schema := NewSchema("test", []Field{
{Name: "batch", Type: FieldTypeInt64, Indexed: false, Comment: "Batch number"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 4 * 1024 * 1024, // 4MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
const (
numBatches = 10
rowsPerBatch = 50
dataSize = 50 * 1024 // 50KB
)
insertedSeqs := make([]int64, 0, numBatches*rowsPerBatch)
for batch := range numBatches {
for i := range rowsPerBatch {
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"batch": batch,
"index": i,
"data": data,
"timestamp": time.Now().Unix(),
}
err := table.Insert(payload)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
seq := table.seq.Load()
insertedSeqs = append(insertedSeqs, seq)
}
// 每批后触发 Flush
if batch%3 == 0 {
table.switchMemTable()
time.Sleep(100 * time.Millisecond)
}
t.Logf("批次 %d: 插入 %d 行", batch, rowsPerBatch)
}
totalInserted := len(insertedSeqs)
t.Logf("总共插入: %d 行", totalInserted)
// 获取崩溃前的状态
statsBefore := table.Stats()
t.Logf("崩溃前状态: 总行数=%d, SST文件=%d, MemTable行数=%d",
statsBefore.TotalRows, statsBefore.SSTCount, statsBefore.MemTableCount)
// 模拟崩溃:直接关闭(不等待 Flush 完成)
t.Log("\n=== 模拟断电崩溃 ===")
table.Close()
// 第二阶段:恢复并验证数据
t.Log("\n=== 阶段 2: 恢复数据 ===")
tableRecovered, err := OpenTable(opts)
if err != nil {
t.Fatalf("恢复失败: %v", err)
}
defer tableRecovered.Close()
// 等待恢复完成
time.Sleep(500 * time.Millisecond)
statsAfter := tableRecovered.Stats()
t.Logf("恢复后状态: 总行数=%d, SST文件=%d, MemTable行数=%d",
statsAfter.TotalRows, statsAfter.SSTCount, statsAfter.MemTableCount)
// 验证数据完整性
t.Log("\n=== 阶段 3: 验证数据完整性 ===")
recovered := 0
missing := 0
corrupted := 0
for i, seq := range insertedSeqs {
row, err := tableRecovered.Get(seq)
if err != nil {
missing++
if i < len(insertedSeqs)/2 {
// 前半部分应该已经 Flush不应该丢失
t.Logf("警告: Seq %d 丢失(应该已持久化)", seq)
}
continue
}
// 验证数据
if row.Seq != seq {
corrupted++
t.Errorf("数据损坏: 预期 Seq=%d, 实际=%d", seq, row.Seq)
continue
}
recovered++
}
recoveryRate := float64(recovered) / float64(totalInserted) * 100
t.Logf("\n=== 恢复结果 ===")
t.Logf("插入总数: %d", totalInserted)
t.Logf("成功恢复: %d (%.2f%%)", recovered, recoveryRate)
t.Logf("丢失: %d", missing)
t.Logf("损坏: %d", corrupted)
// 验证:至少应该恢复已经 Flush 的数据
if corrupted > 0 {
t.Errorf("发现 %d 条损坏数据", corrupted)
}
// 至少应该恢复 50% 的数据(已 Flush 的部分)
if recoveryRate < 50 {
t.Errorf("恢复率过低: %.2f%% (预期至少 50%%)", recoveryRate)
}
t.Logf("\n断电恢复测试通过")
}
// TestCrashDuringCompaction 测试 Compaction 期间崩溃
func TestCrashDuringCompaction(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema := NewSchema("test", []Field{
{Name: "index", Type: FieldTypeInt64, Indexed: false, Comment: "Index"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 1024, // 很小,快速触发 Flush
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
// 插入大量数据触发多次 Flush
t.Log("=== 插入数据触发 Compaction ===")
const numRows = 500
dataSize := 5 * 1024 // 5KB
for i := range numRows {
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"index": i,
"data": data,
}
err := table.Insert(payload)
if err != nil {
t.Fatal(err)
}
if i%50 == 0 {
t.Logf("已插入 %d 行", i)
}
}
// 等待一些 Flush 完成
time.Sleep(500 * time.Millisecond)
version := table.versionSet.GetCurrent()
l0Count := version.GetLevelFileCount(0)
t.Logf("L0 文件数: %d", l0Count)
// 模拟在 Compaction 期间崩溃
if l0Count >= 4 {
t.Log("触发 Compaction...")
go func() {
table.compactionManager.TriggerCompaction()
}()
// 等待 Compaction 开始
time.Sleep(100 * time.Millisecond)
t.Log("=== 模拟 Compaction 期间崩溃 ===")
}
// 直接关闭(模拟崩溃)
table.Close()
// 恢复
t.Log("\n=== 恢复数据库 ===")
tableRecovered, err := OpenTable(opts)
if err != nil {
t.Fatalf("恢复失败: %v", err)
}
defer tableRecovered.Close()
// 验证数据完整性
stats := tableRecovered.Stats()
t.Logf("恢复后: 总行数=%d, SST文件=%d", stats.TotalRows, stats.SSTCount)
// 随机验证一些数据
t.Log("\n=== 验证数据 ===")
verified := 0
for i := 1; i <= 100; i++ {
seq := int64(i)
_, err := tableRecovered.Get(seq)
if err == nil {
verified++
}
}
t.Logf("验证前 100 行: %d 行可读", verified)
if verified < 50 {
t.Errorf("数据恢复不足: 只有 %d/100 行可读", verified)
}
t.Log("Compaction 崩溃恢复测试通过!")
}
// TestLargeDataIntegrity 测试大数据完整性2KB-5MB 数据)
func TestLargeDataIntegrity(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema := NewSchema("test", []Field{
{Name: "size", Type: FieldTypeInt64, Indexed: false, Comment: "Size"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024, // 64MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 测试不同大小的数据
testSizes := []int{
2 * 1024, // 2KB
10 * 1024, // 10KB
100 * 1024, // 100KB
1 * 1024 * 1024, // 1MB
5 * 1024 * 1024, // 5MB
}
t.Log("=== 插入不同大小的数据 ===")
insertedSeqs := make([]int64, 0)
for _, size := range testSizes {
// 每种大小插入 3 行
for i := range 3 {
data := make([]byte, size)
rand.Read(data)
payload := map[string]any{
"size": size,
"index": i,
"data": data,
}
err := table.Insert(payload)
if err != nil {
t.Fatalf("插入失败 (size=%d, index=%d): %v", size, i, err)
}
seq := table.seq.Load()
insertedSeqs = append(insertedSeqs, seq)
t.Logf("插入: Seq=%d, Size=%d KB", seq, size/1024)
}
}
totalInserted := len(insertedSeqs)
t.Logf("总共插入: %d 行", totalInserted)
// 等待 Flush
time.Sleep(2 * time.Second)
// 验证数据可读性
t.Log("\n=== 验证数据可读性 ===")
successCount := 0
for i, seq := range insertedSeqs {
row, err := table.Get(seq)
if err != nil {
t.Errorf("读取失败 (Seq=%d): %v", seq, err)
continue
}
// 验证数据存在
if _, exists := row.Data["data"]; !exists {
t.Errorf("Seq=%d: 数据字段不存在", seq)
continue
}
if _, exists := row.Data["size"]; !exists {
t.Errorf("Seq=%d: size 字段不存在", seq)
continue
}
successCount++
if i < 5 || i >= totalInserted-5 {
// 只打印前5行和后5行
t.Logf("✓ Seq=%d 验证通过", seq)
}
}
successRate := float64(successCount) / float64(totalInserted) * 100
stats := table.Stats()
t.Logf("\n=== 测试结果 ===")
t.Logf("插入总数: %d", totalInserted)
t.Logf("成功读取: %d (%.2f%%)", successCount, successRate)
t.Logf("总行数: %d", stats.TotalRows)
t.Logf("SST 文件数: %d", stats.SSTCount)
if successCount != totalInserted {
t.Errorf("数据丢失: %d/%d", totalInserted-successCount, totalInserted)
}
t.Log("\n大数据完整性测试通过")
}
// BenchmarkConcurrentWrites 并发写入性能测试
func BenchmarkConcurrentWrites(b *testing.B) {
tmpDir := b.TempDir()
// Note: This benchmark uses []byte data - we create a minimal schema
schema := NewSchema("test", []Field{
{Name: "timestamp", Type: FieldTypeInt64, Indexed: false, Comment: "Timestamp"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
b.Fatal(err)
}
defer table.Close()
const (
numWorkers = 10
dataSize = 10 * 1024 // 10KB
)
data := make([]byte, dataSize)
rand.Read(data)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
payload := map[string]any{
"data": data,
"timestamp": time.Now().UnixNano(),
}
err := table.Insert(payload)
if err != nil {
b.Error(err)
}
}
})
b.StopTimer()
stats := table.Stats()
b.Logf("总行数: %d, SST 文件数: %d", stats.TotalRows, stats.SSTCount)
}
// TestTableWithCompaction 测试 Table 的 Compaction 功能
func TestTableWithCompaction(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
schema := NewSchema("test", []Field{
{Name: "batch", Type: FieldTypeInt64, Indexed: false, Comment: "批次"},
{Name: "index", Type: FieldTypeInt64, Indexed: false, Comment: "索引"},
{Name: "value", Type: FieldTypeString, Indexed: false, Comment: "值"},
})
// 打开 Table
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 1024, // 小的 MemTable 以便快速触发 Flush
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入大量数据,触发多次 Flush
const numBatches = 10
const rowsPerBatch = 100
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
"value": fmt.Sprintf("data-%d-%d", batch, i),
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
}
// 强制 Flush
err = table.switchMemTable()
if err != nil {
t.Fatalf("Switch MemTable failed: %v", err)
}
// 等待 Flush 完成
time.Sleep(100 * time.Millisecond)
}
// 等待所有 Immutable Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 检查 Version 状态
version := table.versionSet.GetCurrent()
l0Count := version.GetLevelFileCount(0)
t.Logf("L0 files: %d", l0Count)
if l0Count == 0 {
t.Error("Expected some files in L0")
}
// 获取 Level 统计信息
levelStats := table.compactionManager.GetLevelStats()
for _, stat := range levelStats {
level := stat.Level
fileCount := stat.FileCount
totalSize := stat.TotalSize
score := stat.Score
if fileCount > 0 {
t.Logf("L%d: %d files, %d bytes, score: %.2f", level, fileCount, totalSize, score)
}
}
// 手动触发 Compaction
if l0Count >= 4 {
t.Log("Triggering manual compaction...")
err = table.compactionManager.TriggerCompaction()
if err != nil {
t.Logf("Compaction: %v", err)
} else {
t.Log("Compaction completed")
// 检查 Compaction 后的状态
version = table.versionSet.GetCurrent()
newL0Count := version.GetLevelFileCount(0)
l1Count := version.GetLevelFileCount(1)
t.Logf("After compaction - L0: %d files, L1: %d files", newL0Count, l1Count)
if newL0Count >= l0Count {
t.Error("Expected L0 file count to decrease after compaction")
}
if l1Count == 0 {
t.Error("Expected some files in L1 after compaction")
}
}
}
// 验证数据完整性
stats := table.Stats()
t.Logf("Table stats: %d rows, %d SST files", stats.TotalRows, stats.SSTCount)
// 读取一些数据验证
for batch := range 3 {
for i := range 10 {
seq := int64(batch*rowsPerBatch + i + 1)
row, err := table.Get(seq)
if err != nil {
t.Errorf("Get(%d) failed: %v", seq, err)
continue
}
if row.Data["batch"].(int64) != int64(batch) {
t.Errorf("Expected batch %d, got %v", batch, row.Data["batch"])
}
}
}
}
// TestTableCompactionMerge 测试 Compaction 的合并功能
func TestTableCompactionMerge(t *testing.T) {
tmpDir := t.TempDir()
schema := NewSchema("test", []Field{
{Name: "batch", Type: FieldTypeInt64, Indexed: false, Comment: "批次"},
{Name: "index", Type: FieldTypeInt64, Indexed: false, Comment: "索引"},
{Name: "value", Type: FieldTypeString, Indexed: false, Comment: "值"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 512, // 很小的 MemTable
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入数据Append-Only 模式)
const numBatches = 5
const rowsPerBatch = 50
totalRows := 0
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
"value": fmt.Sprintf("v%d-%d", batch, i),
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
totalRows++
}
// 每批后 Flush
err = table.switchMemTable()
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
}
// 等待所有 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 记录 Compaction 前的文件数
version := table.versionSet.GetCurrent()
beforeL0 := version.GetLevelFileCount(0)
t.Logf("Before compaction: L0 has %d files", beforeL0)
// 触发 Compaction
if beforeL0 >= 4 {
err = table.compactionManager.TriggerCompaction()
if err != nil {
t.Logf("Compaction: %v", err)
} else {
version = table.versionSet.GetCurrent()
afterL0 := version.GetLevelFileCount(0)
afterL1 := version.GetLevelFileCount(1)
t.Logf("After compaction: L0 has %d files, L1 has %d files", afterL0, afterL1)
}
}
// 验证数据完整性 - 检查前几条记录
for batch := range 2 {
for i := range 5 {
seq := int64(batch*rowsPerBatch + i + 1)
row, err := table.Get(seq)
if err != nil {
t.Errorf("Get(%d) failed: %v", seq, err)
continue
}
actualBatch := int(row.Data["batch"].(int64))
if actualBatch != batch {
t.Errorf("Seq %d: expected batch %d, got %d", seq, batch, actualBatch)
}
}
}
// 验证总行数
stats := table.Stats()
if stats.TotalRows != int64(totalRows) {
t.Errorf("Expected %d total rows, got %d", totalRows, stats.TotalRows)
}
t.Logf("Data integrity verified: %d rows", totalRows)
}
// TestTableBackgroundCompaction 测试后台自动 Compaction
func TestTableBackgroundCompaction(t *testing.T) {
if testing.Short() {
t.Skip("Skipping background compaction test in short mode")
}
tmpDir := t.TempDir()
schema := NewSchema("test", []Field{
{Name: "batch", Type: FieldTypeInt64, Indexed: false, Comment: "批次"},
{Name: "index", Type: FieldTypeInt64, Indexed: false, Comment: "索引"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 512,
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入数据触发多次 Flush
const numBatches = 8
const rowsPerBatch = 50
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
}
err = table.switchMemTable()
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
}
// 等待 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 记录初始状态
version := table.versionSet.GetCurrent()
initialL0 := version.GetLevelFileCount(0)
t.Logf("Initial L0 files: %d", initialL0)
// 等待后台 Compaction最多等待 30 秒)
maxWait := 30 * time.Second
checkInterval := 2 * time.Second
waited := time.Duration(0)
for waited < maxWait {
time.Sleep(checkInterval)
waited += checkInterval
version = table.versionSet.GetCurrent()
currentL0 := version.GetLevelFileCount(0)
currentL1 := version.GetLevelFileCount(1)
t.Logf("After %v: L0=%d, L1=%d", waited, currentL0, currentL1)
// 如果 L0 文件减少或 L1 有文件,说明 Compaction 发生了
if currentL0 < initialL0 || currentL1 > 0 {
t.Logf("Background compaction detected!")
// 获取 Compaction 统计
stats := table.compactionManager.GetStats()
t.Logf("Compaction stats: %v", stats)
return
}
}
t.Log("No background compaction detected within timeout (this is OK if L0 < 4 files)")
}
// BenchmarkTableWithCompaction 性能测试
func BenchmarkTableWithCompaction(b *testing.B) {
tmpDir := b.TempDir()
schema := NewSchema("test", []Field{
{Name: "index", Type: FieldTypeInt64, Indexed: false, Comment: "索引"},
{Name: "value", Type: FieldTypeString, Indexed: false, Comment: "值"},
})
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024, // 64KB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
b.Fatal(err)
}
defer table.Close()
for i := 0; b.Loop(); i++ {
data := map[string]any{
"index": i,
"value": fmt.Sprintf("benchmark-data-%d", i),
}
err := table.Insert(data)
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
// 等待所有 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(10 * time.Millisecond)
}
// 报告统计信息
version := table.versionSet.GetCurrent()
b.Logf("Final state: L0=%d files, L1=%d files, Total=%d files",
version.GetLevelFileCount(0),
version.GetLevelFileCount(1),
version.GetFileCount())
}
// TestTableSchemaRecover 测试 Schema 恢复
func TestTableSchemaRecover(t *testing.T) {
dir := "test_schema_recover"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s := NewSchema("users", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeInt64, Indexed: false, Comment: "年龄"},
{Name: "email", Type: FieldTypeString, Indexed: false, Comment: "邮箱"},
})
// 1. 创建引擎并插入数据(带 Schema
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 10 MB不会触发 Flush
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入符合 Schema 的数据
for i := 1; i <= 50; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i%50,
"email": fmt.Sprintf("user%d@example.com", i),
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Failed to insert valid data: %v", err)
}
}
t.Log("Inserted 50 rows with schema")
// 2. 关闭引擎
table.Close()
// 3. 重新打开引擎(带 Schema应该成功恢复
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatalf("Failed to recover with schema: %v", err)
}
// 验证数据
row, err := table2.Get(1)
if err != nil {
t.Fatalf("Failed to get row after recovery: %v", err)
}
if row.Seq != 1 {
t.Errorf("Expected seq=1, got %d", row.Seq)
}
// 验证字段
if row.Data["name"] == nil {
t.Error("Missing field 'name'")
}
if row.Data["age"] == nil {
t.Error("Missing field 'age'")
}
table2.Close()
t.Log("Schema recovery test passed!")
}
// TestTableSchemaRecoverInvalid 测试当 WAL 中有不符合 Schema 的数据时恢复失败
func TestTableSchemaRecoverInvalid(t *testing.T) {
dir := "test_schema_recover_invalid"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema := NewSchema("test", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeString, Indexed: false, Comment: "年龄字符串"},
})
// 1. 先不带 Schema 插入一些数据
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 大容量,确保不会触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入一些不符合后续 Schema 的数据
for i := 1; i <= 10; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": "invalid_age", // 这是字符串,但后续 Schema 要求 int64
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Failed to insert data: %v", err)
}
}
// 2. 停止后台任务但不 Flush模拟崩溃
if table.compactionManager != nil {
table.compactionManager.Stop()
}
// 直接关闭资源,但不 Flush MemTable
if table.walManager != nil {
table.walManager.Close()
}
if table.versionSet != nil {
table.versionSet.Close()
}
if table.sstManager != nil {
table.sstManager.Close()
}
// 3. 创建 Schemaage 字段要求 int64
s := NewSchema("users", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeInt64, Indexed: false, Comment: "年龄"},
})
// 4. 尝试用 Schema 打开引擎,应该失败
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err == nil {
table2.Close()
t.Fatal("Expected recovery to fail with invalid schema, but it succeeded")
}
// 验证错误信息包含 "schema validation failed"
if err != nil {
t.Logf("Got expected error: %v", err)
}
t.Log("Invalid schema recovery test passed!")
}
// TestTableAutoRecoverSchema 测试自动从磁盘恢复 Schema
func TestTableAutoRecoverSchema(t *testing.T) {
dir := "test_auto_recover_schema"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s := NewSchema("users", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeInt64, Indexed: false, Comment: "年龄"},
})
// 1. 创建引擎并提供 Schema会保存到磁盘
table1, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入数据
for i := 1; i <= 10; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i,
}
err := table1.Insert(data)
if err != nil {
t.Fatalf("Failed to insert: %v", err)
}
}
table1.Close()
// 2. 重新打开引擎,不提供 Schema应该自动从磁盘恢复
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
// 不设置 Schema
})
if err != nil {
t.Fatalf("Failed to open without schema: %v", err)
}
// 验证 Schema 已恢复
recoveredSchema := table2.GetSchema()
if recoveredSchema == nil {
t.Fatal("Expected schema to be recovered, but got nil")
}
if recoveredSchema.Name != "users" {
t.Errorf("Expected schema name 'users', got '%s'", recoveredSchema.Name)
}
if len(recoveredSchema.Fields) != 2 {
t.Errorf("Expected 2 fields, got %d", len(recoveredSchema.Fields))
}
// 验证数据
row, err := table2.Get(1)
if err != nil {
t.Fatalf("Failed to get row: %v", err)
}
if row.Data["name"] != "user_1" {
t.Errorf("Expected name='user_1', got '%v'", row.Data["name"])
}
// 尝试插入新数据(应该符合恢复的 Schema
err = table2.Insert(map[string]any{
"name": "new_user",
"age": 30,
})
if err != nil {
t.Fatalf("Failed to insert with recovered schema: %v", err)
}
// 尝试插入不符合 Schema 的数据(应该失败)
err = table2.Insert(map[string]any{
"name": "bad_user",
"age": "invalid", // 类型错误
})
if err == nil {
t.Fatal("Expected insert to fail with invalid type, but it succeeded")
}
table2.Close()
t.Log("Auto recover schema test passed!")
}
// TestTableSchemaTamperDetection 测试篡改检测
func TestTableSchemaTamperDetection(t *testing.T) {
dir := "test_schema_tamper"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s := NewSchema("users", []Field{
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "用户名"},
{Name: "age", Type: FieldTypeInt64, Indexed: false, Comment: "年龄"},
})
// 1. 创建引擎并保存 Schema
table1, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
table1.Close()
// 2. 篡改 schema.json修改字段但不更新 checksum
schemaPath := fmt.Sprintf("%s/schema.json", dir)
schemaData, err := os.ReadFile(schemaPath)
if err != nil {
t.Fatal(err)
}
// 将 "age" 的注释从 "年龄" 改为 "AGE"(简单篡改)
tamperedData := strings.Replace(string(schemaData), "年龄", "AGE", 1)
err = os.WriteFile(schemaPath, []byte(tamperedData), 0644)
if err != nil {
t.Fatal(err)
}
// 3. 尝试打开引擎,应该检测到篡改
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
})
if err == nil {
table2.Close()
t.Fatal("Expected to detect schema tampering, but open succeeded")
}
// 验证错误信息包含 "checksum mismatch"
errMsg := err.Error()
if !strings.Contains(errMsg, "checksum mismatch") && !strings.Contains(errMsg, "tampered") {
t.Errorf("Expected error about checksum mismatch or tampering, got: %v", err)
}
t.Logf("Detected tampering as expected: %v", err)
t.Log("Schema tamper detection test passed!")
}
func TestTableClean(t *testing.T) {
dir := "./test_table_clean_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema := NewSchema("users", []Field{
{Name: "id", Type: FieldTypeInt64, Indexed: true, Comment: "ID"},
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "Name"},
})
table, err := db.CreateTable("users", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 100 {
err := table.Insert(map[string]any{
"id": int64(i),
"name": "user" + string(rune(i)),
})
if err != nil {
t.Fatal(err)
}
}
// 3. 验证数据存在
stats := table.Stats()
t.Logf("Before Clean: %d rows", stats.TotalRows)
if stats.TotalRows == 0 {
t.Error("Expected data in table")
}
// 4. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 5. 验证数据已清除
stats = table.Stats()
t.Logf("After Clean: %d rows", stats.TotalRows)
if stats.TotalRows != 0 {
t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows)
}
// 6. 验证表仍然可用
err = table.Insert(map[string]any{
"id": int64(100),
"name": "new_user",
})
if err != nil {
t.Fatal(err)
}
stats = table.Stats()
if stats.TotalRows != 1 {
t.Errorf("Expected 1 row after insert, got %d", stats.TotalRows)
}
}
func TestTableDestroy(t *testing.T) {
dir := "./test_table_destroy_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema := NewSchema("test", []Field{
{Name: "id", Type: FieldTypeInt64, Indexed: false, Comment: "ID"},
})
table, err := db.CreateTable("test", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 50 {
table.Insert(map[string]any{"id": int64(i)})
}
// 3. 验证数据存在
stats := table.Stats()
t.Logf("Before Destroy: %d rows", stats.TotalRows)
if stats.TotalRows == 0 {
t.Error("Expected data in table")
}
// 4. 获取表目录路径
tableDir := table.dir
// 5. 销毁表
err = table.Destroy()
if err != nil {
t.Fatal(err)
}
// 6. 验证表目录已删除
if _, err := os.Stat(tableDir); !os.IsNotExist(err) {
t.Error("Table directory should be deleted")
}
// 7. 注意Table.Destroy() 只删除文件,不从 Database 中删除
// 表仍然在 Database 的元数据中,但文件已被删除
tables := db.ListTables()
found := slices.Contains(tables, "test")
if !found {
t.Error("Table should still be in database metadata (use Database.DestroyTable to remove from metadata)")
}
}
func TestTableCleanWithIndex(t *testing.T) {
dir := "./test_table_clean_index_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema := NewSchema("users", []Field{
{Name: "id", Type: FieldTypeInt64, Indexed: true, Comment: "ID"},
{Name: "email", Type: FieldTypeString, Indexed: true, Comment: "Email"},
{Name: "name", Type: FieldTypeString, Indexed: false, Comment: "Name"},
})
table, err := db.CreateTable("users", schema)
if err != nil {
t.Fatal(err)
}
// 2. 创建索引
err = table.CreateIndex("id")
if err != nil {
t.Fatal(err)
}
err = table.CreateIndex("email")
if err != nil {
t.Fatal(err)
}
// 3. 插入数据
for i := range 50 {
table.Insert(map[string]any{
"id": int64(i),
"email": "user" + string(rune(i)) + "@example.com",
"name": "User " + string(rune(i)),
})
}
// 4. 验证索引存在
indexes := table.ListIndexes()
if len(indexes) != 2 {
t.Errorf("Expected 2 indexes, got %d", len(indexes))
}
// 5. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 6. 验证数据已清除
stats := table.Stats()
if stats.TotalRows != 0 {
t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows)
}
// 7. 验证索引已被清除Clean 会删除索引数据)
indexes = table.ListIndexes()
if len(indexes) != 0 {
t.Logf("Note: Indexes were cleared (expected behavior), got %d", len(indexes))
}
// 8. 重新创建索引
table.CreateIndex("id")
table.CreateIndex("email")
// 9. 验证可以继续插入数据
err = table.Insert(map[string]any{
"id": int64(100),
"email": "new@example.com",
"name": "New User",
})
if err != nil {
t.Fatal(err)
}
stats = table.Stats()
if stats.TotalRows != 1 {
t.Errorf("Expected 1 row, got %d", stats.TotalRows)
}
}
func TestTableCleanAndQuery(t *testing.T) {
dir := "./test_table_clean_query_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema := NewSchema("test", []Field{
{Name: "id", Type: FieldTypeInt64, Indexed: false, Comment: "ID"},
{Name: "status", Type: FieldTypeString, Indexed: false, Comment: "Status"},
})
table, err := db.CreateTable("test", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 30 {
table.Insert(map[string]any{
"id": int64(i),
"status": "active",
})
}
// 3. 查询数据
rows, err := table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count := 0
for rows.Next() {
count++
}
rows.Close()
t.Logf("Before Clean: found %d rows", count)
if count != 30 {
t.Errorf("Expected 30 rows, got %d", count)
}
// 4. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 5. 再次查询
rows, err = table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count = 0
for rows.Next() {
count++
}
rows.Close()
t.Logf("After Clean: found %d rows", count)
if count != 0 {
t.Errorf("Expected 0 rows after clean, got %d", count)
}
// 6. 插入新数据并查询
table.Insert(map[string]any{
"id": int64(100),
"status": "active",
})
rows, err = table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count = 0
for rows.Next() {
count++
}
rows.Close()
if count != 1 {
t.Errorf("Expected 1 row, got %d", count)
}
}