Files
srdb/index.go

612 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package srdb
import (
"encoding/binary"
"encoding/json"
"fmt"
"maps"
"os"
"path/filepath"
"sync"
"time"
)
// IndexMetadata 索引元数据
type IndexMetadata struct {
Version int64 // 索引版本号
MaxSeq int64 // 索引包含的最大 seq
MinSeq int64 // 索引包含的最小 seq
RowCount int64 // 索引包含的行数
CreatedAt int64 // 创建时间
UpdatedAt int64 // 更新时间
}
// SecondaryIndex 二级索引
type SecondaryIndex struct {
name string // 索引名称
field string // 字段名
fieldType FieldType // 字段类型
file *os.File // 索引文件
btreeReader *IndexBTreeReader // B+Tree 读取器
valueToSeq map[string][]int64 // 值 → seq 列表 (构建时使用)
metadata IndexMetadata // 元数据
mu sync.RWMutex
ready bool // 索引是否就绪
useBTree bool // 是否使用 B+Tree 存储(新格式)
}
// NewSecondaryIndex 创建二级索引
func NewSecondaryIndex(dir, field string, fieldType FieldType) (*SecondaryIndex, error) {
indexPath := filepath.Join(dir, fmt.Sprintf("idx_%s.sst", field))
file, err := os.OpenFile(indexPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return &SecondaryIndex{
name: field,
field: field,
fieldType: fieldType,
file: file,
valueToSeq: make(map[string][]int64),
ready: false,
}, nil
}
// Add 添加索引条目(增量更新元数据)
func (idx *SecondaryIndex) Add(value any, seq int64) error {
idx.mu.Lock()
defer idx.mu.Unlock()
// 将值转换为字符串作为 key
key := fmt.Sprintf("%v", value)
idx.valueToSeq[key] = append(idx.valueToSeq[key], seq)
// 增量更新元数据 O(1)
if idx.metadata.MinSeq == 0 || seq < idx.metadata.MinSeq {
idx.metadata.MinSeq = seq
}
if seq > idx.metadata.MaxSeq {
idx.metadata.MaxSeq = seq
}
idx.metadata.RowCount++
idx.metadata.UpdatedAt = time.Now().UnixNano()
// 首次添加时设置 CreatedAt
if idx.metadata.CreatedAt == 0 {
idx.metadata.CreatedAt = time.Now().UnixNano()
}
return nil
}
// Build 构建索引并持久化B+Tree 格式)
func (idx *SecondaryIndex) Build() error {
idx.mu.Lock()
defer idx.mu.Unlock()
// 元数据已在 Add 时增量更新,这里只更新版本号
idx.metadata.Version++
idx.metadata.UpdatedAt = time.Now().UnixNano()
// Truncate 文件
err := idx.file.Truncate(0)
if err != nil {
return err
}
// 使用 B+Tree 写入器
writer := NewIndexBTreeWriter(idx.file, idx.metadata)
// 写入内存中的所有条目
// 注意:这假设 valueToSeq 包含所有数据(包括从磁盘加载的)
// 对于增量更新场景Get() 会合并内存和磁盘的结果
for value, seqs := range idx.valueToSeq {
writer.Add(value, seqs)
}
// 构建并写入
err = writer.Build()
if err != nil {
return fmt.Errorf("failed to build btree index: %w", err)
}
// 关闭旧的 btreeReader
if idx.btreeReader != nil {
idx.btreeReader.Close()
}
// 重新加载 btreeReader读取刚写入的数据
reader, err := NewIndexBTreeReader(idx.file)
if err != nil {
return fmt.Errorf("failed to reload btree reader: %w", err)
}
idx.btreeReader = reader
idx.useBTree = true
idx.ready = true
// 不清空 valueToSeq保留所有数据在内存中
// 这样下次 Build() 时可以写入完整数据
// Get() 方法会合并内存和磁盘的结果(去重)
return nil
}
// load 从磁盘加载索引(支持 B+Tree 和 JSON 格式)
func (idx *SecondaryIndex) load() error {
// 获取文件大小
stat, err := idx.file.Stat()
if err != nil {
return err
}
if stat.Size() == 0 {
// 空文件,索引不存在
return nil
}
// 读取文件头,判断格式
headerData := make([]byte, min(int(stat.Size()), IndexHeaderSize))
_, err = idx.file.ReadAt(headerData, 0)
if err != nil {
return err
}
// 检查是否为 B+Tree 格式
if len(headerData) >= 4 {
magic := binary.LittleEndian.Uint32(headerData[0:4])
if magic == IndexMagic {
// B+Tree 格式
return idx.loadBTree()
}
}
// 回退到 JSON 格式(向后兼容)
return idx.loadJSON()
}
// loadBTree 加载 B+Tree 格式的索引
func (idx *SecondaryIndex) loadBTree() error {
reader, err := NewIndexBTreeReader(idx.file)
if err != nil {
return fmt.Errorf("failed to create btree reader: %w", err)
}
idx.btreeReader = reader
idx.metadata = reader.GetMetadata()
idx.useBTree = true
idx.ready = true
return nil
}
// loadJSON 加载 JSON 格式的索引(向后兼容)
func (idx *SecondaryIndex) loadJSON() error {
stat, err := idx.file.Stat()
if err != nil {
return err
}
// 读取文件内容
data := make([]byte, stat.Size())
_, err = idx.file.ReadAt(data, 0)
if err != nil {
return err
}
// 加载 JSON 格式
var indexData struct {
Metadata IndexMetadata `json:"metadata"`
ValueToSeq map[string][]int64 `json:"data"`
}
err = json.Unmarshal(data, &indexData)
if err != nil {
return fmt.Errorf("failed to unmarshal index data: %w", err)
}
if indexData.ValueToSeq == nil {
return fmt.Errorf("invalid index data: missing data field")
}
idx.metadata = indexData.Metadata
idx.valueToSeq = indexData.ValueToSeq
idx.useBTree = false
idx.ready = true
return nil
}
// Get 查询索引(优先查内存,然后查磁盘,合并结果)
func (idx *SecondaryIndex) Get(value any) ([]int64, error) {
idx.mu.RLock()
defer idx.mu.RUnlock()
if !idx.ready {
return nil, fmt.Errorf("index not ready")
}
key := fmt.Sprintf("%v", value)
// 收集所有匹配的 seqs需要去重
seqMap := make(map[int64]bool)
// 1. 先从内存 map 读取(包含最新的未持久化数据)
if memSeqs, exists := idx.valueToSeq[key]; exists {
for _, seq := range memSeqs {
seqMap[seq] = true
}
}
// 2. 如果使用 B+Tree从 B+Tree 读取(持久化的数据)
if idx.useBTree && idx.btreeReader != nil {
diskSeqs, err := idx.btreeReader.Get(key)
if err == nil && diskSeqs != nil {
for _, seq := range diskSeqs {
seqMap[seq] = true
}
}
}
// 3. 合并结果
if len(seqMap) == 0 {
return nil, nil
}
result := make([]int64, 0, len(seqMap))
for seq := range seqMap {
result = append(result, seq)
}
return result, nil
}
// IsReady 索引是否就绪
func (idx *SecondaryIndex) IsReady() bool {
idx.mu.RLock()
defer idx.mu.RUnlock()
return idx.ready
}
// GetMetadata 获取元数据
func (idx *SecondaryIndex) GetMetadata() IndexMetadata {
idx.mu.RLock()
defer idx.mu.RUnlock()
return idx.metadata
}
// ForEach 升序迭代所有索引条目
// callback 返回 false 时停止迭代,支持提前终止
// 注意只能迭代已持久化的数据B+Tree不包括内存中未持久化的数据
func (idx *SecondaryIndex) ForEach(callback IndexEntryCallback) error {
idx.mu.RLock()
defer idx.mu.RUnlock()
if !idx.ready {
return fmt.Errorf("index not ready")
}
// 只支持 B+Tree 格式的索引
if !idx.useBTree || idx.btreeReader == nil {
return fmt.Errorf("ForEach only supports B+Tree format indexes")
}
idx.btreeReader.ForEach(callback)
return nil
}
// ForEachDesc 降序迭代所有索引条目
// callback 返回 false 时停止迭代,支持提前终止
// 注意只能迭代已持久化的数据B+Tree不包括内存中未持久化的数据
func (idx *SecondaryIndex) ForEachDesc(callback IndexEntryCallback) error {
idx.mu.RLock()
defer idx.mu.RUnlock()
if !idx.ready {
return fmt.Errorf("index not ready")
}
// 只支持 B+Tree 格式的索引
if !idx.useBTree || idx.btreeReader == nil {
return fmt.Errorf("ForEachDesc only supports B+Tree format indexes")
}
idx.btreeReader.ForEachDesc(callback)
return nil
}
// NeedsUpdate 检查是否需要更新
func (idx *SecondaryIndex) NeedsUpdate(currentMaxSeq int64) bool {
idx.mu.RLock()
defer idx.mu.RUnlock()
return idx.metadata.MaxSeq < currentMaxSeq
}
// IncrementalUpdate 增量更新索引
func (idx *SecondaryIndex) IncrementalUpdate(getData func(int64) (map[string]any, error), fromSeq, toSeq int64) error {
idx.mu.Lock()
addedCount := int64(0)
// 遍历缺失的 seq 范围
for seq := fromSeq; seq <= toSeq; seq++ {
// 获取数据
data, err := getData(seq)
if err != nil {
continue // 跳过错误的数据
}
// 提取字段值
value, exists := data[idx.field]
if !exists {
continue
}
// 添加到索引
key := fmt.Sprintf("%v", value)
idx.valueToSeq[key] = append(idx.valueToSeq[key], seq)
// 更新元数据
if idx.metadata.MinSeq == 0 || seq < idx.metadata.MinSeq {
idx.metadata.MinSeq = seq
}
if seq > idx.metadata.MaxSeq {
idx.metadata.MaxSeq = seq
}
addedCount++
}
idx.metadata.RowCount += addedCount
idx.metadata.UpdatedAt = time.Now().UnixNano()
// 释放锁,然后调用 BuildBuild 会重新获取锁)
idx.mu.Unlock()
// 保存更新后的索引
return idx.Build()
}
// Close 关闭索引
func (idx *SecondaryIndex) Close() error {
// 关闭 B+Tree reader
if idx.btreeReader != nil {
idx.btreeReader.Close()
}
// 关闭文件
if idx.file != nil {
return idx.file.Close()
}
return nil
}
// IndexManager 索引管理器
type IndexManager struct {
dir string
schema *Schema
indexes map[string]*SecondaryIndex // field → index
mu sync.RWMutex
}
// NewIndexManager 创建索引管理器
func NewIndexManager(dir string, schema *Schema) *IndexManager {
mgr := &IndexManager{
dir: dir,
schema: schema,
indexes: make(map[string]*SecondaryIndex),
}
// 自动加载已存在的索引
mgr.loadExistingIndexes()
return mgr
}
// loadExistingIndexes 加载已存在的索引文件
func (m *IndexManager) loadExistingIndexes() error {
// 确保目录存在
if _, err := os.Stat(m.dir); os.IsNotExist(err) {
return nil // 目录不存在,跳过
}
// 查找所有索引文件
pattern := filepath.Join(m.dir, "idx_*.sst")
files, err := filepath.Glob(pattern)
if err != nil {
return nil // 忽略错误,继续
}
for _, filePath := range files {
// 从文件名提取字段名
// idx_name.sst -> name
filename := filepath.Base(filePath)
if len(filename) < 8 { // "idx_" (4) + ".sst" (4)
continue
}
field := filename[4 : len(filename)-4] // 去掉 "idx_" 和 ".sst"
// 检查字段是否在 Schema 中
fieldDef, err := m.schema.GetField(field)
if err != nil {
continue // 跳过不在 Schema 中的索引
}
// 打开索引文件
file, err := os.OpenFile(filePath, os.O_RDWR, 0644)
if err != nil {
continue
}
// 创建索引对象
idx := &SecondaryIndex{
name: field,
field: field,
fieldType: fieldDef.Type,
file: file,
valueToSeq: make(map[string][]int64),
ready: false,
}
// 加载索引数据
err = idx.load()
if err != nil {
file.Close()
continue
}
m.indexes[field] = idx
}
return nil
}
// CreateIndex 创建索引
func (m *IndexManager) CreateIndex(field string) error {
m.mu.Lock()
defer m.mu.Unlock()
// 检查字段是否存在
fieldDef, err := m.schema.GetField(field)
if err != nil {
return err
}
// 检查是否已存在
if _, exists := m.indexes[field]; exists {
return fmt.Errorf("index on field %s already exists", field)
}
// 创建索引
idx, err := NewSecondaryIndex(m.dir, field, fieldDef.Type)
if err != nil {
return err
}
m.indexes[field] = idx
return nil
}
// DropIndex 删除索引
func (m *IndexManager) DropIndex(field string) error {
m.mu.Lock()
defer m.mu.Unlock()
idx, exists := m.indexes[field]
if !exists {
return fmt.Errorf("index on field %s does not exist", field)
}
// 获取文件路径
indexPath := filepath.Join(m.dir, fmt.Sprintf("idx_%s.sst", field))
// 关闭索引
idx.Close()
// 删除索引文件
os.Remove(indexPath)
// 从内存中删除
delete(m.indexes, field)
return nil
}
// GetIndex 获取索引
func (m *IndexManager) GetIndex(field string) (*SecondaryIndex, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
idx, exists := m.indexes[field]
return idx, exists
}
// AddToIndexes 添加到所有索引
func (m *IndexManager) AddToIndexes(data map[string]any, seq int64) error {
m.mu.RLock()
defer m.mu.RUnlock()
for field, idx := range m.indexes {
if value, exists := data[field]; exists {
err := idx.Add(value, seq)
if err != nil {
return err
}
}
}
return nil
}
// BuildAll 构建所有索引
func (m *IndexManager) BuildAll() error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, idx := range m.indexes {
err := idx.Build()
if err != nil {
return err
}
}
return nil
}
// ListIndexes 列出所有索引
func (m *IndexManager) ListIndexes() []string {
m.mu.RLock()
defer m.mu.RUnlock()
fields := make([]string, 0, len(m.indexes))
for field := range m.indexes {
fields = append(fields, field)
}
return fields
}
// VerifyAndRepair 验证并修复所有索引
func (m *IndexManager) VerifyAndRepair(currentMaxSeq int64, getData func(int64) (map[string]any, error)) error {
m.mu.RLock()
indexes := make(map[string]*SecondaryIndex)
maps.Copy(indexes, m.indexes)
m.mu.RUnlock()
for field, idx := range indexes {
// 检查是否需要更新
if idx.NeedsUpdate(currentMaxSeq) {
metadata := idx.GetMetadata()
fromSeq := metadata.MaxSeq + 1
toSeq := currentMaxSeq
// 增量更新
err := idx.IncrementalUpdate(getData, fromSeq, toSeq)
if err != nil {
return fmt.Errorf("failed to update index %s: %v", field, err)
}
}
}
return nil
}
// GetIndexMetadata 获取所有索引的元数据
func (m *IndexManager) GetIndexMetadata() map[string]IndexMetadata {
m.mu.RLock()
defer m.mu.RUnlock()
metadata := make(map[string]IndexMetadata)
for field, idx := range m.indexes {
metadata[field] = idx.GetMetadata()
}
return metadata
}
// Close 关闭所有索引
func (m *IndexManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
for _, idx := range m.indexes {
idx.Close()
}
return nil
}