package seqlog import ( "context" "fmt" "log/slog" "path/filepath" "sync" "time" ) // ProcessorState 处理器状态 type ProcessorState int const ( StateIdle ProcessorState = iota // 空闲(未启动) StateStarting // 启动中 StateRunning // 运行中 StateStopping // 停止中 StateStopped // 已停止 StateResetting // 重置中(阻止所有操作) StateError // 错误状态 ) // String 返回状态的字符串表示 func (s ProcessorState) String() string { switch s { case StateIdle: return "idle" case StateStarting: return "starting" case StateRunning: return "running" case StateStopping: return "stopping" case StateStopped: return "stopped" case StateResetting: return "resetting" case StateError: return "error" default: return "unknown" } } // CanWrite 判断当前状态是否允许写入 func (s ProcessorState) CanWrite() bool { // 允许在 Idle、Starting、Running 状态下写入 // 不允许在 Stopping、Stopped、Resetting、Error 状态下写入 return s == StateIdle || s == StateStarting || s == StateRunning } // CanQuery 判断当前状态是否允许查询 func (s ProcessorState) CanQuery() bool { return s != StateResetting } // CanProcess 判断当前状态是否允许处理 func (s ProcessorState) CanProcess() bool { return s == StateRunning } // ProcessorStatus 处理器状态信息 type ProcessorStatus struct { State ProcessorState // 当前状态 LastUpdated time.Time // 最后更新时间 Error error // 错误信息(仅在 StateError 时有效) } // TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口 type TopicProcessor struct { topic string title string // 显示标题,用于 UI 展示 logPath string logger *slog.Logger // 核心组件(聚合) writer *LogWriter // 写入器 index *RecordIndex // 索引管理器 query *RecordQuery // 查询器 cursor *ProcessCursor // 游标 tailer *LogTailer // 持续处理器 // 配置和状态 handler RecordHandler tailConfig *TailConfig stats *TopicStats // 统计信息 eventBus *EventBus // 事件总线 status ProcessorStatus // 处理器状态 // 并发控制 mu sync.RWMutex ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // TopicConfig topic 配置 type TopicConfig struct { Title string // 显示标题,可选,默认为 topic 名称 Handler RecordHandler // 处理函数(必填) TailConfig *TailConfig // tail 配置,可选 } // NewTopicProcessor 创建一个新的 topic 处理器 // 在初始化时创建所有核心组件,index 在组件间共享 // handler 为必填参数,如果 config 为 nil 或 config.Handler 为 nil 会返回错误 func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) { // 验证必填参数 if config == nil || config.Handler == nil { return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig) } ctx, cancel := context.WithCancel(context.Background()) // 默认配置 tailConfig := &TailConfig{ PollInterval: 100 * 1000000, // 100ms SaveInterval: 1000 * 1000000, // 1s } if config.TailConfig != nil { tailConfig = config.TailConfig } if logger == nil { logger = slog.Default() } logPath := filepath.Join(baseDir, topic+".log") statsPath := filepath.Join(baseDir, topic+".stats") // 设置 title,如果未提供则使用 topic 名称 title := config.Title if title == "" { title = topic } tp := &TopicProcessor{ topic: topic, title: title, logPath: logPath, logger: logger, handler: config.Handler, tailConfig: tailConfig, stats: NewTopicStats(statsPath), eventBus: NewEventBus(), status: ProcessorStatus{ State: StateIdle, LastUpdated: time.Now(), }, ctx: ctx, cancel: cancel, } // 初始化所有组件 if err := tp.initializeComponents(); err != nil { cancel() return nil, fmt.Errorf("failed to initialize components: %w", err) } return tp, nil } // initializeComponents 初始化所有核心组件 func (tp *TopicProcessor) initializeComponents() error { // 1. 创建共享的索引管理器 index, err := NewRecordIndex(tp.logPath) if err != nil { return fmt.Errorf("create index: %w", err) } tp.index = index // 2. 创建写入器(使用共享 index) writer, err := NewLogWriter(tp.logPath, tp.index) if err != nil { tp.index.Close() return fmt.Errorf("create writer: %w", err) } tp.writer = writer // 3. 创建查询器(使用共享 index) query, err := NewRecordQuery(tp.logPath, tp.index, tp.writer) if err != nil { tp.writer.Close() tp.index.Close() return fmt.Errorf("create query: %w", err) } tp.query = query // 4. 创建游标(使用共享 index 和 writer) cursor, err := NewCursor(tp.logPath, tp.index, tp.writer) if err != nil { tp.query.Close() tp.writer.Close() tp.index.Close() return fmt.Errorf("create cursor: %w", err) } tp.cursor = cursor // 5. 创建 tailer(handler 为必填,总是创建) // 注意:只创建不启动,启动在 Start() 中进行 if err := tp.createTailer(); err != nil { tp.cursor.Close() tp.query.Close() tp.writer.Close() tp.index.Close() return fmt.Errorf("create tailer: %w", err) } tp.logger.Debug("all components initialized") return nil } // createTailer 创建 tailer(不启动) func (tp *TopicProcessor) createTailer() error { // 包装 handler,添加统计功能和事件发布 wrappedHandler := func(rec *Record) error { if err := tp.handler(rec); err != nil { tp.stats.IncError() // 发布处理错误事件 tp.eventBus.Publish(&Event{ Type: EventProcessError, Topic: tp.topic, Timestamp: time.Now(), Record: rec, Error: err, Position: 0, // Position 在 tailer 模式下不可用 }) return err } // 处理成功,更新统计 tp.stats.IncProcessed(int64(len(rec.Data))) // 发布处理成功事件 tp.eventBus.Publish(&Event{ Type: EventProcessSuccess, Topic: tp.topic, Timestamp: time.Now(), Record: rec, Position: 0, // Position 在 tailer 模式下不可用 }) return nil } tp.logger.Debug("creating tailer") tailer, err := NewTailer(tp.cursor, wrappedHandler, tp.tailConfig) if err != nil { tp.logger.Error("failed to create tailer", "error", err) return fmt.Errorf("failed to create tailer: %w", err) } tp.tailer = tailer tp.logger.Debug("tailer created") return nil } // Write 写入日志(统一接口) func (tp *TopicProcessor) Write(data []byte) (int64, error) { // 检查状态 tp.mu.RLock() state := tp.status.State tp.mu.RUnlock() if !state.CanWrite() { if state == StateResetting { return 0, ErrProcessorResetting } return 0, ErrNotRunning } offset, err := tp.writer.Append(data) if err != nil { tp.logger.Error("failed to append", "error", err) tp.stats.IncError() // 发布写入错误事件 tp.eventBus.Publish(&Event{ Type: EventWriteError, Topic: tp.topic, Timestamp: time.Now(), Error: err, }) return 0, err } // 更新统计信息 tp.stats.IncWrite(int64(len(data))) tp.logger.Debug("write success", "offset", offset, "size", len(data)) // 发布写入成功事件 tp.eventBus.Publish(&Event{ Type: EventWriteSuccess, Topic: tp.topic, Timestamp: time.Now(), Position: offset, }) return offset, nil } // Start 启动 tailer(如果已创建) func (tp *TopicProcessor) Start() error { tp.mu.Lock() defer tp.mu.Unlock() // 检查状态 if tp.status.State != StateIdle && tp.status.State != StateStopped { return NewTopicError(tp.topic, "start", ErrInvalidState) } tp.logger.Debug("starting processor") // 设置为启动中状态 tp.setState(StateStarting, nil) // 重新创建 context(如果之前被 cancel 了) if tp.ctx.Err() != nil { tp.ctx, tp.cancel = context.WithCancel(context.Background()) } // 启动定期保存统计信息的 goroutine tp.wg.Add(1) go func() { defer tp.wg.Done() ticker := time.NewTicker(tp.tailConfig.SaveInterval) defer ticker.Stop() for { select { case <-tp.ctx.Done(): return case <-ticker.C: if err := tp.stats.Save(); err != nil { tp.logger.Error("failed to save stats", "error", err) } } } }() // 如果 tailer 已创建,启动它 tp.logger.Debug("launching tailer goroutine") tp.wg.Add(1) go func() { defer tp.wg.Done() tp.logger.Debug("tailer goroutine started") if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled { tp.logger.Error("tailer error", "error", err) } tp.logger.Debug("tailer goroutine finished") }() // 设置为运行中状态 tp.setState(StateRunning, nil) // 发布启动事件 tp.eventBus.Publish(&Event{ Type: EventProcessorStart, Topic: tp.topic, Timestamp: time.Now(), }) return nil } // Stop 停止 tailer func (tp *TopicProcessor) Stop() error { tp.mu.Lock() if tp.status.State != StateRunning { tp.mu.Unlock() return nil } tp.logger.Debug("stopping processor") // 设置为停止中状态 tp.setState(StateStopping, nil) tp.cancel() tp.mu.Unlock() // 等待 tailer 停止 tp.wg.Wait() tp.logger.Debug("processor stopped") // 设置为已停止状态 tp.mu.Lock() tp.setState(StateStopped, nil) tp.mu.Unlock() // 发布停止事件 tp.eventBus.Publish(&Event{ Type: EventProcessorStop, Topic: tp.topic, Timestamp: time.Now(), }) return nil } // Topic 返回 topic 名称 func (tp *TopicProcessor) Topic() string { return tp.topic } // Title 返回显示标题 func (tp *TopicProcessor) Title() string { return tp.title } // GetState 获取当前状态 func (tp *TopicProcessor) GetState() ProcessorState { tp.mu.RLock() defer tp.mu.RUnlock() return tp.status.State } // GetStatus 获取完整状态信息 func (tp *TopicProcessor) GetStatus() ProcessorStatus { tp.mu.RLock() defer tp.mu.RUnlock() return tp.status } // setState 设置状态(需持有锁) func (tp *TopicProcessor) setState(state ProcessorState, err error) { tp.status.State = state tp.status.LastUpdated = time.Now() tp.status.Error = err // 发布状态变更事件 tp.eventBus.Publish(&Event{ Type: EventStateChanged, Topic: tp.topic, Timestamp: time.Now(), Error: err, }) } // IsRunning 检查是否正在运行 func (tp *TopicProcessor) IsRunning() bool { return tp.GetState() == StateRunning } // UpdateTailConfig 动态更新 tail 配置 func (tp *TopicProcessor) UpdateTailConfig(config *TailConfig) error { tp.mu.Lock() defer tp.mu.Unlock() if config == nil { return fmt.Errorf("config cannot be nil") } tp.tailConfig = config // 如果 tailer 已经在运行,更新其配置 if tp.tailer != nil { tp.tailer.UpdateConfig(*config) } return nil } // GetTailConfig 获取当前 tail 配置 func (tp *TopicProcessor) GetTailConfig() *TailConfig { tp.mu.RLock() defer tp.mu.RUnlock() cfg := tp.tailConfig return cfg } // GetStats 获取当前统计信息 func (tp *TopicProcessor) GetStats() Stats { return tp.stats.Get() } // Query 获取共享的查询器 func (tp *TopicProcessor) Query() (*RecordQuery, error) { tp.mu.RLock() defer tp.mu.RUnlock() if !tp.status.State.CanQuery() { return nil, ErrProcessorResetting } return tp.query, nil } // addStatusToRecords 为记录添加状态信息(辅助方法) func (tp *TopicProcessor) addStatusToRecords(records []*RecordWithIndex) []*RecordWithStatus { // 获取窗口索引范围(用于状态判断) var startIdx, endIdx int tp.mu.RLock() if tp.tailer != nil { startIdx = tp.tailer.GetStartIndex() endIdx = tp.tailer.GetEndIndex() } tp.mu.RUnlock() // 为每条记录添加状态 results := make([]*RecordWithStatus, len(records)) for i, rec := range records { results[i] = &RecordWithStatus{ Record: rec.Record, Index: rec.Index, Status: GetRecordStatus(rec.Index, startIdx, endIdx), } } return results } // addStatusToMetadata 为元数据添加状态信息 func (tp *TopicProcessor) addStatusToMetadata(metadata []*RecordMetadata) []*RecordMetadataWithStatus { // 获取窗口索引范围(用于状态判断) var startIdx, endIdx int tp.mu.RLock() if tp.tailer != nil { startIdx = tp.tailer.GetStartIndex() endIdx = tp.tailer.GetEndIndex() } tp.mu.RUnlock() // 为每个元数据添加状态 results := make([]*RecordMetadataWithStatus, len(metadata)) for i, meta := range metadata { results[i] = &RecordMetadataWithStatus{ Metadata: meta, Status: GetRecordStatus(meta.Index, startIdx, endIdx), } } return results } // QueryOldest 从参考索引向索引递减方向查询记录(查询更早的记录) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录包含索引和状态信息,按索引递增方向排序 // 例如:QueryOldest(5, 3) 查询索引 2, 3, 4(不包含 5) func (tp *TopicProcessor) QueryOldest(refIndex, count int) ([]*RecordWithStatus, error) { records, err := tp.query.QueryOldest(refIndex, count) if err != nil { return nil, err } return tp.addStatusToRecords(records), nil } // QueryNewest 从参考索引向索引递增方向查询记录(查询更新的记录) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录包含索引和状态信息,按索引递增方向排序 // 例如:QueryNewest(5, 3) 查询索引 6, 7, 8(不包含 5) func (tp *TopicProcessor) QueryNewest(refIndex, count int) ([]*RecordWithStatus, error) { records, err := tp.query.QueryNewest(refIndex, count) if err != nil { return nil, err } return tp.addStatusToRecords(records), nil } // QueryOldestMetadata 从参考索引向索引递减方向查询记录元数据(查询更早的记录,不读取完整数据) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序 // 例如:QueryOldestMetadata(5, 3) 查询索引 2, 3, 4(不包含 5) func (tp *TopicProcessor) QueryOldestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) { metadata, err := tp.query.QueryOldestMetadata(refIndex, count) if err != nil { return nil, err } return tp.addStatusToMetadata(metadata), nil } // QueryNewestMetadata 从参考索引向索引递增方向查询记录元数据(查询更新的记录,不读取完整数据) // refIndex: 参考索引位置 // count: 查询数量 // 返回的记录只包含元数据信息(索引、UUID、数据大小),按索引递增方向排序 // 例如:QueryNewestMetadata(5, 3) 查询索引 6, 7, 8(不包含 5) func (tp *TopicProcessor) QueryNewestMetadata(refIndex, count int) ([]*RecordMetadataWithStatus, error) { metadata, err := tp.query.QueryNewestMetadata(refIndex, count) if err != nil { return nil, err } return tp.addStatusToMetadata(metadata), nil } // QueryByIndex 根据索引查询单条记录的完整数据 // index: 记录索引 // 返回完整的记录数据,包含状态信息 func (tp *TopicProcessor) QueryByIndex(index int) (*RecordWithStatus, error) { record, err := tp.query.QueryByIndex(index) if err != nil { return nil, err } // 获取当前处理窗口位置 var startIdx, endIdx int tp.mu.RLock() if tp.tailer != nil { startIdx = tp.tailer.GetStartIndex() endIdx = tp.tailer.GetEndIndex() } tp.mu.RUnlock() status := GetRecordStatus(index, startIdx, endIdx) return &RecordWithStatus{ Record: record, Index: index, Status: status, }, nil } // GetRecordCount 获取记录总数(统一接口) func (tp *TopicProcessor) GetRecordCount() int { return tp.index.Count() } // QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录 // count: 查询数量 // 返回从处理窗口开始位置(startIndex)向后的记录,包含状态信息 func (tp *TopicProcessor) QueryFromProcessing(count int) ([]*RecordWithStatus, error) { // 获取当前处理窗口的开始位置 tp.mu.RLock() var startIdx int if tp.tailer != nil { startIdx = tp.tailer.GetStartIndex() } tp.mu.RUnlock() // 从 startIdx 开始向索引递增方向查询 // QueryNewest(refIndex, count) 查询从 refIndex+1 开始的记录 // 所以要从 startIdx 开始,应该调用 QueryNewest(startIdx - 1, count) return tp.QueryNewest(startIdx-1, count) } // QueryFromFirst 从第一条记录向索引递增方向查询 // count: 查询数量 // 返回从第一条记录(索引 0)开始的记录,包含状态信息 // 例如:QueryFromFirst(3) 查询索引 0, 1, 2 func (tp *TopicProcessor) QueryFromFirst(count int) ([]*RecordWithStatus, error) { // QueryNewest(-1, count) 会从索引 0 开始向后查询 return tp.QueryNewest(-1, count) } // QueryFromLast 从最后一条记录向索引递减方向查询 // count: 查询数量 // 返回从最后一条记录开始向前的记录,包含状态信息,按索引递增方向排序 // 例如:如果总共有 10 条记录,QueryFromLast(3) 查询索引 7, 8, 9 func (tp *TopicProcessor) QueryFromLast(count int) ([]*RecordWithStatus, error) { // 获取记录总数 totalCount := tp.index.Count() // 如果没有记录,返回空数组 if totalCount == 0 { return []*RecordWithStatus{}, nil } // QueryOldest(totalCount, count) 会从最后一条记录开始向前查询 // totalCount 是记录总数,有效索引是 0 到 totalCount-1 // 所以传入 totalCount 作为 refIndex,会查询 totalCount-count 到 totalCount-1 的记录 return tp.QueryOldest(totalCount, count) } // QueryFromFirstMetadata 从第一条记录向索引递增方向查询元数据 // count: 查询数量 // 返回从第一条记录(索引 0)开始的记录元数据,包含状态信息 // 例如:QueryFromFirstMetadata(3) 查询索引 0, 1, 2 的元数据 func (tp *TopicProcessor) QueryFromFirstMetadata(count int) ([]*RecordMetadataWithStatus, error) { // QueryNewestMetadata(-1, count) 会从索引 0 开始向后查询 return tp.QueryNewestMetadata(-1, count) } // QueryFromLastMetadata 从最后一条记录向索引递减方向查询元数据 // count: 查询数量 // 返回最后 N 条记录的元数据(按索引递增方向排序),包含状态信息 // 例如:QueryFromLastMetadata(3) 查询最后 3 条记录的元数据 func (tp *TopicProcessor) QueryFromLastMetadata(count int) ([]*RecordMetadataWithStatus, error) { totalCount := tp.index.Count() if totalCount == 0 { return []*RecordMetadataWithStatus{}, nil } // QueryOldestMetadata(totalCount, count) 会从 totalCount 向前查询 count 条 return tp.QueryOldestMetadata(totalCount, count) } // GetProcessingIndex 获取当前处理索引(窗口开始索引) func (tp *TopicProcessor) GetProcessingIndex() int { tp.mu.RLock() defer tp.mu.RUnlock() if tp.tailer == nil { return 0 } return tp.tailer.GetStartIndex() } // GetReadIndex 获取当前读取索引(窗口结束索引) func (tp *TopicProcessor) GetReadIndex() int { tp.mu.RLock() defer tp.mu.RUnlock() if tp.tailer == nil { return 0 } return tp.tailer.GetEndIndex() } // Subscribe 订阅事件 func (tp *TopicProcessor) Subscribe(eventType EventType, listener EventListener) { tp.eventBus.Subscribe(eventType, listener) } // SubscribeAll 订阅所有事件 func (tp *TopicProcessor) SubscribeAll(listener EventListener) { tp.eventBus.SubscribeAll(listener) } // Unsubscribe 取消订阅 func (tp *TopicProcessor) Unsubscribe(eventType EventType) { tp.eventBus.Unsubscribe(eventType) } // CanReset 检查是否可以执行重置操作 // 返回 true 表示可以重置,false 表示不能重置(正在重置中或有待处理的记录) func (tp *TopicProcessor) CanReset() bool { tp.mu.RLock() defer tp.mu.RUnlock() return tp.canResetLocked() } // canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁) func (tp *TopicProcessor) canResetLocked() bool { // 检查当前状态 if tp.status.State == StateResetting { return false } // 检查是否有待处理的日志 processingIndex := 0 if tp.tailer != nil { processingIndex = tp.tailer.GetStartIndex() } recordCount := 0 if tp.index != nil { recordCount = tp.index.Count() } // 只有当所有记录都已处理完成时才能重置 return processingIndex >= recordCount } // Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件 // 注意:Reset 不会停止 processor,只有在没有待处理的日志时才能执行 // 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态 func (tp *TopicProcessor) Reset() error { tp.mu.Lock() defer tp.mu.Unlock() // 检查是否可以执行重置操作 if !tp.canResetLocked() { // 提供详细的错误信息 currentState := tp.status.State if currentState == StateResetting { return nil } // 获取待处理记录信息 processingIndex := 0 if tp.tailer != nil { processingIndex = tp.tailer.GetStartIndex() } recordCount := 0 if tp.index != nil { recordCount = tp.index.Count() } return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount) } // 记录之前是否在运行 currentState := tp.status.State wasRunning := currentState == StateRunning // 进入 Resetting 状态 tp.setState(StateResetting, nil) tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState) // 如果之前在运行,停止 goroutines if wasRunning && tp.cancel != nil { tp.cancel() // 释放锁以等待 goroutines 停止 tp.mu.Unlock() tp.wg.Wait() tp.mu.Lock() } var errs []error // 重置所有组件(保持引用关系不变) // 1. 重置索引(会删除索引文件并重新创建) if tp.index != nil { if err := tp.index.Reset(); err != nil { tp.logger.Error("failed to reset index", "error", err) errs = append(errs, fmt.Errorf("reset index: %w", err)) } } // 2. 重置写入器(会删除日志文件并重新创建) if tp.writer != nil { if err := tp.writer.Reset(); err != nil { tp.logger.Error("failed to reset writer", "error", err) errs = append(errs, fmt.Errorf("reset writer: %w", err)) } } // 3. 重置游标(会删除位置文件并重置位置) if tp.cursor != nil { if err := tp.cursor.Reset(); err != nil { tp.logger.Error("failed to reset cursor", "error", err) errs = append(errs, fmt.Errorf("reset cursor: %w", err)) } } // 4. 重置查询器(重新打开日志文件) if tp.query != nil { if err := tp.query.Reset(); err != nil { tp.logger.Error("failed to reset query", "error", err) errs = append(errs, fmt.Errorf("reset query: %w", err)) } } // 5. 重置 tailer(重置内部状态) if tp.tailer != nil { if err := tp.tailer.Reset(); err != nil { tp.logger.Error("failed to reset tailer", "error", err) errs = append(errs, fmt.Errorf("reset tailer: %w", err)) } } // 6. 重置统计信息(会删除统计文件并重置所有计数器) if tp.stats != nil { if err := tp.stats.Reset(); err != nil { tp.logger.Error("failed to reset stats", "error", err) errs = append(errs, fmt.Errorf("reset stats: %w", err)) } } // 如果有错误,设置为错误状态 if len(errs) > 0 { tp.setState(StateError, errs[0]) return errs[0] } // 发布重置事件 tp.eventBus.Publish(&Event{ Type: EventProcessorReset, Topic: tp.topic, Timestamp: time.Now(), }) // 如果之前在运行,恢复到 Running 状态 if wasRunning { // 创建新的 context tp.ctx, tp.cancel = context.WithCancel(context.Background()) // 启动 tailer goroutine(如果有 handler) if tp.handler != nil && tp.tailer != nil { tp.wg.Add(1) go func() { defer tp.wg.Done() if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled { tp.logger.Error("tailer error after reset", "error", err) } }() } // 启动统计保存 goroutine tp.wg.Add(1) go func() { defer tp.wg.Done() ticker := time.NewTicker(tp.tailConfig.SaveInterval) defer ticker.Stop() for { select { case <-tp.ctx.Done(): return case <-ticker.C: if tp.stats != nil { if err := tp.stats.Save(); err != nil { tp.logger.Error("failed to save stats", "error", err) } } } } }() tp.setState(StateRunning, nil) tp.logger.Debug("processor reset completed, resumed to running state") } else { // 恢复到之前的状态(通常是 Idle 或 Stopped) if currentState == StateStopped { tp.setState(StateStopped, nil) } else { tp.setState(StateIdle, nil) } tp.logger.Debug("processor reset completed", "newState", tp.status.State) } // 如果有错误,返回第一个 if len(errs) > 0 { return errs[0] } return nil } // Close 清理 processor 的所有资源 func (tp *TopicProcessor) Close() error { tp.mu.Lock() defer tp.mu.Unlock() tp.logger.Debug("closing processor") var errs []error // 保存统计信息 if tp.stats != nil { if err := tp.stats.Save(); err != nil { tp.logger.Error("failed to save stats", "error", err) errs = append(errs, fmt.Errorf("save stats: %w", err)) } } // 关闭 query if tp.query != nil { if err := tp.query.Close(); err != nil { tp.logger.Error("failed to close query", "error", err) errs = append(errs, fmt.Errorf("close query: %w", err)) } tp.query = nil } // 关闭 cursor(如果 tailer 未启动,cursor 可能还未关闭) if tp.cursor != nil { if err := tp.cursor.Close(); err != nil { tp.logger.Error("failed to close cursor", "error", err) errs = append(errs, fmt.Errorf("close cursor: %w", err)) } tp.cursor = nil } // 关闭 writer if tp.writer != nil { if err := tp.writer.Close(); err != nil { tp.logger.Error("failed to close writer", "error", err) errs = append(errs, fmt.Errorf("close writer: %w", err)) } tp.writer = nil } // 关闭 index(最后关闭,因为其他组件可能依赖它) if tp.index != nil { if err := tp.index.Close(); err != nil { tp.logger.Error("failed to close index", "error", err) errs = append(errs, fmt.Errorf("close index: %w", err)) } tp.index = nil } // tailer 会通过 context cancel 和 Stop() 自动关闭 tp.tailer = nil tp.logger.Debug("processor closed") // 如果有多个错误,返回第一个 if len(errs) > 0 { return errs[0] } return nil }