package srdb import ( "crypto/rand" "fmt" "os" "slices" "strings" "sync" "sync/atomic" "testing" "time" ) func TestTable(t *testing.T) { // 1. 创建引擎 dir := "test_db" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: Int64, Indexed: false, Comment: "年龄"}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 1024, // 1 KB,方便触发 Flush Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 2. 插入数据 for i := 1; i <= 100; i++ { data := map[string]any{ "name": fmt.Sprintf("user_%d", i), "age": 20 + i%50, } err := table.Insert(data) if err != nil { t.Fatal(err) } } // 等待 Flush 和 Compaction 完成 time.Sleep(1 * time.Second) t.Logf("Inserted 100 rows") // 3. 查询数据 for i := int64(1); i <= 100; i++ { row, err := table.Get(i) if err != nil { t.Errorf("Failed to get key %d: %v", i, err) continue } if row.Seq != i { t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq) } } // 4. 统计信息 stats := table.Stats() t.Logf("Stats: MemTable=%d rows, SST=%d files, Total=%d rows", stats.MemTableCount, stats.SSTCount, stats.TotalRows) if stats.TotalRows != 100 { t.Errorf("Expected 100 total rows, got %d", stats.TotalRows) } t.Log("All tests passed!") } func TestTableRecover(t *testing.T) { dir := "test_recover" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "value", Type: Int64, Indexed: false, Comment: "值"}, }) if err != nil { t.Fatal(err) } // 1. 创建引擎并插入数据 table, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, // 10 MB,不会触发 Flush Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } for i := 1; i <= 50; i++ { data := map[string]any{ "value": i, } table.Insert(data) } t.Log("Inserted 50 rows") // 2. 关闭引擎 (模拟崩溃前) table.Close() // 3. 重新打开引擎 (恢复) table2, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table2.Close() // 4. 验证数据 for i := int64(1); i <= 50; i++ { row, err := table2.Get(i) if err != nil { t.Errorf("Failed to get key %d after recover: %v", i, err) } if row.Seq != i { t.Errorf("Key %d: expected Seq=%d, got %d", i, i, row.Seq) } } stats := table2.Stats() if stats.TotalRows != 50 { t.Errorf("Expected 50 rows after recover, got %d", stats.TotalRows) } t.Log("Recover test passed!") } func TestTableFlush(t *testing.T) { dir := "test_flush" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "data", Type: String, Indexed: false, Comment: "数据"}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 1024, // 1 KB Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入足够多的数据触发 Flush for i := 1; i <= 200; i++ { data := map[string]any{ "data": fmt.Sprintf("value_%d", i), } table.Insert(data) } // 等待 Flush time.Sleep(500 * time.Millisecond) stats := table.Stats() t.Logf("After flush: MemTable=%d, SST=%d, Total=%d", stats.MemTableCount, stats.SSTCount, stats.TotalRows) if stats.SSTCount == 0 { t.Error("Expected at least 1 SST file after flush") } // 验证所有数据都能查到 for i := int64(1); i <= 200; i++ { _, err := table.Get(i) if err != nil { t.Errorf("Failed to get key %d after flush: %v", i, err) } } t.Log("Flush test passed!") } func BenchmarkTableInsert(b *testing.B) { dir := "bench_insert" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "value", Type: Int64, Indexed: false, Comment: "值"}, }) if err != nil { b.Fatal(err) } table, _ := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 100 * 1024 * 1024, // 100 MB Name: schema.Name, Fields: schema.Fields, }) defer table.Close() data := map[string]any{ "value": 123, } for b.Loop() { table.Insert(data) } } func BenchmarkTableGet(b *testing.B) { dir := "bench_get" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "value", Type: Int64, Indexed: false, Comment: "值"}, }) if err != nil { b.Fatal(err) } table, _ := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 100 * 1024 * 1024, Name: schema.Name, Fields: schema.Fields, }) defer table.Close() // 预先插入数据 for i := 1; i <= 10000; i++ { data := map[string]any{ "value": i, } table.Insert(data) } for i := 0; b.Loop(); i++ { key := int64(i%10000 + 1) table.Get(key) } } // TestHighConcurrencyWrite 测试高并发写入(2KB-5MB 数据) func TestHighConcurrencyWrite(t *testing.T) { tmpDir := t.TempDir() // Note: This test uses []byte payload - we create a minimal schema // Schema validation accepts []byte as it gets JSON-marshaled schema, err := NewSchema("test", []Field{ {Name: "worker_id", Type: Int64, Indexed: false, Comment: "Worker ID"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 64 * 1024 * 1024, // 64MB Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() // 测试配置 const ( numGoroutines = 50 // 50 个并发写入 rowsPerWorker = 100 // 每个 worker 写入 100 行 minDataSize = 2 * 1024 // 2KB maxDataSize = 5 * 1024 * 1024 // 5MB ) var ( totalInserted atomic.Int64 totalErrors atomic.Int64 wg sync.WaitGroup ) startTime := time.Now() // 启动多个并发写入 goroutine for i := range numGoroutines { wg.Add(1) go func(workerID int) { defer wg.Done() for j := range rowsPerWorker { // 生成随机大小的数据 (2KB - 5MB) dataSize := minDataSize + (j % (maxDataSize - minDataSize)) largeData := make([]byte, dataSize) rand.Read(largeData) data := map[string]any{ "worker_id": workerID, "row_index": j, "data_size": dataSize, "payload": largeData, "timestamp": time.Now().Unix(), } err := table.Insert(data) if err != nil { totalErrors.Add(1) t.Logf("Worker %d, Row %d: Insert failed: %v", workerID, j, err) } else { totalInserted.Add(1) } // 每 10 行报告一次进度 if j > 0 && j%10 == 0 { t.Logf("Worker %d: 已插入 %d 行", workerID, j) } } }(i) } // 等待所有写入完成 wg.Wait() duration := time.Since(startTime) // 统计结果 inserted := totalInserted.Load() errors := totalErrors.Load() expected := int64(numGoroutines * rowsPerWorker) t.Logf("\n=== 高并发写入测试结果 ===") t.Logf("并发数: %d", numGoroutines) t.Logf("预期插入: %d 行", expected) t.Logf("成功插入: %d 行", inserted) t.Logf("失败: %d 行", errors) t.Logf("耗时: %v", duration) t.Logf("吞吐量: %.2f 行/秒", float64(inserted)/duration.Seconds()) // 验证 if errors > 0 { t.Errorf("有 %d 次写入失败", errors) } if inserted != expected { t.Errorf("预期插入 %d 行,实际插入 %d 行", expected, inserted) } // 等待 Flush 完成 time.Sleep(2 * time.Second) // 验证数据完整性 stats := table.Stats() t.Logf("\nTable 状态:") t.Logf(" 总行数: %d", stats.TotalRows) t.Logf(" SST 文件数: %d", stats.SSTCount) t.Logf(" MemTable 行数: %d", stats.MemTableCount) if stats.TotalRows < inserted { t.Errorf("数据丢失: 预期至少 %d 行,实际 %d 行", inserted, stats.TotalRows) } } // TestConcurrentReadWrite 测试并发读写混合 func TestConcurrentReadWrite(t *testing.T) { tmpDir := t.TempDir() // Note: This test uses []byte data - we create a minimal schema schema, err := NewSchema("test", []Field{ {Name: "writer_id", Type: Int64, Indexed: false, Comment: "Writer ID"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 32 * 1024 * 1024, // 32MB Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() const ( numWriters = 20 numReaders = 30 duration = 10 * time.Second dataSize = 10 * 1024 // 10KB ) var ( writeCount atomic.Int64 readCount atomic.Int64 readErrors atomic.Int64 wg sync.WaitGroup stopCh = make(chan struct{}) ) // 启动写入 goroutines for i := range numWriters { wg.Add(1) go func(writerID int) { defer wg.Done() for { select { case <-stopCh: return default: data := make([]byte, dataSize) rand.Read(data) payload := map[string]any{ "writer_id": writerID, "data": data, "timestamp": time.Now().UnixNano(), } err := table.Insert(payload) if err == nil { writeCount.Add(1) } time.Sleep(10 * time.Millisecond) } } }(i) } // 启动读取 goroutines for i := range numReaders { wg.Add(1) go func(readerID int) { defer wg.Done() for { select { case <-stopCh: return default: // 随机读取 seq := int64(readerID*100 + 1) _, err := table.Get(seq) if err == nil { readCount.Add(1) } else { readErrors.Add(1) } time.Sleep(5 * time.Millisecond) } } }(i) } // 运行指定时间 time.Sleep(duration) close(stopCh) wg.Wait() // 统计结果 writes := writeCount.Load() reads := readCount.Load() errors := readErrors.Load() t.Logf("\n=== 并发读写测试结果 ===") t.Logf("测试时长: %v", duration) t.Logf("写入次数: %d (%.2f 次/秒)", writes, float64(writes)/duration.Seconds()) t.Logf("读取次数: %d (%.2f 次/秒)", reads, float64(reads)/duration.Seconds()) t.Logf("读取失败: %d", errors) stats := table.Stats() t.Logf("\nTable 状态:") t.Logf(" 总行数: %d", stats.TotalRows) t.Logf(" SST 文件数: %d", stats.SSTCount) } // TestPowerFailureRecovery 测试断电恢复(模拟崩溃) func TestPowerFailureRecovery(t *testing.T) { tmpDir := t.TempDir() // 第一阶段:写入数据并模拟崩溃 t.Log("=== 阶段 1: 写入数据 ===") // Note: This test uses []byte data - we create a minimal schema schema, err := NewSchema("test", []Field{ {Name: "batch", Type: Int64, Indexed: false, Comment: "Batch number"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 4 * 1024 * 1024, // 4MB Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } const ( numBatches = 10 rowsPerBatch = 50 dataSize = 50 * 1024 // 50KB ) insertedSeqs := make([]int64, 0, numBatches*rowsPerBatch) for batch := range numBatches { for i := range rowsPerBatch { data := make([]byte, dataSize) rand.Read(data) payload := map[string]any{ "batch": batch, "index": i, "data": data, "timestamp": time.Now().Unix(), } err := table.Insert(payload) if err != nil { t.Fatalf("Insert failed: %v", err) } seq := table.seq.Load() insertedSeqs = append(insertedSeqs, seq) } // 每批后触发 Flush if batch%3 == 0 { table.switchMemTable() time.Sleep(100 * time.Millisecond) } t.Logf("批次 %d: 插入 %d 行", batch, rowsPerBatch) } totalInserted := len(insertedSeqs) t.Logf("总共插入: %d 行", totalInserted) // 获取崩溃前的状态 statsBefore := table.Stats() t.Logf("崩溃前状态: 总行数=%d, SST文件=%d, MemTable行数=%d", statsBefore.TotalRows, statsBefore.SSTCount, statsBefore.MemTableCount) // 模拟崩溃:直接关闭(不等待 Flush 完成) t.Log("\n=== 模拟断电崩溃 ===") table.Close() // 第二阶段:恢复并验证数据 t.Log("\n=== 阶段 2: 恢复数据 ===") tableRecovered, err := OpenTable(opts) if err != nil { t.Fatalf("恢复失败: %v", err) } defer tableRecovered.Close() // 等待恢复完成 time.Sleep(500 * time.Millisecond) statsAfter := tableRecovered.Stats() t.Logf("恢复后状态: 总行数=%d, SST文件=%d, MemTable行数=%d", statsAfter.TotalRows, statsAfter.SSTCount, statsAfter.MemTableCount) // 验证数据完整性 t.Log("\n=== 阶段 3: 验证数据完整性 ===") recovered := 0 missing := 0 corrupted := 0 for i, seq := range insertedSeqs { row, err := tableRecovered.Get(seq) if err != nil { missing++ if i < len(insertedSeqs)/2 { // 前半部分应该已经 Flush,不应该丢失 t.Logf("警告: Seq %d 丢失(应该已持久化)", seq) } continue } // 验证数据 if row.Seq != seq { corrupted++ t.Errorf("数据损坏: 预期 Seq=%d, 实际=%d", seq, row.Seq) continue } recovered++ } recoveryRate := float64(recovered) / float64(totalInserted) * 100 t.Logf("\n=== 恢复结果 ===") t.Logf("插入总数: %d", totalInserted) t.Logf("成功恢复: %d (%.2f%%)", recovered, recoveryRate) t.Logf("丢失: %d", missing) t.Logf("损坏: %d", corrupted) // 验证:至少应该恢复已经 Flush 的数据 if corrupted > 0 { t.Errorf("发现 %d 条损坏数据", corrupted) } // 至少应该恢复 50% 的数据(已 Flush 的部分) if recoveryRate < 50 { t.Errorf("恢复率过低: %.2f%% (预期至少 50%%)", recoveryRate) } t.Logf("\n断电恢复测试通过!") } // TestCrashDuringCompaction 测试 Compaction 期间崩溃 func TestCrashDuringCompaction(t *testing.T) { tmpDir := t.TempDir() // Note: This test uses []byte data - we create a minimal schema schema, err := NewSchema("test", []Field{ {Name: "index", Type: Int64, Indexed: false, Comment: "Index"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 1024, // 很小,快速触发 Flush Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } // 插入大量数据触发多次 Flush t.Log("=== 插入数据触发 Compaction ===") const numRows = 500 dataSize := 5 * 1024 // 5KB for i := range numRows { data := make([]byte, dataSize) rand.Read(data) payload := map[string]any{ "index": i, "data": data, } err := table.Insert(payload) if err != nil { t.Fatal(err) } if i%50 == 0 { t.Logf("已插入 %d 行", i) } } // 等待一些 Flush 完成 time.Sleep(500 * time.Millisecond) version := table.versionSet.GetCurrent() l0Count := version.GetLevelFileCount(0) t.Logf("L0 文件数: %d", l0Count) // 模拟在 Compaction 期间崩溃 if l0Count >= 4 { t.Log("触发 Compaction...") go func() { table.compactionManager.TriggerCompaction() }() // 等待 Compaction 开始 time.Sleep(100 * time.Millisecond) t.Log("=== 模拟 Compaction 期间崩溃 ===") } // 直接关闭(模拟崩溃) table.Close() // 恢复 t.Log("\n=== 恢复数据库 ===") tableRecovered, err := OpenTable(opts) if err != nil { t.Fatalf("恢复失败: %v", err) } defer tableRecovered.Close() // 验证数据完整性 stats := tableRecovered.Stats() t.Logf("恢复后: 总行数=%d, SST文件=%d", stats.TotalRows, stats.SSTCount) // 随机验证一些数据 t.Log("\n=== 验证数据 ===") verified := 0 for i := 1; i <= 100; i++ { seq := int64(i) _, err := tableRecovered.Get(seq) if err == nil { verified++ } } t.Logf("验证前 100 行: %d 行可读", verified) if verified < 50 { t.Errorf("数据恢复不足: 只有 %d/100 行可读", verified) } t.Log("Compaction 崩溃恢复测试通过!") } // TestLargeDataIntegrity 测试大数据完整性(2KB-5MB 数据) func TestLargeDataIntegrity(t *testing.T) { tmpDir := t.TempDir() // Note: This test uses []byte data - we create a minimal schema schema, err := NewSchema("test", []Field{ {Name: "size", Type: Int64, Indexed: false, Comment: "Size"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 64 * 1024 * 1024, // 64MB Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() // 测试不同大小的数据 testSizes := []int{ 2 * 1024, // 2KB 10 * 1024, // 10KB 100 * 1024, // 100KB 1 * 1024 * 1024, // 1MB 5 * 1024 * 1024, // 5MB } t.Log("=== 插入不同大小的数据 ===") insertedSeqs := make([]int64, 0) for _, size := range testSizes { // 每种大小插入 3 行 for i := range 3 { data := make([]byte, size) rand.Read(data) payload := map[string]any{ "size": size, "index": i, "data": data, } err := table.Insert(payload) if err != nil { t.Fatalf("插入失败 (size=%d, index=%d): %v", size, i, err) } seq := table.seq.Load() insertedSeqs = append(insertedSeqs, seq) t.Logf("插入: Seq=%d, Size=%d KB", seq, size/1024) } } totalInserted := len(insertedSeqs) t.Logf("总共插入: %d 行", totalInserted) // 等待 Flush time.Sleep(2 * time.Second) // 验证数据可读性 t.Log("\n=== 验证数据可读性 ===") successCount := 0 for i, seq := range insertedSeqs { row, err := table.Get(seq) if err != nil { t.Errorf("读取失败 (Seq=%d): %v", seq, err) continue } // 验证数据存在 if _, exists := row.Data["data"]; !exists { t.Errorf("Seq=%d: 数据字段不存在", seq) continue } if _, exists := row.Data["size"]; !exists { t.Errorf("Seq=%d: size 字段不存在", seq) continue } successCount++ if i < 5 || i >= totalInserted-5 { // 只打印前5行和后5行 t.Logf("✓ Seq=%d 验证通过", seq) } } successRate := float64(successCount) / float64(totalInserted) * 100 stats := table.Stats() t.Logf("\n=== 测试结果 ===") t.Logf("插入总数: %d", totalInserted) t.Logf("成功读取: %d (%.2f%%)", successCount, successRate) t.Logf("总行数: %d", stats.TotalRows) t.Logf("SST 文件数: %d", stats.SSTCount) if successCount != totalInserted { t.Errorf("数据丢失: %d/%d", totalInserted-successCount, totalInserted) } t.Log("\n大数据完整性测试通过!") } // BenchmarkConcurrentWrites 并发写入性能测试 func BenchmarkConcurrentWrites(b *testing.B) { tmpDir := b.TempDir() // Note: This benchmark uses []byte data - we create a minimal schema schema, err := NewSchema("test", []Field{ {Name: "timestamp", Type: Int64, Indexed: false, Comment: "Timestamp"}, }) if err != nil { b.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 64 * 1024 * 1024, Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { b.Fatal(err) } defer table.Close() const ( numWorkers = 10 dataSize = 10 * 1024 // 10KB ) data := make([]byte, dataSize) rand.Read(data) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { payload := map[string]any{ "data": data, "timestamp": time.Now().UnixNano(), } err := table.Insert(payload) if err != nil { b.Error(err) } } }) b.StopTimer() stats := table.Stats() b.Logf("总行数: %d, SST 文件数: %d", stats.TotalRows, stats.SSTCount) } // TestTableWithCompaction 测试 Table 的 Compaction 功能 func TestTableWithCompaction(t *testing.T) { // 创建临时目录 tmpDir := t.TempDir() schema, err := NewSchema("test", []Field{ {Name: "batch", Type: Int64, Indexed: false, Comment: "批次"}, {Name: "index", Type: Int64, Indexed: false, Comment: "索引"}, {Name: "value", Type: String, Indexed: false, Comment: "值"}, }) if err != nil { t.Fatal(err) } // 打开 Table opts := &TableOptions{ Dir: tmpDir, MemTableSize: 1024, // 小的 MemTable 以便快速触发 Flush Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() // 插入大量数据,触发多次 Flush const numBatches = 10 const rowsPerBatch = 100 for batch := range numBatches { for i := range rowsPerBatch { data := map[string]any{ "batch": batch, "index": i, "value": fmt.Sprintf("data-%d-%d", batch, i), } err := table.Insert(data) if err != nil { t.Fatalf("Insert failed: %v", err) } } // 强制 Flush err = table.switchMemTable() if err != nil { t.Fatalf("Switch MemTable failed: %v", err) } // 等待 Flush 完成 time.Sleep(100 * time.Millisecond) } // 等待所有 Immutable Flush 完成 for table.memtableManager.GetImmutableCount() > 0 { time.Sleep(100 * time.Millisecond) } // 检查 Version 状态 version := table.versionSet.GetCurrent() l0Count := version.GetLevelFileCount(0) t.Logf("L0 files: %d", l0Count) if l0Count == 0 { t.Error("Expected some files in L0") } // 获取 Level 统计信息 levelStats := table.compactionManager.GetLevelStats() for _, stat := range levelStats { level := stat.Level fileCount := stat.FileCount totalSize := stat.TotalSize score := stat.Score if fileCount > 0 { t.Logf("L%d: %d files, %d bytes, score: %.2f", level, fileCount, totalSize, score) } } // 手动触发 Compaction if l0Count >= 4 { t.Log("Triggering manual compaction...") err = table.compactionManager.TriggerCompaction() if err != nil { t.Logf("Compaction: %v", err) } else { t.Log("Compaction completed") // 检查 Compaction 后的状态 version = table.versionSet.GetCurrent() newL0Count := version.GetLevelFileCount(0) l1Count := version.GetLevelFileCount(1) t.Logf("After compaction - L0: %d files, L1: %d files", newL0Count, l1Count) if newL0Count >= l0Count { t.Error("Expected L0 file count to decrease after compaction") } if l1Count == 0 { t.Error("Expected some files in L1 after compaction") } } } // 验证数据完整性 stats := table.Stats() t.Logf("Table stats: %d rows, %d SST files", stats.TotalRows, stats.SSTCount) // 读取一些数据验证 for batch := range 3 { for i := range 10 { seq := int64(batch*rowsPerBatch + i + 1) row, err := table.Get(seq) if err != nil { t.Errorf("Get(%d) failed: %v", seq, err) continue } if row.Data["batch"].(int64) != int64(batch) { t.Errorf("Expected batch %d, got %v", batch, row.Data["batch"]) } } } } // TestTableCompactionMerge 测试 Compaction 的合并功能 func TestTableCompactionMerge(t *testing.T) { tmpDir := t.TempDir() schema, err := NewSchema("test", []Field{ {Name: "batch", Type: Int64, Indexed: false, Comment: "批次"}, {Name: "index", Type: Int64, Indexed: false, Comment: "索引"}, {Name: "value", Type: String, Indexed: false, Comment: "值"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 512, // 很小的 MemTable Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() // 插入数据(Append-Only 模式) const numBatches = 5 const rowsPerBatch = 50 totalRows := 0 for batch := range numBatches { for i := range rowsPerBatch { data := map[string]any{ "batch": batch, "index": i, "value": fmt.Sprintf("v%d-%d", batch, i), } err := table.Insert(data) if err != nil { t.Fatal(err) } totalRows++ } // 每批后 Flush err = table.switchMemTable() if err != nil { t.Fatal(err) } time.Sleep(50 * time.Millisecond) } // 等待所有 Flush 完成 for table.memtableManager.GetImmutableCount() > 0 { time.Sleep(100 * time.Millisecond) } // 记录 Compaction 前的文件数 version := table.versionSet.GetCurrent() beforeL0 := version.GetLevelFileCount(0) t.Logf("Before compaction: L0 has %d files", beforeL0) // 触发 Compaction if beforeL0 >= 4 { err = table.compactionManager.TriggerCompaction() if err != nil { t.Logf("Compaction: %v", err) } else { version = table.versionSet.GetCurrent() afterL0 := version.GetLevelFileCount(0) afterL1 := version.GetLevelFileCount(1) t.Logf("After compaction: L0 has %d files, L1 has %d files", afterL0, afterL1) } } // 验证数据完整性 - 检查前几条记录 for batch := range 2 { for i := range 5 { seq := int64(batch*rowsPerBatch + i + 1) row, err := table.Get(seq) if err != nil { t.Errorf("Get(%d) failed: %v", seq, err) continue } actualBatch := int(row.Data["batch"].(int64)) if actualBatch != batch { t.Errorf("Seq %d: expected batch %d, got %d", seq, batch, actualBatch) } } } // 验证总行数 stats := table.Stats() if stats.TotalRows != int64(totalRows) { t.Errorf("Expected %d total rows, got %d", totalRows, stats.TotalRows) } t.Logf("Data integrity verified: %d rows", totalRows) } // TestTableBackgroundCompaction 测试后台自动 Compaction func TestTableBackgroundCompaction(t *testing.T) { if testing.Short() { t.Skip("Skipping background compaction test in short mode") } tmpDir := t.TempDir() schema, err := NewSchema("test", []Field{ {Name: "batch", Type: Int64, Indexed: false, Comment: "批次"}, {Name: "index", Type: Int64, Indexed: false, Comment: "索引"}, }) if err != nil { t.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 512, Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { t.Fatal(err) } defer table.Close() // 插入数据触发多次 Flush const numBatches = 8 const rowsPerBatch = 50 for batch := range numBatches { for i := range rowsPerBatch { data := map[string]any{ "batch": batch, "index": i, } err := table.Insert(data) if err != nil { t.Fatal(err) } } err = table.switchMemTable() if err != nil { t.Fatal(err) } time.Sleep(50 * time.Millisecond) } // 等待 Flush 完成 for table.memtableManager.GetImmutableCount() > 0 { time.Sleep(100 * time.Millisecond) } // 记录初始状态 version := table.versionSet.GetCurrent() initialL0 := version.GetLevelFileCount(0) t.Logf("Initial L0 files: %d", initialL0) // 等待后台 Compaction(最多等待 30 秒) maxWait := 30 * time.Second checkInterval := 2 * time.Second waited := time.Duration(0) for waited < maxWait { time.Sleep(checkInterval) waited += checkInterval version = table.versionSet.GetCurrent() currentL0 := version.GetLevelFileCount(0) currentL1 := version.GetLevelFileCount(1) t.Logf("After %v: L0=%d, L1=%d", waited, currentL0, currentL1) // 如果 L0 文件减少或 L1 有文件,说明 Compaction 发生了 if currentL0 < initialL0 || currentL1 > 0 { t.Logf("Background compaction detected!") // 获取 Compaction 统计 stats := table.compactionManager.GetStats() t.Logf("Compaction stats: %v", stats) return } } t.Log("No background compaction detected within timeout (this is OK if L0 < 4 files)") } // BenchmarkTableWithCompaction 性能测试 func BenchmarkTableWithCompaction(b *testing.B) { tmpDir := b.TempDir() schema, err := NewSchema("test", []Field{ {Name: "index", Type: Int64, Indexed: false, Comment: "索引"}, {Name: "value", Type: String, Indexed: false, Comment: "值"}, }) if err != nil { b.Fatal(err) } opts := &TableOptions{ Dir: tmpDir, MemTableSize: 64 * 1024, // 64KB Name: schema.Name, Fields: schema.Fields, } table, err := OpenTable(opts) if err != nil { b.Fatal(err) } defer table.Close() for i := 0; b.Loop(); i++ { data := map[string]any{ "index": i, "value": fmt.Sprintf("benchmark-data-%d", i), } err := table.Insert(data) if err != nil { b.Fatal(err) } } b.StopTimer() // 等待所有 Flush 完成 for table.memtableManager.GetImmutableCount() > 0 { time.Sleep(10 * time.Millisecond) } // 报告统计信息 version := table.versionSet.GetCurrent() b.Logf("Final state: L0=%d files, L1=%d files, Total=%d files", version.GetLevelFileCount(0), version.GetLevelFileCount(1), version.GetFileCount()) } // TestTableSchemaRecover 测试 Schema 恢复 func TestTableSchemaRecover(t *testing.T) { dir := "test_schema_recover" os.RemoveAll(dir) defer os.RemoveAll(dir) // 创建 Schema s, err := NewSchema("users", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: Int64, Indexed: false, Comment: "年龄"}, {Name: "email", Type: String, Indexed: false, Comment: "邮箱"}, }) if err != nil { t.Fatal(err) } // 1. 创建引擎并插入数据(带 Schema) table, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, // 10 MB,不会触发 Flush Name: s.Name, Fields: s.Fields, }) if err != nil { t.Fatal(err) } // 插入符合 Schema 的数据 for i := 1; i <= 50; i++ { data := map[string]any{ "name": fmt.Sprintf("user_%d", i), "age": 20 + i%50, "email": fmt.Sprintf("user%d@example.com", i), } err := table.Insert(data) if err != nil { t.Fatalf("Failed to insert valid data: %v", err) } } t.Log("Inserted 50 rows with schema") // 2. 关闭引擎 table.Close() // 3. 重新打开引擎(带 Schema,应该成功恢复) table2, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, Name: s.Name, Fields: s.Fields, }) if err != nil { t.Fatalf("Failed to recover with schema: %v", err) } // 验证数据 row, err := table2.Get(1) if err != nil { t.Fatalf("Failed to get row after recovery: %v", err) } if row.Seq != 1 { t.Errorf("Expected seq=1, got %d", row.Seq) } // 验证字段 if row.Data["name"] == nil { t.Error("Missing field 'name'") } if row.Data["age"] == nil { t.Error("Missing field 'age'") } table2.Close() t.Log("Schema recovery test passed!") } // TestTableSchemaRecoverInvalid 测试当 WAL 中有不符合 Schema 的数据时恢复失败 func TestTableSchemaRecoverInvalid(t *testing.T) { dir := "test_schema_recover_invalid" os.RemoveAll(dir) defer os.RemoveAll(dir) schema, err := NewSchema("test", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: String, Indexed: false, Comment: "年龄字符串"}, }) if err != nil { t.Fatal(err) } // 1. 先不带 Schema 插入一些数据 table, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, // 大容量,确保不会触发 Flush Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } // 插入一些不符合后续 Schema 的数据 for i := 1; i <= 10; i++ { data := map[string]any{ "name": fmt.Sprintf("user_%d", i), "age": "invalid_age", // 这是字符串,但后续 Schema 要求 int64 } err := table.Insert(data) if err != nil { t.Fatalf("Failed to insert data: %v", err) } } // 2. 停止后台任务但不 Flush(模拟崩溃) if table.compactionManager != nil { table.compactionManager.Stop() } // 直接关闭资源,但不 Flush MemTable if table.walManager != nil { table.walManager.Close() } if table.versionSet != nil { table.versionSet.Close() } if table.sstManager != nil { table.sstManager.Close() } // 3. 创建 Schema,age 字段要求 int64 s, err := NewSchema("users", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: Int64, Indexed: false, Comment: "年龄"}, }) if err != nil { t.Fatal(err) } // 4. 尝试用 Schema 打开引擎,应该失败 table2, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, Name: s.Name, Fields: s.Fields, }) if err == nil { table2.Close() t.Fatal("Expected recovery to fail with invalid schema, but it succeeded") } // 验证错误信息包含 "schema validation failed" if err != nil { t.Logf("Got expected error: %v", err) } t.Log("Invalid schema recovery test passed!") } // TestTableAutoRecoverSchema 测试自动从磁盘恢复 Schema func TestTableAutoRecoverSchema(t *testing.T) { dir := "test_auto_recover_schema" os.RemoveAll(dir) defer os.RemoveAll(dir) // 创建 Schema s, err := NewSchema("users", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: Int64, Indexed: false, Comment: "年龄"}, }) if err != nil { t.Fatal(err) } // 1. 创建引擎并提供 Schema(会保存到磁盘) table1, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, Name: s.Name, Fields: s.Fields, }) if err != nil { t.Fatal(err) } // 插入数据 for i := 1; i <= 10; i++ { data := map[string]any{ "name": fmt.Sprintf("user_%d", i), "age": 20 + i, } err := table1.Insert(data) if err != nil { t.Fatalf("Failed to insert: %v", err) } } table1.Close() // 2. 重新打开引擎,不提供 Schema(应该自动从磁盘恢复) table2, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, // 不设置 Schema }) if err != nil { t.Fatalf("Failed to open without schema: %v", err) } // 验证 Schema 已恢复 recoveredSchema := table2.GetSchema() if recoveredSchema == nil { t.Fatal("Expected schema to be recovered, but got nil") } if recoveredSchema.Name != "users" { t.Errorf("Expected schema name 'users', got '%s'", recoveredSchema.Name) } if len(recoveredSchema.Fields) != 2 { t.Errorf("Expected 2 fields, got %d", len(recoveredSchema.Fields)) } // 验证数据 row, err := table2.Get(1) if err != nil { t.Fatalf("Failed to get row: %v", err) } if row.Data["name"] != "user_1" { t.Errorf("Expected name='user_1', got '%v'", row.Data["name"]) } // 尝试插入新数据(应该符合恢复的 Schema) err = table2.Insert(map[string]any{ "name": "new_user", "age": 30, }) if err != nil { t.Fatalf("Failed to insert with recovered schema: %v", err) } // 尝试插入不符合 Schema 的数据(应该失败) err = table2.Insert(map[string]any{ "name": "bad_user", "age": "invalid", // 类型错误 }) if err == nil { t.Fatal("Expected insert to fail with invalid type, but it succeeded") } table2.Close() t.Log("Auto recover schema test passed!") } // TestTableSchemaTamperDetection 测试篡改检测 func TestTableSchemaTamperDetection(t *testing.T) { dir := "test_schema_tamper" os.RemoveAll(dir) defer os.RemoveAll(dir) // 创建 Schema s, err := NewSchema("users", []Field{ {Name: "name", Type: String, Indexed: false, Comment: "用户名"}, {Name: "age", Type: Int64, Indexed: false, Comment: "年龄"}, }) if err != nil { t.Fatal(err) } // 1. 创建引擎并保存 Schema table1, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, Name: s.Name, Fields: s.Fields, }) if err != nil { t.Fatal(err) } table1.Close() // 2. 篡改 schema.json(修改字段但不更新 checksum) schemaPath := fmt.Sprintf("%s/schema.json", dir) schemaData, err := os.ReadFile(schemaPath) if err != nil { t.Fatal(err) } // 将 "age" 的注释从 "年龄" 改为 "AGE"(简单篡改) tamperedData := strings.Replace(string(schemaData), "年龄", "AGE", 1) err = os.WriteFile(schemaPath, []byte(tamperedData), 0644) if err != nil { t.Fatal(err) } // 3. 尝试打开引擎,应该检测到篡改 table2, err := OpenTable(&TableOptions{ Dir: dir, MemTableSize: 10 * 1024 * 1024, }) if err == nil { table2.Close() t.Fatal("Expected to detect schema tampering, but open succeeded") } // 验证错误信息包含 "checksum mismatch" errMsg := err.Error() if !strings.Contains(errMsg, "checksum mismatch") && !strings.Contains(errMsg, "tampered") { t.Errorf("Expected error about checksum mismatch or tampering, got: %v", err) } t.Logf("Detected tampering as expected: %v", err) t.Log("Schema tamper detection test passed!") } func TestTableClean(t *testing.T) { dir := "./test_table_clean_data" defer os.RemoveAll(dir) // 1. 创建数据库和表 db, err := Open(dir) if err != nil { t.Fatal(err) } defer db.Close() schema, err := NewSchema("users", []Field{ {Name: "id", Type: Int64, Indexed: true, Comment: "ID"}, {Name: "name", Type: String, Indexed: false, Comment: "Name"}, }) if err != nil { t.Fatal(err) } table, err := db.CreateTable("users", schema) if err != nil { t.Fatal(err) } // 2. 插入数据 for i := range 100 { err := table.Insert(map[string]any{ "id": int64(i), "name": "user" + string(rune(i)), }) if err != nil { t.Fatal(err) } } // 3. 验证数据存在 stats := table.Stats() t.Logf("Before Clean: %d rows", stats.TotalRows) if stats.TotalRows == 0 { t.Error("Expected data in table") } // 4. 清除数据 err = table.Clean() if err != nil { t.Fatal(err) } // 5. 验证数据已清除 stats = table.Stats() t.Logf("After Clean: %d rows", stats.TotalRows) if stats.TotalRows != 0 { t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows) } // 6. 验证表仍然可用 err = table.Insert(map[string]any{ "id": int64(100), "name": "new_user", }) if err != nil { t.Fatal(err) } stats = table.Stats() if stats.TotalRows != 1 { t.Errorf("Expected 1 row after insert, got %d", stats.TotalRows) } } func TestTableDestroy(t *testing.T) { dir := "./test_table_destroy_data" defer os.RemoveAll(dir) // 1. 创建数据库和表 db, err := Open(dir) if err != nil { t.Fatal(err) } defer db.Close() schema, err := NewSchema("test", []Field{ {Name: "id", Type: Int64, Indexed: false, Comment: "ID"}, }) if err != nil { t.Fatal(err) } table, err := db.CreateTable("test", schema) if err != nil { t.Fatal(err) } // 2. 插入数据 for i := range 50 { table.Insert(map[string]any{"id": int64(i)}) } // 3. 验证数据存在 stats := table.Stats() t.Logf("Before Destroy: %d rows", stats.TotalRows) if stats.TotalRows == 0 { t.Error("Expected data in table") } // 4. 获取表目录路径 tableDir := table.dir // 5. 销毁表 err = table.Destroy() if err != nil { t.Fatal(err) } // 6. 验证表目录已删除 if _, err := os.Stat(tableDir); !os.IsNotExist(err) { t.Error("Table directory should be deleted") } // 7. 注意:Table.Destroy() 只删除文件,不从 Database 中删除 // 表仍然在 Database 的元数据中,但文件已被删除 tables := db.ListTables() found := slices.Contains(tables, "test") if !found { t.Error("Table should still be in database metadata (use Database.DestroyTable to remove from metadata)") } } func TestTableCleanWithIndex(t *testing.T) { dir := "./test_table_clean_index_data" defer os.RemoveAll(dir) // 1. 创建数据库和表 db, err := Open(dir) if err != nil { t.Fatal(err) } defer db.Close() schema, err := NewSchema("users", []Field{ {Name: "id", Type: Int64, Indexed: true, Comment: "ID"}, {Name: "email", Type: String, Indexed: true, Comment: "Email"}, {Name: "name", Type: String, Indexed: false, Comment: "Name"}, }) if err != nil { t.Fatal(err) } table, err := db.CreateTable("users", schema) if err != nil { t.Fatal(err) } // 2. 创建索引 err = table.CreateIndex("id") if err != nil { t.Fatal(err) } err = table.CreateIndex("email") if err != nil { t.Fatal(err) } // 3. 插入数据 for i := range 50 { table.Insert(map[string]any{ "id": int64(i), "email": "user" + string(rune(i)) + "@example.com", "name": "User " + string(rune(i)), }) } // 4. 验证索引存在 indexes := table.ListIndexes() if len(indexes) != 2 { t.Errorf("Expected 2 indexes, got %d", len(indexes)) } // 5. 清除数据 err = table.Clean() if err != nil { t.Fatal(err) } // 6. 验证数据已清除 stats := table.Stats() if stats.TotalRows != 0 { t.Errorf("Expected 0 rows after clean, got %d", stats.TotalRows) } // 7. 验证索引已被清除(Clean 会删除索引数据) indexes = table.ListIndexes() if len(indexes) != 0 { t.Logf("Note: Indexes were cleared (expected behavior), got %d", len(indexes)) } // 8. 重新创建索引 table.CreateIndex("id") table.CreateIndex("email") // 9. 验证可以继续插入数据 err = table.Insert(map[string]any{ "id": int64(100), "email": "new@example.com", "name": "New User", }) if err != nil { t.Fatal(err) } stats = table.Stats() if stats.TotalRows != 1 { t.Errorf("Expected 1 row, got %d", stats.TotalRows) } } func TestTableCleanAndQuery(t *testing.T) { dir := "./test_table_clean_query_data" defer os.RemoveAll(dir) // 1. 创建数据库和表 db, err := Open(dir) if err != nil { t.Fatal(err) } defer db.Close() schema, err := NewSchema("test", []Field{ {Name: "id", Type: Int64, Indexed: false, Comment: "ID"}, {Name: "status", Type: String, Indexed: false, Comment: "Status"}, }) if err != nil { t.Fatal(err) } table, err := db.CreateTable("test", schema) if err != nil { t.Fatal(err) } // 2. 插入数据 for i := range 30 { table.Insert(map[string]any{ "id": int64(i), "status": "active", }) } // 3. 查询数据 rows, err := table.Query().Eq("status", "active").Rows() if err != nil { t.Fatal(err) } count := 0 for rows.Next() { count++ } rows.Close() t.Logf("Before Clean: found %d rows", count) if count != 30 { t.Errorf("Expected 30 rows, got %d", count) } // 4. 清除数据 err = table.Clean() if err != nil { t.Fatal(err) } // 5. 再次查询 rows, err = table.Query().Eq("status", "active").Rows() if err != nil { t.Fatal(err) } count = 0 for rows.Next() { count++ } rows.Close() t.Logf("After Clean: found %d rows", count) if count != 0 { t.Errorf("Expected 0 rows after clean, got %d", count) } // 6. 插入新数据并查询 table.Insert(map[string]any{ "id": int64(100), "status": "active", }) rows, err = table.Query().Eq("status", "active").Rows() if err != nil { t.Fatal(err) } count = 0 for rows.Next() { count++ } rows.Close() if count != 1 { t.Errorf("Expected 1 row, got %d", count) } } // TestInsertMap 测试插入 map[string]any func TestInsertMap(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertMap") defer os.RemoveAll(tmpDir) schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入单个 map err = table.Insert(map[string]any{ "name": "Alice", "age": int64(25), }) if err != nil { t.Fatalf("Insert map failed: %v", err) } // 验证 row, err := table.Get(1) if err != nil { t.Fatalf("Get failed: %v", err) } if row.Data["name"] != "Alice" { t.Errorf("Expected name=Alice, got %v", row.Data["name"]) } t.Log("✓ Insert map test passed") } // TestInsertMapSlice 测试插入 []map[string]any func TestInsertMapSlice(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertMapSlice") defer os.RemoveAll(tmpDir) schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 批量插入 maps err = table.Insert([]map[string]any{ {"name": "Alice", "age": int64(25)}, {"name": "Bob", "age": int64(30)}, {"name": "Charlie", "age": int64(35)}, }) if err != nil { t.Fatalf("Insert map slice failed: %v", err) } // 验证 row1, _ := table.Get(1) row2, _ := table.Get(2) row3, _ := table.Get(3) if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" { t.Errorf("Data mismatch") } t.Log("✓ Insert map slice test passed (3 rows)") } // TestInsertStruct 测试插入单个结构体 func TestInsertStruct(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertStruct") defer os.RemoveAll(tmpDir) type User struct { Name string `srdb:"name"` Age int64 `srdb:"age"` Email string `srdb:"email"` } schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, {Name: "email", Type: String}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入单个结构体 user := User{ Name: "Alice", Age: 25, Email: "alice@example.com", } err = table.Insert(user) if err != nil { t.Fatalf("Insert struct failed: %v", err) } // 验证 row, err := table.Get(1) if err != nil { t.Fatalf("Get failed: %v", err) } if row.Data["name"] != "Alice" { t.Errorf("Expected name=Alice, got %v", row.Data["name"]) } if row.Data["email"] != "alice@example.com" { t.Errorf("Expected email=alice@example.com, got %v", row.Data["email"]) } t.Log("✓ Insert struct test passed") } // TestInsertStructPointer 测试插入结构体指针 func TestInsertStructPointer(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertStructPointer") defer os.RemoveAll(tmpDir) type User struct { Name string `srdb:"name"` Age int64 `srdb:"age"` Email string `srdb:"email"` } schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, {Name: "email", Type: String}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入结构体指针 user := &User{ Name: "Bob", Age: 30, Email: "bob@example.com", } err = table.Insert(user) if err != nil { t.Fatalf("Insert struct pointer failed: %v", err) } // 验证 row, err := table.Get(1) if err != nil { t.Fatalf("Get failed: %v", err) } if row.Data["name"] != "Bob" { t.Errorf("Expected name=Bob, got %v", row.Data["name"]) } t.Log("✓ Insert struct pointer test passed") } // TestInsertStructSlice 测试插入结构体切片 func TestInsertStructSlice(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertStructSlice") defer os.RemoveAll(tmpDir) type User struct { Name string `srdb:"name"` Age int64 `srdb:"age"` } schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 批量插入结构体切片 users := []User{ {Name: "Alice", Age: 25}, {Name: "Bob", Age: 30}, {Name: "Charlie", Age: 35}, } err = table.Insert(users) if err != nil { t.Fatalf("Insert struct slice failed: %v", err) } // 验证 row1, _ := table.Get(1) row2, _ := table.Get(2) row3, _ := table.Get(3) if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" { t.Errorf("Data mismatch") } t.Log("✓ Insert struct slice test passed (3 rows)") } // TestInsertStructPointerSlice 测试插入结构体指针切片 func TestInsertStructPointerSlice(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertStructPointerSlice") defer os.RemoveAll(tmpDir) type User struct { Name string `srdb:"name"` Age int64 `srdb:"age"` } schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 批量插入结构体指针切片 users := []*User{ {Name: "Alice", Age: 25}, {Name: "Bob", Age: 30}, nil, // 测试 nil 指针会被跳过 {Name: "Charlie", Age: 35}, } err = table.Insert(users) if err != nil { t.Fatalf("Insert struct pointer slice failed: %v", err) } // 验证(应该只有 3 条记录,nil 被跳过) row1, _ := table.Get(1) row2, _ := table.Get(2) row3, _ := table.Get(3) if row1.Data["name"] != "Alice" || row2.Data["name"] != "Bob" || row3.Data["name"] != "Charlie" { t.Errorf("Data mismatch") } t.Log("✓ Insert struct pointer slice test passed (3 rows, nil skipped)") } // TestInsertWithSnakeCase 测试结构体自动 snake_case 转换 func TestInsertWithSnakeCase(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertWithSnakeCase") defer os.RemoveAll(tmpDir) type User struct { UserName string `srdb:";comment:用户名"` // 没有指定字段名,应该自动转为 user_name EmailAddress string // 没有 tag,应该自动转为 email_address IsActive bool // 应该自动转为 is_active } schema, err := NewSchema("users", []Field{ {Name: "user_name", Type: String, Comment: "用户名"}, {Name: "email_address", Type: String}, {Name: "is_active", Type: Bool}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入结构体 user := User{ UserName: "Alice", EmailAddress: "alice@example.com", IsActive: true, } err = table.Insert(user) if err != nil { t.Fatalf("Insert failed: %v", err) } // 验证字段名是否正确转换 row, err := table.Get(1) if err != nil { t.Fatalf("Get failed: %v", err) } if row.Data["user_name"] != "Alice" { t.Errorf("Expected user_name=Alice, got %v", row.Data["user_name"]) } if row.Data["email_address"] != "alice@example.com" { t.Errorf("Expected email_address=alice@example.com, got %v", row.Data["email_address"]) } if row.Data["is_active"] != true { t.Errorf("Expected is_active=true, got %v", row.Data["is_active"]) } t.Log("✓ Insert with snake_case test passed") } // TestInsertInvalidType 测试插入不支持的类型 func TestInsertInvalidType(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertInvalidType") defer os.RemoveAll(tmpDir) schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 尝试插入不支持的类型 err = table.Insert(123) // int 类型 if err == nil { t.Errorf("Expected error for invalid type, got nil") } err = table.Insert("string") // string 类型 if err == nil { t.Errorf("Expected error for invalid type, got nil") } err = table.Insert(nil) // nil if err == nil { t.Errorf("Expected error for nil, got nil") } t.Log("✓ Insert invalid type test passed") } // TestInsertEmptySlice 测试插入空切片 func TestInsertEmptySlice(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestInsertEmptySlice") defer os.RemoveAll(tmpDir) schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 插入空切片 err = table.Insert([]map[string]any{}) if err != nil { t.Errorf("Expected nil error for empty slice, got %v", err) } // 验证没有数据 _, err = table.Get(1) if err == nil { t.Errorf("Expected error for non-existent row") } t.Log("✓ Insert empty slice test passed") } // TestBatchInsertPerformance 测试批量插入性能 func TestBatchInsertPerformance(t *testing.T) { tmpDir, _ := os.MkdirTemp("", "TestBatchInsertPerformance") defer os.RemoveAll(tmpDir) schema, err := NewSchema("users", []Field{ {Name: "name", Type: String}, {Name: "age", Type: Int64}, }) if err != nil { t.Fatal(err) } table, err := OpenTable(&TableOptions{ Dir: tmpDir, Name: schema.Name, Fields: schema.Fields, }) if err != nil { t.Fatal(err) } defer table.Close() // 准备1000条数据 batchSize := 1000 data := make([]map[string]any, batchSize) for i := 0; i < batchSize; i++ { data[i] = map[string]any{ "name": "User" + string(rune(i)), "age": int64(20 + i%50), } } // 批量插入 err = table.Insert(data) if err != nil { t.Fatalf("Batch insert failed: %v", err) } // 验证数量 row, err := table.Get(int64(batchSize)) if err != nil { t.Fatalf("Get last row failed: %v", err) } if row.Seq != int64(batchSize) { t.Errorf("Expected seq=%d, got %d", batchSize, row.Seq) } t.Logf("✓ Batch insert performance test passed (%d rows)", batchSize) }