diff --git a/cursor.go b/cursor.go index 28b6ceb..e963e0e 100644 --- a/cursor.go +++ b/cursor.go @@ -10,8 +10,8 @@ import ( "github.com/google/uuid" ) -// LogCursor 日志游标(窗口模式) -type LogCursor struct { +// ProcessCursor 日志游标(窗口模式) +type ProcessCursor struct { fd *os.File rbuf []byte // 8 MiB 复用 path string // 日志文件路径 @@ -23,7 +23,7 @@ type LogCursor struct { // NewCursor 创建一个新的日志游标 // index: 外部提供的索引管理器,用于快速定位记录 -func NewCursor(path string, index *RecordIndex) (*LogCursor, error) { +func NewCursor(path string, index *RecordIndex) (*ProcessCursor, error) { if index == nil { return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } @@ -32,7 +32,7 @@ func NewCursor(path string, index *RecordIndex) (*LogCursor, error) { if err != nil { return nil, err } - c := &LogCursor{ + c := &ProcessCursor{ fd: fd, rbuf: make([]byte, 8<<20), path: path, @@ -47,12 +47,12 @@ func NewCursor(path string, index *RecordIndex) (*LogCursor, error) { } // Seek 到任意 offset(支持重启续传) -func (c *LogCursor) Seek(offset int64, whence int) (int64, error) { +func (c *ProcessCursor) Seek(offset int64, whence int) (int64, error) { return c.fd.Seek(offset, whence) } // Next 读取下一条记录(使用索引快速定位) -func (c *LogCursor) Next() (*Record, error) { +func (c *ProcessCursor) Next() (*Record, error) { // 检查是否超出索引范围 if c.endIdx >= c.index.Count() { return nil, io.EOF @@ -110,7 +110,7 @@ func (c *LogCursor) Next() (*Record, error) { // NextRange 读取指定数量的记录(范围游动) // count: 要读取的记录数量 // 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count -func (c *LogCursor) NextRange(count int) ([]*Record, error) { +func (c *ProcessCursor) NextRange(count int) ([]*Record, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } @@ -133,34 +133,34 @@ func (c *LogCursor) NextRange(count int) ([]*Record, error) { } // Commit 提交窗口,将 endIdx 移动到 startIdx(表示已处理完这批记录) -func (c *LogCursor) Commit() { +func (c *ProcessCursor) Commit() { c.startIdx = c.endIdx } // Rollback 回滚窗口,将 endIdx 回退到 startIdx(表示放弃这批记录的处理) -func (c *LogCursor) Rollback() error { +func (c *ProcessCursor) Rollback() error { c.endIdx = c.startIdx return nil } // StartIndex 获取窗口开始索引 -func (c *LogCursor) StartIndex() int { +func (c *ProcessCursor) StartIndex() int { return c.startIdx } // EndIndex 获取窗口结束索引 -func (c *LogCursor) EndIndex() int { +func (c *ProcessCursor) EndIndex() int { return c.endIdx } // Close 关闭游标并保存位置 -func (c *LogCursor) Close() error { +func (c *ProcessCursor) Close() error { c.savePosition() return c.fd.Close() } // savePosition 保存当前读取位置到文件 -func (c *LogCursor) savePosition() error { +func (c *ProcessCursor) savePosition() error { f, err := os.Create(c.posFile) if err != nil { return err @@ -175,7 +175,7 @@ func (c *LogCursor) savePosition() error { } // loadPosition 从文件加载上次的读取位置 -func (c *LogCursor) loadPosition() error { +func (c *ProcessCursor) loadPosition() error { f, err := os.Open(c.posFile) if err != nil { if os.IsNotExist(err) { diff --git a/example/README.md b/example/README.md index b32ffac..02e73d2 100644 --- a/example/README.md +++ b/example/README.md @@ -192,10 +192,10 @@ for i := 0; i < 20; i++ { **1. "no such file or directory" 错误** -确保在创建 Seqlog 之前先创建目录: +确保在创建 LogHub 之前先创建目录: ```go os.MkdirAll("log_dir", 0755) -seq := seqlog.NewSeqlog("log_dir", logger, nil) +seq := seqlog.NewLogHub("log_dir", logger, nil) ``` **2. 查询时出现 EOF 错误** diff --git a/example/concurrent/main.go b/example/concurrent/main.go index 4dc6906..8396a9f 100644 --- a/example/concurrent/main.go +++ b/example/concurrent/main.go @@ -40,7 +40,7 @@ func main() { } // 创建 Seqlog 实例(默认处理器) - seq := seqlog.NewSeqlog(testDir, logger, nil) + seq := seqlog.NewLogHub(testDir, logger, nil) topics := []string{"app", "access", "error"} for _, topic := range topics { diff --git a/example/get_record/main.go b/example/get_record/main.go new file mode 100644 index 0000000..a9dc462 --- /dev/null +++ b/example/get_record/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" + "time" + + "code.tczkiot.com/seqlog" +) + +func main() { + // 创建 Seqlog 实例 + mgr := seqlog.NewLogHub("./logs", nil, nil) + + // 注册 topic handler + processedCount := 0 + err := mgr.RegisterHandler("app", func(record *seqlog.Record) error { + processedCount++ + fmt.Printf("处理记录 #%d: %s\n", processedCount, string(record.Data)) + time.Sleep(100 * time.Millisecond) // 模拟处理耗时 + return nil + }) + if err != nil { + log.Fatal(err) + } + + // 写入一些记录 + fmt.Println("=== 写入记录 ===") + for i := 0; i < 10; i++ { + data := fmt.Sprintf("日志消息 #%d", i) + offset, err := mgr.Write("app", []byte(data)) + if err != nil { + log.Fatal(err) + } + fmt.Printf("写入: offset=%d, data=%s\n", offset, data) + } + + // 启动处理 + fmt.Println("\n=== 启动日志处理 ===") + err = mgr.Start() + if err != nil { + log.Fatal(err) + } + + // 等待一段时间让处理器处理一些记录 + time.Sleep(500 * time.Millisecond) + + // 查询当前处理窗口的记录 + fmt.Println("\n=== 查询当前处理窗口记录 ===") + records, err := mgr.QueryFromProcessing("app", 5) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("从处理窗口开始位置查询到 %d 条记录:\n", len(records)) + for _, rec := range records { + fmt.Printf(" [索引 %d] %s - 状态: %s\n", rec.Index, string(rec.Record.Data), rec.Status) + } + + // 查询更多记录 + fmt.Println("\n=== 查询后续记录 ===") + moreRecords, err := mgr.QueryFromProcessing("app", 10) + if err != nil { + log.Fatal(err) + } + fmt.Printf("查询到 %d 条记录:\n", len(moreRecords)) + for _, rec := range moreRecords { + fmt.Printf(" [索引 %d] %s - 状态: %s\n", rec.Index, string(rec.Record.Data), rec.Status) + } + + // 清理 + mgr.Stop() + fmt.Println("\n=== 示例完成 ===") +} diff --git a/example/index/main.go b/example/index/main.go index b5648ef..627a7d7 100644 --- a/example/index/main.go +++ b/example/index/main.go @@ -70,25 +70,25 @@ func main() { fmt.Printf("从第 %d 条记录开始查询\n", startIndex) // 向索引递减方向查询(查询更早的记录) - // QueryNewest(4, 3) 查询索引 2, 3, 4,返回按索引递增排序 - backward, err := query.QueryNewest(startIndex-1, 3) + // QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5),返回按索引递增排序 + backward, err := query.QueryOldest(startIndex, 3) if err != nil { log.Fatal(err) } - fmt.Printf("向索引递减方向查询 3 条记录(索引 2-4):\n") - for i, rec := range backward { - fmt.Printf(" [%d] 数据=%s\n", i, string(rec.Data)) + fmt.Printf("向索引递减方向查询 3 条记录:\n") + for _, rec := range backward { + fmt.Printf(" [索引 %d] 数据=%s\n", rec.Index, string(rec.Record.Data)) } // 向索引递增方向查询(查询更新的记录) - // QueryOldest(5, 3) 查询索引 5, 6, 7,返回按索引递增排序 - forward, err := query.QueryOldest(startIndex, 3) + // QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5),返回按索引递增排序 + forward, err := query.QueryNewest(startIndex, 3) if err != nil { log.Fatal(err) } - fmt.Printf("向索引递增方向查询 3 条记录(索引 5-7):\n") - for i, rec := range forward { - fmt.Printf(" [%d] 数据=%s\n", i, string(rec.Data)) + fmt.Printf("向索引递增方向查询 3 条记录:\n") + for _, rec := range forward { + fmt.Printf(" [索引 %d] 数据=%s\n", rec.Index, string(rec.Record.Data)) } fmt.Println() diff --git a/example/topic_processor/main.go b/example/topic_processor/main.go index 6671ce1..228b8d4 100644 --- a/example/topic_processor/main.go +++ b/example/topic_processor/main.go @@ -40,58 +40,48 @@ func main() { count := tp.GetRecordCount() fmt.Printf(" 总共 %d 条记录\n\n", count) - // ===== 3. 获取索引 ===== - fmt.Println("3. 使用索引:") - index := tp.Index() - fmt.Printf(" 索引记录数: %d\n", index.Count()) - fmt.Printf(" 最后偏移: %d\n\n", index.LastOffset()) + // ===== 3. 获取记录数 ===== + fmt.Println("3. 查看记录统计:") + totalCount := tp.GetRecordCount() + fmt.Printf(" 记录总数: %d\n\n", totalCount) // ===== 4. 使用查询器查询 ===== fmt.Println("4. 查询记录:") // 查询最老的 3 条记录(从索引 0 开始) - oldest, err := tp.QueryOldest(0, 3) + oldest, err := tp.QueryOldest(3, 3) if err != nil { log.Fatal(err) } fmt.Println(" 查询最老的 3 条:") for i, rws := range oldest { - fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data)) + fmt.Printf(" [%d] 索引=%d, 状态=%s, 数据=%s\n", i, rws.Index, rws.Status, string(rws.Record.Data)) } // 查询最新的 2 条记录(从最后一条开始) - totalCount := tp.GetRecordCount() - newest, err := tp.QueryNewest(totalCount-1, 2) + newest, err := tp.QueryNewest(totalCount-3, 2) if err != nil { log.Fatal(err) } fmt.Println(" 查询最新的 2 条:") for i, rws := range newest { - fmt.Printf(" [%d] 状态=%s, 数据=%s\n", i, rws.Status, string(rws.Record.Data)) + fmt.Printf(" [%d] 索引=%d, 状态=%s, 数据=%s\n", i, rws.Index, rws.Status, string(rws.Record.Data)) } fmt.Println() - // ===== 5. 使用游标读取 ===== - fmt.Println("5. 使用游标读取:") - cursor, err := tp.Cursor() + // ===== 5. 从处理窗口查询 ===== + fmt.Println("5. 从处理窗口查询:") + + // 从处理窗口开始位置查询 3 条记录 + processing, err := tp.QueryFromProcessing(3) if err != nil { log.Fatal(err) } - defer cursor.Close() - - // 读取 3 条记录 - records, err := cursor.NextRange(3) - if err != nil { - log.Fatal(err) + fmt.Printf(" 从处理窗口查询到 %d 条记录:\n", len(processing)) + for i, rec := range processing { + fmt.Printf(" [%d] 索引=%d, 状态=%s, 数据=%s\n", i, rec.Index, rec.Status, string(rec.Record.Data)) } - fmt.Printf(" 读取了 %d 条记录:\n", len(records)) - for i, rec := range records { - fmt.Printf(" [%d] %s\n", i, string(rec.Data)) - } - - // 提交游标位置 - cursor.Commit() - fmt.Printf(" 游标位置: start=%d, end=%d\n\n", cursor.StartIndex(), cursor.EndIndex()) + fmt.Println() // ===== 6. 继续写入 ===== fmt.Println("6. 继续写入:") diff --git a/example/webapp/main.go b/example/webapp/main.go index a967912..af848a5 100644 --- a/example/webapp/main.go +++ b/example/webapp/main.go @@ -14,7 +14,7 @@ import ( ) var ( - seq *seqlog.Seqlog + seq *seqlog.LogHub logger *slog.Logger ) @@ -25,7 +25,7 @@ func main() { })) // 创建 Seqlog - seq = seqlog.NewSeqlog("logs", logger, func(topic string, rec *seqlog.Record) error { + seq = seqlog.NewLogHub("logs", logger, func(topic string, rec *seqlog.Record) error { // 简单的日志处理:只打印摘要信息 dataPreview := string(rec.Data) if len(dataPreview) > 100 { @@ -155,6 +155,12 @@ func formatBytes(bytes int64) string { return fmt.Sprintf("%.2f MB", float64(bytes)/1024/1024) } +type Record struct { + Index int `json:"index"` + Status string `json:"status"` + Data string `json:"data"` +} + // 首页 func handleIndex(w http.ResponseWriter, r *http.Request) { html := ` @@ -366,7 +372,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { // 选择 topic function selectTopic(topic) { currentTopic = topic; - direction = 'forward'; // 切换 topic 时重置方向 + direction = ''; // 切换 topic 时重置方向 startIndex = null; endIndex = null; @@ -405,14 +411,14 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { const forward = document.getElementById('forwardCount').value; // 构建查询 URL - let url = '/api/query?topic=' + currentTopic + - '&backward=' + backward + '&forward=' + forward + - '&direction=' + direction; + let url = '/api/query?topic=' + currentTopic; if (direction === 'backward' && startIndex != null) { - url += '&index=' + startIndex; + url += '&direction=backward&index=' + startIndex + '&count=' + backward; } else if (direction === 'forward' && endIndex != null) { - url += '&index=' + endIndex; + url += '&direction=forward&index=' + endIndex + '&count=' + forward; + } else { + url += '&count=10'; } const response = await fetch(url); @@ -449,11 +455,16 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { }).join(''); if (data.records.length > 0) { - startIndex = data.records[0].index; - endIndex = data.records[data.records.length - 1].index; - } else { - startIndex = null; - endIndex = null; + if (startIndex === null) { + startIndex = data.records[0].index; + } else { + startIndex = Math.min(startIndex, data.records[0].index); + } + if (endIndex === null) { + endIndex = data.records[data.records.length - 1].index; + } else { + endIndex = Math.max(endIndex, data.records[data.records.length - 1].index); + } } container.innerHTML = html; @@ -526,14 +537,10 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { // 获取查询参数 indexParam := r.URL.Query().Get("index") direction := r.URL.Query().Get("direction") - backward, _ := strconv.Atoi(r.URL.Query().Get("backward")) - forward, _ := strconv.Atoi(r.URL.Query().Get("forward")) + count, _ := strconv.Atoi(r.URL.Query().Get("count")) - if backward == 0 { - backward = 10 - } - if forward == 0 { - forward = 10 + if count <= 0 { + count = 10 } // 获取 processor @@ -546,79 +553,68 @@ func handleQuery(w http.ResponseWriter, r *http.Request) { // 获取记录总数 totalCount := processor.GetRecordCount() - // 获取当前处理索引和读取索引(用于状态判断) - startIdx := seq.GetProcessingIndex(topic) - endIdx := seq.GetReadIndex(topic) - // 执行查询 var results []*seqlog.RecordWithStatus - var startRecordIndex int - if direction == "backward" { - // 向前翻页:查询更早的记录 - var index int - if indexParam == "" { - index = startIdx - } else { - index, _ = strconv.Atoi(indexParam) + if direction == "" { + results, err = processor.QueryFromProcessing(count) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return } - - if index > 0 { - var queryErr error - results, queryErr = processor.QueryNewest(index-1, backward) - if queryErr != nil { - http.Error(w, queryErr.Error(), http.StatusInternalServerError) + if len(results) == 0 { + results, err = processor.QueryFromLast(count) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) return } - if len(results) > 0 { - // QueryNewest 返回的结果按索引递增排列 - startRecordIndex = index - len(results) - } } } else { - // 向后翻页:查询更新的记录 - var index int + var refIndex int if indexParam == "" { - index = startIdx + http.Error(w, "参数错误", http.StatusNotFound) + return } else { - index, _ = strconv.Atoi(indexParam) + refIndex, err = strconv.Atoi(indexParam) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } } - if index < totalCount { + if direction == "backward" { var queryErr error - results, queryErr = processor.QueryOldest(index, forward) + results, queryErr = processor.QueryNewest(refIndex, count) if queryErr != nil { http.Error(w, queryErr.Error(), http.StatusInternalServerError) return } - if len(results) > 0 { - // QueryOldest 返回的结果按索引递增排列 - startRecordIndex = index + } else if direction == "forward" { + var queryErr error + results, queryErr = processor.QueryOldest(refIndex, count) + if queryErr != nil { + http.Error(w, queryErr.Error(), http.StatusInternalServerError) + return } + } else { + http.Error(w, "参数错误", http.StatusNotFound) + return } } - type Record struct { - Index int `json:"index"` - Status string `json:"status"` - Data string `json:"data"` - } - records := make([]Record, len(results)) - for i, r := range results { + for i, result := range results { records[i] = Record{ - Index: startRecordIndex + i, - Status: r.Status.String(), - Data: string(r.Record.Data), + Index: result.Index, + Status: result.Status.String(), + Data: string(result.Record.Data), } } json.NewEncoder(w).Encode(map[string]interface{}{ - "records": records, - "total": len(records), - "totalCount": totalCount, - "processingIndex": startIdx, - "readIndex": endIdx, + "records": records, + "total": len(records), + "totalCount": totalCount, }) } diff --git a/seqlog_manager.go b/loghub.go similarity index 82% rename from seqlog_manager.go rename to loghub.go index de5280a..76f0866 100644 --- a/seqlog_manager.go +++ b/loghub.go @@ -8,14 +8,14 @@ import ( "sync" ) -// Seqlog 日志管理器,统一管理多个 topic 的日志分发 +// LogHub 日志中枢,统一管理多个 topic 的日志分发 // // 自动恢复机制: // - Start() 时自动扫描 baseDir 中所有 .log 文件 // - 为每个发现的日志文件创建 processor // - 使用 .pos 文件保存的游标位置恢复处理进度 // - 只处理上次中断后新增的日志,避免重复处理 -type Seqlog struct { +type LogHub struct { baseDir string processors map[string]*TopicProcessor defaultHandler TopicRecordHandler @@ -27,13 +27,13 @@ type Seqlog struct { running bool } -// NewSeqlog 创建一个新的日志管理器 +// NewLogHub 创建一个新的日志中枢 // logger: 内部日志记录器,如果不需要可以传 slog.Default() -func NewSeqlog(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *Seqlog { +func NewLogHub(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *LogHub { if logger == nil { logger = slog.Default() } - return &Seqlog{ + return &LogHub{ baseDir: baseDir, processors: make(map[string]*TopicProcessor), defaultHandler: defaultHandler, @@ -48,7 +48,7 @@ func NewSeqlog(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHa } // SetDefaultTailConfig 设置默认的 tail 配置 -func (s *Seqlog) SetDefaultTailConfig(config *TailConfig) { +func (s *LogHub) SetDefaultTailConfig(config *TailConfig) { s.mu.Lock() defer s.mu.Unlock() if config != nil { @@ -57,13 +57,13 @@ func (s *Seqlog) SetDefaultTailConfig(config *TailConfig) { } // RegisterHandler 为指定 topic 注册 handler -func (s *Seqlog) RegisterHandler(topic string, handler RecordHandler) error { +func (s *LogHub) RegisterHandler(topic string, handler RecordHandler) error { return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler}) } // RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置 // 注意:handler 为必填参数,如果 topic 已存在则返回错误 -func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) error { +func (s *LogHub) RegisterHandlerWithConfig(topic string, config *TopicConfig) error { s.mu.Lock() defer s.mu.Unlock() @@ -94,7 +94,7 @@ func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) er } // Write 写入日志到指定 topic -func (s *Seqlog) Write(topic string, data []byte) (int64, error) { +func (s *LogHub) Write(topic string, data []byte) (int64, error) { processor, err := s.getOrCreateProcessor(topic) if err != nil { s.logger.Error("failed to get processor", "topic", topic, "error", err) @@ -109,8 +109,8 @@ func (s *Seqlog) Write(topic string, data []byte) (int64, error) { return offset, nil } -// Start 启动 Seqlog 和所有已注册的 processor -func (s *Seqlog) Start() error { +// Start 启动 LogHub 和所有已注册的 processor +func (s *LogHub) Start() error { s.mu.Lock() defer s.mu.Unlock() @@ -143,7 +143,7 @@ func (s *Seqlog) Start() error { // discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor // 注意:此方法需要在持有锁的情况下调用 -func (s *Seqlog) discoverExistingTopics() error { +func (s *LogHub) discoverExistingTopics() error { // 确保目录存在 if err := os.MkdirAll(s.baseDir, 0755); err != nil { return fmt.Errorf("failed to create base directory: %w", err) @@ -213,7 +213,7 @@ func (s *Seqlog) discoverExistingTopics() error { } // Stop 停止所有 processor -func (s *Seqlog) Stop() error { +func (s *LogHub) Stop() error { s.mu.Lock() if !s.running { s.mu.Unlock() @@ -250,7 +250,7 @@ func (s *Seqlog) Stop() error { // getOrCreateProcessor 获取或创建指定 topic 的 processor(使用默认配置) // 如果没有 defaultHandler,使用空 handler(no-op) -func (s *Seqlog) getOrCreateProcessor(topic string) (*TopicProcessor, error) { +func (s *LogHub) getOrCreateProcessor(topic string) (*TopicProcessor, error) { // 创建默认配置 var config *TopicConfig if s.defaultHandler != nil { @@ -284,7 +284,7 @@ func (s *Seqlog) getOrCreateProcessor(topic string) (*TopicProcessor, error) { } // getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor(使用指定配置) -func (s *Seqlog) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) { +func (s *LogHub) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -330,7 +330,7 @@ func (s *Seqlog) getOrCreateProcessorWithConfig(topic string, config *TopicConfi } // GetTopics 获取所有已知的 topic -func (s *Seqlog) GetTopics() []string { +func (s *LogHub) GetTopics() []string { s.mu.RLock() defer s.mu.RUnlock() @@ -341,15 +341,15 @@ func (s *Seqlog) GetTopics() []string { return topics } -// IsRunning 检查 Seqlog 是否正在运行 -func (s *Seqlog) IsRunning() bool { +// IsRunning 检查 LogHub 是否正在运行 +func (s *LogHub) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() return s.running } // UpdateTopicConfig 动态更新指定 topic 的 tail 配置 -func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error { +func (s *LogHub) UpdateTopicConfig(topic string, config *TailConfig) error { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -362,7 +362,7 @@ func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error { } // GetTopicConfig 获取指定 topic 的 tail 配置 -func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) { +func (s *LogHub) GetTopicConfig(topic string) (*TailConfig, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -375,7 +375,7 @@ func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) { } // GetTopicStats 获取指定 topic 的统计信息 -func (s *Seqlog) GetTopicStats(topic string) (Stats, error) { +func (s *LogHub) GetTopicStats(topic string) (Stats, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -388,7 +388,7 @@ func (s *Seqlog) GetTopicStats(topic string) (Stats, error) { } // GetAllStats 获取所有 topic 的统计信息 -func (s *Seqlog) GetAllStats() map[string]Stats { +func (s *LogHub) GetAllStats() map[string]Stats { s.mu.RLock() defer s.mu.RUnlock() @@ -400,7 +400,7 @@ func (s *Seqlog) GetAllStats() map[string]Stats { } // NewTopicQuery 为指定 topic 获取查询器(返回共享实例) -func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) { +func (s *LogHub) NewTopicQuery(topic string) (*RecordQuery, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -413,7 +413,7 @@ func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) { } // GetProcessingIndex 获取指定 topic 的当前处理索引 -func (s *Seqlog) GetProcessingIndex(topic string) int { +func (s *LogHub) GetProcessingIndex(topic string) int { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -426,7 +426,7 @@ func (s *Seqlog) GetProcessingIndex(topic string) int { } // GetReadIndex 获取指定 topic 的当前读取索引 -func (s *Seqlog) GetReadIndex(topic string) int { +func (s *LogHub) GetReadIndex(topic string) int { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -439,7 +439,7 @@ func (s *Seqlog) GetReadIndex(topic string) int { } // GetProcessor 获取指定 topic 的 processor -func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) { +func (s *LogHub) GetProcessor(topic string) (*TopicProcessor, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -451,8 +451,44 @@ func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) { return processor, nil } +// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录 +// topic: 主题名称 +// count: 查询数量 +func (s *LogHub) QueryFromProcessing(topic string, count int) ([]*RecordWithStatus, error) { + processor, err := s.GetProcessor(topic) + if err != nil { + return nil, err + } + + return processor.QueryFromProcessing(count) +} + +// QueryFromFirst 从第一条记录向索引递增方向查询 +// topic: 主题名称 +// count: 查询数量 +func (s *LogHub) QueryFromFirst(topic string, count int) ([]*RecordWithStatus, error) { + processor, err := s.GetProcessor(topic) + if err != nil { + return nil, err + } + + return processor.QueryFromFirst(count) +} + +// QueryFromLast 从最后一条记录向索引递减方向查询 +// topic: 主题名称 +// count: 查询数量 +func (s *LogHub) QueryFromLast(topic string, count int) ([]*RecordWithStatus, error) { + processor, err := s.GetProcessor(topic) + if err != nil { + return nil, err + } + + return processor.QueryFromLast(count) +} + // Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅) -func (s *Seqlog) Subscribe(topic string, eventType EventType, listener EventListener) error { +func (s *LogHub) Subscribe(topic string, eventType EventType, listener EventListener) error { s.mu.Lock() defer s.mu.Unlock() @@ -476,7 +512,7 @@ func (s *Seqlog) Subscribe(topic string, eventType EventType, listener EventList } // SubscribeAll 为指定 topic 订阅所有事件 -func (s *Seqlog) SubscribeAll(topic string, listener EventListener) error { +func (s *LogHub) SubscribeAll(topic string, listener EventListener) error { s.mu.Lock() defer s.mu.Unlock() @@ -510,7 +546,7 @@ func (s *Seqlog) SubscribeAll(topic string, listener EventListener) error { } // SubscribeAllTopics 为所有 topic 订阅指定事件 -func (s *Seqlog) SubscribeAllTopics(eventType EventType, listener EventListener) { +func (s *LogHub) SubscribeAllTopics(eventType EventType, listener EventListener) { s.mu.Lock() defer s.mu.Unlock() @@ -525,8 +561,9 @@ func (s *Seqlog) SubscribeAllTopics(eventType EventType, listener EventListener) } // ResetTopic 重置指定 topic 的所有数据 -// 注意:必须先停止 Seqlog 或至少停止该 topic 的 processor -func (s *Seqlog) ResetTopic(topic string) error { +// 如果 processor 正在运行且没有待处理的日志,会自动停止后重置 +// 如果有待处理的日志,则返回错误 +func (s *LogHub) ResetTopic(topic string) error { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() @@ -535,17 +572,12 @@ func (s *Seqlog) ResetTopic(topic string) error { return NewTopicError(topic, "operation", ErrNotFound) } - // 先停止 processor - if err := processor.Stop(); err != nil { - return fmt.Errorf("failed to stop processor: %w", err) - } - - // 执行重置 + // 执行重置(如果没有待处理的日志会自动停止) if err := processor.Reset(); err != nil { return fmt.Errorf("failed to reset processor: %w", err) } - // 如果 seqlog 正在运行,重新启动 processor + // 如果 LogHub 正在运行,重新启动 processor s.mu.RLock() running := s.running s.mu.RUnlock() diff --git a/query.go b/query.go index 1c00bd8..0747755 100644 --- a/query.go +++ b/query.go @@ -36,7 +36,14 @@ func (s RecordStatus) String() string { // RecordWithStatus 带状态的记录 type RecordWithStatus struct { Record *Record - Status RecordStatus + Index int // 记录在日志文件中的索引位置 + Status RecordStatus // 记录的处理状态 +} + +// RecordWithIndex 带索引的记录 +type RecordWithIndex struct { + Record *Record + Index int // 记录在日志文件中的索引位置 } // RecordQuery 记录查询器 @@ -115,26 +122,80 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, err return results, nil } -// QueryOldest 从指定索引向索引递增方向查询记录 -// startIndex: 查询起始索引 +// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) +// refIndex: 参考索引位置 // count: 查询数量 -// 返回的记录按索引递增方向排序 -func (rq *RecordQuery) QueryOldest(startIndex, count int) ([]*Record, error) { +// 返回的记录按索引递增方向排序,包含索引信息 +// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5),返回 [2, 3, 4] +func (rq *RecordQuery) QueryOldest(refIndex, count int) ([]*RecordWithIndex, error) { if count <= 0 { return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() if totalCount == 0 { - return []*Record{}, nil + return []*RecordWithIndex{}, nil } - // 校验起始索引 + // 验证参考索引范围(严格模式) + if refIndex < 0 || refIndex > totalCount { + return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [0, %d]", refIndex, totalCount), ErrInvalidRange) + } + + // 计算实际起始索引(向索引递减方向) + startIndex := refIndex - count if startIndex < 0 { startIndex = 0 + count = refIndex // 调整实际数量 } + + if count <= 0 { + return []*RecordWithIndex{}, nil + } + + // 读取记录 + records, err := rq.readRecordsForward(startIndex, count) + if err != nil { + return nil, err + } + + // 转换为带索引的记录 + results := make([]*RecordWithIndex, len(records)) + for i, rec := range records { + results[i] = &RecordWithIndex{ + Record: rec, + Index: startIndex + i, + } + } + + return results, nil +} + +// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录) +// refIndex: 参考索引位置 +// count: 查询数量 +// 返回的记录按索引递增方向排序,包含索引信息 +// 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5),返回 [6, 7, 8] +func (rq *RecordQuery) QueryNewest(refIndex, count int) ([]*RecordWithIndex, error) { + if count <= 0 { + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) + } + + totalCount := rq.index.Count() + if totalCount == 0 { + return []*RecordWithIndex{}, nil + } + + // 验证参考索引范围(严格模式) + // QueryNewest 允许 refIndex = -1(从头开始查询) + if refIndex < -1 || refIndex >= totalCount { + return nil, NewValidationError("refIndex", fmt.Sprintf("refIndex %d out of range [-1, %d)", refIndex, totalCount), ErrInvalidRange) + } + + // 计算实际起始索引(向索引递增方向) + startIndex := refIndex + 1 if startIndex >= totalCount { - return []*Record{}, nil + return []*RecordWithIndex{}, nil } // 限制查询数量 @@ -143,40 +204,22 @@ func (rq *RecordQuery) QueryOldest(startIndex, count int) ([]*Record, error) { count = remainCount } - return rq.readRecordsForward(startIndex, count) -} - -// QueryNewest 从指定索引向索引递减方向查询记录 -// endIndex: 查询的最大索引(向前查询更早的记录) -// count: 查询数量 -// 返回的记录按索引递增方向排序(与 QueryOldest 一致) -func (rq *RecordQuery) QueryNewest(endIndex, count int) ([]*Record, error) { - if count <= 0 { - return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) + // 读取记录 + records, err := rq.readRecordsForward(startIndex, count) + if err != nil { + return nil, err } - totalCount := rq.index.Count() - if totalCount == 0 { - return []*Record{}, nil + // 转换为带索引的记录 + results := make([]*RecordWithIndex, len(records)) + for i, rec := range records { + results[i] = &RecordWithIndex{ + Record: rec, + Index: startIndex + i, + } } - // 校验结束索引 - if endIndex < 0 { - return []*Record{}, nil - } - if endIndex >= totalCount { - endIndex = totalCount - 1 - } - - // 计算实际起始索引(向索引递减方向查询 count 条) - queryStartIdx := endIndex - count + 1 - if queryStartIdx < 0 { - queryStartIdx = 0 - count = endIndex + 1 // 调整实际数量 - } - - // 向前读取,返回按索引递增排序的结果 - return rq.readRecordsForward(queryStartIdx, count) + return results, nil } // GetRecordCount 获取记录总数 diff --git a/query_test.go b/query_test.go new file mode 100644 index 0000000..f44a58b --- /dev/null +++ b/query_test.go @@ -0,0 +1,75 @@ +package seqlog + +import ( + "fmt" + "testing" + "time" +) + +// TestQueryFromProcessing 测试从处理窗口开始位置查询记录 +func TestQueryFromProcessing(t *testing.T) { + // 创建临时目录 + tmpDir := t.TempDir() + + // 创建配置(带 handler,不处理任何记录以保持窗口稳定) + config := &TopicConfig{ + Handler: func(record *Record) error { + // 不处理,只是为了让 tailer 启动 + time.Sleep(1 * time.Second) // 延迟处理 + return nil + }, + } + + // 创建 TopicProcessor + processor, err := NewTopicProcessor(tmpDir, "test", nil, config) + if err != nil { + t.Fatal(err) + } + defer processor.Close() + + // 写入 10 条记录 + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("message %d", i) + _, err := processor.Write([]byte(msg)) + if err != nil { + t.Fatal(err) + } + } + + // 不启动 tailer,直接测试查询功能 + // startIdx 应该是 0(没有处理任何记录) + + // 从处理窗口开始位置查询 5 条记录 + records, err := processor.QueryFromProcessing(5) + if err != nil { + t.Fatal(err) + } + + t.Logf("查询到 %d 条记录", len(records)) + + if len(records) != 5 { + t.Fatalf("expected 5 records, got %d", len(records)) + } + + // 验证查询结果从索引 0 开始 + for i, rec := range records { + expectedIndex := i + if rec.Index != expectedIndex { + t.Errorf("record[%d]: expected index %d, got %d", i, expectedIndex, rec.Index) + } + + expectedMsg := fmt.Sprintf("message %d", expectedIndex) + if string(rec.Record.Data) != expectedMsg { + t.Errorf("record[%d]: expected data '%s', got '%s'", i, expectedMsg, string(rec.Record.Data)) + } + + // 未启动 tailer,所有记录都应该是 Pending 状态 + if rec.Status != StatusPending { + t.Errorf("record[%d]: expected StatusPending, got %s", i, rec.Status) + } + + t.Logf(" [索引 %d] %s - 状态: %s", rec.Index, string(rec.Record.Data), rec.Status) + } + + t.Log("QueryFromProcessing 测试通过") +} diff --git a/seqlog.go b/seqlog.go index 2aa745f..f8f9c18 100644 --- a/seqlog.go +++ b/seqlog.go @@ -42,13 +42,13 @@ import "github.com/google/uuid" // tailer, _ := seqlog.NewTailer("app.log", handler, nil) // tailer.Start() // -// // 使用 Seqlog 管理器(带 slog 支持和自动恢复) +// // 使用 LogHub 管理器(带 slog 支持和自动恢复) // logger := slog.Default() // handler := func(topic string, rec *seqlog.Record) error { // fmt.Printf("[%s] %s\n", topic, string(rec.Data)) // return nil // } -// seq := seqlog.NewSeqlog("/tmp/logs", logger, handler) +// seq := seqlog.NewLogHub("/tmp/logs", logger, handler) // seq.Start() // 自动发现并恢复已存在的日志文件 // seq.Write("app", []byte("application log")) // diff --git a/seqlog_test.go b/seqlog_test.go index cb523aa..45d0d23 100644 --- a/seqlog_test.go +++ b/seqlog_test.go @@ -437,7 +437,7 @@ func TestSeqlogBasic(t *testing.T) { os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) - seqlog := NewSeqlog(tmpDir, slog.Default(), nil) + seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler appLogs := make([]string, 0) @@ -493,7 +493,7 @@ func TestSeqlogDefaultHandler(t *testing.T) { return nil } - seqlog := NewSeqlog(tmpDir, slog.Default(), defaultHandler) + seqlog := NewLogHub(tmpDir, slog.Default(), defaultHandler) // 注册特定 handler seqlog.RegisterHandler("special", func(rec *Record) error { @@ -534,7 +534,7 @@ func TestSeqlogDynamicRegistration(t *testing.T) { os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) - seqlog := NewSeqlog(tmpDir, slog.Default(), nil) + seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 先注册 handler(handler 现在是必填项) logs := make([]string, 0) @@ -566,7 +566,7 @@ func TestDynamicConfigUpdate(t *testing.T) { os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) - seqlog := NewSeqlog(tmpDir, slog.Default(), nil) + seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler logs := make([]string, 0) @@ -746,7 +746,7 @@ func TestSeqlogAutoRecovery(t *testing.T) { return nil } - seqlog1 := NewSeqlog(tmpDir, slog.Default(), defaultHandler) + seqlog1 := NewLogHub(tmpDir, slog.Default(), defaultHandler) seqlog1.Start() // 写入一些日志 @@ -777,7 +777,7 @@ func TestSeqlogAutoRecovery(t *testing.T) { mu.Unlock() // 第二阶段:重启并自动恢复 - seqlog2 := NewSeqlog(tmpDir, slog.Default(), defaultHandler) + seqlog2 := NewLogHub(tmpDir, slog.Default(), defaultHandler) seqlog2.Start() // 写入新日志 @@ -880,7 +880,7 @@ func TestSeqlogCleanup(t *testing.T) { os.MkdirAll(tmpDir, 0755) defer os.RemoveAll(tmpDir) - seqlog := NewSeqlog(tmpDir, slog.Default(), nil) + seqlog := NewLogHub(tmpDir, slog.Default(), nil) seqlog.Start() // 写入多个 topic 的日志 @@ -1118,7 +1118,7 @@ func TestSeqlogStats(t *testing.T) { return nil } - seq := NewSeqlog(tmpDir, slog.Default(), handler) + seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } @@ -1229,16 +1229,19 @@ func TestRecordQuery(t *testing.T) { } defer query.Close() - // 测试查询当前位置 - current, err := query.QueryOldest(startIdx, 1) + // 测试查询当前位置(使用 QueryNewest 查询 startIdx) + current, err := query.QueryNewest(startIdx-1, 1) if err != nil { t.Fatalf("failed to query current: %v", err) } if len(current) != 1 { t.Fatalf("expected 1 current result, got %d", len(current)) } - if string(current[0].Data) != "message 5" { - t.Errorf("expected current 'message 5', got '%s'", string(current[0].Data)) + if string(current[0].Record.Data) != "message 5" { + t.Errorf("expected current 'message 5', got '%s'", string(current[0].Record.Data)) + } + if current[0].Index != startIdx { + t.Errorf("expected index %d, got %d", startIdx, current[0].Index) } // 手动判断状态 status := GetRecordStatus(startIdx, startIdx, endIdx) @@ -1246,43 +1249,53 @@ func TestRecordQuery(t *testing.T) { t.Errorf("expected status Processing, got %s", status) } - // 测试向后查询(查询更早的记录,返回按索引递增排序) - backResults, err := query.QueryNewest(startIdx-1, 3) + // 测试 QueryOldest:查询更早的记录(向索引递减方向) + // QueryOldest(5, 3) 查询索引 2, 3, 4 + backResults, err := query.QueryOldest(startIdx, 3) if err != nil { t.Fatalf("failed to query backward: %v", err) } if len(backResults) != 3 { t.Errorf("expected 3 backward results, got %d", len(backResults)) } - // 向后查询返回按索引递增排序的结果 + // 返回按索引递增排序的结果:2, 3, 4 expectedBack := []string{"message 2", "message 3", "message 4"} for i, rec := range backResults { - if string(rec.Data) != expectedBack[i] { - t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Data)) + if string(rec.Record.Data) != expectedBack[i] { + t.Errorf("backward[%d]: expected '%s', got '%s'", i, expectedBack[i], string(rec.Record.Data)) } - // 手动判断状态:索引 2, 3, 4 - recStatus := GetRecordStatus(startIdx-3+i, startIdx, endIdx) + expectedIndex := startIdx - 3 + i + if rec.Index != expectedIndex { + t.Errorf("backward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index) + } + // 手动判断状态:索引 2, 3, 4 都已处理 + recStatus := GetRecordStatus(rec.Index, startIdx, endIdx) if recStatus != StatusProcessed { t.Errorf("backward[%d]: expected status Processed, got %s", i, recStatus) } } - // 测试向前查询(查询更新的记录) - forwardResults, err := query.QueryOldest(endIdx, 3) + // 测试 QueryNewest:查询更新的记录(向索引递增方向) + // QueryNewest(endIdx, 3) 从 endIdx 向后查询,查询索引 6, 7, 8 + forwardResults, err := query.QueryNewest(endIdx-1, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) } if len(forwardResults) != 3 { t.Errorf("expected 3 forward results, got %d", len(forwardResults)) } - // 向前查询返回顺序结果 + // 返回按索引递增排序的结果:6, 7, 8 expectedForward := []string{"message 6", "message 7", "message 8"} for i, rec := range forwardResults { - if string(rec.Data) != expectedForward[i] { - t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Data)) + if string(rec.Record.Data) != expectedForward[i] { + t.Errorf("forward[%d]: expected '%s', got '%s'", i, expectedForward[i], string(rec.Record.Data)) } - // 手动判断状态:索引 6, 7, 8 - recStatus := GetRecordStatus(endIdx+i, startIdx, endIdx) + expectedIndex := endIdx + i + if rec.Index != expectedIndex { + t.Errorf("forward[%d]: expected index %d, got %d", i, expectedIndex, rec.Index) + } + // 手动判断状态:索引 6, 7, 8 待处理 + recStatus := GetRecordStatus(rec.Index, startIdx, endIdx) if recStatus != StatusPending { t.Errorf("forward[%d]: expected status Pending, got %s", i, recStatus) } @@ -1384,7 +1397,7 @@ func TestSeqlogQuery(t *testing.T) { return nil } - seq := NewSeqlog(tmpDir, slog.Default(), handler) + seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } @@ -1414,7 +1427,7 @@ func TestSeqlogQuery(t *testing.T) { // 获取 processor 用于查询(带状态) processor, _ := seq.GetProcessor("app") - index := processor.Index() + totalCount := processor.GetRecordCount() // 测试查询当前 if startIdx < endIdx { @@ -1439,7 +1452,7 @@ func TestSeqlogQuery(t *testing.T) { } // 测试向前查询 - if startIdx < index.Count() { + if startIdx < totalCount { forward, err := processor.QueryOldest(endIdx, 3) if err != nil { t.Fatalf("failed to query forward: %v", err) @@ -1569,7 +1582,7 @@ func TestSeqlogEventSubscription(t *testing.T) { return nil } - seq := NewSeqlog(tmpDir, slog.Default(), handler) + seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } @@ -1612,7 +1625,7 @@ func TestMultiTopicEventSubscription(t *testing.T) { return nil } - seq := NewSeqlog(tmpDir, slog.Default(), handler) + seq := NewLogHub(tmpDir, slog.Default(), handler) if err := seq.Start(); err != nil { t.Fatalf("failed to start seqlog: %v", err) } @@ -1658,7 +1671,7 @@ func TestMultiTopicEventSubscription(t *testing.T) { func TestTopicReset(t *testing.T) { tmpDir := t.TempDir() - seqlog := NewSeqlog(tmpDir, slog.Default(), nil) + seqlog := NewLogHub(tmpDir, slog.Default(), nil) // 注册 handler seqlog.RegisterHandler("test", func(rec *Record) error { @@ -1750,6 +1763,50 @@ func TestTopicReset(t *testing.T) { seqlog.Stop() } +// TestTopicResetWithPendingRecords 测试当有待处理日志时 Reset 返回错误 +func TestTopicResetWithPendingRecords(t *testing.T) { + tmpDir := t.TempDir() + + seqlog := NewLogHub(tmpDir, slog.Default(), nil) + + // 注册一个慢速 handler,让日志堆积 + slowHandler := func(rec *Record) error { + time.Sleep(100 * time.Millisecond) // 模拟慢速处理 + return nil + } + + seqlog.RegisterHandler("test", slowHandler) + seqlog.Start() + + // 快速写入多条日志 + for i := 0; i < 10; i++ { + data := []byte(fmt.Sprintf("message %d", i)) + if _, err := seqlog.Write("test", data); err != nil { + t.Fatalf("写入失败: %v", err) + } + } + + // 短暂等待,让一部分日志开始处理但不是全部 + time.Sleep(200 * time.Millisecond) + + // 尝试重置,应该失败因为有待处理的日志 + err := seqlog.ResetTopic("test") + if err == nil { + t.Fatal("期望 Reset 失败因为有待处理的日志,但成功了") + } + + t.Logf("预期的错误: %v", err) + + // 停止处理 + seqlog.Stop() + + // 现在 Reset 应该成功(停止后没有待处理的日志) + processor, _ := seqlog.GetProcessor("test") + if err := processor.Reset(); err != nil { + t.Fatalf("停止后 Reset 应该成功: %v", err) + } +} + // TestQueryOldestNewest 测试 QueryOldest 和 QueryNewest func TestQueryOldestNewest(t *testing.T) { tmpDir := t.TempDir() @@ -1773,10 +1830,11 @@ func TestQueryOldestNewest(t *testing.T) { } } - // 测试 QueryOldest - 从索引 0 开始查询 3 条 - oldest, err := processor.QueryOldest(0, 3) + // 测试 QueryNewest - 查询索引 0, 1, 2(向索引递增方向) + // QueryNewest(-1, 3) 从 -1 向后查询,得到索引 0, 1, 2 + oldest, err := processor.QueryNewest(-1, 3) if err != nil { - t.Fatalf("QueryOldest failed: %v", err) + t.Fatalf("QueryNewest failed: %v", err) } if len(oldest) != 3 { t.Errorf("expected 3 records, got %d", len(oldest)) @@ -1787,14 +1845,18 @@ func TestQueryOldestNewest(t *testing.T) { if string(oldest[i].Record.Data) != expected { t.Errorf("oldest[%d]: expected %s, got %s", i, expected, string(oldest[i].Record.Data)) } - t.Logf("Oldest[%d]: %s - %s", i, string(oldest[i].Record.Data), oldest[i].Status) + if oldest[i].Index != i { + t.Errorf("oldest[%d]: expected index %d, got %d", i, i, oldest[i].Index) + } + t.Logf("Oldest[%d]: index=%d, %s - %s", i, oldest[i].Index, string(oldest[i].Record.Data), oldest[i].Status) } - // 测试 QueryNewest - 从索引 9 结束查询 3 条 + // 测试 QueryOldest - 查询索引 7, 8, 9(向索引递减方向) + // QueryOldest(10, 3) 从 10 向前查询,得到索引 7, 8, 9 totalCount := processor.GetRecordCount() - newest, err := processor.QueryNewest(totalCount-1, 3) + newest, err := processor.QueryOldest(totalCount, 3) if err != nil { - t.Fatalf("QueryNewest failed: %v", err) + t.Fatalf("QueryOldest failed: %v", err) } if len(newest) != 3 { t.Errorf("expected 3 records, got %d", len(newest)) @@ -1805,13 +1867,17 @@ func TestQueryOldestNewest(t *testing.T) { if string(newest[i].Record.Data) != expected { t.Errorf("newest[%d]: expected %s, got %s", i, expected, string(newest[i].Record.Data)) } - t.Logf("Newest[%d]: %s - %s", i, string(newest[i].Record.Data), newest[i].Status) + if newest[i].Index != 7+i { + t.Errorf("newest[%d]: expected index %d, got %d", i, 7+i, newest[i].Index) + } + t.Logf("Newest[%d]: index=%d, %s - %s", i, newest[i].Index, string(newest[i].Record.Data), newest[i].Status) } - // 测试超出范围 - all, err := processor.QueryOldest(0, 100) + // 测试超出范围 - 查询所有记录 + // QueryNewest(-1, 100) 从 -1 向后查询,会返回所有记录(最多 100 条) + all, err := processor.QueryNewest(-1, 100) if err != nil { - t.Fatalf("QueryOldest(0, 100) failed: %v", err) + t.Fatalf("QueryNewest(-1, 100) failed: %v", err) } if len(all) != 10 { t.Errorf("expected 10 records, got %d", len(all)) @@ -1828,11 +1894,195 @@ func TestQueryOldestNewest(t *testing.T) { } defer processor2.Close() - emptyOldest, err := processor2.QueryOldest(0, 10) + emptyNewest, err := processor2.QueryNewest(-1, 10) if err != nil { - t.Fatalf("QueryOldest on empty failed: %v", err) + t.Fatalf("QueryNewest on empty failed: %v", err) } - if len(emptyOldest) != 0 { - t.Errorf("expected 0 records, got %d", len(emptyOldest)) + if len(emptyNewest) != 0 { + t.Errorf("expected 0 records, got %d", len(emptyNewest)) + } +} + +// TestQueryFromFirstAndLast 测试 QueryFromFirst 和 QueryFromLast +func TestQueryFromFirstAndLast(t *testing.T) { + tmpDir := t.TempDir() + + // 创建 TopicProcessor + processor, err := NewTopicProcessor(tmpDir, "test", nil, &TopicConfig{ + Handler: func(rec *Record) error { + return nil + }, + }) + if err != nil { + t.Fatal(err) + } + defer processor.Close() + + // 写入 10 条测试数据 + for i := 0; i < 10; i++ { + data := fmt.Sprintf("message %d", i) + if _, err := processor.Write([]byte(data)); err != nil { + t.Fatal(err) + } + } + + // 测试 QueryFromFirst - 从第一条记录向索引递增方向查询 + t.Run("QueryFromFirst", func(t *testing.T) { + // 查询前 3 条记录 + records, err := processor.QueryFromFirst(3) + if err != nil { + t.Fatalf("QueryFromFirst failed: %v", err) + } + + if len(records) != 3 { + t.Fatalf("expected 3 records, got %d", len(records)) + } + + // 验证结果:应该是索引 0, 1, 2 + for i := 0; i < 3; i++ { + expectedData := fmt.Sprintf("message %d", i) + if string(records[i].Record.Data) != expectedData { + t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data)) + } + if records[i].Index != i { + t.Errorf("records[%d]: expected index %d, got %d", i, i, records[i].Index) + } + t.Logf("FromFirst[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status) + } + + // 查询超过总数的记录 + allRecords, err := processor.QueryFromFirst(100) + if err != nil { + t.Fatalf("QueryFromFirst(100) failed: %v", err) + } + if len(allRecords) != 10 { + t.Errorf("expected 10 records, got %d", len(allRecords)) + } + }) + + // 测试 QueryFromLast - 从最后一条记录向索引递减方向查询 + t.Run("QueryFromLast", func(t *testing.T) { + // 查询最后 3 条记录 + records, err := processor.QueryFromLast(3) + if err != nil { + t.Fatalf("QueryFromLast failed: %v", err) + } + + if len(records) != 3 { + t.Fatalf("expected 3 records, got %d", len(records)) + } + + // 验证结果:应该是索引 7, 8, 9(按索引递增顺序排列) + for i := 0; i < 3; i++ { + expectedIndex := 7 + i + expectedData := fmt.Sprintf("message %d", expectedIndex) + if string(records[i].Record.Data) != expectedData { + t.Errorf("records[%d]: expected %s, got %s", i, expectedData, string(records[i].Record.Data)) + } + if records[i].Index != expectedIndex { + t.Errorf("records[%d]: expected index %d, got %d", i, expectedIndex, records[i].Index) + } + t.Logf("FromLast[%d]: index=%d, %s - %s", i, records[i].Index, string(records[i].Record.Data), records[i].Status) + } + + // 查询超过总数的记录 + allRecords, err := processor.QueryFromLast(100) + if err != nil { + t.Fatalf("QueryFromLast(100) failed: %v", err) + } + if len(allRecords) != 10 { + t.Errorf("expected 10 records, got %d", len(allRecords)) + } + }) + + // 测试空数据库 + t.Run("EmptyDatabase", func(t *testing.T) { + emptyProcessor, err := NewTopicProcessor(t.TempDir(), "empty", nil, &TopicConfig{ + Handler: func(rec *Record) error { + return nil + }, + }) + if err != nil { + t.Fatal(err) + } + defer emptyProcessor.Close() + + // QueryFromFirst 应该返回空数组 + firstRecords, err := emptyProcessor.QueryFromFirst(10) + if err != nil { + t.Fatalf("QueryFromFirst on empty failed: %v", err) + } + if len(firstRecords) != 0 { + t.Errorf("expected 0 records, got %d", len(firstRecords)) + } + + // QueryFromLast 应该返回空数组 + lastRecords, err := emptyProcessor.QueryFromLast(10) + if err != nil { + t.Fatalf("QueryFromLast on empty failed: %v", err) + } + if len(lastRecords) != 0 { + t.Errorf("expected 0 records, got %d", len(lastRecords)) + } + }) +} + +// TestLogHubQueryFromFirstAndLast 测试 LogHub 的 QueryFromFirst 和 QueryFromLast +func TestLogHubQueryFromFirstAndLast(t *testing.T) { + tmpDir := t.TempDir() + + seqlog := NewLogHub(tmpDir, slog.Default(), nil) + seqlog.RegisterHandler("test", func(rec *Record) error { + return nil + }) + seqlog.Start() + defer seqlog.Stop() + + // 写入测试数据 + for i := 0; i < 10; i++ { + data := fmt.Sprintf("message %d", i) + if _, err := seqlog.Write("test", []byte(data)); err != nil { + t.Fatal(err) + } + } + + // 测试 QueryFromFirst + firstRecords, err := seqlog.QueryFromFirst("test", 3) + if err != nil { + t.Fatalf("QueryFromFirst failed: %v", err) + } + if len(firstRecords) != 3 { + t.Fatalf("expected 3 records, got %d", len(firstRecords)) + } + for i := 0; i < 3; i++ { + if firstRecords[i].Index != i { + t.Errorf("firstRecords[%d]: expected index %d, got %d", i, i, firstRecords[i].Index) + } + } + + // 测试 QueryFromLast + lastRecords, err := seqlog.QueryFromLast("test", 3) + if err != nil { + t.Fatalf("QueryFromLast failed: %v", err) + } + if len(lastRecords) != 3 { + t.Fatalf("expected 3 records, got %d", len(lastRecords)) + } + for i := 0; i < 3; i++ { + expectedIndex := 7 + i + if lastRecords[i].Index != expectedIndex { + t.Errorf("lastRecords[%d]: expected index %d, got %d", i, expectedIndex, lastRecords[i].Index) + } + } + + // 测试不存在的 topic + _, err = seqlog.QueryFromFirst("nonexistent", 10) + if err == nil { + t.Error("expected error for nonexistent topic") + } + + _, err = seqlog.QueryFromLast("nonexistent", 10) + if err == nil { + t.Error("expected error for nonexistent topic") } } diff --git a/tailer.go b/tailer.go index a16f6db..20dbb35 100644 --- a/tailer.go +++ b/tailer.go @@ -22,7 +22,7 @@ type TailConfig struct { // LogTailer 持续监控处理器 type LogTailer struct { - cursor *LogCursor + cursor *ProcessCursor handler RecordHandler config TailConfig configCh chan TailConfig // 用于动态更新配置 @@ -32,7 +32,7 @@ type LogTailer struct { // NewTailer 创建一个新的 tail 处理器 // cursor: 外部提供的游标,用于读取和跟踪日志位置 -func NewTailer(cursor *LogCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) { +func NewTailer(cursor *ProcessCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) { if cursor == nil { return nil, fmt.Errorf("cursor cannot be nil") } diff --git a/topic_processor.go b/topic_processor.go index dc0f35b..0cdf8ca 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -20,7 +20,7 @@ type TopicProcessor struct { writer *LogWriter // 写入器 index *RecordIndex // 索引管理器 query *RecordQuery // 查询器 - cursor *LogCursor // 游标 + cursor *ProcessCursor // 游标 tailer *LogTailer // 持续处理器 // 配置和状态 @@ -339,13 +339,14 @@ func (tp *TopicProcessor) Query() *RecordQuery { return tp.query } -// QueryOldest 从指定索引向索引递增方向查询记录 -// startIndex: 查询起始索引 +// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) +// refIndex: 参考索引位置 // count: 查询数量 -// 返回的记录包含状态信息(基于 tailer 的窗口索引),按索引递增方向排序 -func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatus, error) { - // 查询记录 - records, err := tp.query.QueryOldest(startIndex, count) +// 返回的记录包含索引和状态信息,按索引递增方向排序 +// 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5) +func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) { + // 查询记录(包含索引信息) + records, err := tp.query.QueryOldest(refIndex, count) if err != nil { return nil, err } @@ -363,21 +364,23 @@ func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatu results := make([]*RecordWithStatus, len(records)) for i, rec := range records { results[i] = &RecordWithStatus{ - Record: rec, - Status: GetRecordStatus(startIndex+i, startIdx, endIdx), + Record: rec.Record, + Index: rec.Index, + Status: GetRecordStatus(rec.Index, startIdx, endIdx), } } return results, nil } -// QueryNewest 从指定索引向索引递减方向查询记录 -// endIndex: 查询的最大索引(向前查询更早的记录) +// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录) +// refIndex: 参考索引位置 // count: 查询数量 -// 返回的记录包含状态信息(基于 tailer 的窗口索引),按索引递增方向排序 -func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, error) { - // 查询记录 - records, err := tp.query.QueryNewest(endIndex, count) +// 返回的记录包含索引和状态信息,按索引递增方向排序 +// 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5) +func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) { + // 查询记录(包含索引信息) + records, err := tp.query.QueryNewest(refIndex, count) if err != nil { return nil, err } @@ -391,12 +394,13 @@ func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, } tp.mu.RUnlock() - // 为每条记录添加状态(倒序:endIndex, endIndex-1, ...) + // 为每条记录添加状态 results := make([]*RecordWithStatus, len(records)) for i, rec := range records { results[i] = &RecordWithStatus{ - Record: rec, - Status: GetRecordStatus(endIndex-i, startIdx, endIdx), + Record: rec.Record, + Index: rec.Index, + Status: GetRecordStatus(rec.Index, startIdx, endIdx), } } @@ -408,16 +412,43 @@ func (tp *TopicProcessor) GetRecordCount() int { return tp.index.Count() } -// Cursor 创建一个新的游标实例(使用共享的 index) -// 注意:每次调用都会创建新实例,调用者需要负责关闭 -// Tailer 内部有自己的游标,不会与此冲突 -func (tp *TopicProcessor) Cursor() (*LogCursor, error) { - return NewCursor(tp.logPath, tp.index) +// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录 +// count: 查询数量 +// 返回从处理窗口开始位置(startIndex)向后的记录,包含状态信息 +func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) { + // 获取当前处理窗口的开始位置 + tp.mu.RLock() + var startIdx int + if tp.tailer != nil { + startIdx = tp.tailer.GetStartIndex() + } + tp.mu.RUnlock() + + // 从 startIdx 开始向索引递增方向查询 + // QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录 + // 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count) + return tp.QueryNewest(startIdx-1, count) } -// Index 获取索引管理器 -func (tp *TopicProcessor) Index() *RecordIndex { - return tp.index +// QueryFromFirst 从第一条记录向索引递增方向查询 +// count: 查询数量 +// 返回从第一条记录(索引 0)开始的记录,包含状态信息 +// 例如:QueryFromFirst(3) 查询索引 0, 1, 2 +func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) { + // QueryNewest(-1, count) 会从索引 0 开始向后查询 + return tp.QueryNewest(-1, count) +} + +// QueryFromLast 从最后一条记录向索引递减方向查询 +// count: 查询数量 +// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序 +// 例如:如果总共有 10 条记录,QueryFromLast(3) 查询索引 7, 8, 9 +func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) { + // 获取记录总数 + totalCount := tp.index.Count() + + // QueryOldest(totalCount, count) 会从最后一条记录开始向前查询 + return tp.QueryOldest(totalCount, count) } // GetProcessingIndex 获取当前处理索引(窗口开始索引) @@ -460,13 +491,42 @@ func (tp *TopicProcessor) Unsubscribe(eventType EventType) { } // Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件 -// 注意:必须在 Stop 之后调用 +// 如果 processor 正在运行且没有待处理的日志,会自动停止 +// 如果有待处理的日志,则返回错误,需要等待处理完成或手动停止 func (tp *TopicProcessor) Reset() error { tp.mu.Lock() defer tp.mu.Unlock() if tp.running { - return fmt.Errorf("cannot reset while processor is running, please stop first") + // 检查是否有待处理的日志 + processingIndex := 0 + if tp.tailer != nil { + processingIndex = tp.tailer.GetStartIndex() + } + recordCount := tp.index.Count() + + hasPendingRecords := processingIndex < recordCount + + if hasPendingRecords { + return fmt.Errorf("cannot reset while processor is running with pending records (%d/%d processed), please stop first or wait for processing to complete", processingIndex, recordCount) + } + + // 没有待处理的日志,自动停止 + tp.logger.Debug("auto-stopping processor before reset (no pending records)") + tp.running = false + tp.cancel() + + // 释放锁以等待 tailer 停止 + tp.mu.Unlock() + tp.wg.Wait() + tp.mu.Lock() + + // 发布停止事件 + tp.eventBus.Publish(&Event{ + Type: EventProcessorStop, + Topic: tp.topic, + Timestamp: time.Now(), + }) } tp.logger.Debug("resetting processor")