// Package pipelinedb - storage.go // // 存储引擎模块:实现了基于页面的存储系统 // // 核心功能: // - 页面管理:分配、释放、读写页面 // - 记录存储:插入、读取、更新、删除记录 // - 索引支持:与索引系统集成,提供快速数据访问 // - 缓存机制:页面缓存提高访问性能 // - 链式存储:支持页面链接,处理大量数据 // // 页面布局: // [0-2] numSlots - 槽位数量 // [2-4] freeOff - 空闲空间偏移 // [4-6] nextPage - 下一页编号 // [6...] slotArray - 槽位数组,每个槽位2字节 // [...] records - 记录数据区域(从页面末尾向前增长) // // 记录格式: // [0-8] recordID - 记录ID(8字节) // [8-9+] dataLength - 数据长度(变长编码) // [9+...] data - 实际数据内容 package pipelinedb import ( "encoding/binary" "errors" ) // Page 页面类型 // // 页面是存储系统的基本单位,大小固定为4096字节 // 每个页面包含页面头、槽位数组和记录数据区域 // 页面采用槽位目录结构,支持变长记录存储 type Page []byte // ==================== 页面头部访问方法 ==================== // numSlots 获取页面中的槽位数量 // // 返回值: // - uint16: 当前页面的槽位数量 // // 槽位数量表示页面中存储的记录数量,每个记录占用一个槽位 func (p Page) numSlots() uint16 { return binary.LittleEndian.Uint16(p[0:2]) } // setNumSlots 设置页面中的槽位数量 // // 参数: // - n: 要设置的槽位数量 // // 用于在插入或删除记录时更新槽位计数 func (p Page) setNumSlots(n uint16) { binary.LittleEndian.PutUint16(p[0:2], n) } // freeOff 获取空闲空间的起始偏移 // // 返回值: // - uint16: 空闲空间的起始位置 // // 空闲空间从页面末尾向前增长,freeOff指向下一个可用的位置 func (p Page) freeOff() uint16 { return binary.LittleEndian.Uint16(p[2:4]) } // setFreeOff 设置空闲空间的起始偏移 // // 参数: // - off: 新的空闲空间偏移位置 // // 在插入记录后需要更新空闲空间指针 func (p Page) setFreeOff(off uint16) { binary.LittleEndian.PutUint16(p[2:4], off) } // nextPage 获取链接的下一个页面编号 // // 返回值: // - uint16: 下一个页面的编号,0表示没有下一页 // // 支持页面链接,用于存储超出单页容量的数据 func (p Page) nextPage() uint16 { return binary.LittleEndian.Uint16(p[4:6]) } // setNextPage 设置链接的下一个页面编号 // // 参数: // - n: 下一个页面的编号 // // 用于建立页面链接关系 func (p Page) setNextPage(n uint16) { binary.LittleEndian.PutUint16(p[4:6], n) } // slotArray 获取页面的槽位数组 // // 返回值: // - []uint16: 槽位数组,每个元素是记录在页面中的偏移量 // // 槽位数组存储每个记录在页面中的位置,支持快速定位记录 // 槽位值为0表示该槽位对应的记录已被删除 func (p Page) slotArray() []uint16 { n := p.numSlots() arr := make([]uint16, n) for i := uint16(0); i < n; i++ { arr[i] = binary.LittleEndian.Uint16(p[6+i*2:]) } return arr } // setSlot 设置指定槽位的记录偏移量 // // 参数: // - idx: 槽位索引 // - off: 记录在页面中的偏移量,0表示删除 // // 用于在插入记录时设置槽位指向记录位置,或在删除时标记槽位为空 func (p Page) setSlot(idx uint16, off uint16) { binary.LittleEndian.PutUint16(p[6+idx*2:], off) } // ==================== 页面I/O操作方法 ==================== // readPageDirect 直接从磁盘读取页面,绕过缓存 // // 参数: // - pageNo: 页面编号 // // 返回值: // - []byte: 页面数据 // - error: 读取错误 // // 用于需要绕过缓存直接访问磁盘的场景,如初始化或特殊操作 func (pdb *PipelineDB) readPageDirect(pageNo uint16) ([]byte, error) { buf := make([]byte, PageSize) _, err := pdb.file.ReadAt(buf, int64(pageNo)*PageSize) return buf, err } // readPage 读取页面,优先从缓存获取 // // 参数: // - pageNo: 页面编号 // // 返回值: // - Page: 页面对象 // - error: 读取错误 // // 执行流程: // 1. 首先尝试从页面缓存中获取 // 2. 缓存未命中时从磁盘读取 // 3. 将读取的页面放入缓存以提高后续访问性能 func (pdb *PipelineDB) readPage(pageNo uint16) (Page, error) { // 尝试从缓存获取 if p, found := pdb.cache.Get(pageNo); found { return Page(p), nil } // 从磁盘读取 p, err := pdb.readPageDirect(pageNo) if err != nil { return nil, err } // 放入缓存 pdb.cache.Put(pageNo, p, false) return Page(p), nil } // writePage 写入页面到缓存和磁盘 // // 参数: // - pageNo: 页面编号 // - p: 页面数据 // // 返回值: // - error: 写入错误 // // 执行流程: // 1. 将页面写入缓存,标记为脏页 // 2. 立即同步写入磁盘,确保数据持久性 func (pdb *PipelineDB) writePage(pageNo uint16, p Page) error { // 写入缓存 pdb.cache.Put(pageNo, []byte(p), true) // 立即写入磁盘 _, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize) return err } // writePageDirect 直接写入页面到磁盘,绕过缓存 // // 参数: // - pageNo: 页面编号 // - p: 页面数据 // // 返回值: // - error: 写入错误 // // 用于需要绕过缓存直接写入磁盘的场景 func (pdb *PipelineDB) writePageDirect(pageNo uint16, p []byte) error { _, err := pdb.file.WriteAt(p, int64(pageNo)*PageSize) return err } // ==================== 页面管理方法 ==================== // allocPage 分配一个新的页面 // // 返回值: // - uint16: 分配的页面编号 // - error: 分配错误 // // 执行流程: // 1. 首先尝试从空闲页面管理器中分配已释放的页面 // 2. 如果没有空闲页面,则分配新的页面 // 3. 初始化页面头部信息 // 4. 更新数据库头部的总页面数 // // 线程安全:使用互斥锁保护并发访问 func (pdb *PipelineDB) allocPage() (uint16, error) { pdb.mu.Lock() defer pdb.mu.Unlock() // 尝试从空闲页分配 if pageNo, found := pdb.freePageMgr.AllocPage(); found { // 初始化页面 p := make(Page, PageSize) p.setNumSlots(0) p.setFreeOff(PageSize) p.setNextPage(0) if err := pdb.writePage(pageNo, p); err != nil { return 0, err } return pageNo, nil } // 分配新页面 pageNo := pdb.header.TotalPages p := make(Page, PageSize) p.setNumSlots(0) p.setFreeOff(PageSize) p.setNextPage(0) if err := pdb.writePage(pageNo, p); err != nil { return 0, err } // 更新头部 pdb.header.TotalPages++ if err := pdb.saveHeader(); err != nil { return 0, err } return pageNo, nil } // freePage 释放页面,将其标记为可重用 // // 参数: // - pageNo: 要释放的页面编号 // // 返回值: // - error: 释放错误 // // 执行流程: // 1. 将页面添加到空闲页面管理器 // 2. 从缓存中移除该页面 // // 线程安全:使用互斥锁保护并发访问 func (pdb *PipelineDB) freePage(pageNo uint16) error { pdb.mu.Lock() defer pdb.mu.Unlock() pdb.freePageMgr.FreePage(pageNo) pdb.cache.Invalidate(pageNo) return nil } // ==================== 记录操作方法 ==================== // insertToChain 在页面链中插入记录 // // 参数: // - rootPage: 页面链的根页面编号 // - id: 记录ID // - data: 记录数据 // // 返回值: // - uint16: 插入的页面编号 // - uint16: 插入的槽位编号 // - error: 插入错误 // // 执行流程: // 1. 从根页面开始尝试插入记录 // 2. 如果当前页面空间不足,沿着页面链查找下一个页面 // 3. 如果到达链尾仍无空间,分配新页面并链接到链尾 // 4. 在找到的页面中插入记录 // // 支持动态扩展:当数据量超过单页容量时自动扩展页面链 func (pdb *PipelineDB) insertToChain(rootPage uint16, id int64, data []byte) (uint16, uint16, error) { pageNo := rootPage for { // 尝试插入当前页 slotNo, err := pdb.insertToPage(pageNo, id, data) if err == nil { return pageNo, slotNo, nil } if err.Error() != "page full" { return 0, 0, err } // 页满:检查是否有后继页 p, err := pdb.readPage(pageNo) if err != nil { return 0, 0, err } nextPage := p.nextPage() if nextPage != 0 { pageNo = nextPage continue } // 分配新页并链接 newPage, err := pdb.allocPage() if err != nil { return 0, 0, err } p.setNextPage(newPage) if err := pdb.writePage(pageNo, p); err != nil { return 0, 0, err } pageNo = newPage } } func (pdb *PipelineDB) insertToPage(pageNo uint16, id int64, data []byte) (uint16, error) { pdb.mu.Lock() defer pdb.mu.Unlock() p, err := pdb.readPage(pageNo) if err != nil { return 0, err } // 计算所需空间 recSize := 8 + 1 + len(data) if len(data) >= 128 { recSize++ } freeOff := p.freeOff() requiredSpace := uint16(recSize) slotSpace := 6 + 2*(p.numSlots()+1) if freeOff < requiredSpace || freeOff-requiredSpace < slotSpace { return 0, errors.New("page full") } newFree := freeOff - requiredSpace // 写入记录 off := newFree binary.LittleEndian.PutUint64(p[off:], uint64(id)) off += 8 n := binary.PutUvarint(p[off:], uint64(len(data))) off += uint16(n) copy(p[off:], data) // 更新槽数组 numSlots := p.numSlots() p.setSlot(numSlots, newFree) p.setNumSlots(numSlots + 1) p.setFreeOff(newFree) if err := pdb.writePage(pageNo, p); err != nil { return 0, err } return numSlots, nil } func (pdb *PipelineDB) readRecord(pageNo uint16, slotNo uint16, expectedID int64) ([]byte, error) { p, err := pdb.readPage(pageNo) if err != nil { return nil, err } slots := p.slotArray() if int(slotNo) >= len(slots) { return nil, errors.New("invalid slot index") } off := slots[slotNo] if off == 0 { return nil, errors.New("record deleted") } // 验证记录ID recID := int64(binary.LittleEndian.Uint64(p[off:])) if recID != expectedID { return nil, errors.New("index inconsistent") } // 读取payload pldOff := int(off) + 8 length, n := binary.Uvarint(p[pldOff:]) if n <= 0 { return nil, errors.New("bad varint") } pldOff += n if pldOff+int(length) > PageSize { return nil, errors.New("payload out of bounds") } return p[pldOff : pldOff+int(length)], nil } func (pdb *PipelineDB) updateInPlace(pageNo uint16, slotNo uint16, id int64, data []byte) error { p, err := pdb.readPage(pageNo) if err != nil { return err } slots := p.slotArray() if int(slotNo) >= len(slots) { return errors.New("invalid slot index") } off := slots[slotNo] if off == 0 { return errors.New("record deleted") } // 验证记录ID recID := int64(binary.LittleEndian.Uint64(p[off:])) if recID != id { return errors.New("index inconsistent") } // 检查原payload长度 pldOff := int(off) + 8 oldLength, n := binary.Uvarint(p[pldOff:]) if n <= 0 { return errors.New("bad varint in existing record") } // 如果长度相同,原地更新 if int(oldLength) == len(data) { pldStart := pldOff + n copy(p[pldStart:pldStart+len(data)], data) return pdb.writePage(pageNo, p) } return errors.New("cannot update in place") } func (pdb *PipelineDB) deleteRecord(pageNo uint16, slotNo uint16, id int64, idx *TableIndex) error { pdb.mu.Lock() defer pdb.mu.Unlock() p, err := pdb.readPage(pageNo) if err != nil { return err } numSlots := p.numSlots() if slotNo >= numSlots { return errors.New("slot index out of bounds") } // 标记槽位为删除 p.setSlot(slotNo, 0) // 如果是最后一个槽位,压缩槽数组 if slotNo == numSlots-1 { newSlots := numSlots slots := p.slotArray() for newSlots > 0 && slots[newSlots-1] == 0 { newSlots-- } p.setNumSlots(newSlots) } // 写回页面 if err := pdb.writePage(pageNo, p); err != nil { return err } // 从索引中删除(如果提供了索引) if idx != nil { idx.Delete(id) } return nil } // 高级数据库操作 func (pdb *PipelineDB) insert(group string, id int64, data []byte) error { if len(data) > MaxRecSize { return errors.New("record too large") } // 获取组索引 idx := pdb.indexMgr.GetOrCreateIndex(group) // 检查是否已存在 if _, _, exists := idx.Get(id); exists { return errors.New("record already exists") } // 行锁 mutex := pdb.getRowMutex(id) mutex.Lock() defer mutex.Unlock() // 插入到页链 pageNo, slotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data) if err != nil { return err } // 更新索引 idx.Insert(id, pageNo, slotNo) return nil } func (pdb *PipelineDB) get(group string, id int64) ([]byte, error) { idx, exists := pdb.indexMgr.GetIndex(group) if !exists { return nil, errors.New("group not found") } // 使用索引定位 pageNo, slotNo, found := idx.Get(id) if !found { return nil, errors.New("record not found") } // 读取记录 return pdb.readRecord(pageNo, slotNo, id) } func (pdb *PipelineDB) update(group string, id int64, data []byte) error { if len(data) > MaxRecSize { return errors.New("record too large") } idx, exists := pdb.indexMgr.GetIndex(group) if !exists { return errors.New("group not found") } pageNo, slotNo, found := idx.Get(id) if !found { return errors.New("record not found") } // 行锁 mutex := pdb.getRowMutex(id) mutex.Lock() defer mutex.Unlock() // 尝试原地更新 if err := pdb.updateInPlace(pageNo, slotNo, id, data); err == nil { return nil } // 删除后重新插入 if err := pdb.deleteRecord(pageNo, slotNo, id, idx); err != nil { return err } newPageNo, newSlotNo, err := pdb.insertToChain(pdb.header.RootPage, id, data) if err != nil { return err } idx.Insert(id, newPageNo, newSlotNo) return nil } func (pdb *PipelineDB) rangeQuery(group string, startID, endID int64, visitor func(id int64, data []byte) error) error { idx, exists := pdb.indexMgr.GetIndex(group) if !exists { return errors.New("group not found") } idx.Range(startID, endID, func(id int64, pageNo, slotNo uint16) bool { data, err := pdb.readRecord(pageNo, slotNo, id) if err != nil { return false } if err := visitor(id, data); err != nil { return false } return true }) return nil }