601 lines
14 KiB
Go
601 lines
14 KiB
Go
// Package pipelinedb - storage.go
|
||
//
|
||
// 存储引擎模块:实现了基于页面的存储系统
|
||
//
|
||
// 核心功能:
|
||
// - 页面管理:分配、释放、读写页面
|
||
// - 记录存储:插入、读取、更新、删除记录
|
||
// - 索引支持:与索引系统集成,提供快速数据访问
|
||
// - 缓存机制:页面缓存提高访问性能
|
||
// - 链式存储:支持页面链接,处理大量数据
|
||
//
|
||
// 页面布局:
|
||
// [0-2] numSlots - 槽位数量
|
||
// [2-4] freeOff - 空闲空间偏移
|
||
// [4-6] nextPage - 下一页编号
|
||
// [6...] slotArray - 槽位数组,每个槽位2字节
|
||
// [...] records - 记录数据区域(从页面末尾向前增长)
|
||
//
|
||
// 记录格式:
|
||
// [0-8] recordID - 记录ID(8字节)
|
||
// [8-9+] dataLength - 数据长度(变长编码)
|
||
// [9+...] data - 实际数据内容
|
||
package pipelinedb
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"errors"
|
||
)
|
||
|
||
// Page 页面类型
|
||
//
|
||
// 页面是存储系统的基本单位,大小固定为4096字节
|
||
// 每个页面包含页面头、槽位数组和记录数据区域
|
||
// 页面采用槽位目录结构,支持变长记录存储
|
||
type Page []byte
|
||
|
||
// ==================== 页面头部访问方法 ====================
|
||
|
||
// numSlots 获取页面中的槽位数量
|
||
//
|
||
// 返回值:
|
||
// - uint16: 当前页面的槽位数量
|
||
//
|
||
// 槽位数量表示页面中存储的记录数量,每个记录占用一个槽位
|
||
func (p Page) numSlots() uint16 { return binary.LittleEndian.Uint16(p[0:2]) }
|
||
|
||
// setNumSlots 设置页面中的槽位数量
|
||
//
|
||
// 参数:
|
||
// - n: 要设置的槽位数量
|
||
//
|
||
// 用于在插入或删除记录时更新槽位计数
|
||
func (p Page) setNumSlots(n uint16) { binary.LittleEndian.PutUint16(p[0:2], n) }
|
||
|
||
// freeOff 获取空闲空间的起始偏移
|
||
//
|
||
// 返回值:
|
||
// - uint16: 空闲空间的起始位置
|
||
//
|
||
// 空闲空间从页面末尾向前增长,freeOff指向下一个可用的位置
|
||
func (p Page) freeOff() uint16 { return binary.LittleEndian.Uint16(p[2:4]) }
|
||
|
||
// setFreeOff 设置空闲空间的起始偏移
|
||
//
|
||
// 参数:
|
||
// - off: 新的空闲空间偏移位置
|
||
//
|
||
// 在插入记录后需要更新空闲空间指针
|
||
func (p Page) setFreeOff(off uint16) { binary.LittleEndian.PutUint16(p[2:4], off) }
|
||
|
||
// nextPage 获取链接的下一个页面编号
|
||
//
|
||
// 返回值:
|
||
// - uint16: 下一个页面的编号,0表示没有下一页
|
||
//
|
||
// 支持页面链接,用于存储超出单页容量的数据
|
||
func (p Page) nextPage() uint16 { return binary.LittleEndian.Uint16(p[4:6]) }
|
||
|
||
// setNextPage 设置链接的下一个页面编号
|
||
//
|
||
// 参数:
|
||
// - n: 下一个页面的编号
|
||
//
|
||
// 用于建立页面链接关系
|
||
func (p Page) setNextPage(n uint16) { binary.LittleEndian.PutUint16(p[4:6], n) }
|
||
|
||
// slotArray 获取页面的槽位数组
|
||
//
|
||
// 返回值:
|
||
// - []uint16: 槽位数组,每个元素是记录在页面中的偏移量
|
||
//
|
||
// 槽位数组存储每个记录在页面中的位置,支持快速定位记录
|
||
// 槽位值为0表示该槽位对应的记录已被删除
|
||
func (p Page) slotArray() []uint16 {
|
||
n := p.numSlots()
|
||
arr := make([]uint16, n)
|
||
for i := uint16(0); i < n; i++ {
|
||
arr[i] = binary.LittleEndian.Uint16(p[6+i*2:])
|
||
}
|
||
return arr
|
||
}
|
||
|
||
// setSlot 设置指定槽位的记录偏移量
|
||
//
|
||
// 参数:
|
||
// - idx: 槽位索引
|
||
// - off: 记录在页面中的偏移量,0表示删除
|
||
//
|
||
// 用于在插入记录时设置槽位指向记录位置,或在删除时标记槽位为空
|
||
func (p Page) setSlot(idx uint16, off uint16) {
|
||
binary.LittleEndian.PutUint16(p[6+idx*2:], off)
|
||
}
|
||
|
||
// ==================== 页面I/O操作方法 ====================
|
||
|
||
// readPageDirect 直接从磁盘读取页面,绕过缓存
|
||
//
|
||
// 参数:
|
||
// - pageNo: 页面编号
|
||
//
|
||
// 返回值:
|
||
// - []byte: 页面数据
|
||
// - error: 读取错误
|
||
//
|
||
// 用于需要绕过缓存直接访问磁盘的场景,如初始化或特殊操作
|
||
func (pdb *PipelineDB) readPageDirect(pageNo uint16) ([]byte, error) {
|
||
buf := make([]byte, PageSize)
|
||
_, err := pdb.file.ReadAt(buf, int64(pageNo)*PageSize)
|
||
return buf, err
|
||
}
|
||
|
||
// readPage 读取页面,优先从缓存获取
|
||
//
|
||
// 参数:
|
||
// - pageNo: 页面编号
|
||
//
|
||
// 返回值:
|
||
// - Page: 页面对象
|
||
// - error: 读取错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 首先尝试从页面缓存中获取
|
||
// 2. 缓存未命中时从磁盘读取
|
||
// 3. 将读取的页面放入缓存以提高后续访问性能
|
||
func (pdb *PipelineDB) readPage(pageNo uint16) (Page, error) {
|
||
// 尝试从缓存获取
|
||
if p, found := pdb.cache.Get(pageNo); found {
|
||
return Page(p), nil
|
||
}
|
||
|
||
// 从磁盘读取
|
||
p, err := pdb.readPageDirect(pageNo)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 放入缓存
|
||
pdb.cache.Put(pageNo, p, false)
|
||
return Page(p), nil
|
||
}
|
||
|
||
// writePage 写入页面到缓存和磁盘
|
||
//
|
||
// 参数:
|
||
// - pageNo: 页面编号
|
||
// - p: 页面数据
|
||
//
|
||
// 返回值:
|
||
// - error: 写入错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 将页面写入缓存,标记为脏页
|
||
// 2. 立即同步写入磁盘,确保数据持久性
|
||
func (pdb *PipelineDB) writePage(pageNo uint16, p Page) error {
|
||
// 写入缓存
|
||
pdb.cache.Put(pageNo, []byte(p), true)
|
||
|
||
// 立即写入磁盘
|
||
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
|
||
return err
|
||
}
|
||
|
||
// writePageDirect 直接写入页面到磁盘,绕过缓存
|
||
//
|
||
// 参数:
|
||
// - pageNo: 页面编号
|
||
// - p: 页面数据
|
||
//
|
||
// 返回值:
|
||
// - error: 写入错误
|
||
//
|
||
// 用于需要绕过缓存直接写入磁盘的场景
|
||
func (pdb *PipelineDB) writePageDirect(pageNo uint16, p []byte) error {
|
||
_, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize)
|
||
return err
|
||
}
|
||
|
||
// ==================== 页面管理方法 ====================
|
||
|
||
// allocPage 分配一个新的页面
|
||
//
|
||
// 返回值:
|
||
// - uint16: 分配的页面编号
|
||
// - error: 分配错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 首先尝试从空闲页面管理器中分配已释放的页面
|
||
// 2. 如果没有空闲页面,则分配新的页面
|
||
// 3. 初始化页面头部信息
|
||
// 4. 更新数据库头部的总页面数
|
||
//
|
||
// 线程安全:使用互斥锁保护并发访问
|
||
func (pdb *PipelineDB) allocPage() (uint16, error) {
|
||
pdb.mu.Lock()
|
||
defer pdb.mu.Unlock()
|
||
|
||
// 尝试从空闲页分配
|
||
if pageNo, found := pdb.freePageMgr.AllocPage(); found {
|
||
// 初始化页面
|
||
p := make(Page, PageSize)
|
||
p.setNumSlots(0)
|
||
p.setFreeOff(PageSize)
|
||
p.setNextPage(0)
|
||
if err := pdb.writePage(pageNo, p); err != nil {
|
||
return 0, err
|
||
}
|
||
return pageNo, nil
|
||
}
|
||
|
||
// 分配新页面
|
||
pageNo := pdb.header.TotalPages
|
||
p := make(Page, PageSize)
|
||
p.setNumSlots(0)
|
||
p.setFreeOff(PageSize)
|
||
p.setNextPage(0)
|
||
if err := pdb.writePage(pageNo, p); err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 更新头部
|
||
pdb.header.TotalPages++
|
||
if err := pdb.saveHeader(); err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
return pageNo, nil
|
||
}
|
||
|
||
// freePage 释放页面,将其标记为可重用
|
||
//
|
||
// 参数:
|
||
// - pageNo: 要释放的页面编号
|
||
//
|
||
// 返回值:
|
||
// - error: 释放错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 将页面添加到空闲页面管理器
|
||
// 2. 从缓存中移除该页面
|
||
//
|
||
// 线程安全:使用互斥锁保护并发访问
|
||
func (pdb *PipelineDB) freePage(pageNo uint16) error {
|
||
pdb.mu.Lock()
|
||
defer pdb.mu.Unlock()
|
||
|
||
pdb.freePageMgr.FreePage(pageNo)
|
||
pdb.cache.Invalidate(pageNo)
|
||
return nil
|
||
}
|
||
|
||
// ==================== 记录操作方法 ====================
|
||
|
||
// insertToChain 在页面链中插入记录
|
||
//
|
||
// 参数:
|
||
// - rootPage: 页面链的根页面编号
|
||
// - id: 记录ID
|
||
// - data: 记录数据
|
||
//
|
||
// 返回值:
|
||
// - uint16: 插入的页面编号
|
||
// - uint16: 插入的槽位编号
|
||
// - error: 插入错误
|
||
//
|
||
// 执行流程:
|
||
// 1. 从根页面开始尝试插入记录
|
||
// 2. 如果当前页面空间不足,沿着页面链查找下一个页面
|
||
// 3. 如果到达链尾仍无空间,分配新页面并链接到链尾
|
||
// 4. 在找到的页面中插入记录
|
||
//
|
||
// 支持动态扩展:当数据量超过单页容量时自动扩展页面链
|
||
func (pdb *PipelineDB) insertToChain(rootPage uint16, id int64, data []byte) (uint16, uint16, error) {
|
||
pageNo := rootPage
|
||
for {
|
||
// 尝试插入当前页
|
||
slotNo, err := pdb.insertToPage(pageNo, id, data)
|
||
if err == nil {
|
||
return pageNo, slotNo, nil
|
||
}
|
||
if err.Error() != "page full" {
|
||
return 0, 0, err
|
||
}
|
||
|
||
// 页满:检查是否有后继页
|
||
p, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
|
||
nextPage := p.nextPage()
|
||
if nextPage != 0 {
|
||
pageNo = nextPage
|
||
continue
|
||
}
|
||
|
||
// 分配新页并链接
|
||
newPage, err := pdb.allocPage()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
|
||
p.setNextPage(newPage)
|
||
if err := pdb.writePage(pageNo, p); err != nil {
|
||
return 0, 0, err
|
||
}
|
||
|
||
pageNo = newPage
|
||
}
|
||
}
|
||
|
||
func (pdb *PipelineDB) insertToPage(pageNo uint16, id int64, data []byte) (uint16, error) {
|
||
pdb.mu.Lock()
|
||
defer pdb.mu.Unlock()
|
||
|
||
p, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 计算所需空间
|
||
recSize := 8 + 1 + len(data)
|
||
if len(data) >= 128 {
|
||
recSize++
|
||
}
|
||
|
||
freeOff := p.freeOff()
|
||
requiredSpace := uint16(recSize)
|
||
slotSpace := 6 + 2*(p.numSlots()+1)
|
||
|
||
if freeOff < requiredSpace || freeOff-requiredSpace < slotSpace {
|
||
return 0, errors.New("page full")
|
||
}
|
||
|
||
newFree := freeOff - requiredSpace
|
||
|
||
// 写入记录
|
||
off := newFree
|
||
binary.LittleEndian.PutUint64(p[off:], uint64(id))
|
||
off += 8
|
||
|
||
n := binary.PutUvarint(p[off:], uint64(len(data)))
|
||
off += uint16(n)
|
||
|
||
copy(p[off:], data)
|
||
|
||
// 更新槽数组
|
||
numSlots := p.numSlots()
|
||
p.setSlot(numSlots, newFree)
|
||
p.setNumSlots(numSlots + 1)
|
||
p.setFreeOff(newFree)
|
||
|
||
if err := pdb.writePage(pageNo, p); err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
return numSlots, nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) readRecord(pageNo uint16, slotNo uint16, expectedID int64) ([]byte, error) {
|
||
p, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
slots := p.slotArray()
|
||
if int(slotNo) >= len(slots) {
|
||
return nil, errors.New("invalid slot index")
|
||
}
|
||
|
||
off := slots[slotNo]
|
||
if off == 0 {
|
||
return nil, errors.New("record deleted")
|
||
}
|
||
|
||
// 验证记录ID
|
||
recID := int64(binary.LittleEndian.Uint64(p[off:]))
|
||
if recID != expectedID {
|
||
return nil, errors.New("index inconsistent")
|
||
}
|
||
|
||
// 读取payload
|
||
pldOff := int(off) + 8
|
||
length, n := binary.Uvarint(p[pldOff:])
|
||
if n <= 0 {
|
||
return nil, errors.New("bad varint")
|
||
}
|
||
|
||
pldOff += n
|
||
if pldOff+int(length) > PageSize {
|
||
return nil, errors.New("payload out of bounds")
|
||
}
|
||
|
||
return p[pldOff : pldOff+int(length)], nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) updateInPlace(pageNo uint16, slotNo uint16, id int64, data []byte) error {
|
||
p, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
slots := p.slotArray()
|
||
if int(slotNo) >= len(slots) {
|
||
return errors.New("invalid slot index")
|
||
}
|
||
|
||
off := slots[slotNo]
|
||
if off == 0 {
|
||
return errors.New("record deleted")
|
||
}
|
||
|
||
// 验证记录ID
|
||
recID := int64(binary.LittleEndian.Uint64(p[off:]))
|
||
if recID != id {
|
||
return errors.New("index inconsistent")
|
||
}
|
||
|
||
// 检查原payload长度
|
||
pldOff := int(off) + 8
|
||
oldLength, n := binary.Uvarint(p[pldOff:])
|
||
if n <= 0 {
|
||
return errors.New("bad varint in existing record")
|
||
}
|
||
|
||
// 如果长度相同,原地更新
|
||
if int(oldLength) == len(data) {
|
||
pldStart := pldOff + n
|
||
copy(p[pldStart:pldStart+len(data)], data)
|
||
return pdb.writePage(pageNo, p)
|
||
}
|
||
|
||
return errors.New("cannot update in place")
|
||
}
|
||
|
||
func (pdb *PipelineDB) deleteRecord(pageNo uint16, slotNo uint16, id int64, idx *TableIndex) error {
|
||
pdb.mu.Lock()
|
||
defer pdb.mu.Unlock()
|
||
|
||
p, err := pdb.readPage(pageNo)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
numSlots := p.numSlots()
|
||
if slotNo >= numSlots {
|
||
return errors.New("slot index out of bounds")
|
||
}
|
||
|
||
// 标记槽位为删除
|
||
p.setSlot(slotNo, 0)
|
||
|
||
// 如果是最后一个槽位,压缩槽数组
|
||
if slotNo == numSlots-1 {
|
||
newSlots := numSlots
|
||
slots := p.slotArray()
|
||
for newSlots > 0 && slots[newSlots-1] == 0 {
|
||
newSlots--
|
||
}
|
||
p.setNumSlots(newSlots)
|
||
}
|
||
|
||
// 写回页面
|
||
if err := pdb.writePage(pageNo, p); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 从索引中删除(如果提供了索引)
|
||
if idx != nil {
|
||
idx.Delete(id)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 高级数据库操作
|
||
|
||
func (pdb *PipelineDB) insert(group string, id int64, data []byte) error {
|
||
if len(data) > MaxRecSize {
|
||
return errors.New("record too large")
|
||
}
|
||
|
||
// 获取组索引
|
||
idx := pdb.indexMgr.GetOrCreateIndex(group)
|
||
|
||
// 检查是否已存在
|
||
if _, _, exists := idx.Get(id); exists {
|
||
return errors.New("record already exists")
|
||
}
|
||
|
||
// 行锁
|
||
mutex := pdb.getRowMutex(id)
|
||
mutex.Lock()
|
||
defer mutex.Unlock()
|
||
|
||
// 插入到页链
|
||
pageNo, slotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 更新索引
|
||
idx.Insert(id, pageNo, slotNo)
|
||
return nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) get(group string, id int64) ([]byte, error) {
|
||
idx, exists := pdb.indexMgr.GetIndex(group)
|
||
if !exists {
|
||
return nil, errors.New("group not found")
|
||
}
|
||
|
||
// 使用索引定位
|
||
pageNo, slotNo, found := idx.Get(id)
|
||
if !found {
|
||
return nil, errors.New("record not found")
|
||
}
|
||
|
||
// 读取记录
|
||
return pdb.readRecord(pageNo, slotNo, id)
|
||
}
|
||
|
||
func (pdb *PipelineDB) update(group string, id int64, data []byte) error {
|
||
if len(data) > MaxRecSize {
|
||
return errors.New("record too large")
|
||
}
|
||
|
||
idx, exists := pdb.indexMgr.GetIndex(group)
|
||
if !exists {
|
||
return errors.New("group not found")
|
||
}
|
||
|
||
pageNo, slotNo, found := idx.Get(id)
|
||
if !found {
|
||
return errors.New("record not found")
|
||
}
|
||
|
||
// 行锁
|
||
mutex := pdb.getRowMutex(id)
|
||
mutex.Lock()
|
||
defer mutex.Unlock()
|
||
|
||
// 尝试原地更新
|
||
if err := pdb.updateInPlace(pageNo, slotNo, id, data); err == nil {
|
||
return nil
|
||
}
|
||
|
||
// 删除后重新插入
|
||
if err := pdb.deleteRecord(pageNo, slotNo, id, idx); err != nil {
|
||
return err
|
||
}
|
||
|
||
newPageNo, newSlotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
idx.Insert(id, newPageNo, newSlotNo)
|
||
return nil
|
||
}
|
||
|
||
func (pdb *PipelineDB) rangeQuery(group string, startID, endID int64, visitor func(id int64, data []byte) error) error {
|
||
idx, exists := pdb.indexMgr.GetIndex(group)
|
||
if !exists {
|
||
return errors.New("group not found")
|
||
}
|
||
|
||
idx.Range(startID, endID, func(id int64, pageNo, slotNo uint16) bool {
|
||
data, err := pdb.readRecord(pageNo, slotNo, id)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
|
||
if err := visitor(id, data); err != nil {
|
||
return false
|
||
}
|
||
|
||
return true
|
||
})
|
||
|
||
return nil
|
||
}
|