Files
pipelinedb/storage.go

601 lines
14 KiB
Go
Raw Permalink Normal View History

2025-09-30 15:05:56 +08:00
// Package pipelinedb - storage.go
//
// 存储引擎模块:实现了基于页面的存储系统
//
// 核心功能:
// - 页面管理:分配、释放、读写页面
// - 记录存储:插入、读取、更新、删除记录
// - 索引支持:与索引系统集成,提供快速数据访问
// - 缓存机制:页面缓存提高访问性能
// - 链式存储:支持页面链接,处理大量数据
//
// 页面布局:
// [0-2] numSlots - 槽位数量
// [2-4] freeOff - 空闲空间偏移
// [4-6] nextPage - 下一页编号
// [6...] slotArray - 槽位数组每个槽位2字节
// [...] records - 记录数据区域(从页面末尾向前增长)
//
// 记录格式:
// [0-8] recordID - 记录ID8字节
// [8-9+] dataLength - 数据长度(变长编码)
// [9+...] data - 实际数据内容
package pipelinedb
import (
"encoding/binary"
"errors"
)
// Page 页面类型
//
// 页面是存储系统的基本单位大小固定为4096字节
// 每个页面包含页面头、槽位数组和记录数据区域
// 页面采用槽位目录结构,支持变长记录存储
type Page []byte
// ==================== 页面头部访问方法 ====================
// numSlots 获取页面中的槽位数量
//
// 返回值:
// - uint16: 当前页面的槽位数量
//
// 槽位数量表示页面中存储的记录数量,每个记录占用一个槽位
func (p Page) numSlots() uint16 { return binary.LittleEndian.Uint16(p[0:2]) }
// setNumSlots 设置页面中的槽位数量
//
// 参数:
// - n: 要设置的槽位数量
//
// 用于在插入或删除记录时更新槽位计数
func (p Page) setNumSlots(n uint16) { binary.LittleEndian.PutUint16(p[0:2], n) }
// freeOff 获取空闲空间的起始偏移
//
// 返回值:
// - uint16: 空闲空间的起始位置
//
// 空闲空间从页面末尾向前增长freeOff指向下一个可用的位置
func (p Page) freeOff() uint16 { return binary.LittleEndian.Uint16(p[2:4]) }
// setFreeOff 设置空闲空间的起始偏移
//
// 参数:
// - off: 新的空闲空间偏移位置
//
// 在插入记录后需要更新空闲空间指针
func (p Page) setFreeOff(off uint16) { binary.LittleEndian.PutUint16(p[2:4], off) }
// nextPage 获取链接的下一个页面编号
//
// 返回值:
// - uint16: 下一个页面的编号0表示没有下一页
//
// 支持页面链接,用于存储超出单页容量的数据
func (p Page) nextPage() uint16 { return binary.LittleEndian.Uint16(p[4:6]) }
// setNextPage 设置链接的下一个页面编号
//
// 参数:
// - n: 下一个页面的编号
//
// 用于建立页面链接关系
func (p Page) setNextPage(n uint16) { binary.LittleEndian.PutUint16(p[4:6], n) }
// slotArray 获取页面的槽位数组
//
// 返回值:
// - []uint16: 槽位数组,每个元素是记录在页面中的偏移量
//
// 槽位数组存储每个记录在页面中的位置,支持快速定位记录
// 槽位值为0表示该槽位对应的记录已被删除
func (p Page) slotArray() []uint16 {
n := p.numSlots()
arr := make([]uint16, n)
for i := uint16(0); i < n; i++ {
arr[i] = binary.LittleEndian.Uint16(p[6+i*2:])
}
return arr
}
// setSlot 设置指定槽位的记录偏移量
//
// 参数:
// - idx: 槽位索引
// - off: 记录在页面中的偏移量0表示删除
//
// 用于在插入记录时设置槽位指向记录位置,或在删除时标记槽位为空
func (p Page) setSlot(idx uint16, off uint16) {
binary.LittleEndian.PutUint16(p[6+idx*2:], off)
}
// ==================== 页面I/O操作方法 ====================
// readPageDirect 直接从磁盘读取页面,绕过缓存
//
// 参数:
// - pageNo: 页面编号
//
// 返回值:
// - []byte: 页面数据
// - error: 读取错误
//
// 用于需要绕过缓存直接访问磁盘的场景,如初始化或特殊操作
func (pdb *PipelineDB) readPageDirect(pageNo uint16) ([]byte, error) {
buf := make([]byte, PageSize)
_, err := pdb.file.ReadAt(buf, int64(pageNo)*PageSize)
return buf, err
}
// readPage 读取页面,优先从缓存获取
//
// 参数:
// - pageNo: 页面编号
//
// 返回值:
// - Page: 页面对象
// - error: 读取错误
//
// 执行流程:
// 1. 首先尝试从页面缓存中获取
// 2. 缓存未命中时从磁盘读取
// 3. 将读取的页面放入缓存以提高后续访问性能
func (pdb *PipelineDB) readPage(pageNo uint16) (Page, error) {
// 尝试从缓存获取
if p, found := pdb.cache.Get(pageNo); found {
return Page(p), nil
}
// 从磁盘读取
p, err := pdb.readPageDirect(pageNo)
if err != nil {
return nil, err
}
// 放入缓存
pdb.cache.Put(pageNo, p, false)
return Page(p), nil
}
// writePage 写入页面到缓存和磁盘
//
// 参数:
// - pageNo: 页面编号
// - p: 页面数据
//
// 返回值:
// - error: 写入错误
//
// 执行流程:
// 1. 将页面写入缓存,标记为脏页
// 2. 立即同步写入磁盘,确保数据持久性
func (pdb *PipelineDB) writePage(pageNo uint16, p Page) error {
// 写入缓存
pdb.cache.Put(pageNo, []byte(p), true)
// 立即写入磁盘
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
return err
}
// writePageDirect 直接写入页面到磁盘,绕过缓存
//
// 参数:
// - pageNo: 页面编号
// - p: 页面数据
//
// 返回值:
// - error: 写入错误
//
// 用于需要绕过缓存直接写入磁盘的场景
func (pdb *PipelineDB) writePageDirect(pageNo uint16, p []byte) error {
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
return err
}
// ==================== 页面管理方法 ====================
// allocPage 分配一个新的页面
//
// 返回值:
// - uint16: 分配的页面编号
// - error: 分配错误
//
// 执行流程:
// 1. 首先尝试从空闲页面管理器中分配已释放的页面
// 2. 如果没有空闲页面,则分配新的页面
// 3. 初始化页面头部信息
// 4. 更新数据库头部的总页面数
//
// 线程安全:使用互斥锁保护并发访问
func (pdb *PipelineDB) allocPage() (uint16, error) {
pdb.mu.Lock()
defer pdb.mu.Unlock()
// 尝试从空闲页分配
if pageNo, found := pdb.freePageMgr.AllocPage(); found {
// 初始化页面
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
p.setNextPage(0)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
return pageNo, nil
}
// 分配新页面
pageNo := pdb.header.TotalPages
p := make(Page, PageSize)
p.setNumSlots(0)
p.setFreeOff(PageSize)
p.setNextPage(0)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
// 更新头部
pdb.header.TotalPages++
if err := pdb.saveHeader(); err != nil {
return 0, err
}
return pageNo, nil
}
// freePage 释放页面,将其标记为可重用
//
// 参数:
// - pageNo: 要释放的页面编号
//
// 返回值:
// - error: 释放错误
//
// 执行流程:
// 1. 将页面添加到空闲页面管理器
// 2. 从缓存中移除该页面
//
// 线程安全:使用互斥锁保护并发访问
func (pdb *PipelineDB) freePage(pageNo uint16) error {
pdb.mu.Lock()
defer pdb.mu.Unlock()
pdb.freePageMgr.FreePage(pageNo)
pdb.cache.Invalidate(pageNo)
return nil
}
// ==================== 记录操作方法 ====================
// insertToChain 在页面链中插入记录
//
// 参数:
// - rootPage: 页面链的根页面编号
// - id: 记录ID
// - data: 记录数据
//
// 返回值:
// - uint16: 插入的页面编号
// - uint16: 插入的槽位编号
// - error: 插入错误
//
// 执行流程:
// 1. 从根页面开始尝试插入记录
// 2. 如果当前页面空间不足,沿着页面链查找下一个页面
// 3. 如果到达链尾仍无空间,分配新页面并链接到链尾
// 4. 在找到的页面中插入记录
//
// 支持动态扩展:当数据量超过单页容量时自动扩展页面链
func (pdb *PipelineDB) insertToChain(rootPage uint16, id int64, data []byte) (uint16, uint16, error) {
pageNo := rootPage
for {
// 尝试插入当前页
slotNo, err := pdb.insertToPage(pageNo, id, data)
if err == nil {
return pageNo, slotNo, nil
}
if err.Error() != "page full" {
return 0, 0, err
}
// 页满:检查是否有后继页
p, err := pdb.readPage(pageNo)
if err != nil {
return 0, 0, err
}
nextPage := p.nextPage()
if nextPage != 0 {
pageNo = nextPage
continue
}
// 分配新页并链接
newPage, err := pdb.allocPage()
if err != nil {
return 0, 0, err
}
p.setNextPage(newPage)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, 0, err
}
pageNo = newPage
}
}
func (pdb *PipelineDB) insertToPage(pageNo uint16, id int64, data []byte) (uint16, error) {
pdb.mu.Lock()
defer pdb.mu.Unlock()
p, err := pdb.readPage(pageNo)
if err != nil {
return 0, err
}
// 计算所需空间
recSize := 8 + 1 + len(data)
if len(data) >= 128 {
recSize++
}
freeOff := p.freeOff()
requiredSpace := uint16(recSize)
slotSpace := 6 + 2*(p.numSlots()+1)
if freeOff < requiredSpace || freeOff-requiredSpace < slotSpace {
return 0, errors.New("page full")
}
newFree := freeOff - requiredSpace
// 写入记录
off := newFree
binary.LittleEndian.PutUint64(p[off:], uint64(id))
off += 8
n := binary.PutUvarint(p[off:], uint64(len(data)))
off += uint16(n)
copy(p[off:], data)
// 更新槽数组
numSlots := p.numSlots()
p.setSlot(numSlots, newFree)
p.setNumSlots(numSlots + 1)
p.setFreeOff(newFree)
if err := pdb.writePage(pageNo, p); err != nil {
return 0, err
}
return numSlots, nil
}
func (pdb *PipelineDB) readRecord(pageNo uint16, slotNo uint16, expectedID int64) ([]byte, error) {
p, err := pdb.readPage(pageNo)
if err != nil {
return nil, err
}
slots := p.slotArray()
if int(slotNo) >= len(slots) {
return nil, errors.New("invalid slot index")
}
off := slots[slotNo]
if off == 0 {
return nil, errors.New("record deleted")
}
// 验证记录ID
recID := int64(binary.LittleEndian.Uint64(p[off:]))
if recID != expectedID {
return nil, errors.New("index inconsistent")
}
// 读取payload
pldOff := int(off) + 8
length, n := binary.Uvarint(p[pldOff:])
if n <= 0 {
return nil, errors.New("bad varint")
}
pldOff += n
if pldOff+int(length) > PageSize {
return nil, errors.New("payload out of bounds")
}
return p[pldOff : pldOff+int(length)], nil
}
func (pdb *PipelineDB) updateInPlace(pageNo uint16, slotNo uint16, id int64, data []byte) error {
p, err := pdb.readPage(pageNo)
if err != nil {
return err
}
slots := p.slotArray()
if int(slotNo) >= len(slots) {
return errors.New("invalid slot index")
}
off := slots[slotNo]
if off == 0 {
return errors.New("record deleted")
}
// 验证记录ID
recID := int64(binary.LittleEndian.Uint64(p[off:]))
if recID != id {
return errors.New("index inconsistent")
}
// 检查原payload长度
pldOff := int(off) + 8
oldLength, n := binary.Uvarint(p[pldOff:])
if n <= 0 {
return errors.New("bad varint in existing record")
}
// 如果长度相同,原地更新
if int(oldLength) == len(data) {
pldStart := pldOff + n
copy(p[pldStart:pldStart+len(data)], data)
return pdb.writePage(pageNo, p)
}
return errors.New("cannot update in place")
}
func (pdb *PipelineDB) deleteRecord(pageNo uint16, slotNo uint16, id int64, idx *TableIndex) error {
pdb.mu.Lock()
defer pdb.mu.Unlock()
p, err := pdb.readPage(pageNo)
if err != nil {
return err
}
numSlots := p.numSlots()
if slotNo >= numSlots {
return errors.New("slot index out of bounds")
}
// 标记槽位为删除
p.setSlot(slotNo, 0)
// 如果是最后一个槽位,压缩槽数组
if slotNo == numSlots-1 {
newSlots := numSlots
slots := p.slotArray()
for newSlots > 0 && slots[newSlots-1] == 0 {
newSlots--
}
p.setNumSlots(newSlots)
}
// 写回页面
if err := pdb.writePage(pageNo, p); err != nil {
return err
}
// 从索引中删除(如果提供了索引)
if idx != nil {
idx.Delete(id)
}
return nil
}
// 高级数据库操作
func (pdb *PipelineDB) insert(group string, id int64, data []byte) error {
if len(data) > MaxRecSize {
return errors.New("record too large")
}
// 获取组索引
idx := pdb.indexMgr.GetOrCreateIndex(group)
// 检查是否已存在
if _, _, exists := idx.Get(id); exists {
return errors.New("record already exists")
}
// 行锁
mutex := pdb.getRowMutex(id)
mutex.Lock()
defer mutex.Unlock()
// 插入到页链
pageNo, slotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
if err != nil {
return err
}
// 更新索引
idx.Insert(id, pageNo, slotNo)
return nil
}
func (pdb *PipelineDB) get(group string, id int64) ([]byte, error) {
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return nil, errors.New("group not found")
}
// 使用索引定位
pageNo, slotNo, found := idx.Get(id)
if !found {
return nil, errors.New("record not found")
}
// 读取记录
return pdb.readRecord(pageNo, slotNo, id)
}
func (pdb *PipelineDB) update(group string, id int64, data []byte) error {
if len(data) > MaxRecSize {
return errors.New("record too large")
}
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return errors.New("group not found")
}
pageNo, slotNo, found := idx.Get(id)
if !found {
return errors.New("record not found")
}
// 行锁
mutex := pdb.getRowMutex(id)
mutex.Lock()
defer mutex.Unlock()
// 尝试原地更新
if err := pdb.updateInPlace(pageNo, slotNo, id, data); err == nil {
return nil
}
// 删除后重新插入
if err := pdb.deleteRecord(pageNo, slotNo, id, idx); err != nil {
return err
}
newPageNo, newSlotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
if err != nil {
return err
}
idx.Insert(id, newPageNo, newSlotNo)
return nil
}
func (pdb *PipelineDB) rangeQuery(group string, startID, endID int64, visitor func(id int64, data []byte) error) error {
idx, exists := pdb.indexMgr.GetIndex(group)
if !exists {
return errors.New("group not found")
}
idx.Range(startID, endID, func(id int64, pageNo, slotNo uint16) bool {
data, err := pdb.readRecord(pageNo, slotNo, id)
if err != nil {
return false
}
if err := visitor(id, data); err != nil {
return false
}
return true
})
return nil
}