diff --git a/query.go b/query.go index 3e8a259..1af7840 100644 --- a/query.go +++ b/query.go @@ -718,6 +718,15 @@ func (m *memtableIterator) next() (int64, bool) { return m.keys[m.index], true } +// peek 查看下一个 seq,但不推进指针 +func (m *memtableIterator) peek() int64 { + nextIndex := m.index + 1 + if nextIndex >= len(m.keys) { + return -1 + } + return m.keys[nextIndex] +} + // sstReader 包装 SST Reader 的迭代状态 type sstReader struct { keys []int64 // 文件中实际存在的 key 列表(已排序) @@ -743,79 +752,99 @@ func (r *Rows) Next() bool { } // next 从数据源读取下一条匹配的记录(惰性加载的核心逻辑) +// 使用归并排序,从所有数据源中选择最小的 seq func (r *Rows) next() bool { for { - // 1. 尝试从 Active MemTable 获取 - if r.memIterator != nil { - if seq, ok := r.memIterator.next(); ok { - if !r.visited[seq] { - row, err := r.table.Get(seq) - if err == nil && r.qb.Match(row.Data) { - r.visited[seq] = true - r.currentRow = &Row{schema: r.schema, fields: r.fields, inner: row} - return true - } - r.visited[seq] = true - } - continue - } - // Active MemTable 迭代完成 - r.memIterator = nil - } - - // 2. 尝试从 Immutable MemTables 获取 - if r.immutableIterator != nil { - if seq, ok := r.immutableIterator.next(); ok { - if !r.visited[seq] { - row, err := r.table.Get(seq) - if err == nil && r.qb.Match(row.Data) { - r.visited[seq] = true - r.currentRow = &Row{schema: r.schema, fields: r.fields, inner: row} - return true - } - r.visited[seq] = true - } - continue - } - // 当前 Immutable 迭代完成,移到下一个 - r.immutableIterator = nil - r.immutableIndex++ - } - - // 检查是否有更多 Immutable MemTables + // 初始化 Immutable 迭代器(如果需要) if r.immutableIterator == nil && r.immutableIndex < len(r.table.memtableManager.GetImmutables()) { immutables := r.table.memtableManager.GetImmutables() if r.immutableIndex < len(immutables) { r.immutableIterator = newMemtableIterator(immutables[r.immutableIndex].MemTable.Keys()) - continue } } - // 3. 尝试从 SST 文件获取 - if r.sstIndex < len(r.sstReaders) { - sstReader := r.sstReaders[r.sstIndex] - // 遍历文件中实际存在的 key(不是 minKey→maxKey 范围) - for sstReader.index < len(sstReader.keys) { - seq := sstReader.keys[sstReader.index] - sstReader.index++ + // 收集所有数据源的下一个 seq(使用 peek,不推进指针) + minSeq := int64(-1) + minSource := -1 // 0=mem, 1=immutable, 2+=sst - if !r.visited[seq] { - row, err := r.table.Get(seq) - if err == nil && r.qb.Match(row.Data) { - r.visited[seq] = true - r.currentRow = &Row{schema: r.schema, fields: r.fields, inner: row} - return true - } - r.visited[seq] = true + // 1. 检查 Active MemTable + if r.memIterator != nil { + if seq := r.memIterator.peek(); seq != -1 { + if minSeq == -1 || seq < minSeq { + minSeq = seq + minSource = 0 } } - // 当前 SST 文件迭代完成,移到下一个 - r.sstIndex++ + } + + // 2. 检查 Immutable MemTables + if r.immutableIterator != nil { + if seq := r.immutableIterator.peek(); seq != -1 { + if minSeq == -1 || seq < minSeq { + minSeq = seq + minSource = 1 + } + } + } + + // 3. 检查所有 SST 文件 + for i, sstReader := range r.sstReaders { + if sstReader.index < len(sstReader.keys) { + seq := sstReader.keys[sstReader.index] + if minSeq == -1 || seq < minSeq { + minSeq = seq + minSource = 2 + i + } + } + } + + // 如果没有找到任何数据源,说明迭代结束 + if minSource == -1 { + return false + } + + // 从选定的数据源推进指针 + switch minSource { + case 0: // Active MemTable + r.memIterator.next() + if r.memIterator.peek() == -1 { + r.memIterator = nil + } + + case 1: // Immutable MemTable + r.immutableIterator.next() + if r.immutableIterator.peek() == -1 { + r.immutableIterator = nil + r.immutableIndex++ + } + + default: // SST 文件 + sstIndex := minSource - 2 + r.sstReaders[sstIndex].index++ + } + + // 如果该 seq 已访问过(去重),继续下一轮 + if r.visited[minSeq] { continue } - // 所有数据源都迭代完成 - return false + // 获取并验证该记录 + row, err := r.table.Get(minSeq) + if err != nil { + r.visited[minSeq] = true + continue + } + + // 检查是否匹配过滤条件 + if !r.qb.Match(row.Data) { + r.visited[minSeq] = true + continue + } + + // 找到匹配的记录 + r.visited[minSeq] = true + r.currentRow = &Row{schema: r.schema, fields: r.fields, inner: row} + return true } } diff --git a/table.go b/table.go index 76acbe8..e5ffea2 100644 --- a/table.go +++ b/table.go @@ -134,6 +134,20 @@ func OpenTable(opts *TableOptions) (*Table, error) { // 创建索引管理器 indexMgr := NewIndexManager(idxDir, sch) + // 自动为 Schema 中标记 Indexed 的字段创建索引 + for _, field := range sch.Fields { + if field.Indexed { + // 检查索引是否已存在(避免重复创建) + if _, exists := indexMgr.GetIndex(field.Name); !exists { + err := indexMgr.CreateIndex(field.Name) + if err != nil { + // 索引创建失败,记录警告但不阻塞表创建 + fmt.Fprintf(os.Stderr, "[WARNING] Failed to create index for field %s: %v\n", field.Name, err) + } + } + } + } + // 创建 SST Manager sstMgr, err := NewSSTableManager(sstDir) if err != nil { @@ -238,7 +252,7 @@ func (t *Table) normalizeInsertData(data any) ([]map[string]any, error) { typ := reflect.TypeOf(data) // 如果是指针,解引用 - if typ.Kind() == reflect.Ptr { + if typ.Kind() == reflect.Pointer { if val.IsNil() { return nil, fmt.Errorf("data pointer cannot be nil") } @@ -269,7 +283,7 @@ func (t *Table) normalizeInsertData(data any) ([]map[string]any, error) { } // []*struct{} 或 []struct{} - if elemType.Kind() == reflect.Ptr { + if elemType.Kind() == reflect.Pointer { elemType = elemType.Elem() } @@ -279,7 +293,7 @@ func (t *Table) normalizeInsertData(data any) ([]map[string]any, error) { for i := 0; i < val.Len(); i++ { elem := val.Index(i) // 如果是指针,解引用 - if elem.Kind() == reflect.Ptr { + if elem.Kind() == reflect.Pointer { if elem.IsNil() { continue // 跳过 nil 指针 } @@ -315,7 +329,7 @@ func (t *Table) structToMap(v any) (map[string]any, error) { val := reflect.ValueOf(v) typ := reflect.TypeOf(v) - if typ.Kind() == reflect.Ptr { + if typ.Kind() == reflect.Pointer { val = val.Elem() typ = val.Type() }