Files
pipelinedb/storage.go
2025-09-30 15:05:56 +08:00

601 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package pipelinedb - storage.go
//
// 存储引擎模块:实现了基于页面的存储系统
//
// 核心功能:
// - 页面管理:分配、释放、读写页面
// - 记录存储:插入、读取、更新、删除记录
// - 索引支持:与索引系统集成,提供快速数据访问
// - 缓存机制:页面缓存提高访问性能
// - 链式存储:支持页面链接,处理大量数据
//
// 页面布局:
// [0-2] numSlots - 槽位数量
// [2-4] freeOff - 空闲空间偏移
// [4-6] nextPage - 下一页编号
// [6...] slotArray - 槽位数组每个槽位2字节
// [...] records - 记录数据区域(从页面末尾向前增长)
//
// 记录格式:
// [0-8] recordID - 记录ID8字节
// [8-9+] dataLength - 数据长度(变长编码)
// [9+...] data - 实际数据内容
package pipelinedb
import (
"encoding/binary"
"errors"
)
// Page 页面类型
//
// 页面是存储系统的基本单位大小固定为4096字节
// 每个页面包含页面头、槽位数组和记录数据区域
// 页面采用槽位目录结构,支持变长记录存储
type Page []byte
// ==================== 页面头部访问方法 ====================
// numSlots 获取页面中的槽位数量
//
// 返回值:
// - uint16: 当前页面的槽位数量
//
// 槽位数量表示页面中存储的记录数量,每个记录占用一个槽位
func (p Page) numSlots() uint16 { return binary.LittleEndian.Uint16(p[0:2]) }
// setNumSlots 设置页面中的槽位数量
//
// 参数:
// - n: 要设置的槽位数量
//
// 用于在插入或删除记录时更新槽位计数
func (p Page) setNumSlots(n uint16) { binary.LittleEndian.PutUint16(p[0:2], n) }
// freeOff 获取空闲空间的起始偏移
//
// 返回值:
// - uint16: 空闲空间的起始位置
//
// 空闲空间从页面末尾向前增长freeOff指向下一个可用的位置
func (p Page) freeOff() uint16 { return binary.LittleEndian.Uint16(p[2:4]) }
// setFreeOff 设置空闲空间的起始偏移
//
// 参数:
// - off: 新的空闲空间偏移位置
//
// 在插入记录后需要更新空闲空间指针
func (p Page) setFreeOff(off uint16) { binary.LittleEndian.PutUint16(p[2:4], off) }
// nextPage 获取链接的下一个页面编号
//
// 返回值:
// - uint16: 下一个页面的编号0表示没有下一页
//
// 支持页面链接,用于存储超出单页容量的数据
func (p Page) nextPage() uint16 { return binary.LittleEndian.Uint16(p[4:6]) }
// setNextPage 设置链接的下一个页面编号
//
// 参数:
// - n: 下一个页面的编号
//
// 用于建立页面链接关系
func (p Page) setNextPage(n uint16) { binary.LittleEndian.PutUint16(p[4:6], n) }
// slotArray 获取页面的槽位数组
//
// 返回值:
// - []uint16: 槽位数组,每个元素是记录在页面中的偏移量
//
// 槽位数组存储每个记录在页面中的位置,支持快速定位记录
// 槽位值为0表示该槽位对应的记录已被删除
func (p Page) slotArray() []uint16 {
n := p.numSlots()
arr := make([]uint16, n)
for i := uint16(0); i < n; i++ {
arr[i] = binary.LittleEndian.Uint16(p[6+i*2:])
}
return arr
}
// setSlot 设置指定槽位的记录偏移量
//
// 参数:
// - idx: 槽位索引
// - off: 记录在页面中的偏移量0表示删除
//
// 用于在插入记录时设置槽位指向记录位置,或在删除时标记槽位为空
func (p Page) setSlot(idx uint16, off uint16) {
binary.LittleEndian.PutUint16(p[6+idx*2:], off)
}
// ==================== 页面I/O操作方法 ====================
// readPageDirect 直接从磁盘读取页面,绕过缓存
//
// 参数:
// - pageNo: 页面编号
//
// 返回值:
// - []byte: 页面数据
// - error: 读取错误
//
// 用于需要绕过缓存直接访问磁盘的场景,如初始化或特殊操作
func (pdb *PipelineDB) readPageDirect(pageNo uint16) ([]byte, error) {
buf := make([]byte, PageSize)
_, err := pdb.file.ReadAt(buf, int64(pageNo)*PageSize)
return buf, err
}
// readPage 读取页面,优先从缓存获取
//
// 参数:
// - pageNo: 页面编号
//
// 返回值:
// - Page: 页面对象
// - error: 读取错误
//
// 执行流程:
// 1. 首先尝试从页面缓存中获取
// 2. 缓存未命中时从磁盘读取
// 3. 将读取的页面放入缓存以提高后续访问性能
func (pdb *PipelineDB) readPage(pageNo uint16) (Page, error) {
// 尝试从缓存获取
if p, found := pdb.cache.Get(pageNo); found {
return Page(p), nil
}
// 从磁盘读取
p, err := pdb.readPageDirect(pageNo)
if err != nil {
return nil, err
}
// 放入缓存
pdb.cache.Put(pageNo, p, false)
return Page(p), nil
}
// writePage 写入页面到缓存和磁盘
//
// 参数:
// - pageNo: 页面编号
// - p: 页面数据
//
// 返回值:
// - error: 写入错误
//
// 执行流程:
// 1. 将页面写入缓存,标记为脏页
// 2. 立即同步写入磁盘,确保数据持久性
func (pdb *PipelineDB) writePage(pageNo uint16, p Page) error {
// 写入缓存
pdb.cache.Put(pageNo, []byte(p), true)
// 立即写入磁盘
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
return err
}
// writePageDirect 直接写入页面到磁盘,绕过缓存
//
// 参数:
// - pageNo: 页面编号
// - p: 页面数据
//
// 返回值:
// - error: 写入错误
//
// 用于需要绕过缓存直接写入磁盘的场景
func (pdb *PipelineDB) writePageDirect(pageNo uint16, p []byte) error {
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
return err
}
// ==================== 页面管理方法 ====================
// allocPage 分配一个新的页面
//
// 返回值:
// - uint16: 分配的页面编号
// - error: 分配错误
//
// 执行流程:
// 1. 首先尝试从空闲页面管理器中分配已释放的页面
// 2. 如果没有空闲页面,则分配新的页面
// 3. 初始化页面头部信息
// 4. 更新数据库头部的总页面数
//
// 线程安全:使用互斥锁保护并发访问
func (pdb *PipelineDB) allocPage() (uint16, error) {
pdb.mu.Lock()
defer pdb.mu.Unlock()
// 尝试从空闲页分配
if pageNo, found := pdb.freePageMgr.AllocPage(); found {
// 初始化页面
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
p.setNextPage(0)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
return pageNo, nil
}
// 分配新页面
pageNo := pdb.header.TotalPages
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
p.setNextPage(0)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
// 更新头部
pdb.header.TotalPages++
if err := pdb.saveHeader(); err != nil {
return 0, err
}
return pageNo, nil
}
// freePage 释放页面,将其标记为可重用
//
// 参数:
// - pageNo: 要释放的页面编号
//
// 返回值:
// - error: 释放错误
//
// 执行流程:
// 1. 将页面添加到空闲页面管理器
// 2. 从缓存中移除该页面
//
// 线程安全:使用互斥锁保护并发访问
func (pdb *PipelineDB) freePage(pageNo uint16) error {
pdb.mu.Lock()
defer pdb.mu.Unlock()
pdb.freePageMgr.FreePage(pageNo)
pdb.cache.Invalidate(pageNo)
return nil
}
// ==================== 记录操作方法 ====================
// insertToChain 在页面链中插入记录
//
// 参数:
// - rootPage: 页面链的根页面编号
// - id: 记录ID
// - data: 记录数据
//
// 返回值:
// - uint16: 插入的页面编号
// - uint16: 插入的槽位编号
// - error: 插入错误
//
// 执行流程:
// 1. 从根页面开始尝试插入记录
// 2. 如果当前页面空间不足,沿着页面链查找下一个页面
// 3. 如果到达链尾仍无空间,分配新页面并链接到链尾
// 4. 在找到的页面中插入记录
//
// 支持动态扩展:当数据量超过单页容量时自动扩展页面链
func (pdb *PipelineDB) insertToChain(rootPage uint16, id int64, data []byte) (uint16, uint16, error) {
pageNo := rootPage
for {
// 尝试插入当前页
slotNo, err := pdb.insertToPage(pageNo, id, data)
if err == nil {
return pageNo, slotNo, nil
}
if err.Error() != "page full" {
return 0, 0, err
}
// 页满:检查是否有后继页
p, err := pdb.readPage(pageNo)
if err != nil {
return 0, 0, err
}
nextPage := p.nextPage()
if nextPage != 0 {
pageNo = nextPage
continue
}
// 分配新页并链接
newPage, err := pdb.allocPage()
if err != nil {
return 0, 0, err
}
p.setNextPage(newPage)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, 0, err
}
pageNo = newPage
}
}
func (pdb *PipelineDB) insertToPage(pageNo uint16, id int64, data []byte) (uint16, error) {
pdb.mu.Lock()
defer pdb.mu.Unlock()
p, err := pdb.readPage(pageNo)
if err != nil {
return 0, err
}
// 计算所需空间
recSize := 8 + 1 + len(data)
if len(data) >= 128 {
recSize++
}
freeOff := p.freeOff()
requiredSpace := uint16(recSize)
slotSpace := 6 + 2*(p.numSlots()+1)
if freeOff < requiredSpace || freeOff-requiredSpace < slotSpace {
return 0, errors.New("page full")
}
newFree := freeOff - requiredSpace
// 写入记录
off := newFree
binary.LittleEndian.PutUint64(p[off:], uint64(id))
off += 8
n := binary.PutUvarint(p[off:], uint64(len(data)))
off += uint16(n)
copy(p[off:], data)
// 更新槽数组
numSlots := p.numSlots()
p.setSlot(numSlots, newFree)
p.setNumSlots(numSlots + 1)
p.setFreeOff(newFree)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
return numSlots, nil
}
func (pdb *PipelineDB) readRecord(pageNo uint16, slotNo uint16, expectedID int64) ([]byte, error) {
p, err := pdb.readPage(pageNo)
if err != nil {
return nil, err
}
slots := p.slotArray()
if int(slotNo) >= len(slots) {
return nil, errors.New("invalid slot index")
}
off := slots[slotNo]
if off == 0 {
return nil, errors.New("record deleted")
}
// 验证记录ID
recID := int64(binary.LittleEndian.Uint64(p[off:]))
if recID != expectedID {
return nil, errors.New("index inconsistent")
}
// 读取payload
pldOff := int(off) + 8
length, n := binary.Uvarint(p[pldOff:])
if n <= 0 {
return nil, errors.New("bad varint")
}
pldOff += n
if pldOff+int(length) > PageSize {
return nil, errors.New("payload out of bounds")
}
return p[pldOff : pldOff+int(length)], nil
}
func (pdb *PipelineDB) updateInPlace(pageNo uint16, slotNo uint16, id int64, data []byte) error {
p, err := pdb.readPage(pageNo)
if err != nil {
return err
}
slots := p.slotArray()
if int(slotNo) >= len(slots) {
return errors.New("invalid slot index")
}
off := slots[slotNo]
if off == 0 {
return errors.New("record deleted")
}
// 验证记录ID
recID := int64(binary.LittleEndian.Uint64(p[off:]))
if recID != id {
return errors.New("index inconsistent")
}
// 检查原payload长度
pldOff := int(off) + 8
oldLength, n := binary.Uvarint(p[pldOff:])
if n <= 0 {
return errors.New("bad varint in existing record")
}
// 如果长度相同,原地更新
if int(oldLength) == len(data) {
pldStart := pldOff + n
copy(p[pldStart:pldStart+len(data)], data)
return pdb.writePage(pageNo, p)
}
return errors.New("cannot update in place")
}
func (pdb *PipelineDB) deleteRecord(pageNo uint16, slotNo uint16, id int64, idx *TableIndex) error {
pdb.mu.Lock()
defer pdb.mu.Unlock()
p, err := pdb.readPage(pageNo)
if err != nil {
return err
}
numSlots := p.numSlots()
if slotNo >= numSlots {
return errors.New("slot index out of bounds")
}
// 标记槽位为删除
p.setSlot(slotNo, 0)
// 如果是最后一个槽位,压缩槽数组
if slotNo == numSlots-1 {
newSlots := numSlots
slots := p.slotArray()
for newSlots > 0 && slots[newSlots-1] == 0 {
newSlots--
}
p.setNumSlots(newSlots)
}
// 写回页面
if err := pdb.writePage(pageNo, p); err != nil {
return err
}
// 从索引中删除(如果提供了索引)
if idx != nil {
idx.Delete(id)
}
return nil
}
// 高级数据库操作
func (pdb *PipelineDB) insert(group string, id int64, data []byte) error {
if len(data) > MaxRecSize {
return errors.New("record too large")
}
// 获取组索引
idx := pdb.indexMgr.GetOrCreateIndex(group)
// 检查是否已存在
if _, _, exists := idx.Get(id); exists {
return errors.New("record already exists")
}
// 行锁
mutex := pdb.getRowMutex(id)
mutex.Lock()
defer mutex.Unlock()
// 插入到页链
pageNo, slotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
if err != nil {
return err
}
// 更新索引
idx.Insert(id, pageNo, slotNo)
return nil
}
func (pdb *PipelineDB) get(group string, id int64) ([]byte, error) {
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return nil, errors.New("group not found")
}
// 使用索引定位
pageNo, slotNo, found := idx.Get(id)
if !found {
return nil, errors.New("record not found")
}
// 读取记录
return pdb.readRecord(pageNo, slotNo, id)
}
func (pdb *PipelineDB) update(group string, id int64, data []byte) error {
if len(data) > MaxRecSize {
return errors.New("record too large")
}
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return errors.New("group not found")
}
pageNo, slotNo, found := idx.Get(id)
if !found {
return errors.New("record not found")
}
// 行锁
mutex := pdb.getRowMutex(id)
mutex.Lock()
defer mutex.Unlock()
// 尝试原地更新
if err := pdb.updateInPlace(pageNo, slotNo, id, data); err == nil {
return nil
}
// 删除后重新插入
if err := pdb.deleteRecord(pageNo, slotNo, id, idx); err != nil {
return err
}
newPageNo, newSlotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
if err != nil {
return err
}
idx.Insert(id, newPageNo, newSlotNo)
return nil
}
func (pdb *PipelineDB) rangeQuery(group string, startID, endID int64, visitor func(id int64, data []byte) error) error {
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return errors.New("group not found")
}
idx.Range(startID, endID, func(id int64, pageNo, slotNo uint16) bool {
data, err := pdb.readRecord(pageNo, slotNo, id)
if err != nil {
return false
}
if err := visitor(id, data); err != nil {
return false
}
return true
})
return nil
}