diff --git a/query.go b/query.go index 2590c2d..3e8a259 100644 --- a/query.go +++ b/query.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "maps" - "slices" "strings" ) @@ -488,106 +487,34 @@ func (qb *QueryBuilder) Rows() (*Rows, error) { // 检查是否有可以使用索引的 Eq 条件 indexField, indexValue := qb.findIndexableCondition() if indexField != "" { - // 使用索引查询 + // 使用索引查询(索引查询需要立即加载,因为需要从索引获取 seq 列表) return qb.rowsWithIndex(rows, indexField, indexValue) } - // 收集所有数据源的 keys 并全局排序 - // 立即读取数据避免 compaction 期间文件被删除 - keyToRow := make(map[int64]*SSTableRow) // 存储已读取的行数据 - var allKeys []int64 - - // 1. 从 Active MemTable 读取数据 + // 惰性加载:只初始化迭代器,不读取数据 + // 1. 初始化 Active MemTable 迭代器 activeMemTable := qb.table.memtableManager.GetActive() if activeMemTable != nil { - activeKeys := activeMemTable.Keys() - for _, key := range activeKeys { - if data, ok := activeMemTable.Get(key); ok { - var row SSTableRow - if err := json.Unmarshal(data, &row); err == nil { - keyToRow[key] = &row - allKeys = append(allKeys, key) - } - } - } + rows.memIterator = newMemtableIterator(activeMemTable.Keys()) } - // 2. 从所有 Immutable MemTables 读取数据 - immutables := qb.table.memtableManager.GetImmutables() - for _, imm := range immutables { - immKeys := imm.MemTable.Keys() - for _, key := range immKeys { - // 如果 key 已存在(来自更新的数据源),跳过 - if _, exists := keyToRow[key]; exists { - continue - } + // 2. 初始化 Immutable MemTables(稍后在 Next() 中迭代) + rows.immutableIndex = 0 + rows.immutableIterator = nil - if data, ok := imm.MemTable.Get(key); ok { - var row SSTableRow - if err := json.Unmarshal(data, &row); err == nil { - keyToRow[key] = &row - allKeys = append(allKeys, key) - } - } - } - } - - // 3. 收集所有 SST 文件的 keys + // 3. 初始化 SST 文件迭代器 sstReaders := qb.table.sstManager.GetReaders() - - for _, reader := range sstReaders { - // 获取文件中实际存在的 key 列表(已在 GetAllKeys 中排序) - keys := reader.GetAllKeys() - - // 记录所有 keys(实际数据稍后统一从 table 读取) - for _, key := range keys { - // 如果 key 已存在(来自更新的数据源),跳过 - if _, exists := keyToRow[key]; !exists { - allKeys = append(allKeys, key) - keyToRow[key] = nil // 占位,表示需要读取 - } + rows.sstReaders = make([]*sstReader, len(sstReaders)) + for i, reader := range sstReaders { + rows.sstReaders[i] = &sstReader{ + keys: reader.GetAllKeys(), + index: 0, } } + rows.sstIndex = 0 - // 4. 对所有 keys 排序 - if len(allKeys) > 0 { - // 去重(使用 map 已经去重了,但 allKeys 可能有重复) - keySet := make(map[int64]bool) - uniqueKeys := make([]int64, 0, len(allKeys)) - for _, key := range allKeys { - if !keySet[key] { - keySet[key] = true - uniqueKeys = append(uniqueKeys, key) - } - } - - // 排序 - slices.Sort(uniqueKeys) - - // 统一从 table 读取所有数据(避免 compaction 导致的文件删除) - rows.cachedRows = make([]*SSTableRow, 0, len(uniqueKeys)) - for _, seq := range uniqueKeys { - // 如果已经从 MemTable 读取,直接使用 - row := keyToRow[seq] - if row == nil { - // 从 table 读取(会搜索 MemTable + 所有 SST,包括 compaction 后的新文件) - var err error - row, err = qb.table.Get(seq) - if err != nil { - // 数据不存在(理论上不应该发生,因为 key 来自索引) - continue - } - } - - if qb.Match(row.Data) { - rows.cachedRows = append(rows.cachedRows, row) - } - } - - // 使用缓存模式 - rows.cached = true - rows.cachedIndex = -1 - } + // 不设置 cached,让 Next() 使用惰性加载 + rows.cached = false return rows, nil } diff --git a/query_lazy_test.go b/query_lazy_test.go new file mode 100644 index 0000000..8b779b8 --- /dev/null +++ b/query_lazy_test.go @@ -0,0 +1,342 @@ +package srdb + +import ( + "fmt" + "os" + "testing" +) + +// TestLazyLoadingBasic 测试惰性加载基本功能 +func TestLazyLoadingBasic(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "TestLazyLoadingBasic") + defer os.RemoveAll(tmpDir) + + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString}, + {Name: "age", Type: FieldTypeInt64}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 插入一些数据 + for i := 0; i < 100; i++ { + err = table.Insert(map[string]any{ + "name": "User" + string(rune(i)), + "age": int64(20 + i), + }) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + } + + // 创建查询,但不立即执行 + rows, err := table.Query().Gte("age", int64(50)).Rows() + if err != nil { + t.Fatalf("Rows() failed: %v", err) + } + defer rows.Close() + + // 验证惰性加载:Rows() 返回时不应该已经加载数据 + if rows.cached { + t.Errorf("Expected lazy loading (cached=false), but data is already cached") + } + + // 只读取前 5 条记录 + count := 0 + for rows.Next() && count < 5 { + count++ + } + + if count != 5 { + t.Errorf("Expected to read 5 rows, got %d", count) + } + + t.Log("✓ Lazy loading test passed: only 5 rows were read") +} + +// TestLazyLoadingVsEagerLoading 对比惰性加载和立即加载 +func TestLazyLoadingVsEagerLoading(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "TestLazyLoadingVsEagerLoading") + defer os.RemoveAll(tmpDir) + + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString}, + {Name: "age", Type: FieldTypeInt64}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 插入大量数据 + for i := 0; i < 1000; i++ { + err = table.Insert(map[string]any{ + "name": "User" + string(rune(i)), + "age": int64(20 + i%50), + }) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + } + + // Flush to SST + table.Flush() + + // 测试 1: 惰性加载 - 只读取第一条 + rows, err := table.Query().Rows() + if err != nil { + t.Fatalf("Rows() failed: %v", err) + } + + // 验证是惰性加载 + if rows.cached { + t.Errorf("Expected lazy loading, but data is cached") + } + + // 只读取第一条 + if rows.Next() { + row := rows.Row() + if row == nil { + t.Errorf("Expected row, got nil") + } + } else { + t.Errorf("Expected at least one row") + } + rows.Close() + + // 测试 2: 立即加载所有数据(通过 Collect) + rows2, err := table.Query().Rows() + if err != nil { + t.Fatalf("Rows() failed: %v", err) + } + defer rows2.Close() + + // Collect 会触发立即加载 + allData := rows2.Collect() + if len(allData) != 1000 { + t.Errorf("Expected 1000 rows, got %d", len(allData)) + } + + // 验证现在已缓存 + if !rows2.cached { + t.Errorf("Expected data to be cached after Collect()") + } + + t.Log("✓ Lazy loading vs eager loading test passed") +} + +// TestIndexQueryIsEager 验证索引查询是立即加载的 +func TestIndexQueryIsEager(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "TestIndexQueryIsEager") + defer os.RemoveAll(tmpDir) + + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString}, + {Name: "email", Type: FieldTypeString, Indexed: true}, + {Name: "age", Type: FieldTypeInt64}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 创建索引 + err = table.CreateIndex("email") + if err != nil { + t.Fatal(err) + } + + // 插入数据 + for i := 0; i < 10; i++ { + err = table.Insert(map[string]any{ + "name": fmt.Sprintf("User%d", i), + "email": fmt.Sprintf("user%d@example.com", i), + "age": int64(20 + i), + }) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + } + + // Flush to SST and build indexes + table.Flush() + + // Build indexes explicitly + err = table.indexManager.BuildAll() + if err != nil { + t.Fatalf("Failed to build indexes: %v", err) + } + + // Check if index exists and is ready + idx, exists := table.indexManager.GetIndex("email") + if !exists { + t.Fatalf("Index for email does not exist") + } + if !idx.IsReady() { + t.Fatalf("Index for email is not ready") + } + + // 使用索引查询 + rows, err := table.Query().Eq("email", "user0@example.com").Rows() + if err != nil { + t.Fatalf("Rows() failed: %v", err) + } + defer rows.Close() + + // 索引查询应该是立即加载的(cached=true) + if !rows.cached { + t.Errorf("Expected index query to be eager (cached=true), but got lazy loading") + } + + // 验证结果 + count := 0 + for rows.Next() { + count++ + } + + if count != 1 { + t.Errorf("Expected 1 row from index query, got %d", count) + } + + t.Log("✓ Index query eager loading test passed") +} + +// TestLazyLoadingWithConditions 测试带条件的惰性加载 +func TestLazyLoadingWithConditions(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "TestLazyLoadingWithConditions") + defer os.RemoveAll(tmpDir) + + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString}, + {Name: "age", Type: FieldTypeInt64}, + {Name: "active", Type: FieldTypeBool}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 插入数据 + for i := 0; i < 50; i++ { + err = table.Insert(map[string]any{ + "name": "User" + string(rune(i)), + "age": int64(20 + i), + "active": i%2 == 0, + }) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + } + + // 带多个条件的查询 + rows, err := table.Query(). + Gte("age", int64(30)). + Eq("active", true). + Rows() + if err != nil { + t.Fatalf("Rows() failed: %v", err) + } + defer rows.Close() + + // 验证是惰性加载 + if rows.cached { + t.Errorf("Expected lazy loading with conditions") + } + + // 迭代所有匹配的记录 + count := 0 + for rows.Next() { + row := rows.Row() + data := row.Data() + + // 验证条件 + age := int64(data["age"].(float64)) + active := data["active"].(bool) + + if age < 30 { + t.Errorf("Row age=%d, expected >= 30", age) + } + if !active { + t.Errorf("Row active=%v, expected true", active) + } + + count++ + } + + if count == 0 { + t.Errorf("Expected some matching rows") + } + + t.Logf("✓ Lazy loading with conditions test passed: %d matching rows", count) +} + +// TestFirstDoesNotLoadAll 验证 First() 不会加载所有数据 +func TestFirstDoesNotLoadAll(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "TestFirstDoesNotLoadAll") + defer os.RemoveAll(tmpDir) + + schema := NewSchema("users", []Field{ + {Name: "name", Type: FieldTypeString}, + {Name: "age", Type: FieldTypeInt64}, + }) + + table, err := OpenTable(&TableOptions{ + Dir: tmpDir, + Name: schema.Name, + Fields: schema.Fields, + }) + if err != nil { + t.Fatal(err) + } + defer table.Close() + + // 插入大量数据 + for i := 0; i < 1000; i++ { + err = table.Insert(map[string]any{ + "name": "User" + string(rune(i)), + "age": int64(20 + i), + }) + if err != nil { + t.Fatalf("Insert failed: %v", err) + } + } + + // 只获取第一条记录 + row, err := table.Query().First() + if err != nil { + t.Fatalf("First() failed: %v", err) + } + + if row == nil { + t.Errorf("Expected row, got nil") + } + + // First() 应该只读取一条记录,不会加载所有数据 + t.Log("✓ First() does not load all data test passed") +}