重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
package seqlog
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"log/slog"
|
|
|
|
|
|
"os"
|
|
|
|
|
|
"path/filepath"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
|
|
|
|
|
|
type TopicProcessor struct {
|
|
|
|
|
|
topic string
|
|
|
|
|
|
logPath string
|
|
|
|
|
|
logger *slog.Logger
|
|
|
|
|
|
|
|
|
|
|
|
// 核心组件(聚合)
|
|
|
|
|
|
writer *LogWriter // 写入器
|
|
|
|
|
|
index *RecordIndex // 索引管理器
|
|
|
|
|
|
query *RecordQuery // 查询器
|
|
|
|
|
|
cursor *LogCursor // 游标
|
|
|
|
|
|
tailer *LogTailer // 持续处理器
|
|
|
|
|
|
|
|
|
|
|
|
// 配置和状态
|
|
|
|
|
|
handler RecordHandler
|
|
|
|
|
|
tailConfig *TailConfig
|
|
|
|
|
|
stats *TopicStats // 统计信息
|
|
|
|
|
|
eventBus *EventBus // 事件总线
|
|
|
|
|
|
|
|
|
|
|
|
// 并发控制
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
|
ctx context.Context
|
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
|
running bool
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TopicConfig topic 配置
|
|
|
|
|
|
type TopicConfig struct {
|
|
|
|
|
|
Handler RecordHandler // 处理函数(必填)
|
|
|
|
|
|
TailConfig *TailConfig // tail 配置,可选
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewTopicProcessor 创建一个新的 topic 处理器
|
|
|
|
|
|
// 在初始化时创建所有核心组件,index 在组件间共享
|
|
|
|
|
|
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
|
|
|
|
|
|
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
|
|
|
|
|
|
// 验证必填参数
|
|
|
|
|
|
if config == nil || config.Handler == nil {
|
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
|
|
|
|
return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
// 默认配置
|
|
|
|
|
|
tailConfig := &TailConfig{
|
|
|
|
|
|
PollInterval: 100 * 1000000, // 100ms
|
|
|
|
|
|
SaveInterval: 1000 * 1000000, // 1s
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if config.TailConfig != nil {
|
|
|
|
|
|
tailConfig = config.TailConfig
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if logger == nil {
|
|
|
|
|
|
logger = slog.Default()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logPath := filepath.Join(baseDir, topic+".log")
|
|
|
|
|
|
statsPath := filepath.Join(baseDir, topic+".stats")
|
|
|
|
|
|
|
|
|
|
|
|
tp := &TopicProcessor{
|
|
|
|
|
|
topic: topic,
|
|
|
|
|
|
logPath: logPath,
|
|
|
|
|
|
logger: logger,
|
|
|
|
|
|
handler: config.Handler,
|
|
|
|
|
|
tailConfig: tailConfig,
|
|
|
|
|
|
stats: NewTopicStats(statsPath),
|
|
|
|
|
|
eventBus: NewEventBus(),
|
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
|
cancel: cancel,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 初始化所有组件
|
|
|
|
|
|
if err := tp.initializeComponents(); err != nil {
|
|
|
|
|
|
cancel()
|
|
|
|
|
|
return nil, fmt.Errorf("failed to initialize components: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// initializeComponents 初始化所有核心组件
|
|
|
|
|
|
func (tp *TopicProcessor) initializeComponents() error {
|
|
|
|
|
|
// 1. 创建共享的索引管理器
|
|
|
|
|
|
index, err := NewRecordIndex(tp.logPath)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("create index: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.index = index
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 创建写入器(使用共享 index)
|
|
|
|
|
|
writer, err := NewLogWriter(tp.logPath, tp.index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create writer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.writer = writer
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 创建查询器(使用共享 index)
|
|
|
|
|
|
query, err := NewRecordQuery(tp.logPath, tp.index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create query: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.query = query
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 创建游标(使用共享 index)
|
|
|
|
|
|
cursor, err := NewCursor(tp.logPath, tp.index)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.query.Close()
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create cursor: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.cursor = cursor
|
|
|
|
|
|
|
|
|
|
|
|
// 5. 创建 tailer(handler 为必填,总是创建)
|
|
|
|
|
|
// 注意:只创建不启动,启动在 Start() 中进行
|
|
|
|
|
|
if err := tp.createTailer(); err != nil {
|
|
|
|
|
|
tp.cursor.Close()
|
|
|
|
|
|
tp.query.Close()
|
|
|
|
|
|
tp.writer.Close()
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
return fmt.Errorf("create tailer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("all components initialized")
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// createTailer 创建 tailer(不启动)
|
|
|
|
|
|
func (tp *TopicProcessor) createTailer() error {
|
|
|
|
|
|
// 包装 handler,添加统计功能和事件发布
|
|
|
|
|
|
wrappedHandler := func(rec *Record) error {
|
|
|
|
|
|
if err := tp.handler(rec); err != nil {
|
|
|
|
|
|
tp.stats.IncError()
|
|
|
|
|
|
|
|
|
|
|
|
// 发布处理错误事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessError,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
Position: 0, // Position 在 tailer 模式下不可用
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理成功,更新统计
|
|
|
|
|
|
tp.stats.IncProcessed(int64(len(rec.Data)))
|
|
|
|
|
|
|
|
|
|
|
|
// 发布处理成功事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessSuccess,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Position: 0, // Position 在 tailer 模式下不可用
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("creating tailer")
|
|
|
|
|
|
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to create tailer", "error", err)
|
|
|
|
|
|
return fmt.Errorf("failed to create tailer: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.tailer = tailer
|
|
|
|
|
|
tp.logger.Debug("tailer created")
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Write 写入日志(统一接口)
|
|
|
|
|
|
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
|
|
|
|
|
|
offset, err := tp.writer.Append(data)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to append", "error", err)
|
|
|
|
|
|
tp.stats.IncError()
|
|
|
|
|
|
|
|
|
|
|
|
// 发布写入错误事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventWriteError,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 更新统计信息
|
|
|
|
|
|
tp.stats.IncWrite(int64(len(data)))
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("write success", "offset", offset, "size", len(data))
|
|
|
|
|
|
|
|
|
|
|
|
// 发布写入成功事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventWriteSuccess,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
Position: offset,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return offset, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Start 启动 tailer(如果已创建)
|
|
|
|
|
|
func (tp *TopicProcessor) Start() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.running {
|
新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断
更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig
测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过
使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
// 处理 topic 不存在的情况
}
// 检查索引越界
if IsIndexOutOfRange(err) {
var indexErr *IndexError
errors.As(err, &indexErr)
fmt.Printf("index %d out of range\n", indexErr.Index)
}
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00
|
|
|
|
return NewTopicError(tp.topic, "start", ErrAlreadyRunning)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("starting processor")
|
|
|
|
|
|
|
|
|
|
|
|
// 重新创建 context(如果之前被 cancel 了)
|
|
|
|
|
|
if tp.ctx.Err() != nil {
|
|
|
|
|
|
tp.ctx, tp.cancel = context.WithCancel(context.Background())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.running = true
|
|
|
|
|
|
|
|
|
|
|
|
// 如果 tailer 已创建,启动它
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
tp.logger.Debug("launching tailer goroutine")
|
|
|
|
|
|
tp.wg.Go(func() {
|
|
|
|
|
|
tp.logger.Debug("tailer goroutine started")
|
|
|
|
|
|
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
|
|
|
|
|
|
tp.logger.Error("tailer error", "error", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.logger.Debug("tailer goroutine finished")
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 发布启动事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorStart,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Stop 停止 tailer
|
|
|
|
|
|
func (tp *TopicProcessor) Stop() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
if !tp.running {
|
|
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.logger.Debug("stopping processor")
|
|
|
|
|
|
tp.running = false
|
|
|
|
|
|
tp.cancel()
|
|
|
|
|
|
tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 等待 tailer 停止
|
|
|
|
|
|
tp.wg.Wait()
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("processor stopped")
|
|
|
|
|
|
|
|
|
|
|
|
// 发布停止事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorStop,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Topic 返回 topic 名称
|
|
|
|
|
|
func (tp *TopicProcessor) Topic() string {
|
|
|
|
|
|
return tp.topic
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// IsRunning 检查是否正在运行
|
|
|
|
|
|
func (tp *TopicProcessor) IsRunning() bool {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
return tp.running
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// UpdateTailConfig 动态更新 tail 配置
|
|
|
|
|
|
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if config == nil {
|
|
|
|
|
|
return fmt.Errorf("config cannot be nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.tailConfig = config
|
|
|
|
|
|
|
|
|
|
|
|
// 如果 tailer 已经在运行,更新其配置
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
tp.tailer.UpdateConfig(*config)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetTailConfig 获取当前 tail 配置
|
|
|
|
|
|
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
cfg := tp.tailConfig
|
|
|
|
|
|
return cfg
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetStats 获取当前统计信息
|
|
|
|
|
|
func (tp *TopicProcessor) GetStats() Stats {
|
|
|
|
|
|
return tp.stats.Get()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Query 获取共享的查询器
|
|
|
|
|
|
func (tp *TopicProcessor) Query() *RecordQuery {
|
|
|
|
|
|
return tp.query
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 11:55:44 +08:00
|
|
|
|
// QueryOldest 从指定索引向索引递增方向查询记录
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// startIndex: 查询起始索引
|
|
|
|
|
|
// count: 查询数量
|
2025-10-04 11:55:44 +08:00
|
|
|
|
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按索引递增方向排序
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
func (tp *TopicProcessor) QueryOldest(startIndex, count int) ([]*RecordWithStatus, error) {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 查询记录
|
|
|
|
|
|
records, err := tp.query.QueryOldest(startIndex, count)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 获取窗口索引范围(用于状态判断)
|
|
|
|
|
|
var startIdx, endIdx int
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
endIdx = tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 为每条记录添加状态
|
|
|
|
|
|
results := make([]*RecordWithStatus, len(records))
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
results[i] = &RecordWithStatus{
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Status: GetRecordStatus(startIndex+i, startIdx, endIdx),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return results, nil
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-10-04 11:55:44 +08:00
|
|
|
|
// QueryNewest 从指定索引向索引递减方向查询记录
|
|
|
|
|
|
// endIndex: 查询的最大索引(向前查询更早的记录)
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// count: 查询数量
|
2025-10-04 11:55:44 +08:00
|
|
|
|
// 返回的记录包含状态信息(基于 tailer 的窗口索引),按索引递增方向排序
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
func (tp *TopicProcessor) QueryNewest(endIndex, count int) ([]*RecordWithStatus, error) {
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 查询记录
|
|
|
|
|
|
records, err := tp.query.QueryNewest(endIndex, count)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
// 获取窗口索引范围(用于状态判断)
|
|
|
|
|
|
var startIdx, endIdx int
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
if tp.tailer != nil {
|
|
|
|
|
|
startIdx = tp.tailer.GetStartIndex()
|
|
|
|
|
|
endIdx = tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.mu.RUnlock()
|
|
|
|
|
|
|
2025-10-04 00:10:14 +08:00
|
|
|
|
// 为每条记录添加状态(倒序:endIndex, endIndex-1, ...)
|
|
|
|
|
|
results := make([]*RecordWithStatus, len(records))
|
|
|
|
|
|
for i, rec := range records {
|
|
|
|
|
|
results[i] = &RecordWithStatus{
|
|
|
|
|
|
Record: rec,
|
|
|
|
|
|
Status: GetRecordStatus(endIndex-i, startIdx, endIdx),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return results, nil
|
重构:统一使用索引(Index)替代位置(Position)进行状态判断
## 主要变更
### 架构改进
- 明确索引(Index)与偏移(Offset)的职责分离
- Index: 记录序号(逻辑概念),用于状态判断
- Offset: 文件字节位置(物理概念),仅用于 I/O 操作
### API 变更
- 删除所有 Position 相关方法:
- `LogCursor.StartPos()/EndPos()`
- `LogTailer.GetStartPos()/GetEndPos()`
- `TopicProcessor.GetProcessingPosition()/GetReadPosition()`
- `Seqlog.GetProcessingPosition()/GetReadPosition()`
- 新增索引方法:
- `LogCursor.StartIndex()/EndIndex()`
- `LogTailer.GetStartIndex()/GetEndIndex()`
- `TopicProcessor.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessingIndex()/GetReadIndex()`
- `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index
### 查询接口变更
- `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数
- `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断
### 性能优化
- 状态判断改用整数比较,不再需要计算偏移量
- 减少不必要的索引到偏移的转换
- 只在实际文件 I/O 时才获取 offset
### 测试更新
- 更新所有测试用例使用新的 Index API
- 更新示例代码(topic_processor_example.go, webapp/main.go)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetRecordCount 获取记录总数(统一接口)
|
|
|
|
|
|
func (tp *TopicProcessor) GetRecordCount() int {
|
|
|
|
|
|
return tp.index.Count()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Cursor 创建一个新的游标实例(使用共享的 index)
|
|
|
|
|
|
// 注意:每次调用都会创建新实例,调用者需要负责关闭
|
|
|
|
|
|
// Tailer 内部有自己的游标,不会与此冲突
|
|
|
|
|
|
func (tp *TopicProcessor) Cursor() (*LogCursor, error) {
|
|
|
|
|
|
return NewCursor(tp.logPath, tp.index)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Index 获取索引管理器
|
|
|
|
|
|
func (tp *TopicProcessor) Index() *RecordIndex {
|
|
|
|
|
|
return tp.index
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
|
|
|
|
|
|
func (tp *TopicProcessor) GetProcessingIndex() int {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.tailer == nil {
|
|
|
|
|
|
return 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp.tailer.GetStartIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetReadIndex 获取当前读取索引(窗口结束索引)
|
|
|
|
|
|
func (tp *TopicProcessor) GetReadIndex() int {
|
|
|
|
|
|
tp.mu.RLock()
|
|
|
|
|
|
defer tp.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.tailer == nil {
|
|
|
|
|
|
return 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return tp.tailer.GetEndIndex()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Subscribe 订阅事件
|
|
|
|
|
|
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
|
|
|
|
|
|
tp.eventBus.Subscribe(eventType, listener)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SubscribeAll 订阅所有事件
|
|
|
|
|
|
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
|
|
|
|
|
|
tp.eventBus.SubscribeAll(listener)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Unsubscribe 取消订阅
|
|
|
|
|
|
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
|
|
|
|
|
|
tp.eventBus.Unsubscribe(eventType)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
|
|
|
|
|
|
// 注意:必须在 Stop 之后调用
|
|
|
|
|
|
func (tp *TopicProcessor) Reset() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if tp.running {
|
|
|
|
|
|
return fmt.Errorf("cannot reset while processor is running, please stop first")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("resetting processor")
|
|
|
|
|
|
|
|
|
|
|
|
var errs []error
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 writer(如果还未关闭)
|
|
|
|
|
|
if tp.writer != nil {
|
|
|
|
|
|
if err := tp.writer.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close writer during reset", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close writer: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.writer = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 删除日志文件
|
|
|
|
|
|
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
|
tp.logger.Error("failed to remove log file", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("remove log file: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 删除位置文件
|
|
|
|
|
|
posFile := tp.logPath + ".pos"
|
|
|
|
|
|
if err := os.Remove(posFile); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
|
tp.logger.Error("failed to remove position file", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("remove position file: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 删除索引文件
|
|
|
|
|
|
indexFile := tp.logPath + ".idx"
|
|
|
|
|
|
if err := os.Remove(indexFile); err != nil && !os.IsNotExist(err) {
|
|
|
|
|
|
tp.logger.Error("failed to remove index file", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("remove index file: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭所有组件
|
|
|
|
|
|
if tp.query != nil {
|
|
|
|
|
|
tp.query.Close()
|
|
|
|
|
|
tp.query = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
tp.index.Close()
|
|
|
|
|
|
tp.index = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 重新初始化所有组件(已持有锁)
|
|
|
|
|
|
// 这会重新创建 index, writer, query,如果有 handler 也会创建 tailer
|
|
|
|
|
|
if err := tp.initializeComponents(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to reinitialize components", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 重置统计信息
|
|
|
|
|
|
if tp.stats != nil {
|
|
|
|
|
|
tp.stats.Reset()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("processor reset completed")
|
|
|
|
|
|
|
|
|
|
|
|
// 发布重置事件
|
|
|
|
|
|
tp.eventBus.Publish(&Event{
|
|
|
|
|
|
Type: EventProcessorReset,
|
|
|
|
|
|
Topic: tp.topic,
|
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 如果有多个错误,返回第一个
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
|
return errs[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 清理 processor 的所有资源
|
|
|
|
|
|
func (tp *TopicProcessor) Close() error {
|
|
|
|
|
|
tp.mu.Lock()
|
|
|
|
|
|
defer tp.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("closing processor")
|
|
|
|
|
|
|
|
|
|
|
|
var errs []error
|
|
|
|
|
|
|
|
|
|
|
|
// 保存统计信息
|
|
|
|
|
|
if tp.stats != nil {
|
|
|
|
|
|
if err := tp.stats.Save(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to save stats", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("save stats: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 query
|
|
|
|
|
|
if tp.query != nil {
|
|
|
|
|
|
if err := tp.query.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close query", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close query: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.query = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 cursor(如果 tailer 未启动,cursor 可能还未关闭)
|
|
|
|
|
|
if tp.cursor != nil {
|
|
|
|
|
|
if err := tp.cursor.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close cursor", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close cursor: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.cursor = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 writer
|
|
|
|
|
|
if tp.writer != nil {
|
|
|
|
|
|
if err := tp.writer.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close writer", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close writer: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.writer = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭 index(最后关闭,因为其他组件可能依赖它)
|
|
|
|
|
|
if tp.index != nil {
|
|
|
|
|
|
if err := tp.index.Close(); err != nil {
|
|
|
|
|
|
tp.logger.Error("failed to close index", "error", err)
|
|
|
|
|
|
errs = append(errs, fmt.Errorf("close index: %w", err))
|
|
|
|
|
|
}
|
|
|
|
|
|
tp.index = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// tailer 会通过 context cancel 和 Stop() 自动关闭
|
|
|
|
|
|
tp.tailer = nil
|
|
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("processor closed")
|
|
|
|
|
|
|
|
|
|
|
|
// 如果有多个错误,返回第一个
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
|
return errs[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|