Files
srdb/table_test.go
bourdon 77087d36c6 功能:增强 Schema 系统和添加新示例
- 扩展 Schema 支持更多数据类型(Duration、URL、JSON 等)
- 优化 SSTable 编码解码性能
- 添加多个新示例程序:
  - all_types: 展示所有支持的数据类型
  - new_types: 演示新增类型的使用
  - struct_tags: 展示结构体标签功能
  - time_duration: 时间和持续时间处理示例
- 完善测试用例和文档
- 优化代码结构和错误处理
2025-10-10 02:57:36 +08:00

2418 lines
52 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package srdb
import (
"crypto/rand"
"fmt"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestTable(t *testing.T) {
// 1. 创建引擎
dir := "test_db"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: Int64, Indexed: false, Comment: "年龄"},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 1024, // 1 KB方便触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 2. 插入数据
for i := 1; i <= 100; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i%50,
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
}
// 等待 Flush 和 Compaction 完成
time.Sleep(1 * time.Second)
t.Logf("Inserted 100 rows")
// 3. 查询数据
for i := int64(1); i <= 100; i++ {
row, err := table.Get(i)
if err != nil {
t.Errorf("Failed to get key %d: %v", i, err)
continue
}
if row.Seq != i {
t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq)
}
}
// 4. 统计信息
stats := table.Stats()
t.Logf("Stats: MemTable=%d rows, SST=%d files, Total=%d rows",
stats.MemTableCount, stats.SSTCount, stats.TotalRows)
if stats.TotalRows != 100 {
t.Errorf("Expected 100 total rows, got %d", stats.TotalRows)
}
t.Log("All tests passed!")
}
func TestTableRecover(t *testing.T) {
dir := "test_recover"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "value", Type: Int64, Indexed: false, Comment: "值"},
})
if err != nil {
t.Fatal(err)
}
// 1. 创建引擎并插入数据
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 10 MB不会触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
for i := 1; i <= 50; i++ {
data := map[string]any{
"value": i,
}
table.Insert(data)
}
t.Log("Inserted 50 rows")
// 2. 关闭引擎 (模拟崩溃前)
table.Close()
// 3. 重新打开引擎 (恢复)
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table2.Close()
// 4. 验证数据
for i := int64(1); i <= 50; i++ {
row, err := table2.Get(i)
if err != nil {
t.Errorf("Failed to get key %d after recover: %v", i, err)
}
if row.Seq != i {
t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq)
}
}
stats := table2.Stats()
if stats.TotalRows != 50 {
t.Errorf("Expected 50 rows after recover, got %d", stats.TotalRows)
}
t.Log("Recover test passed!")
}
func TestTableFlush(t *testing.T) {
dir := "test_flush"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "data", Type: String, Indexed: false, Comment: "数据"},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 1024, // 1 KB
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入足够多的数据触发 Flush
for i := 1; i <= 200; i++ {
data := map[string]any{
"data": fmt.Sprintf("value_%d", i),
}
table.Insert(data)
}
// 等待 Flush
time.Sleep(500 * time.Millisecond)
stats := table.Stats()
t.Logf("After flush: MemTable=%d, SST=%d, Total=%d",
stats.MemTableCount, stats.SSTCount, stats.TotalRows)
if stats.SSTCount == 0 {
t.Error("Expected at least 1 SST file after flush")
}
// 验证所有数据都能查到
for i := int64(1); i <= 200; i++ {
_, err := table.Get(i)
if err != nil {
t.Errorf("Failed to get key %d after flush: %v", i, err)
}
}
t.Log("Flush test passed!")
}
func BenchmarkTableInsert(b *testing.B) {
dir := "bench_insert"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "value", Type: Int64, Indexed: false, Comment: "值"},
})
if err != nil {
b.Fatal(err)
}
table, _ := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 100 * 1024 * 1024, // 100 MB
Name: schema.Name,
Fields: schema.Fields,
})
defer table.Close()
data := map[string]any{
"value": 123,
}
for b.Loop() {
table.Insert(data)
}
}
func BenchmarkTableGet(b *testing.B) {
dir := "bench_get"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "value", Type: Int64, Indexed: false, Comment: "值"},
})
if err != nil {
b.Fatal(err)
}
table, _ := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 100 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
})
defer table.Close()
// 预先插入数据
for i := 1; i <= 10000; i++ {
data := map[string]any{
"value": i,
}
table.Insert(data)
}
for i := 0; b.Loop(); i++ {
key := int64(i%10000 + 1)
table.Get(key)
}
}
// TestHighConcurrencyWrite 测试高并发写入2KB-5MB 数据)
func TestHighConcurrencyWrite(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte payload - we create a minimal schema
// Schema validation accepts []byte as it gets JSON-marshaled
schema, err := NewSchema("test", []Field{
{Name: "worker_id", Type: Int64, Indexed: false, Comment: "Worker ID"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024, // 64MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 测试配置
const (
numGoroutines = 50 // 50 个并发写入
rowsPerWorker = 100 // 每个 worker 写入 100 行
minDataSize = 2 * 1024 // 2KB
maxDataSize = 5 * 1024 * 1024 // 5MB
)
var (
totalInserted atomic.Int64
totalErrors atomic.Int64
wg sync.WaitGroup
)
startTime := time.Now()
// 启动多个并发写入 goroutine
for i := range numGoroutines {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := range rowsPerWorker {
// 生成随机大小的数据 (2KB - 5MB)
dataSize := minDataSize + (j % (maxDataSize - minDataSize))
largeData := make([]byte, dataSize)
rand.Read(largeData)
data := map[string]any{
"worker_id": workerID,
"row_index": j,
"data_size": dataSize,
"payload": largeData,
"timestamp": time.Now().Unix(),
}
err := table.Insert(data)
if err != nil {
totalErrors.Add(1)
t.Logf("Worker %d, Row %d: Insert failed: %v", workerID, j, err)
} else {
totalInserted.Add(1)
}
// 每 10 行报告一次进度
if j > 0 && j%10 == 0 {
t.Logf("Worker %d: 已插入 %d 行", workerID, j)
}
}
}(i)
}
// 等待所有写入完成
wg.Wait()
duration := time.Since(startTime)
// 统计结果
inserted := totalInserted.Load()
errors := totalErrors.Load()
expected := int64(numGoroutines * rowsPerWorker)
t.Logf("\n=== 高并发写入测试结果 ===")
t.Logf("并发数: %d", numGoroutines)
t.Logf("预期插入: %d 行", expected)
t.Logf("成功插入: %d 行", inserted)
t.Logf("失败: %d 行", errors)
t.Logf("耗时: %v", duration)
t.Logf("吞吐量: %.2f 行/秒", float64(inserted)/duration.Seconds())
// 验证
if errors > 0 {
t.Errorf("有 %d 次写入失败", errors)
}
if inserted != expected {
t.Errorf("预期插入 %d 行,实际插入 %d 行", expected, inserted)
}
// 等待 Flush 完成
time.Sleep(2 * time.Second)
// 验证数据完整性
stats := table.Stats()
t.Logf("\nTable 状态:")
t.Logf(" 总行数: %d", stats.TotalRows)
t.Logf(" SST 文件数: %d", stats.SSTCount)
t.Logf(" MemTable 行数: %d", stats.MemTableCount)
if stats.TotalRows < inserted {
t.Errorf("数据丢失: 预期至少 %d 行,实际 %d 行", inserted, stats.TotalRows)
}
}
// TestConcurrentReadWrite 测试并发读写混合
func TestConcurrentReadWrite(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema, err := NewSchema("test", []Field{
{Name: "writer_id", Type: Int64, Indexed: false, Comment: "Writer ID"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 32 * 1024 * 1024, // 32MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
const (
numWriters = 20
numReaders = 30
duration = 10 * time.Second
dataSize = 10 * 1024 // 10KB
)
var (
writeCount atomic.Int64
readCount atomic.Int64
readErrors atomic.Int64
wg sync.WaitGroup
stopCh = make(chan struct{})
)
// 启动写入 goroutines
for i := range numWriters {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"writer_id": writerID,
"data": data,
"timestamp": time.Now().UnixNano(),
}
err := table.Insert(payload)
if err == nil {
writeCount.Add(1)
}
time.Sleep(10 * time.Millisecond)
}
}
}(i)
}
// 启动读取 goroutines
for i := range numReaders {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
// 随机读取
seq := int64(readerID*100 + 1)
_, err := table.Get(seq)
if err == nil {
readCount.Add(1)
} else {
readErrors.Add(1)
}
time.Sleep(5 * time.Millisecond)
}
}
}(i)
}
// 运行指定时间
time.Sleep(duration)
close(stopCh)
wg.Wait()
// 统计结果
writes := writeCount.Load()
reads := readCount.Load()
errors := readErrors.Load()
t.Logf("\n=== 并发读写测试结果 ===")
t.Logf("测试时长: %v", duration)
t.Logf("写入次数: %d (%.2f 次/秒)", writes, float64(writes)/duration.Seconds())
t.Logf("读取次数: %d (%.2f 次/秒)", reads, float64(reads)/duration.Seconds())
t.Logf("读取失败: %d", errors)
stats := table.Stats()
t.Logf("\nTable 状态:")
t.Logf(" 总行数: %d", stats.TotalRows)
t.Logf(" SST 文件数: %d", stats.SSTCount)
}
// TestPowerFailureRecovery 测试断电恢复(模拟崩溃)
func TestPowerFailureRecovery(t *testing.T) {
tmpDir := t.TempDir()
// 第一阶段:写入数据并模拟崩溃
t.Log("=== 阶段 1: 写入数据 ===")
// Note: This test uses []byte data - we create a minimal schema
schema, err := NewSchema("test", []Field{
{Name: "batch", Type: Int64, Indexed: false, Comment: "Batch number"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 4 * 1024 * 1024, // 4MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
const (
numBatches = 10
rowsPerBatch = 50
dataSize = 50 * 1024 // 50KB
)
insertedSeqs := make([]int64, 0, numBatches*rowsPerBatch)
for batch := range numBatches {
for i := range rowsPerBatch {
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"batch": batch,
"index": i,
"data": data,
"timestamp": time.Now().Unix(),
}
err := table.Insert(payload)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
seq := table.seq.Load()
insertedSeqs = append(insertedSeqs, seq)
}
// 每批后触发 Flush
if batch%3 == 0 {
table.switchMemTable()
time.Sleep(100 * time.Millisecond)
}
t.Logf("批次 %d: 插入 %d 行", batch, rowsPerBatch)
}
totalInserted := len(insertedSeqs)
t.Logf("总共插入: %d 行", totalInserted)
// 获取崩溃前的状态
statsBefore := table.Stats()
t.Logf("崩溃前状态: 总行数=%d, SST文件=%d, MemTable行数=%d",
statsBefore.TotalRows, statsBefore.SSTCount, statsBefore.MemTableCount)
// 模拟崩溃:直接关闭(不等待 Flush 完成)
t.Log("\n=== 模拟断电崩溃 ===")
table.Close()
// 第二阶段:恢复并验证数据
t.Log("\n=== 阶段 2: 恢复数据 ===")
tableRecovered, err := OpenTable(opts)
if err != nil {
t.Fatalf("恢复失败: %v", err)
}
defer tableRecovered.Close()
// 等待恢复完成
time.Sleep(500 * time.Millisecond)
statsAfter := tableRecovered.Stats()
t.Logf("恢复后状态: 总行数=%d, SST文件=%d, MemTable行数=%d",
statsAfter.TotalRows, statsAfter.SSTCount, statsAfter.MemTableCount)
// 验证数据完整性
t.Log("\n=== 阶段 3: 验证数据完整性 ===")
recovered := 0
missing := 0
corrupted := 0
for i, seq := range insertedSeqs {
row, err := tableRecovered.Get(seq)
if err != nil {
missing++
if i < len(insertedSeqs)/2 {
// 前半部分应该已经 Flush不应该丢失
t.Logf("警告: Seq %d 丢失(应该已持久化)", seq)
}
continue
}
// 验证数据
if row.Seq != seq {
corrupted++
t.Errorf("数据损坏: 预期 Seq=%d, 实际=%d", seq, row.Seq)
continue
}
recovered++
}
recoveryRate := float64(recovered) / float64(totalInserted) * 100
t.Logf("\n=== 恢复结果 ===")
t.Logf("插入总数: %d", totalInserted)
t.Logf("成功恢复: %d (%.2f%%)", recovered, recoveryRate)
t.Logf("丢失: %d", missing)
t.Logf("损坏: %d", corrupted)
// 验证:至少应该恢复已经 Flush 的数据
if corrupted > 0 {
t.Errorf("发现 %d 条损坏数据", corrupted)
}
// 至少应该恢复 50% 的数据(已 Flush 的部分)
if recoveryRate < 50 {
t.Errorf("恢复率过低: %.2f%% (预期至少 50%%)", recoveryRate)
}
t.Logf("\n断电恢复测试通过")
}
// TestCrashDuringCompaction 测试 Compaction 期间崩溃
func TestCrashDuringCompaction(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema, err := NewSchema("test", []Field{
{Name: "index", Type: Int64, Indexed: false, Comment: "Index"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 1024, // 很小,快速触发 Flush
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
// 插入大量数据触发多次 Flush
t.Log("=== 插入数据触发 Compaction ===")
const numRows = 500
dataSize := 5 * 1024 // 5KB
for i := range numRows {
data := make([]byte, dataSize)
rand.Read(data)
payload := map[string]any{
"index": i,
"data": data,
}
err := table.Insert(payload)
if err != nil {
t.Fatal(err)
}
if i%50 == 0 {
t.Logf("已插入 %d 行", i)
}
}
// 等待一些 Flush 完成
time.Sleep(500 * time.Millisecond)
version := table.versionSet.GetCurrent()
l0Count := version.GetLevelFileCount(0)
t.Logf("L0 文件数: %d", l0Count)
// 模拟在 Compaction 期间崩溃
if l0Count >= 4 {
t.Log("触发 Compaction...")
go func() {
table.compactionManager.TriggerCompaction()
}()
// 等待 Compaction 开始
time.Sleep(100 * time.Millisecond)
t.Log("=== 模拟 Compaction 期间崩溃 ===")
}
// 直接关闭(模拟崩溃)
table.Close()
// 恢复
t.Log("\n=== 恢复数据库 ===")
tableRecovered, err := OpenTable(opts)
if err != nil {
t.Fatalf("恢复失败: %v", err)
}
defer tableRecovered.Close()
// 验证数据完整性
stats := tableRecovered.Stats()
t.Logf("恢复后: 总行数=%d, SST文件=%d", stats.TotalRows, stats.SSTCount)
// 随机验证一些数据
t.Log("\n=== 验证数据 ===")
verified := 0
for i := 1; i <= 100; i++ {
seq := int64(i)
_, err := tableRecovered.Get(seq)
if err == nil {
verified++
}
}
t.Logf("验证前 100 行: %d 行可读", verified)
if verified < 50 {
t.Errorf("数据恢复不足: 只有 %d/100 行可读", verified)
}
t.Log("Compaction 崩溃恢复测试通过!")
}
// TestLargeDataIntegrity 测试大数据完整性2KB-5MB 数据)
func TestLargeDataIntegrity(t *testing.T) {
tmpDir := t.TempDir()
// Note: This test uses []byte data - we create a minimal schema
schema, err := NewSchema("test", []Field{
{Name: "size", Type: Int64, Indexed: false, Comment: "Size"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024, // 64MB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 测试不同大小的数据
testSizes := []int{
2 * 1024, // 2KB
10 * 1024, // 10KB
100 * 1024, // 100KB
1 * 1024 * 1024, // 1MB
5 * 1024 * 1024, // 5MB
}
t.Log("=== 插入不同大小的数据 ===")
insertedSeqs := make([]int64, 0)
for _, size := range testSizes {
// 每种大小插入 3 行
for i := range 3 {
data := make([]byte, size)
rand.Read(data)
payload := map[string]any{
"size": size,
"index": i,
"data": data,
}
err := table.Insert(payload)
if err != nil {
t.Fatalf("插入失败 (size=%d, index=%d): %v", size, i, err)
}
seq := table.seq.Load()
insertedSeqs = append(insertedSeqs, seq)
t.Logf("插入: Seq=%d, Size=%d KB", seq, size/1024)
}
}
totalInserted := len(insertedSeqs)
t.Logf("总共插入: %d 行", totalInserted)
// 等待 Flush
time.Sleep(2 * time.Second)
// 验证数据可读性
t.Log("\n=== 验证数据可读性 ===")
successCount := 0
for i, seq := range insertedSeqs {
row, err := table.Get(seq)
if err != nil {
t.Errorf("读取失败 (Seq=%d): %v", seq, err)
continue
}
// 验证数据存在
if _, exists := row.Data["data"]; !exists {
t.Errorf("Seq=%d: 数据字段不存在", seq)
continue
}
if _, exists := row.Data["size"]; !exists {
t.Errorf("Seq=%d: size 字段不存在", seq)
continue
}
successCount++
if i < 5 || i >= totalInserted-5 {
// 只打印前5行和后5行
t.Logf("✓ Seq=%d 验证通过", seq)
}
}
successRate := float64(successCount) / float64(totalInserted) * 100
stats := table.Stats()
t.Logf("\n=== 测试结果 ===")
t.Logf("插入总数: %d", totalInserted)
t.Logf("成功读取: %d (%.2f%%)", successCount, successRate)
t.Logf("总行数: %d", stats.TotalRows)
t.Logf("SST 文件数: %d", stats.SSTCount)
if successCount != totalInserted {
t.Errorf("数据丢失: %d/%d", totalInserted-successCount, totalInserted)
}
t.Log("\n大数据完整性测试通过")
}
// BenchmarkConcurrentWrites 并发写入性能测试
func BenchmarkConcurrentWrites(b *testing.B) {
tmpDir := b.TempDir()
// Note: This benchmark uses []byte data - we create a minimal schema
schema, err := NewSchema("test", []Field{
{Name: "timestamp", Type: Int64, Indexed: false, Comment: "Timestamp"},
})
if err != nil {
b.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024 * 1024,
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
b.Fatal(err)
}
defer table.Close()
const (
numWorkers = 10
dataSize = 10 * 1024 // 10KB
)
data := make([]byte, dataSize)
rand.Read(data)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
payload := map[string]any{
"data": data,
"timestamp": time.Now().UnixNano(),
}
err := table.Insert(payload)
if err != nil {
b.Error(err)
}
}
})
b.StopTimer()
stats := table.Stats()
b.Logf("总行数: %d, SST 文件数: %d", stats.TotalRows, stats.SSTCount)
}
// TestTableWithCompaction 测试 Table 的 Compaction 功能
func TestTableWithCompaction(t *testing.T) {
// 创建临时目录
tmpDir := t.TempDir()
schema, err := NewSchema("test", []Field{
{Name: "batch", Type: Int64, Indexed: false, Comment: "批次"},
{Name: "index", Type: Int64, Indexed: false, Comment: "索引"},
{Name: "value", Type: String, Indexed: false, Comment: "值"},
})
if err != nil {
t.Fatal(err)
}
// 打开 Table
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 1024, // 小的 MemTable 以便快速触发 Flush
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入大量数据,触发多次 Flush
const numBatches = 10
const rowsPerBatch = 100
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
"value": fmt.Sprintf("data-%d-%d", batch, i),
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
}
// 强制 Flush
err = table.switchMemTable()
if err != nil {
t.Fatalf("Switch MemTable failed: %v", err)
}
// 等待 Flush 完成
time.Sleep(100 * time.Millisecond)
}
// 等待所有 Immutable Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 检查 Version 状态
version := table.versionSet.GetCurrent()
l0Count := version.GetLevelFileCount(0)
t.Logf("L0 files: %d", l0Count)
if l0Count == 0 {
t.Error("Expected some files in L0")
}
// 获取 Level 统计信息
levelStats := table.compactionManager.GetLevelStats()
for _, stat := range levelStats {
level := stat.Level
fileCount := stat.FileCount
totalSize := stat.TotalSize
score := stat.Score
if fileCount > 0 {
t.Logf("L%d: %d files, %d bytes, score: %.2f", level, fileCount, totalSize, score)
}
}
// 手动触发 Compaction
if l0Count >= 4 {
t.Log("Triggering manual compaction...")
err = table.compactionManager.TriggerCompaction()
if err != nil {
t.Logf("Compaction: %v", err)
} else {
t.Log("Compaction completed")
// 检查 Compaction 后的状态
version = table.versionSet.GetCurrent()
newL0Count := version.GetLevelFileCount(0)
l1Count := version.GetLevelFileCount(1)
t.Logf("After compaction - L0: %d files, L1: %d files", newL0Count, l1Count)
if newL0Count >= l0Count {
t.Error("Expected L0 file count to decrease after compaction")
}
if l1Count == 0 {
t.Error("Expected some files in L1 after compaction")
}
}
}
// 验证数据完整性
stats := table.Stats()
t.Logf("Table stats: %d rows, %d SST files", stats.TotalRows, stats.SSTCount)
// 读取一些数据验证
for batch := range 3 {
for i := range 10 {
seq := int64(batch*rowsPerBatch + i + 1)
row, err := table.Get(seq)
if err != nil {
t.Errorf("Get(%d) failed: %v", seq, err)
continue
}
if row.Data["batch"].(int64) != int64(batch) {
t.Errorf("Expected batch %d, got %v", batch, row.Data["batch"])
}
}
}
}
// TestTableCompactionMerge 测试 Compaction 的合并功能
func TestTableCompactionMerge(t *testing.T) {
tmpDir := t.TempDir()
schema, err := NewSchema("test", []Field{
{Name: "batch", Type: Int64, Indexed: false, Comment: "批次"},
{Name: "index", Type: Int64, Indexed: false, Comment: "索引"},
{Name: "value", Type: String, Indexed: false, Comment: "值"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 512, // 很小的 MemTable
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入数据Append-Only 模式)
const numBatches = 5
const rowsPerBatch = 50
totalRows := 0
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
"value": fmt.Sprintf("v%d-%d", batch, i),
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
totalRows++
}
// 每批后 Flush
err = table.switchMemTable()
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
}
// 等待所有 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 记录 Compaction 前的文件数
version := table.versionSet.GetCurrent()
beforeL0 := version.GetLevelFileCount(0)
t.Logf("Before compaction: L0 has %d files", beforeL0)
// 触发 Compaction
if beforeL0 >= 4 {
err = table.compactionManager.TriggerCompaction()
if err != nil {
t.Logf("Compaction: %v", err)
} else {
version = table.versionSet.GetCurrent()
afterL0 := version.GetLevelFileCount(0)
afterL1 := version.GetLevelFileCount(1)
t.Logf("After compaction: L0 has %d files, L1 has %d files", afterL0, afterL1)
}
}
// 验证数据完整性 - 检查前几条记录
for batch := range 2 {
for i := range 5 {
seq := int64(batch*rowsPerBatch + i + 1)
row, err := table.Get(seq)
if err != nil {
t.Errorf("Get(%d) failed: %v", seq, err)
continue
}
actualBatch := int(row.Data["batch"].(int64))
if actualBatch != batch {
t.Errorf("Seq %d: expected batch %d, got %d", seq, batch, actualBatch)
}
}
}
// 验证总行数
stats := table.Stats()
if stats.TotalRows != int64(totalRows) {
t.Errorf("Expected %d total rows, got %d", totalRows, stats.TotalRows)
}
t.Logf("Data integrity verified: %d rows", totalRows)
}
// TestTableBackgroundCompaction 测试后台自动 Compaction
func TestTableBackgroundCompaction(t *testing.T) {
if testing.Short() {
t.Skip("Skipping background compaction test in short mode")
}
tmpDir := t.TempDir()
schema, err := NewSchema("test", []Field{
{Name: "batch", Type: Int64, Indexed: false, Comment: "批次"},
{Name: "index", Type: Int64, Indexed: false, Comment: "索引"},
})
if err != nil {
t.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 512,
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入数据触发多次 Flush
const numBatches = 8
const rowsPerBatch = 50
for batch := range numBatches {
for i := range rowsPerBatch {
data := map[string]any{
"batch": batch,
"index": i,
}
err := table.Insert(data)
if err != nil {
t.Fatal(err)
}
}
err = table.switchMemTable()
if err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
}
// 等待 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(100 * time.Millisecond)
}
// 记录初始状态
version := table.versionSet.GetCurrent()
initialL0 := version.GetLevelFileCount(0)
t.Logf("Initial L0 files: %d", initialL0)
// 等待后台 Compaction最多等待 30 秒)
maxWait := 30 * time.Second
checkInterval := 2 * time.Second
waited := time.Duration(0)
for waited < maxWait {
time.Sleep(checkInterval)
waited += checkInterval
version = table.versionSet.GetCurrent()
currentL0 := version.GetLevelFileCount(0)
currentL1 := version.GetLevelFileCount(1)
t.Logf("After %v: L0=%d, L1=%d", waited, currentL0, currentL1)
// 如果 L0 文件减少或 L1 有文件,说明 Compaction 发生了
if currentL0 < initialL0 || currentL1 > 0 {
t.Logf("Background compaction detected!")
// 获取 Compaction 统计
stats := table.compactionManager.GetStats()
t.Logf("Compaction stats: %v", stats)
return
}
}
t.Log("No background compaction detected within timeout (this is OK if L0 < 4 files)")
}
// BenchmarkTableWithCompaction 性能测试
func BenchmarkTableWithCompaction(b *testing.B) {
tmpDir := b.TempDir()
schema, err := NewSchema("test", []Field{
{Name: "index", Type: Int64, Indexed: false, Comment: "索引"},
{Name: "value", Type: String, Indexed: false, Comment: "值"},
})
if err != nil {
b.Fatal(err)
}
opts := &TableOptions{
Dir: tmpDir,
MemTableSize: 64 * 1024, // 64KB
Name: schema.Name,
Fields: schema.Fields,
}
table, err := OpenTable(opts)
if err != nil {
b.Fatal(err)
}
defer table.Close()
for i := 0; b.Loop(); i++ {
data := map[string]any{
"index": i,
"value": fmt.Sprintf("benchmark-data-%d", i),
}
err := table.Insert(data)
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
// 等待所有 Flush 完成
for table.memtableManager.GetImmutableCount() > 0 {
time.Sleep(10 * time.Millisecond)
}
// 报告统计信息
version := table.versionSet.GetCurrent()
b.Logf("Final state: L0=%d files, L1=%d files, Total=%d files",
version.GetLevelFileCount(0),
version.GetLevelFileCount(1),
version.GetFileCount())
}
// TestTableSchemaRecover 测试 Schema 恢复
func TestTableSchemaRecover(t *testing.T) {
dir := "test_schema_recover"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s, err := NewSchema("users", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: Int64, Indexed: false, Comment: "年龄"},
{Name: "email", Type: String, Indexed: false, Comment: "邮箱"},
})
if err != nil {
t.Fatal(err)
}
// 1. 创建引擎并插入数据(带 Schema
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 10 MB不会触发 Flush
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入符合 Schema 的数据
for i := 1; i <= 50; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i%50,
"email": fmt.Sprintf("user%d@example.com", i),
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Failed to insert valid data: %v", err)
}
}
t.Log("Inserted 50 rows with schema")
// 2. 关闭引擎
table.Close()
// 3. 重新打开引擎(带 Schema应该成功恢复
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatalf("Failed to recover with schema: %v", err)
}
// 验证数据
row, err := table2.Get(1)
if err != nil {
t.Fatalf("Failed to get row after recovery: %v", err)
}
if row.Seq != 1 {
t.Errorf("Expected seq=1, got %d", row.Seq)
}
// 验证字段
if row.Data["name"] == nil {
t.Error("Missing field 'name'")
}
if row.Data["age"] == nil {
t.Error("Missing field 'age'")
}
table2.Close()
t.Log("Schema recovery test passed!")
}
// TestTableSchemaRecoverInvalid 测试当 WAL 中有不符合 Schema 的数据时恢复失败
func TestTableSchemaRecoverInvalid(t *testing.T) {
dir := "test_schema_recover_invalid"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
schema, err := NewSchema("test", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: String, Indexed: false, Comment: "年龄字符串"},
})
if err != nil {
t.Fatal(err)
}
// 1. 先不带 Schema 插入一些数据
table, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024, // 大容量,确保不会触发 Flush
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入一些不符合后续 Schema 的数据
for i := 1; i <= 10; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": "invalid_age", // 这是字符串,但后续 Schema 要求 int64
}
err := table.Insert(data)
if err != nil {
t.Fatalf("Failed to insert data: %v", err)
}
}
// 2. 停止后台任务但不 Flush模拟崩溃
if table.compactionManager != nil {
table.compactionManager.Stop()
}
// 直接关闭资源,但不 Flush MemTable
if table.walManager != nil {
table.walManager.Close()
}
if table.versionSet != nil {
table.versionSet.Close()
}
if table.sstManager != nil {
table.sstManager.Close()
}
// 3. 创建 Schemaage 字段要求 int64
s, err := NewSchema("users", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: Int64, Indexed: false, Comment: "年龄"},
})
if err != nil {
t.Fatal(err)
}
// 4. 尝试用 Schema 打开引擎,应该失败
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err == nil {
table2.Close()
t.Fatal("Expected recovery to fail with invalid schema, but it succeeded")
}
// 验证错误信息包含 "schema validation failed"
if err != nil {
t.Logf("Got expected error: %v", err)
}
t.Log("Invalid schema recovery test passed!")
}
// TestTableAutoRecoverSchema 测试自动从磁盘恢复 Schema
func TestTableAutoRecoverSchema(t *testing.T) {
dir := "test_auto_recover_schema"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s, err := NewSchema("users", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: Int64, Indexed: false, Comment: "年龄"},
})
if err != nil {
t.Fatal(err)
}
// 1. 创建引擎并提供 Schema会保存到磁盘
table1, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
// 插入数据
for i := 1; i <= 10; i++ {
data := map[string]any{
"name": fmt.Sprintf("user_%d", i),
"age": 20 + i,
}
err := table1.Insert(data)
if err != nil {
t.Fatalf("Failed to insert: %v", err)
}
}
table1.Close()
// 2. 重新打开引擎,不提供 Schema应该自动从磁盘恢复
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
// 不设置 Schema
})
if err != nil {
t.Fatalf("Failed to open without schema: %v", err)
}
// 验证 Schema 已恢复
recoveredSchema := table2.GetSchema()
if recoveredSchema == nil {
t.Fatal("Expected schema to be recovered, but got nil")
}
if recoveredSchema.Name != "users" {
t.Errorf("Expected schema name 'users', got '%s'", recoveredSchema.Name)
}
if len(recoveredSchema.Fields) != 2 {
t.Errorf("Expected 2 fields, got %d", len(recoveredSchema.Fields))
}
// 验证数据
row, err := table2.Get(1)
if err != nil {
t.Fatalf("Failed to get row: %v", err)
}
if row.Data["name"] != "user_1" {
t.Errorf("Expected name='user_1', got '%v'", row.Data["name"])
}
// 尝试插入新数据(应该符合恢复的 Schema
err = table2.Insert(map[string]any{
"name": "new_user",
"age": 30,
})
if err != nil {
t.Fatalf("Failed to insert with recovered schema: %v", err)
}
// 尝试插入不符合 Schema 的数据(应该失败)
err = table2.Insert(map[string]any{
"name": "bad_user",
"age": "invalid", // 类型错误
})
if err == nil {
t.Fatal("Expected insert to fail with invalid type, but it succeeded")
}
table2.Close()
t.Log("Auto recover schema test passed!")
}
// TestTableSchemaTamperDetection 测试篡改检测
func TestTableSchemaTamperDetection(t *testing.T) {
dir := "test_schema_tamper"
os.RemoveAll(dir)
defer os.RemoveAll(dir)
// 创建 Schema
s, err := NewSchema("users", []Field{
{Name: "name", Type: String, Indexed: false, Comment: "用户名"},
{Name: "age", Type: Int64, Indexed: false, Comment: "年龄"},
})
if err != nil {
t.Fatal(err)
}
// 1. 创建引擎并保存 Schema
table1, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
Name: s.Name, Fields: s.Fields,
})
if err != nil {
t.Fatal(err)
}
table1.Close()
// 2. 篡改 schema.json修改字段但不更新 checksum
schemaPath := fmt.Sprintf("%s/schema.json", dir)
schemaData, err := os.ReadFile(schemaPath)
if err != nil {
t.Fatal(err)
}
// 将 "age" 的注释从 "年龄" 改为 "AGE"(简单篡改)
tamperedData := strings.Replace(string(schemaData), "年龄", "AGE", 1)
err = os.WriteFile(schemaPath, []byte(tamperedData), 0644)
if err != nil {
t.Fatal(err)
}
// 3. 尝试打开引擎,应该检测到篡改
table2, err := OpenTable(&TableOptions{
Dir: dir,
MemTableSize: 10 * 1024 * 1024,
})
if err == nil {
table2.Close()
t.Fatal("Expected to detect schema tampering, but open succeeded")
}
// 验证错误信息包含 "checksum mismatch"
errMsg := err.Error()
if !strings.Contains(errMsg, "checksum mismatch") && !strings.Contains(errMsg, "tampered") {
t.Errorf("Expected error about checksum mismatch or tampering, got: %v", err)
}
t.Logf("Detected tampering as expected: %v", err)
t.Log("Schema tamper detection test passed!")
}
func TestTableClean(t *testing.T) {
dir := "./test_table_clean_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema, err := NewSchema("users", []Field{
{Name: "id", Type: Int64, Indexed: true, Comment: "ID"},
{Name: "name", Type: String, Indexed: false, Comment: "Name"},
})
if err != nil {
t.Fatal(err)
}
table, err := db.CreateTable("users", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 100 {
err := table.Insert(map[string]any{
"id": int64(i),
"name": "user" + string(rune(i)),
})
if err != nil {
t.Fatal(err)
}
}
// 3. 验证数据存在
stats := table.Stats()
t.Logf("Before Clean: %d rows", stats.TotalRows)
if stats.TotalRows == 0 {
t.Error("Expected data in table")
}
// 4. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 5. 验证数据已清除
stats = table.Stats()
t.Logf("After Clean: %d rows", stats.TotalRows)
if stats.TotalRows != 0 {
t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows)
}
// 6. 验证表仍然可用
err = table.Insert(map[string]any{
"id": int64(100),
"name": "new_user",
})
if err != nil {
t.Fatal(err)
}
stats = table.Stats()
if stats.TotalRows != 1 {
t.Errorf("Expected 1 row after insert, got %d", stats.TotalRows)
}
}
func TestTableDestroy(t *testing.T) {
dir := "./test_table_destroy_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema, err := NewSchema("test", []Field{
{Name: "id", Type: Int64, Indexed: false, Comment: "ID"},
})
if err != nil {
t.Fatal(err)
}
table, err := db.CreateTable("test", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 50 {
table.Insert(map[string]any{"id": int64(i)})
}
// 3. 验证数据存在
stats := table.Stats()
t.Logf("Before Destroy: %d rows", stats.TotalRows)
if stats.TotalRows == 0 {
t.Error("Expected data in table")
}
// 4. 获取表目录路径
tableDir := table.dir
// 5. 销毁表
err = table.Destroy()
if err != nil {
t.Fatal(err)
}
// 6. 验证表目录已删除
if _, err := os.Stat(tableDir); !os.IsNotExist(err) {
t.Error("Table directory should be deleted")
}
// 7. 注意Table.Destroy() 只删除文件,不从 Database 中删除
// 表仍然在 Database 的元数据中,但文件已被删除
tables := db.ListTables()
found := slices.Contains(tables, "test")
if !found {
t.Error("Table should still be in database metadata (use Database.DestroyTable to remove from metadata)")
}
}
func TestTableCleanWithIndex(t *testing.T) {
dir := "./test_table_clean_index_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema, err := NewSchema("users", []Field{
{Name: "id", Type: Int64, Indexed: true, Comment: "ID"},
{Name: "email", Type: String, Indexed: true, Comment: "Email"},
{Name: "name", Type: String, Indexed: false, Comment: "Name"},
})
if err != nil {
t.Fatal(err)
}
table, err := db.CreateTable("users", schema)
if err != nil {
t.Fatal(err)
}
// 2. 创建索引
err = table.CreateIndex("id")
if err != nil {
t.Fatal(err)
}
err = table.CreateIndex("email")
if err != nil {
t.Fatal(err)
}
// 3. 插入数据
for i := range 50 {
table.Insert(map[string]any{
"id": int64(i),
"email": "user" + string(rune(i)) + "@example.com",
"name": "User " + string(rune(i)),
})
}
// 4. 验证索引存在
indexes := table.ListIndexes()
if len(indexes) != 2 {
t.Errorf("Expected 2 indexes, got %d", len(indexes))
}
// 5. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 6. 验证数据已清除
stats := table.Stats()
if stats.TotalRows != 0 {
t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows)
}
// 7. 验证索引已被清除Clean 会删除索引数据)
indexes = table.ListIndexes()
if len(indexes) != 0 {
t.Logf("Note: Indexes were cleared (expected behavior), got %d", len(indexes))
}
// 8. 重新创建索引
table.CreateIndex("id")
table.CreateIndex("email")
// 9. 验证可以继续插入数据
err = table.Insert(map[string]any{
"id": int64(100),
"email": "new@example.com",
"name": "New User",
})
if err != nil {
t.Fatal(err)
}
stats = table.Stats()
if stats.TotalRows != 1 {
t.Errorf("Expected 1 row, got %d", stats.TotalRows)
}
}
func TestTableCleanAndQuery(t *testing.T) {
dir := "./test_table_clean_query_data"
defer os.RemoveAll(dir)
// 1. 创建数据库和表
db, err := Open(dir)
if err != nil {
t.Fatal(err)
}
defer db.Close()
schema, err := NewSchema("test", []Field{
{Name: "id", Type: Int64, Indexed: false, Comment: "ID"},
{Name: "status", Type: String, Indexed: false, Comment: "Status"},
})
if err != nil {
t.Fatal(err)
}
table, err := db.CreateTable("test", schema)
if err != nil {
t.Fatal(err)
}
// 2. 插入数据
for i := range 30 {
table.Insert(map[string]any{
"id": int64(i),
"status": "active",
})
}
// 3. 查询数据
rows, err := table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count := 0
for rows.Next() {
count++
}
rows.Close()
t.Logf("Before Clean: found %d rows", count)
if count != 30 {
t.Errorf("Expected 30 rows, got %d", count)
}
// 4. 清除数据
err = table.Clean()
if err != nil {
t.Fatal(err)
}
// 5. 再次查询
rows, err = table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count = 0
for rows.Next() {
count++
}
rows.Close()
t.Logf("After Clean: found %d rows", count)
if count != 0 {
t.Errorf("Expected 0 rows after clean, got %d", count)
}
// 6. 插入新数据并查询
table.Insert(map[string]any{
"id": int64(100),
"status": "active",
})
rows, err = table.Query().Eq("status", "active").Rows()
if err != nil {
t.Fatal(err)
}
count = 0
for rows.Next() {
count++
}
rows.Close()
if count != 1 {
t.Errorf("Expected 1 row, got %d", count)
}
}
// TestInsertMap 测试插入 map[string]any
func TestInsertMap(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertMap")
defer os.RemoveAll(tmpDir)
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入单个 map
err = table.Insert(map[string]any{
"name": "Alice",
"age": int64(25),
})
if err != nil {
t.Fatalf("Insert map failed: %v", err)
}
// 验证
row, err := table.Get(1)
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if row.Data["name"] != "Alice" {
t.Errorf("Expected name=Alice, got %v", row.Data["name"])
}
t.Log("✓ Insert map test passed")
}
// TestInsertMapSlice 测试插入 []map[string]any
func TestInsertMapSlice(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertMapSlice")
defer os.RemoveAll(tmpDir)
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 批量插入 maps
err = table.Insert([]map[string]any{
{"name": "Alice", "age": int64(25)},
{"name": "Bob", "age": int64(30)},
{"name": "Charlie", "age": int64(35)},
})
if err != nil {
t.Fatalf("Insert map slice failed: %v", err)
}
// 验证
row1, _ := table.Get(1)
row2, _ := table.Get(2)
row3, _ := table.Get(3)
if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" {
t.Errorf("Data mismatch")
}
t.Log("✓ Insert map slice test passed (3 rows)")
}
// TestInsertStruct 测试插入单个结构体
func TestInsertStruct(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertStruct")
defer os.RemoveAll(tmpDir)
type User struct {
Name string `srdb:"name"`
Age int64 `srdb:"age"`
Email string `srdb:"email"`
}
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
{Name: "email", Type: String},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入单个结构体
user := User{
Name: "Alice",
Age: 25,
Email: "alice@example.com",
}
err = table.Insert(user)
if err != nil {
t.Fatalf("Insert struct failed: %v", err)
}
// 验证
row, err := table.Get(1)
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if row.Data["name"] != "Alice" {
t.Errorf("Expected name=Alice, got %v", row.Data["name"])
}
if row.Data["email"] != "alice@example.com" {
t.Errorf("Expected email=alice@example.com, got %v", row.Data["email"])
}
t.Log("✓ Insert struct test passed")
}
// TestInsertStructPointer 测试插入结构体指针
func TestInsertStructPointer(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertStructPointer")
defer os.RemoveAll(tmpDir)
type User struct {
Name string `srdb:"name"`
Age int64 `srdb:"age"`
Email string `srdb:"email"`
}
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
{Name: "email", Type: String},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入结构体指针
user := &User{
Name: "Bob",
Age: 30,
Email: "bob@example.com",
}
err = table.Insert(user)
if err != nil {
t.Fatalf("Insert struct pointer failed: %v", err)
}
// 验证
row, err := table.Get(1)
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if row.Data["name"] != "Bob" {
t.Errorf("Expected name=Bob, got %v", row.Data["name"])
}
t.Log("✓ Insert struct pointer test passed")
}
// TestInsertStructSlice 测试插入结构体切片
func TestInsertStructSlice(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertStructSlice")
defer os.RemoveAll(tmpDir)
type User struct {
Name string `srdb:"name"`
Age int64 `srdb:"age"`
}
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 批量插入结构体切片
users := []User{
{Name: "Alice", Age: 25},
{Name: "Bob", Age: 30},
{Name: "Charlie", Age: 35},
}
err = table.Insert(users)
if err != nil {
t.Fatalf("Insert struct slice failed: %v", err)
}
// 验证
row1, _ := table.Get(1)
row2, _ := table.Get(2)
row3, _ := table.Get(3)
if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" {
t.Errorf("Data mismatch")
}
t.Log("✓ Insert struct slice test passed (3 rows)")
}
// TestInsertStructPointerSlice 测试插入结构体指针切片
func TestInsertStructPointerSlice(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertStructPointerSlice")
defer os.RemoveAll(tmpDir)
type User struct {
Name string `srdb:"name"`
Age int64 `srdb:"age"`
}
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 批量插入结构体指针切片
users := []*User{
{Name: "Alice", Age: 25},
{Name: "Bob", Age: 30},
nil, // 测试 nil 指针会被跳过
{Name: "Charlie", Age: 35},
}
err = table.Insert(users)
if err != nil {
t.Fatalf("Insert struct pointer slice failed: %v", err)
}
// 验证(应该只有 3 条记录nil 被跳过)
row1, _ := table.Get(1)
row2, _ := table.Get(2)
row3, _ := table.Get(3)
if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" {
t.Errorf("Data mismatch")
}
t.Log("✓ Insert struct pointer slice test passed (3 rows, nil skipped)")
}
// TestInsertWithSnakeCase 测试结构体自动 snake_case 转换
func TestInsertWithSnakeCase(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertWithSnakeCase")
defer os.RemoveAll(tmpDir)
type User struct {
UserName string `srdb:";comment:用户名"` // 没有指定字段名,应该自动转为 user_name
EmailAddress string // 没有 tag应该自动转为 email_address
IsActive bool // 应该自动转为 is_active
}
schema, err := NewSchema("users", []Field{
{Name: "user_name", Type: String, Comment: "用户名"},
{Name: "email_address", Type: String},
{Name: "is_active", Type: Bool},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入结构体
user := User{
UserName: "Alice",
EmailAddress: "alice@example.com",
IsActive: true,
}
err = table.Insert(user)
if err != nil {
t.Fatalf("Insert failed: %v", err)
}
// 验证字段名是否正确转换
row, err := table.Get(1)
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if row.Data["user_name"] != "Alice" {
t.Errorf("Expected user_name=Alice, got %v", row.Data["user_name"])
}
if row.Data["email_address"] != "alice@example.com" {
t.Errorf("Expected email_address=alice@example.com, got %v", row.Data["email_address"])
}
if row.Data["is_active"] != true {
t.Errorf("Expected is_active=true, got %v", row.Data["is_active"])
}
t.Log("✓ Insert with snake_case test passed")
}
// TestInsertInvalidType 测试插入不支持的类型
func TestInsertInvalidType(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertInvalidType")
defer os.RemoveAll(tmpDir)
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 尝试插入不支持的类型
err = table.Insert(123) // int 类型
if err == nil {
t.Errorf("Expected error for invalid type, got nil")
}
err = table.Insert("string") // string 类型
if err == nil {
t.Errorf("Expected error for invalid type, got nil")
}
err = table.Insert(nil) // nil
if err == nil {
t.Errorf("Expected error for nil, got nil")
}
t.Log("✓ Insert invalid type test passed")
}
// TestInsertEmptySlice 测试插入空切片
func TestInsertEmptySlice(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestInsertEmptySlice")
defer os.RemoveAll(tmpDir)
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 插入空切片
err = table.Insert([]map[string]any{})
if err != nil {
t.Errorf("Expected nil error for empty slice, got %v", err)
}
// 验证没有数据
_, err = table.Get(1)
if err == nil {
t.Errorf("Expected error for non-existent row")
}
t.Log("✓ Insert empty slice test passed")
}
// TestBatchInsertPerformance 测试批量插入性能
func TestBatchInsertPerformance(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "TestBatchInsertPerformance")
defer os.RemoveAll(tmpDir)
schema, err := NewSchema("users", []Field{
{Name: "name", Type: String},
{Name: "age", Type: Int64},
})
if err != nil {
t.Fatal(err)
}
table, err := OpenTable(&TableOptions{
Dir: tmpDir,
Name: schema.Name,
Fields: schema.Fields,
})
if err != nil {
t.Fatal(err)
}
defer table.Close()
// 准备1000条数据
batchSize := 1000
data := make([]map[string]any, batchSize)
for i := 0; i < batchSize; i++ {
data[i] = map[string]any{
"name": "User" + string(rune(i)),
"age": int64(20 + i%50),
}
}
// 批量插入
err = table.Insert(data)
if err != nil {
t.Fatalf("Batch insert failed: %v", err)
}
// 验证数量
row, err := table.Get(int64(batchSize))
if err != nil {
t.Fatalf("Get last row failed: %v", err)
}
if row.Seq != int64(batchSize) {
t.Errorf("Expected seq=%d, got %d", batchSize, row.Seq)
}
t.Logf("✓ Batch insert performance test passed (%d rows)", batchSize)
}