From 6fb0731935893d94129a0fdff7dd077d66719444 Mon Sep 17 00:00:00 2001 From: bourdon Date: Sat, 4 Oct 2025 21:58:54 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9A=E4=B8=BA=E6=A0=B8?= =?UTF-8?q?=E5=BF=83=E7=BB=84=E4=BB=B6=E5=AE=9E=E7=8E=B0=20Reset=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E4=BC=98=E5=8C=96=E9=87=8D=E7=BD=AE=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cursor.go | 30 ++++++++++++++ index.go | 38 ++++++++++++++++++ query.go | 21 ++++++++++ tailer.go | 16 ++++++++ topic_processor.go | 98 +++++++++++++++++++++------------------------- writer.go | 35 +++++++++++++++++ 6 files changed, 184 insertions(+), 54 deletions(-) diff --git a/cursor.go b/cursor.go index df6ab9d..e16e708 100644 --- a/cursor.go +++ b/cursor.go @@ -172,6 +172,36 @@ func (c *ProcessCursor) Close() error { return c.fd.Close() } +// Reset 重置游标,删除位置文件并重新打开日志文件 +// 保持 index 和 writer 引用不变 +func (c *ProcessCursor) Reset() error { + // 关闭文件 + if c.fd != nil { + if err := c.fd.Close(); err != nil { + return err + } + c.fd = nil + } + + // 删除位置文件 + if err := os.Remove(c.posFile); err != nil && !os.IsNotExist(err) { + return err + } + + // 重新打开日志文件 + fd, err := os.Open(c.path) + if err != nil { + return err + } + + // 重置状态 + c.fd = fd + c.startIdx = 0 + c.endIdx = 0 + + return nil +} + // savePosition 保存当前读取位置到文件 func (c *ProcessCursor) savePosition() error { f, err := os.Create(c.posFile) diff --git a/index.go b/index.go index 14e87da..9afe397 100644 --- a/index.go +++ b/index.go @@ -249,6 +249,44 @@ func (ri *RecordIndex) Close() error { return nil } +// Reset 重置索引,清空所有数据并重新创建索引文件 +func (ri *RecordIndex) Reset() error { + ri.mu.Lock() + defer ri.mu.Unlock() + + // 关闭索引文件 + if ri.indexFile != nil { + if err := ri.indexFile.Close(); err != nil { + return err + } + ri.indexFile = nil + } + + // 删除索引文件 + if err := os.Remove(ri.indexPath); err != nil && !os.IsNotExist(err) { + return err + } + + // 清空内存中的索引数据 + ri.offsets = make([]int64, 0, 1024) + ri.dirtyCount = 0 + ri.lastSync = time.Now() + + // 保存空索引(创建文件并写入头部) + if err := ri.save(); err != nil { + return err + } + + // 重新打开索引文件用于追加 + f, err := os.OpenFile(ri.indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("reopen index file: %w", err) + } + ri.indexFile = f + + return nil +} + // Sync 同步索引文件到磁盘(立即同步,不考虑批量策略) func (ri *RecordIndex) Sync() error { return ri.Flush() diff --git a/query.go b/query.go index c6842a7..89c0be2 100644 --- a/query.go +++ b/query.go @@ -464,3 +464,24 @@ func (rq *RecordQuery) Close() error { } return nil } + +// Reset 重置查询器,关闭并重新打开日志文件 +// 保持 index 和 writer 引用不变 +func (rq *RecordQuery) Reset() error { + // 关闭当前文件句柄 + if rq.fd != nil { + if err := rq.fd.Close(); err != nil { + return err + } + rq.fd = nil + } + + // 重新打开日志文件 + fd, err := os.Open(rq.logPath) + if err != nil { + return fmt.Errorf("reopen log file: %w", err) + } + + rq.fd = fd + return nil +} diff --git a/tailer.go b/tailer.go index 20dbb35..2934df6 100644 --- a/tailer.go +++ b/tailer.go @@ -147,3 +147,19 @@ func (t *LogTailer) GetStartIndex() int { func (t *LogTailer) GetEndIndex() int { return t.cursor.EndIndex() } + +// Reset 重置 tailer 的内部状态 +// 注意:调用前必须确保 tailer 已停止 +func (t *LogTailer) Reset() error { + // 重新创建 channel(确保没有遗留的信号) + t.stopCh = make(chan struct{}) + t.doneCh = make(chan struct{}) + + // 清空配置 channel + select { + case <-t.configCh: + default: + } + + return nil +} diff --git a/topic_processor.go b/topic_processor.go index bc540d8..c1e52d9 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "os" "path/filepath" "sync" "time" @@ -809,63 +808,48 @@ func (tp *TopicProcessor) Reset() error { var errs []error - // 先关闭所有组件(避免在删除文件时重新创建) - if tp.cursor != nil { - if err := tp.cursor.Close(); err != nil { - tp.logger.Error("failed to close cursor during reset", "error", err) - errs = append(errs, fmt.Errorf("close cursor: %w", err)) - } - tp.cursor = nil - } - if tp.query != nil { - tp.query.Close() - tp.query = nil - } - if tp.writer != nil { - if err := tp.writer.Close(); err != nil { - tp.logger.Error("failed to close writer during reset", "error", err) - errs = append(errs, fmt.Errorf("close writer: %w", err)) - } - tp.writer = nil - } + // 重置所有组件(保持引用关系不变) + // 1. 重置索引(会删除索引文件并重新创建) if tp.index != nil { - tp.index.Close() - tp.index = nil - } - - // 删除所有文件 - // 删除日志文件 - if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) { - tp.logger.Error("failed to remove log file", "error", err) - errs = append(errs, fmt.Errorf("remove log file: %w", err)) - } - - // 删除位置文件 - posFile := tp.logPath + ".pos" - if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) { - tp.logger.Error("failed to remove position file", "error", err) - errs = append(errs, fmt.Errorf("remove position file: %w", err)) - } - - // 删除索引文件 - indexFile := tp.logPath + ".idx" - if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) { - tp.logger.Error("failed to remove index file", "error", err) - errs = append(errs, fmt.Errorf("remove index file: %w", err)) - } - - // 重新初始化所有组件 - if err := tp.initializeComponents(); err != nil { - tp.logger.Error("failed to reinitialize components", "error", err) - errs = append(errs, fmt.Errorf("reinitialize components: %w", err)) - tp.setState(StateError, err) - if len(errs) > 0 { - return errs[0] + if err := tp.index.Reset(); err != nil { + tp.logger.Error("failed to reset index", "error", err) + errs = append(errs, fmt.Errorf("reset index: %w", err)) } - return err } - // 重置统计信息(会删除统计文件并重置所有计数器) + // 2. 重置写入器(会删除日志文件并重新创建) + if tp.writer != nil { + if err := tp.writer.Reset(); err != nil { + tp.logger.Error("failed to reset writer", "error", err) + errs = append(errs, fmt.Errorf("reset writer: %w", err)) + } + } + + // 3. 重置游标(会删除位置文件并重置位置) + if tp.cursor != nil { + if err := tp.cursor.Reset(); err != nil { + tp.logger.Error("failed to reset cursor", "error", err) + errs = append(errs, fmt.Errorf("reset cursor: %w", err)) + } + } + + // 4. 重置查询器(重新打开日志文件) + if tp.query != nil { + if err := tp.query.Reset(); err != nil { + tp.logger.Error("failed to reset query", "error", err) + errs = append(errs, fmt.Errorf("reset query: %w", err)) + } + } + + // 5. 重置 tailer(重置内部状态) + if tp.tailer != nil { + if err := tp.tailer.Reset(); err != nil { + tp.logger.Error("failed to reset tailer", "error", err) + errs = append(errs, fmt.Errorf("reset tailer: %w", err)) + } + } + + // 6. 重置统计信息(会删除统计文件并重置所有计数器) if tp.stats != nil { if err := tp.stats.Reset(); err != nil { tp.logger.Error("failed to reset stats", "error", err) @@ -873,6 +857,12 @@ func (tp *TopicProcessor) Reset() error { } } + // 如果有错误,设置为错误状态 + if len(errs) > 0 { + tp.setState(StateError, errs[0]) + return errs[0] + } + // 发布重置事件 tp.eventBus.Publish(&Event{ Type: EventProcessorReset, diff --git a/writer.go b/writer.go index 10e2bd4..61dbfb6 100644 --- a/writer.go +++ b/writer.go @@ -11,6 +11,7 @@ import ( // LogWriter 日志写入器 type LogWriter struct { + path string // 日志文件路径 fd *os.File off int64 // 当前写入偏移 dirtyOff int64 // 最后一次写入偏移 @@ -33,6 +34,7 @@ func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) { off, _ := fd.Seek(0, 2) // 跳到尾部 w := &LogWriter{ + path: path, fd: fd, off: off, dirtyOff: -1, @@ -108,3 +110,36 @@ func (w *LogWriter) Close() error { } return w.fd.Close() } + +// Reset 重置写入器,删除日志文件并重新创建 +// 保持 index 和 wbuf 引用不变 +func (w *LogWriter) Reset() error { + w.mu.Lock() + defer w.mu.Unlock() + + // 关闭当前文件句柄 + if w.fd != nil { + if err := w.fd.Close(); err != nil { + return err + } + w.fd = nil + } + + // 删除日志文件 + if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) { + return err + } + + // 重新创建文件 + fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + + // 重置状态 + w.fd = fd + w.off = 0 + w.dirtyOff = -1 + + return nil +}