Files
seqlog/topic_processor.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

993 lines
26 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"context"
"fmt"
"log/slog"
"path/filepath"
"sync"
"time"
)
// ProcessorState 处理器状态
type ProcessorState int
const (
StateIdle ProcessorState = iota // 空闲(未启动)
StateStarting // 启动中
StateRunning // 运行中
StateStopping // 停止中
StateStopped // 已停止
StateResetting // 重置中(阻止所有操作)
StateError // 错误状态
)
// String 返回状态的字符串表示
func (s ProcessorState) String() string {
switch s {
case StateIdle:
return "idle"
case StateStarting:
return "starting"
case StateRunning:
return "running"
case StateStopping:
return "stopping"
case StateStopped:
return "stopped"
case StateResetting:
return "resetting"
case StateError:
return "error"
default:
return "unknown"
}
}
// CanWrite 判断当前状态是否允许写入
func (s ProcessorState) CanWrite() bool {
// 允许在 Idle、Starting、Running 状态下写入
// 不允许在 Stopping、Stopped、Resetting、Error 状态下写入
return s == StateIdle || s == StateStarting || s == StateRunning
}
// CanQuery 判断当前状态是否允许查询
func (s ProcessorState) CanQuery() bool {
return s != StateResetting
}
// CanProcess 判断当前状态是否允许处理
func (s ProcessorState) CanProcess() bool {
return s == StateRunning
}
// ProcessorStatus 处理器状态信息
type ProcessorStatus struct {
State ProcessorState // 当前状态
LastUpdated time.Time // 最后更新时间
Error error // 错误信息(仅在 StateError 时有效)
}
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
title string // 显示标题,用于 UI 展示
logPath string
logger *slog.Logger
// 核心组件(聚合)
writer *LogWriter // 写入器
index *RecordIndex // 索引管理器
query *RecordQuery // 查询器
cursor *ProcessCursor // 游标
tailer *LogTailer // 持续处理器
// 配置和状态
handler RecordHandler
tailConfig *TailConfig
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
status ProcessorStatus // 处理器状态
// 并发控制
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// TopicConfig topic 配置
type TopicConfig struct {
Title string // 显示标题,可选,默认为 topic 名称
Handler RecordHandler // 处理函数(必填)
TailConfig *TailConfig // tail 配置,可选
}
// NewTopicProcessor 创建一个新的 topic 处理器
// 在初始化时创建所有核心组件index 在组件间共享
// handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误
func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) {
// 验证必填参数
if config == nil || config.Handler == nil {
return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig)
}
ctx, cancel := context.WithCancel(context.Background())
// 默认配置
tailConfig := &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
}
if config.TailConfig != nil {
tailConfig = config.TailConfig
}
if logger == nil {
logger = slog.Default()
}
logPath := filepath.Join(baseDir, topic+".log")
statsPath := filepath.Join(baseDir, topic+".stats")
// 设置 title如果未提供则使用 topic 名称
title := config.Title
if title == "" {
title = topic
}
tp := &TopicProcessor{
topic: topic,
title: title,
logPath: logPath,
logger: logger,
handler: config.Handler,
tailConfig: tailConfig,
stats: NewTopicStats(statsPath),
eventBus: NewEventBus(),
status: ProcessorStatus{
State: StateIdle,
LastUpdated: time.Now(),
},
ctx: ctx,
cancel: cancel,
}
// 初始化所有组件
if err := tp.initializeComponents(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialize components: %w", err)
}
return tp, nil
}
// initializeComponents 初始化所有核心组件
func (tp *TopicProcessor) initializeComponents() error {
// 1. 创建共享的索引管理器
index, err := NewRecordIndex(tp.logPath)
if err != nil {
return fmt.Errorf("create index: %w", err)
}
tp.index = index
// 2. 创建写入器(使用共享 index
writer, err := NewLogWriter(tp.logPath, tp.index)
if err != nil {
tp.index.Close()
return fmt.Errorf("create writer: %w", err)
}
tp.writer = writer
// 3. 创建查询器(使用共享 index
query, err := NewRecordQuery(tp.logPath, tp.index, tp.writer)
if err != nil {
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create query: %w", err)
}
tp.query = query
// 4. 创建游标(使用共享 index 和 writer
cursor, err := NewCursor(tp.logPath, tp.index, tp.writer)
if err != nil {
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create cursor: %w", err)
}
tp.cursor = cursor
// 5. 创建 tailerhandler 为必填,总是创建)
// 注意:只创建不启动,启动在 Start() 中进行
if err := tp.createTailer(); err != nil {
tp.cursor.Close()
tp.query.Close()
tp.writer.Close()
tp.index.Close()
return fmt.Errorf("create tailer: %w", err)
}
tp.logger.Debug("all components initialized")
return nil
}
// createTailer 创建 tailer不启动
func (tp *TopicProcessor) createTailer() error {
// 包装 handler添加统计功能和事件发布
wrappedHandler := func(rec *Record) error {
if err := tp.handler(rec); err != nil {
tp.stats.IncError()
// 发布处理错误事件
tp.eventBus.Publish(&Event{
Type: EventProcessError,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Error: err,
Position: 0, // Position 在 tailer 模式下不可用
})
return err
}
// 处理成功,更新统计
tp.stats.IncProcessed(int64(len(rec.Data)))
// 发布处理成功事件
tp.eventBus.Publish(&Event{
Type: EventProcessSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Record: rec,
Position: 0, // Position 在 tailer 模式下不可用
})
return nil
}
tp.logger.Debug("creating tailer")
tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig)
if err != nil {
tp.logger.Error("failed to create tailer", "error", err)
return fmt.Errorf("failed to create tailer: %w", err)
}
tp.tailer = tailer
tp.logger.Debug("tailer created")
return nil
}
// Write 写入日志(统一接口)
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
// 检查状态
tp.mu.RLock()
state := tp.status.State
tp.mu.RUnlock()
if !state.CanWrite() {
if state == StateResetting {
return 0, ErrProcessorResetting
}
return 0, ErrNotRunning
}
offset, err := tp.writer.Append(data)
if err != nil {
tp.logger.Error("failed to append", "error", err)
tp.stats.IncError()
// 发布写入错误事件
tp.eventBus.Publish(&Event{
Type: EventWriteError,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
return 0, err
}
// 更新统计信息
tp.stats.IncWrite(int64(len(data)))
tp.logger.Debug("write success", "offset", offset, "size", len(data))
// 发布写入成功事件
tp.eventBus.Publish(&Event{
Type: EventWriteSuccess,
Topic: tp.topic,
Timestamp: time.Now(),
Position: offset,
})
return offset, nil
}
// Start 启动 tailer如果已创建
func (tp *TopicProcessor) Start() error {
tp.mu.Lock()
defer tp.mu.Unlock()
// 检查状态
if tp.status.State != StateIdle && tp.status.State != StateStopped {
return NewTopicError(tp.topic, "start", ErrInvalidState)
}
tp.logger.Debug("starting processor")
// 设置为启动中状态
tp.setState(StateStarting, nil)
// 重新创建 context如果之前被 cancel 了)
if tp.ctx.Err() != nil {
tp.ctx, tp.cancel = context.WithCancel(context.Background())
}
// 启动定期保存统计信息的 goroutine
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
defer ticker.Stop()
for {
select {
case <-tp.ctx.Done():
return
case <-ticker.C:
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
}
}
}
}()
// 如果 tailer 已创建,启动它
tp.logger.Debug("launching tailer goroutine")
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
tp.logger.Debug("tailer goroutine started")
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error", "error", err)
}
tp.logger.Debug("tailer goroutine finished")
}()
// 设置为运行中状态
tp.setState(StateRunning, nil)
// 发布启动事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStart,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Stop 停止 tailer
func (tp *TopicProcessor) Stop() error {
tp.mu.Lock()
if tp.status.State != StateRunning {
tp.mu.Unlock()
return nil
}
tp.logger.Debug("stopping processor")
// 设置为停止中状态
tp.setState(StateStopping, nil)
tp.cancel()
tp.mu.Unlock()
// 等待 tailer 停止
tp.wg.Wait()
tp.logger.Debug("processor stopped")
// 设置为已停止状态
tp.mu.Lock()
tp.setState(StateStopped, nil)
tp.mu.Unlock()
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
return nil
}
// Topic 返回 topic 名称
func (tp *TopicProcessor) Topic() string {
return tp.topic
}
// Title 返回显示标题
func (tp *TopicProcessor) Title() string {
return tp.title
}
// GetState 获取当前状态
func (tp *TopicProcessor) GetState() ProcessorState {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.status.State
}
// GetStatus 获取完整状态信息
func (tp *TopicProcessor) GetStatus() ProcessorStatus {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.status
}
// setState 设置状态(需持有锁)
func (tp *TopicProcessor) setState(state ProcessorState, err error) {
tp.status.State = state
tp.status.LastUpdated = time.Now()
tp.status.Error = err
// 发布状态变更事件
tp.eventBus.Publish(&Event{
Type: EventStateChanged,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
return tp.GetState() == StateRunning
}
// UpdateTailConfig 动态更新 tail 配置
func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error {
tp.mu.Lock()
defer tp.mu.Unlock()
if config == nil {
return fmt.Errorf("config cannot be nil")
}
tp.tailConfig = config
// 如果 tailer 已经在运行,更新其配置
if tp.tailer != nil {
tp.tailer.UpdateConfig(*config)
}
return nil
}
// GetTailConfig 获取当前 tail 配置
func (tp *TopicProcessor) GetTailConfig() *TailConfig {
tp.mu.RLock()
defer tp.mu.RUnlock()
cfg := tp.tailConfig
return cfg
}
// GetStats 获取当前统计信息
func (tp *TopicProcessor) GetStats() Stats {
return tp.stats.Get()
}
// Query 获取共享的查询器
func (tp *TopicProcessor) Query() (*RecordQuery, error) {
tp.mu.RLock()
defer tp.mu.RUnlock()
if !tp.status.State.CanQuery() {
return nil, ErrProcessorResetting
}
return tp.query, nil
}
// addStatusToRecords 为记录添加状态信息(辅助方法)
func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus {
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每条记录添加状态
results := make([]*RecordWithStatus, len(records))
for i, rec := range records {
results[i] = &RecordWithStatus{
Record: rec.Record,
Index: rec.Index,
Status: GetRecordStatus(rec.Index, startIdx, endIdx),
}
}
return results
}
// addStatusToMetadata 为元数据添加状态信息
func (tp *TopicProcessor) addStatusToMetadata(metadata []*RecordMetadata) []*RecordMetadataWithStatus {
// 获取窗口索引范围(用于状态判断)
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
// 为每个元数据添加状态
results := make([]*RecordMetadataWithStatus, len(metadata))
for i, meta := range metadata {
results[i] = &RecordMetadataWithStatus{
Metadata: meta,
Status: GetRecordStatus(meta.Index, startIdx, endIdx),
}
}
return results
}
// QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryOldest(5, 3) 查询索引 2, 3, 4不包含 5
func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) {
records, err := tp.query.QueryOldest(refIndex, count)
if err != nil {
return nil, err
}
return tp.addStatusToRecords(records), nil
}
// QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录包含索引和状态信息,按索引递增方向排序
// 例如QueryNewest(5, 3) 查询索引 6, 7, 8不包含 5
func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) {
records, err := tp.query.QueryNewest(refIndex, count)
if err != nil {
return nil, err
}
return tp.addStatusToRecords(records), nil
}
// QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录只包含元数据信息索引、UUID、数据大小按索引递增方向排序
// 例如QueryOldestMetadata(5, 3) 查询索引 2, 3, 4不包含 5
func (tp *TopicProcessor) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
metadata, err := tp.query.QueryOldestMetadata(refIndex, count)
if err != nil {
return nil, err
}
return tp.addStatusToMetadata(metadata), nil
}
// QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据)
// refIndex: 参考索引位置
// count: 查询数量
// 返回的记录只包含元数据信息索引、UUID、数据大小按索引递增方向排序
// 例如QueryNewestMetadata(5, 3) 查询索引 6, 7, 8不包含 5
func (tp *TopicProcessor) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) {
metadata, err := tp.query.QueryNewestMetadata(refIndex, count)
if err != nil {
return nil, err
}
return tp.addStatusToMetadata(metadata), nil
}
// QueryByIndex 根据索引查询单条记录的完整数据
// index: 记录索引
// 返回完整的记录数据,包含状态信息
func (tp *TopicProcessor) QueryByIndex(index int) (*RecordWithStatus, error) {
record, err := tp.query.QueryByIndex(index)
if err != nil {
return nil, err
}
// 获取当前处理窗口位置
var startIdx, endIdx int
tp.mu.RLock()
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
endIdx = tp.tailer.GetEndIndex()
}
tp.mu.RUnlock()
status := GetRecordStatus(index, startIdx, endIdx)
return &RecordWithStatus{
Record: record,
Index: index,
Status: status,
}, nil
}
// GetRecordCount 获取记录总数(统一接口)
func (tp *TopicProcessor) GetRecordCount() int {
return tp.index.Count()
}
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
// count: 查询数量
// 返回从处理窗口开始位置startIndex向后的记录包含状态信息
func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) {
// 获取当前处理窗口的开始位置
tp.mu.RLock()
var startIdx int
if tp.tailer != nil {
startIdx = tp.tailer.GetStartIndex()
}
tp.mu.RUnlock()
// 从 startIdx 开始向索引递增方向查询
// QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录
// 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count)
return tp.QueryNewest(startIdx-1, count)
}
// QueryFromFirst 从第一条记录向索引递增方向查询
// count: 查询数量
// 返回从第一条记录(索引 0开始的记录包含状态信息
// 例如QueryFromFirst(3) 查询索引 0, 1, 2
func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) {
// QueryNewest(-1, count) 会从索引 0 开始向后查询
return tp.QueryNewest(-1, count)
}
// QueryFromLast 从最后一条记录向索引递减方向查询
// count: 查询数量
// 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序
// 例如:如果总共有 10 条记录QueryFromLast(3) 查询索引 7, 8, 9
func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) {
// 获取记录总数
totalCount := tp.index.Count()
// 如果没有记录,返回空数组
if totalCount == 0 {
return []*RecordWithStatus{}, nil
}
// QueryOldest(totalCount, count) 会从最后一条记录开始向前查询
// totalCount 是记录总数,有效索引是 0 到 totalCount-1
// 所以传入 totalCount 作为 refIndex会查询 totalCount-count 到 totalCount-1 的记录
return tp.QueryOldest(totalCount, count)
}
// QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据
// count: 查询数量
// 返回从第一条记录(索引 0开始的记录元数据包含状态信息
// 例如QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据
func (tp *TopicProcessor) QueryFromFirstMetadata(count int) ([]*RecordMetadataWithStatus, error) {
// QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询
return tp.QueryNewestMetadata(-1, count)
}
// QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据
// count: 查询数量
// 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息
// 例如QueryFromLastMetadata(3) 查询最后 3 条记录的元数据
func (tp *TopicProcessor) QueryFromLastMetadata(count int) ([]*RecordMetadataWithStatus, error) {
totalCount := tp.index.Count()
if totalCount == 0 {
return []*RecordMetadataWithStatus{}, nil
}
// QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条
return tp.QueryOldestMetadata(totalCount, count)
}
// GetProcessingIndex 获取当前处理索引(窗口开始索引)
func (tp *TopicProcessor) GetProcessingIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetStartIndex()
}
// GetReadIndex 获取当前读取索引(窗口结束索引)
func (tp *TopicProcessor) GetReadIndex() int {
tp.mu.RLock()
defer tp.mu.RUnlock()
if tp.tailer == nil {
return 0
}
return tp.tailer.GetEndIndex()
}
// Subscribe 订阅事件
func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) {
tp.eventBus.Subscribe(eventType, listener)
}
// SubscribeAll 订阅所有事件
func (tp *TopicProcessor) SubscribeAll(listener EventListener) {
tp.eventBus.SubscribeAll(listener)
}
// Unsubscribe 取消订阅
func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
tp.eventBus.Unsubscribe(eventType)
}
// CanReset 检查是否可以执行重置操作
// 返回 true 表示可以重置false 表示不能重置(正在重置中或有待处理的记录)
func (tp *TopicProcessor) CanReset() bool {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.canResetLocked()
}
// canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁)
func (tp *TopicProcessor) canResetLocked() bool {
// 检查当前状态
if tp.status.State == StateResetting {
return false
}
// 检查是否有待处理的日志
processingIndex := 0
if tp.tailer != nil {
processingIndex = tp.tailer.GetStartIndex()
}
recordCount := 0
if tp.index != nil {
recordCount = tp.index.Count()
}
// 只有当所有记录都已处理完成时才能重置
return processingIndex >= recordCount
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
// 注意Reset 不会停止 processor只有在没有待处理的日志时才能执行
// 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态
func (tp *TopicProcessor) Reset() error {
tp.mu.Lock()
defer tp.mu.Unlock()
// 检查是否可以执行重置操作
if !tp.canResetLocked() {
// 提供详细的错误信息
currentState := tp.status.State
if currentState == StateResetting {
return nil
}
// 获取待处理记录信息
processingIndex := 0
if tp.tailer != nil {
processingIndex = tp.tailer.GetStartIndex()
}
recordCount := 0
if tp.index != nil {
recordCount = tp.index.Count()
}
return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount)
}
// 记录之前是否在运行
currentState := tp.status.State
wasRunning := currentState == StateRunning
// 进入 Resetting 状态
tp.setState(StateResetting, nil)
tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState)
// 如果之前在运行,停止 goroutines
if wasRunning && tp.cancel != nil {
tp.cancel()
// 释放锁以等待 goroutines 停止
tp.mu.Unlock()
tp.wg.Wait()
tp.mu.Lock()
}
var errs []error
// 重置所有组件(保持引用关系不变)
// 1. 重置索引(会删除索引文件并重新创建)
if tp.index != nil {
if err := tp.index.Reset(); err != nil {
tp.logger.Error("failed to reset index", "error", err)
errs = append(errs, fmt.Errorf("reset index: %w", err))
}
}
// 2. 重置写入器(会删除日志文件并重新创建)
if tp.writer != nil {
if err := tp.writer.Reset(); err != nil {
tp.logger.Error("failed to reset writer", "error", err)
errs = append(errs, fmt.Errorf("reset writer: %w", err))
}
}
// 3. 重置游标(会删除位置文件并重置位置)
if tp.cursor != nil {
if err := tp.cursor.Reset(); err != nil {
tp.logger.Error("failed to reset cursor", "error", err)
errs = append(errs, fmt.Errorf("reset cursor: %w", err))
}
}
// 4. 重置查询器(重新打开日志文件)
if tp.query != nil {
if err := tp.query.Reset(); err != nil {
tp.logger.Error("failed to reset query", "error", err)
errs = append(errs, fmt.Errorf("reset query: %w", err))
}
}
// 5. 重置 tailer重置内部状态
if tp.tailer != nil {
if err := tp.tailer.Reset(); err != nil {
tp.logger.Error("failed to reset tailer", "error", err)
errs = append(errs, fmt.Errorf("reset tailer: %w", err))
}
}
// 6. 重置统计信息(会删除统计文件并重置所有计数器)
if tp.stats != nil {
if err := tp.stats.Reset(); err != nil {
tp.logger.Error("failed to reset stats", "error", err)
errs = append(errs, fmt.Errorf("reset stats: %w", err))
}
}
// 如果有错误,设置为错误状态
if len(errs) > 0 {
tp.setState(StateError, errs[0])
return errs[0]
}
// 发布重置事件
tp.eventBus.Publish(&Event{
Type: EventProcessorReset,
Topic: tp.topic,
Timestamp: time.Now(),
})
// 如果之前在运行,恢复到 Running 状态
if wasRunning {
// 创建新的 context
tp.ctx, tp.cancel = context.WithCancel(context.Background())
// 启动 tailer goroutine如果有 handler
if tp.handler != nil && tp.tailer != nil {
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error after reset", "error", err)
}
}()
}
// 启动统计保存 goroutine
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
defer ticker.Stop()
for {
select {
case <-tp.ctx.Done():
return
case <-ticker.C:
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
}
}
}
}
}()
tp.setState(StateRunning, nil)
tp.logger.Debug("processor reset completed, resumed to running state")
} else {
// 恢复到之前的状态(通常是 Idle 或 Stopped
if currentState == StateStopped {
tp.setState(StateStopped, nil)
} else {
tp.setState(StateIdle, nil)
}
tp.logger.Debug("processor reset completed", "newState", tp.status.State)
}
// 如果有错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}
// Close 清理 processor 的所有资源
func (tp *TopicProcessor) Close() error {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.logger.Debug("closing processor")
var errs []error
// 保存统计信息
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
errs = append(errs, fmt.Errorf("save stats: %w", err))
}
}
// 关闭 query
if tp.query != nil {
if err := tp.query.Close(); err != nil {
tp.logger.Error("failed to close query", "error", err)
errs = append(errs, fmt.Errorf("close query: %w", err))
}
tp.query = nil
}
// 关闭 cursor如果 tailer 未启动cursor 可能还未关闭)
if tp.cursor != nil {
if err := tp.cursor.Close(); err != nil {
tp.logger.Error("failed to close cursor", "error", err)
errs = append(errs, fmt.Errorf("close cursor: %w", err))
}
tp.cursor = nil
}
// 关闭 writer
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer", "error", err)
errs = append(errs, fmt.Errorf("close writer: %w", err))
}
tp.writer = nil
}
// 关闭 index最后关闭因为其他组件可能依赖它
if tp.index != nil {
if err := tp.index.Close(); err != nil {
tp.logger.Error("failed to close index", "error", err)
errs = append(errs, fmt.Errorf("close index: %w", err))
}
tp.index = nil
}
// tailer 会通过 context cancel 和 Stop() 自动关闭
tp.tailer = nil
tp.logger.Debug("processor closed")
// 如果有多个错误,返回第一个
if len(errs) > 0 {
return errs[0]
}
return nil
}