重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
30
cursor.go
30
cursor.go
@@ -172,6 +172,36 @@ func (c *ProcessCursor) Close() error {
|
||||
return c.fd.Close()
|
||||
}
|
||||
|
||||
// Reset 重置游标,删除位置文件并重新打开日志文件
|
||||
// 保持 index 和 writer 引用不变
|
||||
func (c *ProcessCursor) Reset() error {
|
||||
// 关闭文件
|
||||
if c.fd != nil {
|
||||
if err := c.fd.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
c.fd = nil
|
||||
}
|
||||
|
||||
// 删除位置文件
|
||||
if err := os.Remove(c.posFile); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// 重新打开日志文件
|
||||
fd, err := os.Open(c.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 重置状态
|
||||
c.fd = fd
|
||||
c.startIdx = 0
|
||||
c.endIdx = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// savePosition 保存当前读取位置到文件
|
||||
func (c *ProcessCursor) savePosition() error {
|
||||
f, err := os.Create(c.posFile)
|
||||
|
||||
38
index.go
38
index.go
@@ -249,6 +249,44 @@ func (ri *RecordIndex) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset 重置索引,清空所有数据并重新创建索引文件
|
||||
func (ri *RecordIndex) Reset() error {
|
||||
ri.mu.Lock()
|
||||
defer ri.mu.Unlock()
|
||||
|
||||
// 关闭索引文件
|
||||
if ri.indexFile != nil {
|
||||
if err := ri.indexFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
ri.indexFile = nil
|
||||
}
|
||||
|
||||
// 删除索引文件
|
||||
if err := os.Remove(ri.indexPath); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// 清空内存中的索引数据
|
||||
ri.offsets = make([]int64, 0, 1024)
|
||||
ri.dirtyCount = 0
|
||||
ri.lastSync = time.Now()
|
||||
|
||||
// 保存空索引(创建文件并写入头部)
|
||||
if err := ri.save(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 重新打开索引文件用于追加
|
||||
f, err := os.OpenFile(ri.indexPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reopen index file: %w", err)
|
||||
}
|
||||
ri.indexFile = f
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync 同步索引文件到磁盘(立即同步,不考虑批量策略)
|
||||
func (ri *RecordIndex) Sync() error {
|
||||
return ri.Flush()
|
||||
|
||||
21
query.go
21
query.go
@@ -464,3 +464,24 @@ func (rq *RecordQuery) Close() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset 重置查询器,关闭并重新打开日志文件
|
||||
// 保持 index 和 writer 引用不变
|
||||
func (rq *RecordQuery) Reset() error {
|
||||
// 关闭当前文件句柄
|
||||
if rq.fd != nil {
|
||||
if err := rq.fd.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
rq.fd = nil
|
||||
}
|
||||
|
||||
// 重新打开日志文件
|
||||
fd, err := os.Open(rq.logPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reopen log file: %w", err)
|
||||
}
|
||||
|
||||
rq.fd = fd
|
||||
return nil
|
||||
}
|
||||
|
||||
16
tailer.go
16
tailer.go
@@ -147,3 +147,19 @@ func (t *LogTailer) GetStartIndex() int {
|
||||
func (t *LogTailer) GetEndIndex() int {
|
||||
return t.cursor.EndIndex()
|
||||
}
|
||||
|
||||
// Reset 重置 tailer 的内部状态
|
||||
// 注意:调用前必须确保 tailer 已停止
|
||||
func (t *LogTailer) Reset() error {
|
||||
// 重新创建 channel(确保没有遗留的信号)
|
||||
t.stopCh = make(chan struct{})
|
||||
t.doneCh = make(chan struct{})
|
||||
|
||||
// 清空配置 channel
|
||||
select {
|
||||
case <-t.configCh:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -809,63 +808,48 @@ func (tp *TopicProcessor) Reset() error {
|
||||
|
||||
var errs []error
|
||||
|
||||
// 先关闭所有组件(避免在删除文件时重新创建)
|
||||
if tp.cursor != nil {
|
||||
if err := tp.cursor.Close(); err != nil {
|
||||
tp.logger.Error("failed to close cursor during reset", "error", err)
|
||||
errs = append(errs, fmt.Errorf("close cursor: %w", err))
|
||||
}
|
||||
tp.cursor = nil
|
||||
}
|
||||
if tp.query != nil {
|
||||
tp.query.Close()
|
||||
tp.query = nil
|
||||
}
|
||||
if tp.writer != nil {
|
||||
if err := tp.writer.Close(); err != nil {
|
||||
tp.logger.Error("failed to close writer during reset", "error", err)
|
||||
errs = append(errs, fmt.Errorf("close writer: %w", err))
|
||||
}
|
||||
tp.writer = nil
|
||||
}
|
||||
// 重置所有组件(保持引用关系不变)
|
||||
// 1. 重置索引(会删除索引文件并重新创建)
|
||||
if tp.index != nil {
|
||||
tp.index.Close()
|
||||
tp.index = nil
|
||||
}
|
||||
|
||||
// 删除所有文件
|
||||
// 删除日志文件
|
||||
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
|
||||
tp.logger.Error("failed to remove log file", "error", err)
|
||||
errs = append(errs, fmt.Errorf("remove log file: %w", err))
|
||||
}
|
||||
|
||||
// 删除位置文件
|
||||
posFile := tp.logPath + ".pos"
|
||||
if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) {
|
||||
tp.logger.Error("failed to remove position file", "error", err)
|
||||
errs = append(errs, fmt.Errorf("remove position file: %w", err))
|
||||
}
|
||||
|
||||
// 删除索引文件
|
||||
indexFile := tp.logPath + ".idx"
|
||||
if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) {
|
||||
tp.logger.Error("failed to remove index file", "error", err)
|
||||
errs = append(errs, fmt.Errorf("remove index file: %w", err))
|
||||
}
|
||||
|
||||
// 重新初始化所有组件
|
||||
if err := tp.initializeComponents(); err != nil {
|
||||
tp.logger.Error("failed to reinitialize components", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
|
||||
tp.setState(StateError, err)
|
||||
if len(errs) > 0 {
|
||||
return errs[0]
|
||||
if err := tp.index.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset index", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reset index: %w", err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 重置统计信息(会删除统计文件并重置所有计数器)
|
||||
// 2. 重置写入器(会删除日志文件并重新创建)
|
||||
if tp.writer != nil {
|
||||
if err := tp.writer.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset writer", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reset writer: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 重置游标(会删除位置文件并重置位置)
|
||||
if tp.cursor != nil {
|
||||
if err := tp.cursor.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset cursor", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reset cursor: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 重置查询器(重新打开日志文件)
|
||||
if tp.query != nil {
|
||||
if err := tp.query.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset query", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reset query: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// 5. 重置 tailer(重置内部状态)
|
||||
if tp.tailer != nil {
|
||||
if err := tp.tailer.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset tailer", "error", err)
|
||||
errs = append(errs, fmt.Errorf("reset tailer: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// 6. 重置统计信息(会删除统计文件并重置所有计数器)
|
||||
if tp.stats != nil {
|
||||
if err := tp.stats.Reset(); err != nil {
|
||||
tp.logger.Error("failed to reset stats", "error", err)
|
||||
@@ -873,6 +857,12 @@ func (tp *TopicProcessor) Reset() error {
|
||||
}
|
||||
}
|
||||
|
||||
// 如果有错误,设置为错误状态
|
||||
if len(errs) > 0 {
|
||||
tp.setState(StateError, errs[0])
|
||||
return errs[0]
|
||||
}
|
||||
|
||||
// 发布重置事件
|
||||
tp.eventBus.Publish(&Event{
|
||||
Type: EventProcessorReset,
|
||||
|
||||
35
writer.go
35
writer.go
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
// LogWriter 日志写入器
|
||||
type LogWriter struct {
|
||||
path string // 日志文件路径
|
||||
fd *os.File
|
||||
off int64 // 当前写入偏移
|
||||
dirtyOff int64 // 最后一次写入偏移
|
||||
@@ -33,6 +34,7 @@ func NewLogWriter(path string, index *RecordIndex) (*LogWriter, error) {
|
||||
off, _ := fd.Seek(0, 2) // 跳到尾部
|
||||
|
||||
w := &LogWriter{
|
||||
path: path,
|
||||
fd: fd,
|
||||
off: off,
|
||||
dirtyOff: -1,
|
||||
@@ -108,3 +110,36 @@ func (w *LogWriter) Close() error {
|
||||
}
|
||||
return w.fd.Close()
|
||||
}
|
||||
|
||||
// Reset 重置写入器,删除日志文件并重新创建
|
||||
// 保持 index 和 wbuf 引用不变
|
||||
func (w *LogWriter) Reset() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// 关闭当前文件句柄
|
||||
if w.fd != nil {
|
||||
if err := w.fd.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.fd = nil
|
||||
}
|
||||
|
||||
// 删除日志文件
|
||||
if err := os.Remove(w.path); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// 重新创建文件
|
||||
fd, err := os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 重置状态
|
||||
w.fd = fd
|
||||
w.off = 0
|
||||
w.dirtyOff = -1
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user