Files
pipelinedb/index_test.go
2025-09-30 15:05:56 +08:00

712 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pipelinedb
import (
"encoding/binary"
"errors"
"os"
"sort"
"testing"
)
// TestIndexEntry 测试索引条目的基本功能
func TestIndexEntry(t *testing.T) {
entry1 := IndexEntry{ID: 100, PageNo: 5, SlotNo: 10}
entry2 := IndexEntry{ID: 200, PageNo: 8, SlotNo: 15}
entry3 := IndexEntry{ID: 100, PageNo: 6, SlotNo: 12} // 相同ID不同位置
// 测试Less方法
if !entry1.Less(entry2) {
t.Error("entry1 should be less than entry2")
}
if entry2.Less(entry1) {
t.Error("entry2 should not be less than entry1")
}
if entry1.Less(entry3) || entry3.Less(entry1) {
t.Error("entries with same ID should be equal")
}
}
// TestNewGroupIndex 测试组索引的创建
func TestNewGroupIndex(t *testing.T) {
groupName := "test_group"
idx := NewGroupIndex(groupName)
if idx == nil {
t.Fatal("NewGroupIndex returned nil")
}
if idx.name != groupName {
t.Errorf("index name = %s, want %s", idx.name, groupName)
}
if idx.Count() != 0 {
t.Errorf("initial count = %d, want 0", idx.Count())
}
if idx.tree == nil {
t.Error("B+Tree not initialized")
}
}
// TestTableIndexInsert 测试索引插入操作
func TestTableIndexInsert(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些条目
testData := []struct {
id int64
pageNo uint16
slotNo uint16
}{
{100, 1, 5},
{200, 2, 10},
{150, 1, 8},
{300, 3, 2},
}
for _, data := range testData {
idx.Insert(data.id, data.pageNo, data.slotNo)
}
// 验证索引大小
if idx.Count() != len(testData) {
t.Errorf("count after inserts = %d, want %d", idx.Count(), len(testData))
}
// 验证每个条目都能找到
for _, data := range testData {
pageNo, slotNo, found := idx.Get(data.id)
if !found {
t.Errorf("entry with ID %d not found", data.id)
continue
}
if pageNo != data.pageNo || slotNo != data.slotNo {
t.Errorf("entry %d: got (%d, %d), want (%d, %d)",
data.id, pageNo, slotNo, data.pageNo, data.slotNo)
}
}
}
// TestTableIndexInsertUpdate 测试插入时的更新操作
func TestTableIndexInsertUpdate(t *testing.T) {
idx := NewGroupIndex("test")
id := int64(100)
// 首次插入
idx.Insert(id, 1, 5)
pageNo, slotNo, found := idx.Get(id)
if !found || pageNo != 1 || slotNo != 5 {
t.Errorf("first insert: got (%d, %d, %t), want (1, 5, true)", pageNo, slotNo, found)
}
// 更新同一ID的位置
idx.Insert(id, 2, 10)
pageNo, slotNo, found = idx.Get(id)
if !found || pageNo != 2 || slotNo != 10 {
t.Errorf("after update: got (%d, %d, %t), want (2, 10, true)", pageNo, slotNo, found)
}
// 验证索引大小没有增加
if idx.Count() != 1 {
t.Errorf("count after update = %d, want 1", idx.Count())
}
}
// TestTableIndexGet 测试索引查找操作
func TestTableIndexGet(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据
idx.Insert(100, 1, 5)
idx.Insert(200, 2, 10)
idx.Insert(300, 3, 15)
// 测试存在的条目
pageNo, slotNo, found := idx.Get(200)
if !found || pageNo != 2 || slotNo != 10 {
t.Errorf("Get(200) = (%d, %d, %t), want (2, 10, true)", pageNo, slotNo, found)
}
// 测试不存在的条目
pageNo, slotNo, found = idx.Get(999)
if found || pageNo != 0 || slotNo != 0 {
t.Errorf("Get(999) = (%d, %d, %t), want (0, 0, false)", pageNo, slotNo, found)
}
}
// TestTableIndexDelete 测试索引删除操作
func TestTableIndexDelete(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据
testIDs := []int64{100, 200, 300, 400}
for i, id := range testIDs {
idx.Insert(id, uint16(i+1), uint16(i*5))
}
initialCount := idx.Count()
// 删除存在的条目
deleted := idx.Delete(200)
if !deleted {
t.Error("Delete(200) should return true")
}
// 验证条目被删除
_, _, found := idx.Get(200)
if found {
t.Error("deleted entry should not be found")
}
// 验证索引大小减少
if idx.Count() != initialCount-1 {
t.Errorf("count after delete = %d, want %d", idx.Count(), initialCount-1)
}
// 删除不存在的条目
deleted = idx.Delete(999)
if deleted {
t.Error("Delete(999) should return false")
}
// 验证索引大小没有变化
if idx.Count() != initialCount-1 {
t.Errorf("count after deleting non-existent = %d, want %d", idx.Count(), initialCount-1)
}
// 验证其他条目仍然存在
for _, id := range []int64{100, 300, 400} {
_, _, found := idx.Get(id)
if !found {
t.Errorf("entry %d should still exist", id)
}
}
}
// TestTableIndexRange 测试范围查询
func TestTableIndexRange(t *testing.T) {
idx := NewGroupIndex("test")
// 插入测试数据(乱序插入)
testData := []struct {
id int64
pageNo uint16
slotNo uint16
}{
{300, 3, 15},
{100, 1, 5},
{500, 5, 25},
{200, 2, 10},
{400, 4, 20},
}
for _, data := range testData {
idx.Insert(data.id, data.pageNo, data.slotNo)
}
// 测试范围查询 [200, 400]
var results []int64
idx.Range(200, 400, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true // 继续遍历
})
// 验证结果按ID升序排列
expectedIDs := []int64{200, 300, 400}
if len(results) != len(expectedIDs) {
t.Errorf("range query returned %d results, want %d", len(results), len(expectedIDs))
t.Errorf("actual results: %v", results)
}
for i, expected := range expectedIDs {
if i >= len(results) || results[i] != expected {
t.Errorf("result[%d] = %d, want %d", i, results[i], expected)
}
}
// 测试提前终止的范围查询
results = nil
count := 0
idx.Range(100, 500, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
count++
return count < 2 // 只处理前2个
})
if len(results) != 2 {
t.Errorf("early termination returned %d results, want 2", len(results))
}
if results[0] != 100 || results[1] != 200 {
t.Errorf("early termination results = %v, want [100, 200]", results)
}
}
// TestTableIndexRangeEmpty 测试空范围查询
func TestTableIndexRangeEmpty(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些数据
idx.Insert(100, 1, 5)
idx.Insert(300, 3, 15)
// 查询不存在的范围
var results []int64
idx.Range(150, 250, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true
})
if len(results) != 0 {
t.Errorf("empty range query returned %d results, want 0", len(results))
}
}
// TestTableIndexClear 测试清空索引
func TestTableIndexClear(t *testing.T) {
idx := NewGroupIndex("test")
// 插入一些数据
for i := int64(1); i <= 10; i++ {
idx.Insert(i, uint16(i), uint16(i*2))
}
// 验证数据存在
if idx.Count() != 10 {
t.Errorf("count before clear = %d, want 10", idx.Count())
}
// 清空索引
idx.Clear()
// 验证索引被清空
if idx.Count() != 0 {
t.Errorf("count after clear = %d, want 0", idx.Count())
}
// 验证数据不再存在
_, _, found := idx.Get(5)
if found {
t.Error("data should not exist after clear")
}
// 验证可以重新插入数据
idx.Insert(100, 1, 5)
if idx.Count() != 1 {
t.Errorf("count after re-insert = %d, want 1", idx.Count())
}
}
// TestNewIndexManager 测试索引管理器的创建
func TestNewIndexManager(t *testing.T) {
im := NewIndexManager()
if im == nil {
t.Fatal("NewIndexManager returned nil")
}
if im.indexes == nil {
t.Error("indexes map not initialized")
}
// 验证初始状态
stats := im.GetStats()
if len(stats) != 0 {
t.Errorf("initial stats length = %d, want 0", len(stats))
}
}
// TestIndexManagerGetOrCreateIndex 测试获取或创建索引
func TestIndexManagerGetOrCreateIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 第一次调用应该创建新索引
idx1 := im.GetOrCreateIndex(groupName)
if idx1 == nil {
t.Fatal("GetOrCreateIndex returned nil")
}
if idx1.name != groupName {
t.Errorf("index name = %s, want %s", idx1.name, groupName)
}
// 第二次调用应该返回相同的索引
idx2 := im.GetOrCreateIndex(groupName)
if idx2 != idx1 {
t.Error("GetOrCreateIndex should return the same index instance")
}
// 验证统计信息
stats := im.GetStats()
if len(stats) != 1 {
t.Errorf("stats length = %d, want 1", len(stats))
}
if stats[groupName] != 0 {
t.Errorf("stats[%s] = %d, want 0", groupName, stats[groupName])
}
}
// TestIndexManagerGetIndex 测试获取索引
func TestIndexManagerGetIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 获取不存在的索引
idx, exists := im.GetIndex(groupName)
if exists || idx != nil {
t.Error("GetIndex should return (nil, false) for non-existent index")
}
// 创建索引
createdIdx := im.GetOrCreateIndex(groupName)
// 获取存在的索引
idx, exists = im.GetIndex(groupName)
if !exists || idx != createdIdx {
t.Error("GetIndex should return the created index")
}
}
// TestIndexManagerDropIndex 测试删除索引
func TestIndexManagerDropIndex(t *testing.T) {
im := NewIndexManager()
groupName := "test_group"
// 创建索引并添加数据
idx := im.GetOrCreateIndex(groupName)
idx.Insert(100, 1, 5)
idx.Insert(200, 2, 10)
// 验证索引存在且有数据
if idx.Count() != 2 {
t.Errorf("index count = %d, want 2", idx.Count())
}
stats := im.GetStats()
if stats[groupName] != 2 {
t.Errorf("stats[%s] = %d, want 2", groupName, stats[groupName])
}
// 删除索引
im.DropIndex(groupName)
// 验证索引被删除
_, exists := im.GetIndex(groupName)
if exists {
t.Error("index should not exist after drop")
}
// 验证统计信息更新
stats = im.GetStats()
if len(stats) != 0 {
t.Errorf("stats length after drop = %d, want 0", len(stats))
}
// 删除不存在的索引应该是安全的
im.DropIndex("non_existent")
}
// TestIndexManagerGetStats 测试获取统计信息
func TestIndexManagerGetStats(t *testing.T) {
im := NewIndexManager()
// 创建多个索引并添加不同数量的数据
groups := map[string]int{
"group1": 5,
"group2": 10,
"group3": 3,
}
for groupName, count := range groups {
idx := im.GetOrCreateIndex(groupName)
for i := 0; i < count; i++ {
idx.Insert(int64(i), uint16(i), uint16(i*2))
}
}
// 获取统计信息
stats := im.GetStats()
// 验证统计信息
if len(stats) != len(groups) {
t.Errorf("stats length = %d, want %d", len(stats), len(groups))
}
for groupName, expectedCount := range groups {
if stats[groupName] != expectedCount {
t.Errorf("stats[%s] = %d, want %d", groupName, stats[groupName], expectedCount)
}
}
}
// MockPipelineDB 用于测试RebuildIndex的模拟数据库
type MockPipelineDB struct {
pages map[uint16]*MockPage
}
type MockPage struct {
data []byte
nextPageNo uint16
slots []uint16
}
func (p *MockPage) slotArray() []uint16 {
return p.slots
}
func (p *MockPage) nextPage() uint16 {
return p.nextPageNo
}
func NewMockPipelineDB() *MockPipelineDB {
return &MockPipelineDB{
pages: make(map[uint16]*MockPage),
}
}
func (db *MockPipelineDB) readPage(pageNo uint16) (*MockPage, error) {
if page, exists := db.pages[pageNo]; exists {
return page, nil
}
return nil, errors.New("page not found")
}
func (db *MockPipelineDB) addPage(pageNo uint16, nextPage uint16, records []struct {
id int64
offset uint16
}) {
page := &MockPage{
data: make([]byte, PageSize),
nextPageNo: nextPage,
slots: make([]uint16, len(records)),
}
for i, record := range records {
// 将ID写入指定偏移位置
binary.LittleEndian.PutUint64(page.data[record.offset:], uint64(record.id))
page.slots[i] = record.offset
}
db.pages[pageNo] = page
}
// TestRebuildIndex 测试索引重建
func TestRebuildIndex(t *testing.T) {
// 创建一个临时的PipelineDB实例用于测试
tmpFile, err := os.CreateTemp("", "rebuild_test_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 先添加一些测试数据
testData := [][]byte{
[]byte("test data 1"),
[]byte("test data 2"),
[]byte("test data 3"),
}
var recordIDs []int64
for _, data := range testData {
recordID, err := pdb.AcceptData("test_group", data, `{"test": true}`)
if err != nil {
t.Fatalf("AcceptData failed: %v", err)
}
recordIDs = append(recordIDs, recordID)
}
// 重建索引
idx, err := pdb.RebuildIndex("test_group", 1)
if err != nil {
t.Errorf("RebuildIndex returned error: %v", err)
}
if idx == nil {
t.Fatal("RebuildIndex returned nil index")
}
// 验证索引包含所有记录
if idx.Count() != len(recordIDs) {
t.Errorf("rebuilt index count = %d, want %d", idx.Count(), len(recordIDs))
}
// 验证每个记录都能在索引中找到
for _, recordID := range recordIDs {
_, _, found := idx.Get(recordID)
if !found {
t.Errorf("record %d not found in rebuilt index", recordID)
}
}
}
// TestIndexLargeDataset 测试大数据集的索引操作
func TestIndexLargeDataset(t *testing.T) {
idx := NewGroupIndex("large_test")
const numRecords = 10000
// 插入大量记录
for i := int64(1); i <= numRecords; i++ {
idx.Insert(i, uint16(i%1000), uint16(i%100))
}
// 验证记录数量
if idx.Count() != numRecords {
t.Errorf("count = %d, want %d", idx.Count(), numRecords)
}
// 随机验证一些记录
testIDs := []int64{1, 100, 1000, 5000, 9999, 10000}
for _, id := range testIDs {
_, _, found := idx.Get(id)
if !found {
t.Errorf("record %d not found", id)
}
}
// 测试范围查询
var rangeResults []int64
idx.Range(5000, 5010, func(id int64, pageNo, slotNo uint16) bool {
rangeResults = append(rangeResults, id)
return true
})
expectedRange := []int64{5000, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010}
if len(rangeResults) != len(expectedRange) {
t.Errorf("range query returned %d results, want %d", len(rangeResults), len(expectedRange))
}
// 删除一些记录
for i := int64(1000); i <= 2000; i++ {
idx.Delete(i)
}
// 验证删除后的数量
expectedCount := numRecords - 1001 // 删除了1001个记录1000到2000包含两端
if idx.Count() != expectedCount {
t.Errorf("count after deletion = %d, want %d", idx.Count(), expectedCount)
}
}
// TestIndexOrdering 测试索引的有序性
func TestIndexOrdering(t *testing.T) {
idx := NewGroupIndex("order_test")
// 乱序插入数据
ids := []int64{500, 100, 300, 200, 400, 600, 150, 350}
for _, id := range ids {
idx.Insert(id, uint16(id/100), uint16(id%100))
}
// 范围查询应该返回有序结果
var results []int64
idx.Range(0, 1000, func(id int64, pageNo, slotNo uint16) bool {
results = append(results, id)
return true
})
// 验证结果是有序的
if !sort.SliceIsSorted(results, func(i, j int) bool {
return results[i] < results[j]
}) {
t.Errorf("range query results are not sorted: %v", results)
}
// 验证所有ID都存在
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
if len(results) != len(ids) {
t.Errorf("result count = %d, want %d", len(results), len(ids))
}
for i, expected := range ids {
if i >= len(results) || results[i] != expected {
t.Errorf("result[%d] = %d, want %d", i, results[i], expected)
}
}
}
// BenchmarkTableIndexInsert 性能测试:索引插入
func BenchmarkTableIndexInsert(b *testing.B) {
idx := NewGroupIndex("bench_test")
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
}
// BenchmarkTableIndexGet 性能测试:索引查找
func BenchmarkTableIndexGet(b *testing.B) {
idx := NewGroupIndex("bench_test")
// 预填充数据
for i := 0; i < 100000; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx.Get(int64(i % 100000))
}
}
// BenchmarkTableIndexRange 性能测试:范围查询
func BenchmarkTableIndexRange(b *testing.B) {
idx := NewGroupIndex("bench_test")
// 预填充数据
for i := 0; i < 100000; i++ {
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := int64(i % 90000)
end := start + 100
idx.Range(start, end, func(id int64, pageNo, slotNo uint16) bool {
return true
})
}
}
// BenchmarkIndexManagerOperations 性能测试:索引管理器操作
func BenchmarkIndexManagerOperations(b *testing.B) {
im := NewIndexManager()
b.ResetTimer()
for i := 0; i < b.N; i++ {
groupName := "group_" + string(rune('A'+i%26))
idx := im.GetOrCreateIndex(groupName)
idx.Insert(int64(i), uint16(i%1000), uint16(i%100))
if i%1000 == 0 {
im.GetStats()
}
}
}