Files
pipelinedb/index_test.go

712 lines
16 KiB
Go
Raw Permalink Normal View History

2025-09-30 15:05:56 +08:00
package pipelinedb
import (
"encoding/binary"
"errors"
"os"
"sort"
"testing"
)
// TestIndexEntry 测试索引条目的基本功能
func TestIndexEntry(t *testing.T) {
entry1 := IndexEntry{ID: 100, PageNo: 5, SlotNo: 10}
entry2 := IndexEntry{ID: 200, PageNo: 8, SlotNo: 15}
entry3 := IndexEntry{ID: 100, PageNo: 6, SlotNo: 12} // 相同ID不同位置
// 测试Less方法
if !entry1.Less(entry2) {
t.Error("entry1 should be less than entry2")
}
if entry2.Less(entry1) {
t.Error("entry2 should not be less than entry1")
}
if entry1.Less(entry3) || entry3.Less(entry1) {
t.Error("entries with same ID should be equal")
}
}
// TestNewGroupIndex 测试组索引的创建
func TestNewGroupIndex(t *testing.T) {
groupName := "test_group"
idx := NewGroupIndex(groupName)
if idx == nil {
t.Fatal("NewGroupIndex returned nil")
}
if idx.name != groupName {
t.Errorf("index name = %s, want %s", idx.name, groupName)
}
if idx.Count() != 0 {
t.Errorf("initial count = %d, want 0", idx.Count())
}
if idx.tree == nil {
t.Error("B+Tree not initialized")
}
}
// TestTableIndexInsert 测试索引插入操作
func TestTableIndexInsert(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些条目
testData := []struct {
id int64
pageNo uint16
slotNo uint16
}{
{100, 1, 5},
{200, 2, 10},
{150, 1, 8},
{300, 3, 2},
}
for _, data := range testData {
idx.Insert(data.id, data.pageNo, data.slotNo)
}
// 验证索引大小
if idx.Count() != len(testData) {
t.Errorf("count after inserts = %d, want %d", idx.Count(), len(testData))
}
// 验证每个条目都能找到
for _, data := range testData {
pageNo, slotNo, found := idx.Get(data.id)
if !found {
t.Errorf("entry with ID %d not found", data.id)
continue
}
if pageNo != data.pageNo || slotNo != data.slotNo {
t.Errorf("entry %d: got (%d, %d), want (%d, %d)",
data.id, pageNo, slotNo, data.pageNo, data.slotNo)
}
}
}
// TestTableIndexInsertUpdate 测试插入时的更新操作
func TestTableIndexInsertUpdate(t *testing.T) {
idx := NewGroupIndex("test")
id := int64(100)
// 首次插入
idx.Insert(id, 1, 5)
pageNo, slotNo, found := idx.Get(id)
if !found || pageNo != 1 || slotNo != 5 {
t.Errorf("first insert: got (%d, %d, %t), want (1, 5, true)", pageNo, slotNo, found)
}
// 更新同一ID的位置
idx.Insert(id, 2, 10)
pageNo, slotNo, found = idx.Get(id)
if !found || pageNo != 2 || slotNo != 10 {
t.Errorf("after update: got (%d, %d, %t), want (2, 10, true)", pageNo, slotNo, found)
}
// 验证索引大小没有增加
if idx.Count() != 1 {
t.Errorf("count after update = %d, want 1", idx.Count())
}
}
// TestTableIndexGet 测试索引查找操作
func TestTableIndexGet(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据
idx.Insert(100, 1, 5)
idx.Insert(200, 2, 10)
idx.Insert(300, 3, 15)
// 测试存在的条目
pageNo, slotNo, found := idx.Get(200)
if !found || pageNo != 2 || slotNo != 10 {
t.Errorf("Get(200) = (%d, %d, %t), want (2, 10, true)", pageNo, slotNo, found)
}
// 测试不存在的条目
pageNo, slotNo, found = idx.Get(999)
if found || pageNo != 0 || slotNo != 0 {
t.Errorf("Get(999) = (%d, %d, %t), want (0, 0, false)", pageNo, slotNo, found)
}
}
// TestTableIndexDelete 测试索引删除操作
func TestTableIndexDelete(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据
testIDs := []int64{100, 200, 300, 400}
for i, id := range testIDs {
idx.Insert(id, uint16(i+1), uint16(i*5))
}
initialCount := idx.Count()
// 删除存在的条目
deleted := idx.Delete(200)
if !deleted {
t.Error("Delete(200) should return true")
}
// 验证条目被删除
_, _, found := idx.Get(200)
if found {
t.Error("deleted entry should not be found")
}
// 验证索引大小减少
if idx.Count() != initialCount-1 {
t.Errorf("count after delete = %d, want %d", idx.Count(), initialCount-1)
}
// 删除不存在的条目
deleted = idx.Delete(999)
if deleted {
t.Error("Delete(999) should return false")
}
// 验证索引大小没有变化
if idx.Count() != initialCount-1 {
t.Errorf("count after deleting non-existent = %d, want %d", idx.Count(), initialCount-1)
}
// 验证其他条目仍然存在
for _, id := range []int64{100, 300, 400} {
_, _, found := idx.Get(id)
if !found {
t.Errorf("entry %d should still exist", id)
}
}
}
// TestTableIndexRange 测试范围查询
func TestTableIndexRange(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据(乱序插入)
testData := []struct {
id int64
pageNo uint16
slotNo uint16
}{
{300, 3, 15},
{100, 1, 5},
{500, 5, 25},
{200, 2, 10},
{400, 4, 20},
}
for _, data := range testData {
idx.Insert(data.id, data.pageNo, data.slotNo)
}
// 测试范围查询 [200, 400]
var results []int64
idx.Range(200, 400, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true // 继续遍历
})
// 验证结果按ID升序排列
expectedIDs := []int64{200, 300, 400}
if len(results) != len(expectedIDs) {
t.Errorf("range query returned %d results, want %d", len(results), len(expectedIDs))
t.Errorf("actual results: %v", results)
}
for i, expected := range expectedIDs {
if i >= len(results) || results[i] != expected {
t.Errorf("result[%d] = %d, want %d", i, results[i], expected)
}
}
// 测试提前终止的范围查询
results = nil
count := 0
idx.Range(100, 500, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
count++
return count < 2 // 只处理前2个
})
if len(results) != 2 {
t.Errorf("early termination returned %d results, want 2", len(results))
}
if results[0] != 100 || results[1] != 200 {
t.Errorf("early termination results = %v, want [100, 200]", results)
}
}
// TestTableIndexRangeEmpty 测试空范围查询
func TestTableIndexRangeEmpty(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些数据
idx.Insert(100, 1, 5)
idx.Insert(300, 3, 15)
// 查询不存在的范围
var results []int64
idx.Range(150, 250, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true
})
if len(results) != 0 {
t.Errorf("empty range query returned %d results, want 0", len(results))
}
}
// TestTableIndexClear 测试清空索引
func TestTableIndexClear(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些数据
for i := int64(1); i <= 10; i++ {
idx.Insert(i, uint16(i), uint16(i*2))
}
// 验证数据存在
if idx.Count() != 10 {
t.Errorf("count before clear = %d, want 10", idx.Count())
}
// 清空索引
idx.Clear()
// 验证索引被清空
if idx.Count() != 0 {
t.Errorf("count after clear = %d, want 0", idx.Count())
}
// 验证数据不再存在
_, _, found := idx.Get(5)
if found {
t.Error("data should not exist after clear")
}
// 验证可以重新插入数据
idx.Insert(100, 1, 5)
if idx.Count() != 1 {
t.Errorf("count after re-insert = %d, want 1", idx.Count())
}
}
// TestNewIndexManager 测试索引管理器的创建
func TestNewIndexManager(t *testing.T) {
im := NewIndexManager()
if im == nil {
t.Fatal("NewIndexManager returned nil")
}
if im.indexes == nil {
t.Error("indexes map not initialized")
}
// 验证初始状态
stats := im.GetStats()
if len(stats) != 0 {
t.Errorf("initial stats length = %d, want 0", len(stats))
}
}
// TestIndexManagerGetOrCreateIndex 测试获取或创建索引
func TestIndexManagerGetOrCreateIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 第一次调用应该创建新索引
idx1 := im.GetOrCreateIndex(groupName)
if idx1 == nil {
t.Fatal("GetOrCreateIndex returned nil")
}
if idx1.name != groupName {
t.Errorf("index name = %s, want %s", idx1.name, groupName)
}
// 第二次调用应该返回相同的索引
idx2 := im.GetOrCreateIndex(groupName)
if idx2 != idx1 {
t.Error("GetOrCreateIndex should return the same index instance")
}
// 验证统计信息
stats := im.GetStats()
if len(stats) != 1 {
t.Errorf("stats length = %d, want 1", len(stats))
}
if stats[groupName] != 0 {
t.Errorf("stats[%s] = %d, want 0", groupName, stats[groupName])
}
}
// TestIndexManagerGetIndex 测试获取索引
func TestIndexManagerGetIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 获取不存在的索引
idx, exists := im.GetIndex(groupName)
if exists || idx != nil {
t.Error("GetIndex should return (nil, false) for non-existent index")
}
// 创建索引
createdIdx := im.GetOrCreateIndex(groupName)
// 获取存在的索引
idx, exists = im.GetIndex(groupName)
if !exists || idx != createdIdx {
t.Error("GetIndex should return the created index")
}
}
// TestIndexManagerDropIndex 测试删除索引
func TestIndexManagerDropIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 创建索引并添加数据
idx := im.GetOrCreateIndex(groupName)
idx.Insert(100, 1, 5)
idx.Insert(200, 2, 10)
// 验证索引存在且有数据
if idx.Count() != 2 {
t.Errorf("index count = %d, want 2", idx.Count())
}
stats := im.GetStats()
if stats[groupName] != 2 {
t.Errorf("stats[%s] = %d, want 2", groupName, stats[groupName])
}
// 删除索引
im.DropIndex(groupName)
// 验证索引被删除
_, exists := im.GetIndex(groupName)
if exists {
t.Error("index should not exist after drop")
}
// 验证统计信息更新
stats = im.GetStats()
if len(stats) != 0 {
t.Errorf("stats length after drop = %d, want 0", len(stats))
}
// 删除不存在的索引应该是安全的
im.DropIndex("non_existent")
}
// TestIndexManagerGetStats 测试获取统计信息
func TestIndexManagerGetStats(t *testing.T) {
im := NewIndexManager()
// 创建多个索引并添加不同数量的数据
groups := map[string]int{
"group1": 5,
"group2": 10,
"group3": 3,
}
for groupName, count := range groups {
idx := im.GetOrCreateIndex(groupName)
for i := 0; i < count; i++ {
idx.Insert(int64(i), uint16(i), uint16(i*2))
}
}
// 获取统计信息
stats := im.GetStats()
// 验证统计信息
if len(stats) != len(groups) {
t.Errorf("stats length = %d, want %d", len(stats), len(groups))
}
for groupName, expectedCount := range groups {
if stats[groupName] != expectedCount {
t.Errorf("stats[%s] = %d, want %d", groupName, stats[groupName], expectedCount)
}
}
}
// MockPipelineDB 用于测试RebuildIndex的模拟数据库
type MockPipelineDB struct {
pages map[uint16]*MockPage
}
type MockPage struct {
data []byte
nextPageNo uint16
slots []uint16
}
func (p *MockPage) slotArray() []uint16 {
return p.slots
}
func (p *MockPage) nextPage() uint16 {
return p.nextPageNo
}
func NewMockPipelineDB() *MockPipelineDB {
return &MockPipelineDB{
pages: make(map[uint16]*MockPage),
}
}
func (db *MockPipelineDB) readPage(pageNo uint16) (*MockPage, error) {
if page, exists := db.pages[pageNo]; exists {
return page, nil
}
return nil, errors.New("page not found")
}
func (db *MockPipelineDB) addPage(pageNo uint16, nextPage uint16, records []struct {
id int64
offset uint16
}) {
page := &MockPage{
data: make([]byte, PageSize),
nextPageNo: nextPage,
slots: make([]uint16, len(records)),
}
for i, record := range records {
// 将ID写入指定偏移位置
binary.LittleEndian.PutUint64(page.data[record.offset:], uint64(record.id))
page.slots[i] = record.offset
}
db.pages[pageNo] = page
}
// TestRebuildIndex 测试索引重建
func TestRebuildIndex(t *testing.T) {
// 创建一个临时的PipelineDB实例用于测试
tmpFile, err := os.CreateTemp("", "rebuild_test_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 先添加一些测试数据
testData := [][]byte{
[]byte("test data 1"),
[]byte("test data 2"),
[]byte("test data 3"),
}
var recordIDs []int64
for _, data := range testData {
recordID, err := pdb.AcceptData("test_group", data, `{"test": true}`)
if err != nil {
t.Fatalf("AcceptData failed: %v", err)
}
recordIDs = append(recordIDs, recordID)
}
// 重建索引
idx, err := pdb.RebuildIndex("test_group", 1)
if err != nil {
t.Errorf("RebuildIndex returned error: %v", err)
}
if idx == nil {
t.Fatal("RebuildIndex returned nil index")
}
// 验证索引包含所有记录
if idx.Count() != len(recordIDs) {
t.Errorf("rebuilt index count = %d, want %d", idx.Count(), len(recordIDs))
}
// 验证每个记录都能在索引中找到
for _, recordID := range recordIDs {
_, _, found := idx.Get(recordID)
if !found {
t.Errorf("record %d not found in rebuilt index", recordID)
}
}
}
// TestIndexLargeDataset 测试大数据集的索引操作
func TestIndexLargeDataset(t *testing.T) {
idx := NewGroupIndex("large_test")
const numRecords = 10000
// 插入大量记录
for i := int64(1); i <= numRecords; i++ {
idx.Insert(i, uint16(i%1000), uint16(i%100))
}
// 验证记录数量
if idx.Count() != numRecords {
t.Errorf("count = %d, want %d", idx.Count(), numRecords)
}
// 随机验证一些记录
testIDs := []int64{1, 100, 1000, 5000, 9999, 10000}
for _, id := range testIDs {
_, _, found := idx.Get(id)
if !found {
t.Errorf("record %d not found", id)
}
}
// 测试范围查询
var rangeResults []int64
idx.Range(5000, 5010, func(id int64, pageNo, slotNo uint16) bool {
rangeResults = append(rangeResults, id)
return true
})
expectedRange := []int64{5000, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010}
if len(rangeResults) != len(expectedRange) {
t.Errorf("range query returned %d results, want %d", len(rangeResults), len(expectedRange))
}
// 删除一些记录
for i := int64(1000); i <= 2000; i++ {
idx.Delete(i)
}
// 验证删除后的数量
expectedCount := numRecords - 1001 // 删除了1001个记录1000到2000包含两端
if idx.Count() != expectedCount {
t.Errorf("count after deletion = %d, want %d", idx.Count(), expectedCount)
}
}
// TestIndexOrdering 测试索引的有序性
func TestIndexOrdering(t *testing.T) {
idx := NewGroupIndex("order_test")
// 乱序插入数据
ids := []int64{500, 100, 300, 200, 400, 600, 150, 350}
for _, id := range ids {
idx.Insert(id, uint16(id/100), uint16(id%100))
}
// 范围查询应该返回有序结果
var results []int64
idx.Range(0, 1000, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true
})
// 验证结果是有序的
if !sort.SliceIsSorted(results, func(i, j int) bool {
return results[i] < results[j]
}) {
t.Errorf("range query results are not sorted: %v", results)
}
// 验证所有ID都存在
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
if len(results) != len(ids) {
t.Errorf("result count = %d, want %d", len(results), len(ids))
}
for i, expected := range ids {
if i >= len(results) || results[i] != expected {
t.Errorf("result[%d] = %d, want %d", i, results[i], expected)
}
}
}
// BenchmarkTableIndexInsert 性能测试:索引插入
func BenchmarkTableIndexInsert(b *testing.B) {
idx := NewGroupIndex("bench_test")
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
}
// BenchmarkTableIndexGet 性能测试:索引查找
func BenchmarkTableIndexGet(b *testing.B) {
idx := NewGroupIndex("bench_test")
// 预填充数据
for i := 0; i < 100000; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Get(int64(i % 100000))
}
}
// BenchmarkTableIndexRange 性能测试:范围查询
func BenchmarkTableIndexRange(b *testing.B) {
idx := NewGroupIndex("bench_test")
// 预填充数据
for i := 0; i < 100000; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := int64(i % 90000)
end := start + 100
idx.Range(start, end, func(id int64, pageNo, slotNo uint16) bool {
return true
})
}
}
// BenchmarkIndexManagerOperations 性能测试:索引管理器操作
func BenchmarkIndexManagerOperations(b *testing.B) {
im := NewIndexManager()
b.ResetTimer()
for i := 0; i < b.N; i++ {
groupName := "group_" + string(rune('A'+i%26))
idx := im.GetOrCreateIndex(groupName)
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
if i%1000 == 0 {
im.GetStats()
}
}
}