From bcc328b12939a8a4ac5beb7853ad03b16f89da65 Mon Sep 17 00:00:00 2001 From: bourdon Date: Sat, 4 Oct 2025 18:56:52 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=EF=BC=9ATopicProcessor=20?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86=E7=B3=BB=E7=BB=9F=E4=B8=8E?= =?UTF-8?q?=20Reset=20=E6=96=B9=E6=B3=95=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增功能: - 添加 ProcessorState 状态类型(Idle/Starting/Running/Stopping/Stopped/Resetting/Error) - 添加 ProcessorStatus 结构体和状态管理方法(GetState/GetStatus/setState) - 实现状态转换逻辑和访问控制(CanWrite/CanQuery) - 新增 CanReset() 方法检查是否可执行重置操作 Reset 方法优化: - 重写 Reset() 方法,不再停止 processor - 只有在无待处理记录时才能执行重置 - 进入 Resetting 状态期间阻止所有读写操作 - 重置后自动恢复到之前的运行状态 - 正确关闭并重置 cursor 和 stats 组件 - 调整执行顺序:先关闭组件,再删除文件,后重新初始化 错误处理增强: - 添加 ErrProcessorResetting 和 ErrInvalidState 错误类型 - 添加 EventStateChanged 事件类型 - 修复 writer/index 为 nil 时的空指针问题 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cursor.go | 10 +- errors.go | 6 + event.go | 3 + loghub.go | 7 +- query.go | 11 +- topic_processor.go | 337 +++++++++++++++++++++++++++++++++++++-------- 6 files changed, 303 insertions(+), 71 deletions(-) diff --git a/cursor.go b/cursor.go index ceff177..df6ab9d 100644 --- a/cursor.go +++ b/cursor.go @@ -68,10 +68,12 @@ func (c *ProcessCursor) Next() (*Record, error) { } // 写入保护:检查读取位置是否超过当前写入位置 - dirtyOffset := c.writer.GetDirtyOffset() - // 如果正在写入(dirtyOffset >= 0)且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待 - if dirtyOffset >= 0 && offset >= dirtyOffset { - return nil, io.EOF + if c.writer != nil { + dirtyOffset := c.writer.GetDirtyOffset() + // 如果正在写入(dirtyOffset >= 0)且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待 + if dirtyOffset >= 0 && offset >= dirtyOffset { + return nil, io.EOF + } } // Seek 到记录位置 diff --git a/errors.go b/errors.go index 3a358f3..72198fd 100644 --- a/errors.go +++ b/errors.go @@ -36,6 +36,12 @@ var ( // ErrInvalidConfig 表示配置无效 ErrInvalidConfig = errors.New("invalid config") + + // ErrProcessorResetting 表示处理器正在重置中,操作被阻止 + ErrProcessorResetting = errors.New("processor is resetting, operations blocked") + + // ErrInvalidState 表示处理器状态无效,不允许执行该操作 + ErrInvalidState = errors.New("invalid processor state for this operation") ) // TopicError 表示与 topic 相关的错误 diff --git a/event.go b/event.go index 92658d8..08c5ab6 100644 --- a/event.go +++ b/event.go @@ -17,6 +17,7 @@ const ( EventProcessorStop // Processor 停止 EventProcessorReset // Processor 重置 EventPositionSaved // 位置保存 + EventStateChanged // 状态变更 ) // String 返回事件类型的字符串表示 @@ -38,6 +39,8 @@ func (e EventType) String() string { return "Processor 重置" case EventPositionSaved: return "位置保存" + case EventStateChanged: + return "状态变更" default: return "未知事件" } diff --git a/loghub.go b/loghub.go index 76f0866..ff65964 100644 --- a/loghub.go +++ b/loghub.go @@ -409,7 +409,7 @@ func (s *LogHub) NewTopicQuery(topic string) (*RecordQuery, error) { return nil, NewTopicError(topic, "get", ErrNotFound) } - return processor.Query(), nil + return processor.Query() } // GetProcessingIndex 获取指定 topic 的当前处理索引 @@ -577,12 +577,13 @@ func (s *LogHub) ResetTopic(topic string) error { return fmt.Errorf("failed to reset processor: %w", err) } - // 如果 LogHub 正在运行,重新启动 processor + // 如果 LogHub 正在运行且 processor 未运行,启动 processor + // 注意:如果 Reset() 已经自动恢复到 Running 状态,就不需要再启动 s.mu.RLock() running := s.running s.mu.RUnlock() - if running { + if running && processor.GetState() != StateRunning { if err := processor.Start(); err != nil { return fmt.Errorf("failed to restart processor: %w", err) } diff --git a/query.go b/query.go index 1b84b93..c6842a7 100644 --- a/query.go +++ b/query.go @@ -127,10 +127,13 @@ func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*Rec } dataOffset := binary.LittleEndian.Uint64(hdr[4:12]) - dirtyOffset := rq.writer.GetDirtyOffset() - // 如果正在写入(dirtyOffset >= 0)且记录位置 >= 写入位置,等待写入完成 - if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) { - break + // 写入保护:如果 writer 存在,检查是否正在写入该记录 + if rq.writer != nil { + dirtyOffset := rq.writer.GetDirtyOffset() + // 如果正在写入(dirtyOffset >= 0)且记录位置 >= 写入位置,等待写入完成 + if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) { + break + } } dataLen := binary.LittleEndian.Uint32(hdr[0:4]) diff --git a/topic_processor.go b/topic_processor.go index 9fcb329..bc540d8 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -10,6 +10,65 @@ import ( "time" ) +// ProcessorState 处理器状态 +type ProcessorState int + +const ( + StateIdle ProcessorState = iota // 空闲(未启动) + StateStarting // 启动中 + StateRunning // 运行中 + StateStopping // 停止中 + StateStopped // 已停止 + StateResetting // 重置中(阻止所有操作) + StateError // 错误状态 +) + +// String 返回状态的字符串表示 +func (s ProcessorState) String() string { + switch s { + case StateIdle: + return "idle" + case StateStarting: + return "starting" + case StateRunning: + return "running" + case StateStopping: + return "stopping" + case StateStopped: + return "stopped" + case StateResetting: + return "resetting" + case StateError: + return "error" + default: + return "unknown" + } +} + +// CanWrite 判断当前状态是否允许写入 +func (s ProcessorState) CanWrite() bool { + // 允许在 Idle、Starting、Running 状态下写入 + // 不允许在 Stopping、Stopped、Resetting、Error 状态下写入 + return s == StateIdle || s == StateStarting || s == StateRunning +} + +// CanQuery 判断当前状态是否允许查询 +func (s ProcessorState) CanQuery() bool { + return s != StateResetting +} + +// CanProcess 判断当前状态是否允许处理 +func (s ProcessorState) CanProcess() bool { + return s == StateRunning +} + +// ProcessorStatus 处理器状态信息 +type ProcessorStatus struct { + State ProcessorState // 当前状态 + LastUpdated time.Time // 最后更新时间 + Error error // 错误信息(仅在 StateError 时有效) +} + // TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口 type TopicProcessor struct { topic string @@ -27,15 +86,15 @@ type TopicProcessor struct { // 配置和状态 handler RecordHandler tailConfig *TailConfig - stats *TopicStats // 统计信息 - eventBus *EventBus // 事件总线 + stats *TopicStats // 统计信息 + eventBus *EventBus // 事件总线 + status ProcessorStatus // 处理器状态 // 并发控制 - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - running bool + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } // TopicConfig topic 配置 @@ -88,8 +147,12 @@ func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *Topic tailConfig: tailConfig, stats: NewTopicStats(statsPath), eventBus: NewEventBus(), - ctx: ctx, - cancel: cancel, + status: ProcessorStatus{ + State: StateIdle, + LastUpdated: time.Now(), + }, + ctx: ctx, + cancel: cancel, } // 初始化所有组件 @@ -200,6 +263,18 @@ func (tp *TopicProcessor) createTailer() error { // Write 写入日志(统一接口) func (tp *TopicProcessor) Write(data []byte) (int64, error) { + // 检查状态 + tp.mu.RLock() + state := tp.status.State + tp.mu.RUnlock() + + if !state.CanWrite() { + if state == StateResetting { + return 0, ErrProcessorResetting + } + return 0, ErrNotRunning + } + offset, err := tp.writer.Append(data) if err != nil { tp.logger.Error("failed to append", "error", err) @@ -237,19 +312,21 @@ func (tp *TopicProcessor) Start() error { tp.mu.Lock() defer tp.mu.Unlock() - if tp.running { - return NewTopicError(tp.topic, "start", ErrAlreadyRunning) + // 检查状态 + if tp.status.State != StateIdle && tp.status.State != StateStopped { + return NewTopicError(tp.topic, "start", ErrInvalidState) } tp.logger.Debug("starting processor") + // 设置为启动中状态 + tp.setState(StateStarting, nil) + // 重新创建 context(如果之前被 cancel 了) if tp.ctx.Err() != nil { tp.ctx, tp.cancel = context.WithCancel(context.Background()) } - tp.running = true - // 启动定期保存统计信息的 goroutine tp.wg.Add(1) go func() { @@ -281,6 +358,9 @@ func (tp *TopicProcessor) Start() error { tp.logger.Debug("tailer goroutine finished") }() + // 设置为运行中状态 + tp.setState(StateRunning, nil) + // 发布启动事件 tp.eventBus.Publish(&Event{ Type: EventProcessorStart, @@ -294,12 +374,14 @@ func (tp *TopicProcessor) Start() error { // Stop 停止 tailer func (tp *TopicProcessor) Stop() error { tp.mu.Lock() - if !tp.running { + if tp.status.State != StateRunning { tp.mu.Unlock() return nil } tp.logger.Debug("stopping processor") - tp.running = false + + // 设置为停止中状态 + tp.setState(StateStopping, nil) tp.cancel() tp.mu.Unlock() @@ -308,6 +390,11 @@ func (tp *TopicProcessor) Stop() error { tp.logger.Debug("processor stopped") + // 设置为已停止状态 + tp.mu.Lock() + tp.setState(StateStopped, nil) + tp.mu.Unlock() + // 发布停止事件 tp.eventBus.Publish(&Event{ Type: EventProcessorStop, @@ -328,11 +415,38 @@ func (tp *TopicProcessor) Title() string { return tp.title } -// IsRunning 检查是否正在运行 -func (tp *TopicProcessor) IsRunning() bool { +// GetState 获取当前状态 +func (tp *TopicProcessor) GetState() ProcessorState { tp.mu.RLock() defer tp.mu.RUnlock() - return tp.running + return tp.status.State +} + +// GetStatus 获取完整状态信息 +func (tp *TopicProcessor) GetStatus() ProcessorStatus { + tp.mu.RLock() + defer tp.mu.RUnlock() + return tp.status +} + +// setState 设置状态(需持有锁) +func (tp *TopicProcessor) setState(state ProcessorState, err error) { + tp.status.State = state + tp.status.LastUpdated = time.Now() + tp.status.Error = err + + // 发布状态变更事件 + tp.eventBus.Publish(&Event{ + Type: EventStateChanged, + Topic: tp.topic, + Timestamp: time.Now(), + Error: err, + }) +} + +// IsRunning 检查是否正在运行 +func (tp *TopicProcessor) IsRunning() bool { + return tp.GetState() == StateRunning } // UpdateTailConfig 动态更新 tail 配置 @@ -368,8 +482,15 @@ func (tp *TopicProcessor) GetStats() Stats { } // Query 获取共享的查询器 -func (tp *TopicProcessor) Query() *RecordQuery { - return tp.query +func (tp *TopicProcessor) Query() (*RecordQuery, error) { + tp.mu.RLock() + defer tp.mu.RUnlock() + + if !tp.status.State.CanQuery() { + return nil, ErrProcessorResetting + } + + return tp.query, nil } // addStatusToRecords 为记录添加状态信息(辅助方法) @@ -609,50 +730,97 @@ func (tp *TopicProcessor) Unsubscribe(eventType EventType) { tp.eventBus.Unsubscribe(eventType) } +// CanReset 检查是否可以执行重置操作 +// 返回 true 表示可以重置,false 表示不能重置(正在重置中或有待处理的记录) +func (tp *TopicProcessor) CanReset() bool { + tp.mu.RLock() + defer tp.mu.RUnlock() + return tp.canResetLocked() +} + +// canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁) +func (tp *TopicProcessor) canResetLocked() bool { + // 检查当前状态 + if tp.status.State == StateResetting { + return false + } + + // 检查是否有待处理的日志 + processingIndex := 0 + if tp.tailer != nil { + processingIndex = tp.tailer.GetStartIndex() + } + + recordCount := 0 + if tp.index != nil { + recordCount = tp.index.Count() + } + + // 只有当所有记录都已处理完成时才能重置 + return processingIndex >= recordCount +} + // Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件 -// 如果 processor 正在运行且没有待处理的日志,会自动停止 -// 如果有待处理的日志,则返回错误,需要等待处理完成或手动停止 +// 注意:Reset 不会停止 processor,只有在没有待处理的日志时才能执行 +// 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态 func (tp *TopicProcessor) Reset() error { tp.mu.Lock() defer tp.mu.Unlock() - if tp.running { - // 检查是否有待处理的日志 + // 检查是否可以执行重置操作 + if !tp.canResetLocked() { + // 提供详细的错误信息 + currentState := tp.status.State + if currentState == StateResetting { + return nil + } + + // 获取待处理记录信息 processingIndex := 0 if tp.tailer != nil { processingIndex = tp.tailer.GetStartIndex() } - recordCount := tp.index.Count() - hasPendingRecords := processingIndex < recordCount - - if hasPendingRecords { - return fmt.Errorf("cannot reset while processor is running with pending records (%d/%d processed), please stop first or wait for processing to complete", processingIndex, recordCount) + recordCount := 0 + if tp.index != nil { + recordCount = tp.index.Count() } - // 没有待处理的日志,自动停止 - tp.logger.Debug("auto-stopping processor before reset (no pending records)") - tp.running = false + return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount) + } + + // 记录之前是否在运行 + currentState := tp.status.State + wasRunning := currentState == StateRunning + + // 进入 Resetting 状态 + tp.setState(StateResetting, nil) + tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState) + + // 如果之前在运行,停止 goroutines + if wasRunning && tp.cancel != nil { tp.cancel() - // 释放锁以等待 tailer 停止 + // 释放锁以等待 goroutines 停止 tp.mu.Unlock() tp.wg.Wait() tp.mu.Lock() - - // 发布停止事件 - tp.eventBus.Publish(&Event{ - Type: EventProcessorStop, - Topic: tp.topic, - Timestamp: time.Now(), - }) } - tp.logger.Debug("resetting processor") - var errs []error - // 关闭 writer(如果还未关闭) + // 先关闭所有组件(避免在删除文件时重新创建) + if tp.cursor != nil { + if err := tp.cursor.Close(); err != nil { + tp.logger.Error("failed to close cursor during reset", "error", err) + errs = append(errs, fmt.Errorf("close cursor: %w", err)) + } + tp.cursor = nil + } + if tp.query != nil { + tp.query.Close() + tp.query = nil + } if tp.writer != nil { if err := tp.writer.Close(); err != nil { tp.logger.Error("failed to close writer during reset", "error", err) @@ -660,7 +828,12 @@ func (tp *TopicProcessor) Reset() error { } tp.writer = nil } + if tp.index != nil { + tp.index.Close() + tp.index = nil + } + // 删除所有文件 // 删除日志文件 if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) { tp.logger.Error("failed to remove log file", "error", err) @@ -681,30 +854,25 @@ func (tp *TopicProcessor) Reset() error { errs = append(errs, fmt.Errorf("remove index file: %w", err)) } - // 关闭所有组件 - if tp.query != nil { - tp.query.Close() - tp.query = nil - } - if tp.index != nil { - tp.index.Close() - tp.index = nil - } - - // 重新初始化所有组件(已持有锁) - // 这会重新创建 index, writer, query,如果有 handler 也会创建 tailer + // 重新初始化所有组件 if err := tp.initializeComponents(); err != nil { tp.logger.Error("failed to reinitialize components", "error", err) errs = append(errs, fmt.Errorf("reinitialize components: %w", err)) + tp.setState(StateError, err) + if len(errs) > 0 { + return errs[0] + } + return err } - // 重置统计信息 + // 重置统计信息(会删除统计文件并重置所有计数器) if tp.stats != nil { - tp.stats.Reset() + if err := tp.stats.Reset(); err != nil { + tp.logger.Error("failed to reset stats", "error", err) + errs = append(errs, fmt.Errorf("reset stats: %w", err)) + } } - tp.logger.Debug("processor reset completed") - // 发布重置事件 tp.eventBus.Publish(&Event{ Type: EventProcessorReset, @@ -712,7 +880,56 @@ func (tp *TopicProcessor) Reset() error { Timestamp: time.Now(), }) - // 如果有多个错误,返回第一个 + // 如果之前在运行,恢复到 Running 状态 + if wasRunning { + // 创建新的 context + tp.ctx, tp.cancel = context.WithCancel(context.Background()) + + // 启动 tailer goroutine(如果有 handler) + if tp.handler != nil && tp.tailer != nil { + tp.wg.Add(1) + go func() { + defer tp.wg.Done() + if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled { + tp.logger.Error("tailer error after reset", "error", err) + } + }() + } + + // 启动统计保存 goroutine + tp.wg.Add(1) + go func() { + defer tp.wg.Done() + ticker := time.NewTicker(tp.tailConfig.SaveInterval) + defer ticker.Stop() + + for { + select { + case <-tp.ctx.Done(): + return + case <-ticker.C: + if tp.stats != nil { + if err := tp.stats.Save(); err != nil { + tp.logger.Error("failed to save stats", "error", err) + } + } + } + } + }() + + tp.setState(StateRunning, nil) + tp.logger.Debug("processor reset completed, resumed to running state") + } else { + // 恢复到之前的状态(通常是 Idle 或 Stopped) + if currentState == StateStopped { + tp.setState(StateStopped, nil) + } else { + tp.setState(StateIdle, nil) + } + tp.logger.Debug("processor reset completed", "newState", tp.status.State) + } + + // 如果有错误,返回第一个 if len(errs) > 0 { return errs[0] }