Files
pipelinedb/storage_test.go
2025-09-30 15:05:56 +08:00

710 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pipelinedb
import (
"bytes"
"errors"
"os"
"sync"
"testing"
)
// TestPageBasicOperations 测试页面基本操作
func TestPageBasicOperations(t *testing.T) {
p := make(Page, PageSize)
// 测试槽数量操作
p.setNumSlots(5)
if p.numSlots() != 5 {
t.Errorf("numSlots() = %d, want 5", p.numSlots())
}
// 测试空闲偏移操作
p.setFreeOff(1000)
if p.freeOff() != 1000 {
t.Errorf("freeOff() = %d, want 1000", p.freeOff())
}
// 测试下一页操作
p.setNextPage(42)
if p.nextPage() != 42 {
t.Errorf("nextPage() = %d, want 42", p.nextPage())
}
// 测试槽位操作
p.setSlot(0, 100)
p.setSlot(1, 200)
p.setSlot(2, 300)
slots := p.slotArray()
expected := []uint16{100, 200, 300, 0, 0}
for i, exp := range expected {
if slots[i] != exp {
t.Errorf("slot[%d] = %d, want %d", i, slots[i], exp)
}
}
}
// MockFileForStorage 用于测试存储的模拟文件
type MockFileForStorage struct {
data []byte
offset int64
mu sync.Mutex
}
func NewMockFileForStorage(size int) *MockFileForStorage {
return &MockFileForStorage{
data: make([]byte, size),
}
}
func (f *MockFileForStorage) ReadAt(p []byte, off int64) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if off >= int64(len(f.data)) {
return 0, errors.New("EOF")
}
n = copy(p, f.data[off:])
return n, nil
}
func (f *MockFileForStorage) WriteAt(p []byte, off int64) (n int, err error) {
f.mu.Lock()
defer f.mu.Unlock()
// 扩展数据如果需要
needed := int(off) + len(p)
if needed > len(f.data) {
newData := make([]byte, needed)
copy(newData, f.data)
f.data = newData
}
n = copy(f.data[off:], p)
return n, nil
}
func (f *MockFileForStorage) Close() error {
return nil
}
func (f *MockFileForStorage) Sync() error {
return nil
}
// TestPipelineDBReadWritePage 测试页面读写操作
func TestPipelineDBReadWritePage(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_page_rw_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配一个新页面
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
// 创建测试页面数据
testPage := make(Page, PageSize)
testPage.setNumSlots(2)
testPage.setFreeOff(PageSize - 100)
testPage.setNextPage(123)
// 写入页面
err = pdb.writePage(pageNo, testPage)
if err != nil {
t.Fatalf("writePage failed: %v", err)
}
// 读取页面
readPage, err := pdb.readPage(pageNo)
if err != nil {
t.Fatalf("readPage failed: %v", err)
}
// 验证页面内容
if readPage.numSlots() != 2 {
t.Errorf("numSlots = %d, want 2", readPage.numSlots())
}
if readPage.freeOff() != PageSize-100 {
t.Errorf("freeOff = %d, want %d", readPage.freeOff(), PageSize-100)
}
if readPage.nextPage() != 123 {
t.Errorf("nextPage = %d, want 123", readPage.nextPage())
}
}
// TestPipelineDBAllocFreePage 测试页面分配和释放
func TestPipelineDBAllocFreePage(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_alloc_free_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配多个页面
var allocatedPages []uint16
for i := 0; i < 5; i++ {
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage[%d] failed: %v", i, err)
}
allocatedPages = append(allocatedPages, pageNo)
}
// 验证页面号是连续的(新数据库)
for i, pageNo := range allocatedPages {
expected := uint16(i + 2) // 页面0是头部页面1是计数器
if pageNo != expected {
t.Errorf("allocated page[%d] = %d, want %d", i, pageNo, expected)
}
}
// 释放一些页面
pdb.freePage(allocatedPages[1])
pdb.freePage(allocatedPages[3])
// 再次分配页面,应该重用释放的页面
pageNo1, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage after free failed: %v", err)
}
pageNo2, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage after free failed: %v", err)
}
// 验证重用了释放的页面LIFO顺序
if pageNo1 != allocatedPages[3] || pageNo2 != allocatedPages[1] {
t.Errorf("reused pages = [%d, %d], want [%d, %d]",
pageNo1, pageNo2, allocatedPages[3], allocatedPages[1])
}
}
// TestInsertToPage 测试页面内记录插入
func TestInsertToPage(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_insert_page_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配一个页面
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
// 插入多条记录
testData := []struct {
id int64
data []byte
}{
{1, []byte("record1")},
{2, []byte("record2")},
{3, []byte("record3")},
}
for i, test := range testData {
slotNo, err := pdb.insertToPage(pageNo, test.id, test.data)
if err != nil {
t.Fatalf("insertToPage(%d) failed: %v", test.id, err)
}
// 槽号从0开始第一条记录应该在槽0
expectedSlot := uint16(i)
if slotNo != expectedSlot {
t.Errorf("insertToPage(%d) returned slot %d, want %d", test.id, slotNo, expectedSlot)
}
}
// 验证页面状态
page, err := pdb.readPage(pageNo)
if err != nil {
t.Fatalf("readPage failed: %v", err)
}
if page.numSlots() != uint16(len(testData)) {
t.Errorf("numSlots = %d, want %d", page.numSlots(), len(testData))
}
}
// TestInsertToPageFull 测试页面满时的处理
func TestInsertToPageFull(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_page_full_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配一个页面
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
// 插入大量记录直到页面满
mediumData := make([]byte, 400) // 400字节数据
for i := 0; i < len(mediumData); i++ {
mediumData[i] = byte(i % 256)
}
var insertCount int
for i := int64(1); i <= 100; i++ {
_, err := pdb.insertToPage(pageNo, i, mediumData)
if err != nil {
// 页面满了,这是预期的
t.Logf("Page full after inserting %d records", insertCount)
// 验证确实是因为页面满而失败
if err.Error() != "page full" {
t.Errorf("expected 'page full' error, got: %v", err)
}
// 测试通过,页面确实满了
return
}
insertCount++
}
// 如果循环结束还没有遇到页面满的情况,说明测试有问题
t.Errorf("should have encountered page full condition, but inserted %d records", insertCount)
}
// TestReadRecord 测试记录读取
func TestReadRecord(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_read_record_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配页面并插入记录
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
testID := int64(42)
testData := []byte("test record data")
slotNo, err := pdb.insertToPage(pageNo, testID, testData)
if err != nil {
t.Fatalf("insertToPage failed: %v", err)
}
// 读取记录
readData, err := pdb.readRecord(pageNo, slotNo, testID)
if err != nil {
t.Fatalf("readRecord failed: %v", err)
}
// 验证数据
if string(readData) != string(testData) {
t.Errorf("readRecord data = %q, want %q", string(readData), string(testData))
}
// 测试读取不存在的记录
_, err = pdb.readRecord(pageNo, slotNo, testID+1) // 错误的ID
if err == nil {
t.Error("readRecord should fail with wrong ID")
}
}
// TestUpdateInPlace 测试原地更新
func TestUpdateInPlace(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_update_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配页面并插入记录
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
testID := int64(100)
originalData := []byte("original data")
slotNo, err := pdb.insertToPage(pageNo, testID, originalData)
if err != nil {
t.Fatalf("insertToPage failed: %v", err)
}
// 测试相同大小的更新(应该成功)
newData := []byte("updated data") // 相同长度
err = pdb.updateInPlace(pageNo, slotNo, testID, newData)
if err != nil {
t.Fatalf("updateInPlace with same size failed: %v", err)
}
// 验证更新后的数据
readData, err := pdb.readRecord(pageNo, slotNo, testID)
if err != nil {
t.Fatalf("readRecord after update failed: %v", err)
}
if string(readData) != string(newData) {
t.Errorf("updated data = %q, want %q", string(readData), string(newData))
}
// 测试更大数据的更新(应该失败)
largerData := []byte("this is much larger data that should not fit")
err = pdb.updateInPlace(pageNo, slotNo, testID, largerData)
if err == nil {
t.Error("updateInPlace with larger data should fail")
}
}
// TestDeleteRecord 测试记录删除
func TestDeleteRecord(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_delete_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配页面并插入记录
pageNo, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
testID := int64(200)
testData := []byte("record to delete")
slotNo, err := pdb.insertToPage(pageNo, testID, testData)
if err != nil {
t.Fatalf("insertToPage failed: %v", err)
}
// 验证记录存在
_, err = pdb.readRecord(pageNo, slotNo, testID)
if err != nil {
t.Fatalf("readRecord before delete failed: %v", err)
}
// 删除记录传入nil索引简化测试
err = pdb.deleteRecord(pageNo, slotNo, testID, nil)
if err != nil {
t.Fatalf("deleteRecord failed: %v", err)
}
// 验证读取已删除的记录会失败
_, err = pdb.readRecord(pageNo, slotNo, testID)
if err == nil {
t.Error("readRecord should fail for deleted record")
}
}
// TestInsertToChain 测试链式插入
func TestInsertToChain(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_chain_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
tmpFile.Close()
config := &Config{CacheSize: 10}
pdb, err := Open(Options{
Filename: tmpFile.Name(),
Config: config,
})
if err != nil {
t.Fatalf("Open failed: %v", err)
}
defer pdb.Stop()
// 分配根页面
rootPage, err := pdb.allocPage()
if err != nil {
t.Fatalf("allocPage failed: %v", err)
}
// 插入大量记录,触发链式插入
largeData := make([]byte, 500) // 500字节数据
for i := 0; i < len(largeData); i++ {
largeData[i] = byte(i % 256)
}
var insertedIDs []int64
for i := int64(1); i <= 20; i++ {
pageNo, _, err := pdb.insertToChain(rootPage, i, largeData)
if err != nil {
t.Fatalf("insertToChain(%d) failed: %v", i, err)
}
if pageNo == 0 {
t.Errorf("insertToChain(%d) returned invalid page 0", i)
}
insertedIDs = append(insertedIDs, i)
}
// 验证至少插入了一些记录
if len(insertedIDs) == 0 {
t.Error("should be able to insert at least some records")
}
// 验证可能创建了多个页面(链式结构)
// 这里我们只验证功能正常,不验证具体的页面数量
t.Logf("Successfully inserted %d records using chain insertion", len(insertedIDs))
}
// TestHighLevelOperations 测试高级数据库操作
func TestHighLevelOperations(t *testing.T) {
// 创建临时文件
tmpFile, err := os.CreateTemp("", "test_storage_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
// 创建数据库实例
pdb := &PipelineDB{
file: tmpFile,
cache: NewPageCache(10),
freePageMgr: NewFreePageManager(),
indexMgr: NewIndexManager(),
rowMutexes: make(map[int64]*sync.RWMutex),
header: &Header{
TotalPages: 1,
RootPage: 1,
},
}
// 初始化根页面
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
p.setNextPage(0)
pdb.writePage(1, p)
group := "test_group"
testData := []byte("high level test data")
// 测试插入
err = pdb.insert(group, 300, testData)
if err != nil {
t.Errorf("insert failed: %v", err)
}
// 测试获取
data, err := pdb.get(group, 300)
if err != nil {
t.Errorf("get failed: %v", err)
}
if !bytes.Equal(data, testData) {
t.Errorf("get data = %s, want %s", string(data), string(testData))
}
// 测试更新
newData := []byte("updated high level data")
err = pdb.update(group, 300, newData)
if err != nil {
t.Errorf("update failed: %v", err)
}
// 验证更新
data, err = pdb.get(group, 300)
if err != nil {
t.Errorf("get after update failed: %v", err)
}
if !bytes.Equal(data, newData) {
t.Errorf("updated data = %s, want %s", string(data), string(newData))
}
// 测试重复插入
err = pdb.insert(group, 300, testData)
if err == nil || err.Error() != "record already exists" {
t.Errorf("expected 'record already exists' error, got %v", err)
}
// 测试不存在的记录
_, err = pdb.get(group, 999)
if err == nil || err.Error() != "record not found" {
t.Errorf("expected 'record not found' error, got %v", err)
}
// 测试不存在的组
_, err = pdb.get("non_existent_group", 300)
if err == nil || err.Error() != "group not found" {
t.Errorf("expected 'group not found' error, got %v", err)
}
}
// TestRangeQuery 测试范围查询
func TestRangeQuery(t *testing.T) {
tmpFile, err := os.CreateTemp("", "test_range_*.db")
if err != nil {
t.Fatalf("failed to create temp file: %v", err)
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
pdb := &PipelineDB{
file: tmpFile,
cache: NewPageCache(10),
freePageMgr: NewFreePageManager(),
indexMgr: NewIndexManager(),
rowMutexes: make(map[int64]*sync.RWMutex),
header: &Header{
TotalPages: 1,
RootPage: 1,
},
}
// 初始化根页面
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
pdb.writePage(1, p)
group := "range_test_group"
// 插入测试数据
testRecords := map[int64]string{
100: "record_100",
200: "record_200",
300: "record_300",
400: "record_400",
500: "record_500",
}
for id, data := range testRecords {
err := pdb.insert(group, id, []byte(data))
if err != nil {
t.Errorf("insert failed for ID %d: %v", id, err)
}
}
// 执行范围查询
var results []struct {
id int64
data string
}
err = pdb.rangeQuery(group, 200, 400, func(id int64, data []byte) error {
results = append(results, struct {
id int64
data string
}{id, string(data)})
return nil
})
if err != nil {
t.Errorf("rangeQuery failed: %v", err)
}
// 验证结果
expectedResults := []struct {
id int64
data string
}{
{200, "record_200"},
{300, "record_300"},
{400, "record_400"},
}
if len(results) != len(expectedResults) {
t.Errorf("result count = %d, want %d", len(results), len(expectedResults))
}
for i, expected := range expectedResults {
if i >= len(results) {
t.Errorf("missing result %d", i)
continue
}
if results[i].id != expected.id || results[i].data != expected.data {
t.Errorf("result[%d] = {%d, %s}, want {%d, %s}",
i, results[i].id, results[i].data, expected.id, expected.data)
}
}
}