diff --git a/index.go b/index.go index 0b6052d..5d414a8 100644 --- a/index.go +++ b/index.go @@ -98,7 +98,9 @@ func (idx *SecondaryIndex) Build() error { // 使用 B+Tree 写入器 writer := NewIndexBTreeWriter(idx.file, idx.metadata) - // 添加所有条目 + // 写入内存中的所有条目 + // 注意:这假设 valueToSeq 包含所有数据(包括从磁盘加载的) + // 对于增量更新场景,Get() 会合并内存和磁盘的结果 for value, seqs := range idx.valueToSeq { writer.Add(value, seqs) } @@ -109,8 +111,25 @@ func (idx *SecondaryIndex) Build() error { return fmt.Errorf("failed to build btree index: %w", err) } + // 关闭旧的 btreeReader + if idx.btreeReader != nil { + idx.btreeReader.Close() + } + + // 重新加载 btreeReader(读取刚写入的数据) + reader, err := NewIndexBTreeReader(idx.file) + if err != nil { + return fmt.Errorf("failed to reload btree reader: %w", err) + } + + idx.btreeReader = reader idx.useBTree = true idx.ready = true + + // 不清空 valueToSeq,保留所有数据在内存中 + // 这样下次 Build() 时可以写入完整数据 + // Get() 方法会合并内存和磁盘的结果(去重) + return nil } @@ -197,7 +216,7 @@ func (idx *SecondaryIndex) loadJSON() error { return nil } -// Get 查询索引 +// Get 查询索引(优先查内存,然后查磁盘,合并结果) func (idx *SecondaryIndex) Get(value any) ([]int64, error) { idx.mu.RLock() defer idx.mu.RUnlock() @@ -208,18 +227,37 @@ func (idx *SecondaryIndex) Get(value any) ([]int64, error) { key := fmt.Sprintf("%v", value) - // 如果使用 B+Tree,从 B+Tree 读取 - if idx.useBTree && idx.btreeReader != nil { - return idx.btreeReader.Get(key) + // 收集所有匹配的 seqs(需要去重) + seqMap := make(map[int64]bool) + + // 1. 先从内存 map 读取(包含最新的未持久化数据) + if memSeqs, exists := idx.valueToSeq[key]; exists { + for _, seq := range memSeqs { + seqMap[seq] = true + } } - // 否则从内存 map 读取 - seqs, exists := idx.valueToSeq[key] - if !exists { + // 2. 如果使用 B+Tree,从 B+Tree 读取(持久化的数据) + if idx.useBTree && idx.btreeReader != nil { + diskSeqs, err := idx.btreeReader.Get(key) + if err == nil && diskSeqs != nil { + for _, seq := range diskSeqs { + seqMap[seq] = true + } + } + } + + // 3. 合并结果 + if len(seqMap) == 0 { return nil, nil } - return seqs, nil + result := make([]int64, 0, len(seqMap)) + for seq := range seqMap { + result = append(result, seq) + } + + return result, nil } // IsReady 索引是否就绪 diff --git a/index_query_test.go b/index_query_test.go new file mode 100644 index 0000000..b515135 --- /dev/null +++ b/index_query_test.go @@ -0,0 +1,260 @@ +package srdb + +import ( + "os" + "testing" +) + +// TestIndexQueryIntegration 测试索引查询的完整流程 +func TestIndexQueryIntegration(t *testing.T) { + tmpDir := t.TempDir() + + // 1. 创建带索引字段的 Schema + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString, Indexed: false}, + {Name: "email", Type: FieldTypeString, Indexed: true}, // email 字段有索引 + {Name: "age", Type: FieldTypeInt64, Indexed: false}, + }) + + // 2. 打开表 + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + MemTableSize: 1024 * 1024, // 1MB + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 3. 创建索引 + err = table.CreateIndex("email") + if err != nil { + t.Fatal(err) + } + + // 4. 插入测试数据 + testData := []map[string]any{ + {"name": "Alice", "email": "alice@example.com", "age": int64(25)}, + {"name": "Bob", "email": "bob@example.com", "age": int64(30)}, + {"name": "Charlie", "email": "alice@example.com", "age": int64(35)}, // 相同 email + {"name": "David", "email": "david@example.com", "age": int64(40)}, + } + + for _, data := range testData { + err := table.Insert(data) + if err != nil { + t.Fatalf("Failed to insert data: %v", err) + } + } + + // 5. 构建索引(持久化) + err = table.indexManager.BuildAll() + if err != nil { + t.Fatalf("Failed to build indexes: %v", err) + } + + // 6. 验证索引文件存在 + indexPath := tmpDir + "/idx/idx_email.sst" + if _, err := os.Stat(indexPath); os.IsNotExist(err) { + t.Fatalf("Index file not created: %s", indexPath) + } + t.Logf("✓ Index file created: %s", indexPath) + + // 7. 使用索引查询 + rows, err := table.Query().Eq("email", "alice@example.com").Rows() + if err != nil { + t.Fatalf("Query failed: %v", err) + } + defer rows.Close() + + // 8. 验证结果 + var results []map[string]any + for rows.Next() { + results = append(results, rows.Row().Data()) + } + + if len(results) != 2 { + t.Errorf("Expected 2 results, got %d", len(results)) + } + + // 验证结果内容 + for _, result := range results { + if result["email"] != "alice@example.com" { + t.Errorf("Unexpected email: %v", result["email"]) + } + name := result["name"].(string) + if name != "Alice" && name != "Charlie" { + t.Errorf("Unexpected name: %s", name) + } + } + + t.Logf("✓ Index query returned correct results: %d rows", len(results)) + + // 9. 测试没有索引的查询(应该正常工作但不使用索引) + rows2, err := table.Query().Eq("name", "Bob").Rows() + if err != nil { + t.Fatalf("Query without index failed: %v", err) + } + defer rows2.Close() + + results2 := []map[string]any{} + for rows2.Next() { + results2 = append(results2, rows2.Row().Data()) + } + + if len(results2) != 1 { + t.Errorf("Expected 1 result for Bob, got %d", len(results2)) + } + + t.Logf("✓ Non-indexed query works correctly: %d rows", len(results2)) + + // 10. 测试索引在新数据上的工作 + err = table.Insert(map[string]any{ + "name": "Eve", + "email": "eve@example.com", + "age": int64(28), + }) + if err != nil { + t.Fatalf("Failed to insert new data: %v", err) + } + + // 查询新插入的数据(索引尚未持久化,但应该在内存中) + rows3, err := table.Query().Eq("email", "eve@example.com").Rows() + if err != nil { + t.Fatalf("Query for new data failed: %v", err) + } + defer rows3.Close() + + results3 := []map[string]any{} + for rows3.Next() { + results3 = append(results3, rows3.Row().Data()) + } + + if len(results3) != 1 { + t.Errorf("Expected 1 result for Eve (new data), got %d", len(results3)) + } + + t.Logf("✓ Index works for new data (before persistence): %d rows", len(results3)) + + // 11. 再次构建索引并验证 + err = table.indexManager.BuildAll() + if err != nil { + t.Fatalf("Failed to rebuild indexes: %v", err) + } + + rows4, err := table.Query().Eq("email", "eve@example.com").Rows() + if err != nil { + t.Fatalf("Query after rebuild failed: %v", err) + } + defer rows4.Close() + + results4 := []map[string]any{} + for rows4.Next() { + results4 = append(results4, rows4.Row().Data()) + } + + if len(results4) != 1 { + t.Errorf("Expected 1 result for Eve (after rebuild), got %d", len(results4)) + } + + t.Logf("✓ Index works after rebuild: %d rows", len(results4)) + + t.Log("=== All index query tests passed ===") +} + +// TestIndexPersistenceAcrossRestart 测试索引在重启后的持久化 +func TestIndexPersistenceAcrossRestart(t *testing.T) { + tmpDir := t.TempDir() + + // 1. 第一次打开:创建数据和索引 + { + schema := NewSchema("products", []Field{ + {Name: "name", Type: FieldTypeString, Indexed: false}, + {Name: "category", Type: FieldTypeString, Indexed: true}, + {Name: "price", Type: FieldTypeInt64, Indexed: false}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + MemTableSize: 1024 * 1024, + }) + if err != nil { + t.Fatal(err) + } + + // 创建索引 + err = table.CreateIndex("category") + if err != nil { + t.Fatal(err) + } + + // 插入数据 + testData := []map[string]any{ + {"name": "Laptop", "category": "Electronics", "price": int64(1000)}, + {"name": "Mouse", "category": "Electronics", "price": int64(50)}, + {"name": "Desk", "category": "Furniture", "price": int64(300)}, + } + + for _, data := range testData { + err := table.Insert(data) + if err != nil { + t.Fatal(err) + } + } + + // 构建索引 + err = table.indexManager.BuildAll() + if err != nil { + t.Fatal(err) + } + + // 关闭表 + table.Close() + + t.Log("✓ First session: data and index created") + } + + // 2. 第二次打开:验证索引仍然可用 + { + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + MemTableSize: 1024 * 1024, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 验证索引存在 + indexes := table.ListIndexes() + if len(indexes) != 1 || indexes[0] != "category" { + t.Errorf("Expected index on 'category', got: %v", indexes) + } + + t.Log("✓ Index loaded after restart") + + // 使用索引查询 + rows, err := table.Query().Eq("category", "Electronics").Rows() + if err != nil { + t.Fatalf("Query failed: %v", err) + } + defer rows.Close() + + results := []map[string]any{} + for rows.Next() { + results = append(results, rows.Row().Data()) + } + + if len(results) != 2 { + t.Errorf("Expected 2 Electronics products, got %d", len(results)) + } + + t.Logf("✓ Index query after restart: %d rows", len(results)) + } + + t.Log("=== Index persistence test passed ===") +} diff --git a/query.go b/query.go index e390be1..2590c2d 100644 --- a/query.go +++ b/query.go @@ -484,6 +484,14 @@ func (qb *QueryBuilder) Rows() (*Rows, error) { visited: make(map[int64]bool), } + // 尝试使用索引优化查询 + // 检查是否有可以使用索引的 Eq 条件 + indexField, indexValue := qb.findIndexableCondition() + if indexField != "" { + // 使用索引查询 + return qb.rowsWithIndex(rows, indexField, indexValue) + } + // 收集所有数据源的 keys 并全局排序 // 立即读取数据避免 compaction 期间文件被删除 keyToRow := make(map[int64]*SSTableRow) // 存储已读取的行数据 @@ -584,6 +592,63 @@ func (qb *QueryBuilder) Rows() (*Rows, error) { return rows, nil } +// findIndexableCondition 查找可以使用索引的条件(Eq 操作) +func (qb *QueryBuilder) findIndexableCondition() (string, any) { + for _, cond := range qb.conds { + // 检查是否是 compare 类型且操作符是 "=" + if cmp, ok := cond.(compare); ok && cmp.op == "=" { + // 检查该字段是否有索引 + if idx, exists := qb.table.indexManager.GetIndex(cmp.field); exists && idx.IsReady() { + return cmp.field, cmp.right + } + } + } + return "", nil +} + +// rowsWithIndex 使用索引查询数据 +func (qb *QueryBuilder) rowsWithIndex(rows *Rows, indexField string, indexValue any) (*Rows, error) { + // 获取索引 + idx, exists := qb.table.indexManager.GetIndex(indexField) + if !exists { + return nil, fmt.Errorf("index on field %s not found", indexField) + } + + // 从索引获取 seq 列表 + seqs, err := idx.Get(indexValue) + if err != nil { + return nil, fmt.Errorf("index lookup failed: %w", err) + } + + // 如果没有结果,返回空结果集 + if len(seqs) == 0 { + rows.cached = true + rows.cachedIndex = -1 + rows.cachedRows = []*SSTableRow{} + return rows, nil + } + + // 根据 seq 列表获取数据 + rows.cachedRows = make([]*SSTableRow, 0, len(seqs)) + for _, seq := range seqs { + row, err := qb.table.Get(seq) + if err != nil { + continue // 跳过获取失败的记录 + } + + // 检查是否匹配所有其他条件(索引只能优化一个条件) + if qb.Match(row.Data) { + rows.cachedRows = append(rows.cachedRows, row) + } + } + + // 使用缓存模式 + rows.cached = true + rows.cachedIndex = -1 + + return rows, nil +} + // First 返回第一个匹配的数据 func (qb *QueryBuilder) First() (*Row, error) { rows, err := qb.Rows()