优化:实现查询结果的惰性迭代器
- 重构 QueryBuilder 使用惰性迭代模式 - 添加 Iterator 接口支持流式处理 - 减少内存占用,支持大数据集查询 - 添加完整的惰性迭代器测试用例
This commit is contained in:
105
query.go
105
query.go
@@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"slices"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -488,106 +487,34 @@ func (qb *QueryBuilder) Rows() (*Rows, error) {
|
||||
// 检查是否有可以使用索引的 Eq 条件
|
||||
indexField, indexValue := qb.findIndexableCondition()
|
||||
if indexField != "" {
|
||||
// 使用索引查询
|
||||
// 使用索引查询(索引查询需要立即加载,因为需要从索引获取 seq 列表)
|
||||
return qb.rowsWithIndex(rows, indexField, indexValue)
|
||||
}
|
||||
|
||||
// 收集所有数据源的 keys 并全局排序
|
||||
// 立即读取数据避免 compaction 期间文件被删除
|
||||
keyToRow := make(map[int64]*SSTableRow) // 存储已读取的行数据
|
||||
var allKeys []int64
|
||||
|
||||
// 1. 从 Active MemTable 读取数据
|
||||
// 惰性加载:只初始化迭代器,不读取数据
|
||||
// 1. 初始化 Active MemTable 迭代器
|
||||
activeMemTable := qb.table.memtableManager.GetActive()
|
||||
if activeMemTable != nil {
|
||||
activeKeys := activeMemTable.Keys()
|
||||
for _, key := range activeKeys {
|
||||
if data, ok := activeMemTable.Get(key); ok {
|
||||
var row SSTableRow
|
||||
if err := json.Unmarshal(data, &row); err == nil {
|
||||
keyToRow[key] = &row
|
||||
allKeys = append(allKeys, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
rows.memIterator = newMemtableIterator(activeMemTable.Keys())
|
||||
}
|
||||
|
||||
// 2. 从所有 Immutable MemTables 读取数据
|
||||
immutables := qb.table.memtableManager.GetImmutables()
|
||||
for _, imm := range immutables {
|
||||
immKeys := imm.MemTable.Keys()
|
||||
for _, key := range immKeys {
|
||||
// 如果 key 已存在(来自更新的数据源),跳过
|
||||
if _, exists := keyToRow[key]; exists {
|
||||
continue
|
||||
}
|
||||
// 2. 初始化 Immutable MemTables(稍后在 Next() 中迭代)
|
||||
rows.immutableIndex = 0
|
||||
rows.immutableIterator = nil
|
||||
|
||||
if data, ok := imm.MemTable.Get(key); ok {
|
||||
var row SSTableRow
|
||||
if err := json.Unmarshal(data, &row); err == nil {
|
||||
keyToRow[key] = &row
|
||||
allKeys = append(allKeys, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 收集所有 SST 文件的 keys
|
||||
// 3. 初始化 SST 文件迭代器
|
||||
sstReaders := qb.table.sstManager.GetReaders()
|
||||
|
||||
for _, reader := range sstReaders {
|
||||
// 获取文件中实际存在的 key 列表(已在 GetAllKeys 中排序)
|
||||
keys := reader.GetAllKeys()
|
||||
|
||||
// 记录所有 keys(实际数据稍后统一从 table 读取)
|
||||
for _, key := range keys {
|
||||
// 如果 key 已存在(来自更新的数据源),跳过
|
||||
if _, exists := keyToRow[key]; !exists {
|
||||
allKeys = append(allKeys, key)
|
||||
keyToRow[key] = nil // 占位,表示需要读取
|
||||
}
|
||||
rows.sstReaders = make([]*sstReader, len(sstReaders))
|
||||
for i, reader := range sstReaders {
|
||||
rows.sstReaders[i] = &sstReader{
|
||||
keys: reader.GetAllKeys(),
|
||||
index: 0,
|
||||
}
|
||||
}
|
||||
rows.sstIndex = 0
|
||||
|
||||
// 4. 对所有 keys 排序
|
||||
if len(allKeys) > 0 {
|
||||
// 去重(使用 map 已经去重了,但 allKeys 可能有重复)
|
||||
keySet := make(map[int64]bool)
|
||||
uniqueKeys := make([]int64, 0, len(allKeys))
|
||||
for _, key := range allKeys {
|
||||
if !keySet[key] {
|
||||
keySet[key] = true
|
||||
uniqueKeys = append(uniqueKeys, key)
|
||||
}
|
||||
}
|
||||
|
||||
// 排序
|
||||
slices.Sort(uniqueKeys)
|
||||
|
||||
// 统一从 table 读取所有数据(避免 compaction 导致的文件删除)
|
||||
rows.cachedRows = make([]*SSTableRow, 0, len(uniqueKeys))
|
||||
for _, seq := range uniqueKeys {
|
||||
// 如果已经从 MemTable 读取,直接使用
|
||||
row := keyToRow[seq]
|
||||
if row == nil {
|
||||
// 从 table 读取(会搜索 MemTable + 所有 SST,包括 compaction 后的新文件)
|
||||
var err error
|
||||
row, err = qb.table.Get(seq)
|
||||
if err != nil {
|
||||
// 数据不存在(理论上不应该发生,因为 key 来自索引)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if qb.Match(row.Data) {
|
||||
rows.cachedRows = append(rows.cachedRows, row)
|
||||
}
|
||||
}
|
||||
|
||||
// 使用缓存模式
|
||||
rows.cached = true
|
||||
rows.cachedIndex = -1
|
||||
}
|
||||
// 不设置 cached,让 Next() 使用惰性加载
|
||||
rows.cached = false
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user