文档:更新 DESIGN.md,使用英文注释和调整项目结构说明
This commit is contained in:
241
index.go
241
index.go
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@@ -22,16 +23,16 @@ type IndexMetadata struct {
|
||||
|
||||
// SecondaryIndex 二级索引
|
||||
type SecondaryIndex struct {
|
||||
name string // 索引名称
|
||||
field string // 字段名
|
||||
fieldType FieldType // 字段类型
|
||||
file *os.File // 索引文件
|
||||
builder *BTreeBuilder // B+Tree 构建器
|
||||
reader *BTreeReader // B+Tree 读取器
|
||||
valueToSeq map[string][]int64 // 值 → seq 列表 (构建时使用)
|
||||
metadata IndexMetadata // 元数据
|
||||
mu sync.RWMutex
|
||||
ready bool // 索引是否就绪
|
||||
name string // 索引名称
|
||||
field string // 字段名
|
||||
fieldType FieldType // 字段类型
|
||||
file *os.File // 索引文件
|
||||
btreeReader *IndexBTreeReader // B+Tree 读取器
|
||||
valueToSeq map[string][]int64 // 值 → seq 列表 (构建时使用)
|
||||
metadata IndexMetadata // 元数据
|
||||
mu sync.RWMutex
|
||||
ready bool // 索引是否就绪
|
||||
useBTree bool // 是否使用 B+Tree 存储(新格式)
|
||||
}
|
||||
|
||||
// NewSecondaryIndex 创建二级索引
|
||||
@@ -52,7 +53,7 @@ func NewSecondaryIndex(dir, field string, fieldType FieldType) (*SecondaryIndex,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Add 添加索引条目
|
||||
// Add 添加索引条目(增量更新元数据)
|
||||
func (idx *SecondaryIndex) Add(value any, seq int64) error {
|
||||
idx.mu.Lock()
|
||||
defer idx.mu.Unlock()
|
||||
@@ -61,97 +62,59 @@ func (idx *SecondaryIndex) Add(value any, seq int64) error {
|
||||
key := fmt.Sprintf("%v", value)
|
||||
idx.valueToSeq[key] = append(idx.valueToSeq[key], seq)
|
||||
|
||||
// 增量更新元数据 O(1)
|
||||
if idx.metadata.MinSeq == 0 || seq < idx.metadata.MinSeq {
|
||||
idx.metadata.MinSeq = seq
|
||||
}
|
||||
if seq > idx.metadata.MaxSeq {
|
||||
idx.metadata.MaxSeq = seq
|
||||
}
|
||||
idx.metadata.RowCount++
|
||||
idx.metadata.UpdatedAt = time.Now().UnixNano()
|
||||
|
||||
// 首次添加时设置 CreatedAt
|
||||
if idx.metadata.CreatedAt == 0 {
|
||||
idx.metadata.CreatedAt = time.Now().UnixNano()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build 构建索引并持久化
|
||||
// Build 构建索引并持久化(B+Tree 格式)
|
||||
func (idx *SecondaryIndex) Build() error {
|
||||
idx.mu.Lock()
|
||||
defer idx.mu.Unlock()
|
||||
|
||||
// 持久化索引数据到 JSON 文件
|
||||
return idx.save()
|
||||
}
|
||||
|
||||
// save 保存索引到磁盘
|
||||
func (idx *SecondaryIndex) save() error {
|
||||
// 更新元数据
|
||||
idx.updateMetadata()
|
||||
|
||||
// 创建包含元数据的数据结构
|
||||
indexData := struct {
|
||||
Metadata IndexMetadata `json:"metadata"`
|
||||
ValueToSeq map[string][]int64 `json:"data"`
|
||||
}{
|
||||
Metadata: idx.metadata,
|
||||
ValueToSeq: idx.valueToSeq,
|
||||
}
|
||||
|
||||
// 序列化索引数据
|
||||
data, err := json.Marshal(indexData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 元数据已在 Add 时增量更新,这里只更新版本号
|
||||
idx.metadata.Version++
|
||||
idx.metadata.UpdatedAt = time.Now().UnixNano()
|
||||
|
||||
// Truncate 文件
|
||||
err = idx.file.Truncate(0)
|
||||
err := idx.file.Truncate(0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 写入文件
|
||||
_, err = idx.file.Seek(0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
// 使用 B+Tree 写入器
|
||||
writer := NewIndexBTreeWriter(idx.file, idx.metadata)
|
||||
|
||||
// 添加所有条目
|
||||
for value, seqs := range idx.valueToSeq {
|
||||
writer.Add(value, seqs)
|
||||
}
|
||||
|
||||
_, err = idx.file.Write(data)
|
||||
// 构建并写入
|
||||
err = writer.Build()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync 到磁盘
|
||||
err = idx.file.Sync()
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to build btree index: %w", err)
|
||||
}
|
||||
|
||||
idx.useBTree = true
|
||||
idx.ready = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateMetadata 更新元数据
|
||||
func (idx *SecondaryIndex) updateMetadata() {
|
||||
now := time.Now().UnixNano()
|
||||
|
||||
if idx.metadata.CreatedAt == 0 {
|
||||
idx.metadata.CreatedAt = now
|
||||
}
|
||||
idx.metadata.UpdatedAt = now
|
||||
idx.metadata.Version++
|
||||
|
||||
// 计算 MinSeq, MaxSeq, RowCount
|
||||
var minSeq, maxSeq int64 = -1, -1
|
||||
rowCount := int64(0)
|
||||
|
||||
for _, seqs := range idx.valueToSeq {
|
||||
for _, seq := range seqs {
|
||||
if minSeq == -1 || seq < minSeq {
|
||||
minSeq = seq
|
||||
}
|
||||
if maxSeq == -1 || seq > maxSeq {
|
||||
maxSeq = seq
|
||||
}
|
||||
rowCount++
|
||||
}
|
||||
}
|
||||
|
||||
idx.metadata.MinSeq = minSeq
|
||||
idx.metadata.MaxSeq = maxSeq
|
||||
idx.metadata.RowCount = rowCount
|
||||
}
|
||||
|
||||
// load 从磁盘加载索引
|
||||
// load 从磁盘加载索引(支持 B+Tree 和 JSON 格式)
|
||||
func (idx *SecondaryIndex) load() error {
|
||||
// 获取文件大小
|
||||
stat, err := idx.file.Stat()
|
||||
@@ -164,6 +127,47 @@ func (idx *SecondaryIndex) load() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取文件头,判断格式
|
||||
headerData := make([]byte, min(int(stat.Size()), IndexHeaderSize))
|
||||
_, err = idx.file.ReadAt(headerData, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 检查是否为 B+Tree 格式
|
||||
if len(headerData) >= 4 {
|
||||
magic := binary.LittleEndian.Uint32(headerData[0:4])
|
||||
if magic == IndexMagic {
|
||||
// B+Tree 格式
|
||||
return idx.loadBTree()
|
||||
}
|
||||
}
|
||||
|
||||
// 回退到 JSON 格式(向后兼容)
|
||||
return idx.loadJSON()
|
||||
}
|
||||
|
||||
// loadBTree 加载 B+Tree 格式的索引
|
||||
func (idx *SecondaryIndex) loadBTree() error {
|
||||
reader, err := NewIndexBTreeReader(idx.file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create btree reader: %w", err)
|
||||
}
|
||||
|
||||
idx.btreeReader = reader
|
||||
idx.metadata = reader.GetMetadata()
|
||||
idx.useBTree = true
|
||||
idx.ready = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadJSON 加载 JSON 格式的索引(向后兼容)
|
||||
func (idx *SecondaryIndex) loadJSON() error {
|
||||
stat, err := idx.file.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 读取文件内容
|
||||
data := make([]byte, stat.Size())
|
||||
_, err = idx.file.ReadAt(data, 0)
|
||||
@@ -171,27 +175,24 @@ func (idx *SecondaryIndex) load() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 尝试加载新格式(带元数据)
|
||||
// 加载 JSON 格式
|
||||
var indexData struct {
|
||||
Metadata IndexMetadata `json:"metadata"`
|
||||
ValueToSeq map[string][]int64 `json:"data"`
|
||||
}
|
||||
|
||||
err = json.Unmarshal(data, &indexData)
|
||||
if err == nil && indexData.ValueToSeq != nil {
|
||||
// 新格式
|
||||
idx.metadata = indexData.Metadata
|
||||
idx.valueToSeq = indexData.ValueToSeq
|
||||
} else {
|
||||
// 旧格式(兼容性)
|
||||
err = json.Unmarshal(data, &idx.valueToSeq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 初始化元数据
|
||||
idx.updateMetadata()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal index data: %w", err)
|
||||
}
|
||||
|
||||
if indexData.ValueToSeq == nil {
|
||||
return fmt.Errorf("invalid index data: missing data field")
|
||||
}
|
||||
|
||||
idx.metadata = indexData.Metadata
|
||||
idx.valueToSeq = indexData.ValueToSeq
|
||||
idx.useBTree = false
|
||||
idx.ready = true
|
||||
return nil
|
||||
}
|
||||
@@ -206,6 +207,13 @@ func (idx *SecondaryIndex) Get(value any) ([]int64, error) {
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%v", value)
|
||||
|
||||
// 如果使用 B+Tree,从 B+Tree 读取
|
||||
if idx.useBTree && idx.btreeReader != nil {
|
||||
return idx.btreeReader.Get(key)
|
||||
}
|
||||
|
||||
// 否则从内存 map 读取
|
||||
seqs, exists := idx.valueToSeq[key]
|
||||
if !exists {
|
||||
return nil, nil
|
||||
@@ -238,8 +246,8 @@ func (idx *SecondaryIndex) NeedsUpdate(currentMaxSeq int64) bool {
|
||||
// IncrementalUpdate 增量更新索引
|
||||
func (idx *SecondaryIndex) IncrementalUpdate(getData func(int64) (map[string]any, error), fromSeq, toSeq int64) error {
|
||||
idx.mu.Lock()
|
||||
defer idx.mu.Unlock()
|
||||
|
||||
addedCount := int64(0)
|
||||
// 遍历缺失的 seq 范围
|
||||
for seq := fromSeq; seq <= toSeq; seq++ {
|
||||
// 获取数据
|
||||
@@ -257,39 +265,40 @@ func (idx *SecondaryIndex) IncrementalUpdate(getData func(int64) (map[string]any
|
||||
// 添加到索引
|
||||
key := fmt.Sprintf("%v", value)
|
||||
idx.valueToSeq[key] = append(idx.valueToSeq[key], seq)
|
||||
|
||||
// 更新元数据
|
||||
if idx.metadata.MinSeq == 0 || seq < idx.metadata.MinSeq {
|
||||
idx.metadata.MinSeq = seq
|
||||
}
|
||||
if seq > idx.metadata.MaxSeq {
|
||||
idx.metadata.MaxSeq = seq
|
||||
}
|
||||
addedCount++
|
||||
}
|
||||
|
||||
idx.metadata.RowCount += addedCount
|
||||
idx.metadata.UpdatedAt = time.Now().UnixNano()
|
||||
|
||||
// 释放锁,然后调用 Build(Build 会重新获取锁)
|
||||
idx.mu.Unlock()
|
||||
|
||||
// 保存更新后的索引
|
||||
return idx.save()
|
||||
return idx.Build()
|
||||
}
|
||||
|
||||
// Close 关闭索引
|
||||
func (idx *SecondaryIndex) Close() error {
|
||||
// 关闭 B+Tree reader
|
||||
if idx.btreeReader != nil {
|
||||
idx.btreeReader.Close()
|
||||
}
|
||||
// 关闭文件
|
||||
if idx.file != nil {
|
||||
return idx.file.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// encodeSeqList 编码 seq 列表
|
||||
func encodeSeqList(seqs []int64) []byte {
|
||||
buf := make([]byte, 8*len(seqs))
|
||||
for i, seq := range seqs {
|
||||
binary.LittleEndian.PutUint64(buf[i*8:], uint64(seq))
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// decodeSeqList 解码 seq 列表
|
||||
func decodeSeqList(data []byte) []int64 {
|
||||
count := len(data) / 8
|
||||
seqs := make([]int64, count)
|
||||
for i := range count {
|
||||
seqs[i] = int64(binary.LittleEndian.Uint64(data[i*8:]))
|
||||
}
|
||||
return seqs
|
||||
}
|
||||
|
||||
// IndexManager 索引管理器
|
||||
type IndexManager struct {
|
||||
dir string
|
||||
@@ -478,9 +487,7 @@ func (m *IndexManager) ListIndexes() []string {
|
||||
func (m *IndexManager) VerifyAndRepair(currentMaxSeq int64, getData func(int64) (map[string]any, error)) error {
|
||||
m.mu.RLock()
|
||||
indexes := make(map[string]*SecondaryIndex)
|
||||
for k, v := range m.indexes {
|
||||
indexes[k] = v
|
||||
}
|
||||
maps.Copy(indexes, m.indexes)
|
||||
m.mu.RUnlock()
|
||||
|
||||
for field, idx := range indexes {
|
||||
|
||||
Reference in New Issue
Block a user