重构:TopicProcessor 状态管理系统与 Reset 方法优化

新增功能:
- 添加 ProcessorState 状态类型(Idle/Starting/Running/Stopping/Stopped/Resetting/Error)
- 添加 ProcessorStatus 结构体和状态管理方法(GetState/GetStatus/setState)
- 实现状态转换逻辑和访问控制(CanWrite/CanQuery)
- 新增 CanReset() 方法检查是否可执行重置操作

Reset 方法优化:
- 重写 Reset() 方法,不再停止 processor
- 只有在无待处理记录时才能执行重置
- 进入 Resetting 状态期间阻止所有读写操作
- 重置后自动恢复到之前的运行状态
- 正确关闭并重置 cursor 和 stats 组件
- 调整执行顺序:先关闭组件,再删除文件,后重新初始化

错误处理增强:
- 添加 ErrProcessorResetting 和 ErrInvalidState 错误类型
- 添加 EventStateChanged 事件类型
- 修复 writer/index 为 nil 时的空指针问题

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-04 18:56:52 +08:00
parent 810664eb12
commit bcc328b129
6 changed files with 303 additions and 71 deletions

View File

@@ -68,10 +68,12 @@ func (c *ProcessCursor) Next() (*Record, error) {
}
// 写入保护:检查读取位置是否超过当前写入位置
dirtyOffset := c.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待
if dirtyOffset >= 0 && offset >= dirtyOffset {
return nil, io.EOF
if c.writer != nil {
dirtyOffset := c.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录起始位置 >= 写入位置,说明数据还未完全写入,返回 EOF 等待
if dirtyOffset >= 0 && offset >= dirtyOffset {
return nil, io.EOF
}
}
// Seek 到记录位置

View File

@@ -36,6 +36,12 @@ var (
// ErrInvalidConfig 表示配置无效
ErrInvalidConfig = errors.New("invalid config")
// ErrProcessorResetting 表示处理器正在重置中,操作被阻止
ErrProcessorResetting = errors.New("processor is resetting, operations blocked")
// ErrInvalidState 表示处理器状态无效,不允许执行该操作
ErrInvalidState = errors.New("invalid processor state for this operation")
)
// TopicError 表示与 topic 相关的错误

View File

@@ -17,6 +17,7 @@ const (
EventProcessorStop // Processor 停止
EventProcessorReset // Processor 重置
EventPositionSaved // 位置保存
EventStateChanged // 状态变更
)
// String 返回事件类型的字符串表示
@@ -38,6 +39,8 @@ func (e EventType) String() string {
return "Processor 重置"
case EventPositionSaved:
return "位置保存"
case EventStateChanged:
return "状态变更"
default:
return "未知事件"
}

View File

@@ -409,7 +409,7 @@ func (s *LogHub) NewTopicQuery(topic string) (*RecordQuery, error) {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor.Query(), nil
return processor.Query()
}
// GetProcessingIndex 获取指定 topic 的当前处理索引
@@ -577,12 +577,13 @@ func (s *LogHub) ResetTopic(topic string) error {
return fmt.Errorf("failed to reset processor: %w", err)
}
// 如果 LogHub 正在运行,重新启动 processor
// 如果 LogHub 正在运行且 processor 未运行,启动 processor
// 注意:如果 Reset() 已经自动恢复到 Running 状态,就不需要再启动
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if running {
if running && processor.GetState() != StateRunning {
if err := processor.Start(); err != nil {
return fmt.Errorf("failed to restart processor: %w", err)
}

View File

@@ -127,10 +127,13 @@ func (rq *RecordQuery) readRecordsMetadataForward(startIndex, count int) ([]*Rec
}
dataOffset := binary.LittleEndian.Uint64(hdr[4:12])
dirtyOffset := rq.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录位置 >= 写入位置,等待写入完成
if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) {
break
// 写入保护:如果 writer 存在,检查是否正在写入该记录
if rq.writer != nil {
dirtyOffset := rq.writer.GetDirtyOffset()
// 如果正在写入dirtyOffset >= 0且记录位置 >= 写入位置,等待写入完成
if dirtyOffset >= 0 && dataOffset >= uint64(dirtyOffset) {
break
}
}
dataLen := binary.LittleEndian.Uint32(hdr[0:4])

View File

@@ -10,6 +10,65 @@ import (
"time"
)
// ProcessorState 处理器状态
type ProcessorState int
const (
StateIdle ProcessorState = iota // 空闲(未启动)
StateStarting // 启动中
StateRunning // 运行中
StateStopping // 停止中
StateStopped // 已停止
StateResetting // 重置中(阻止所有操作)
StateError // 错误状态
)
// String 返回状态的字符串表示
func (s ProcessorState) String() string {
switch s {
case StateIdle:
return "idle"
case StateStarting:
return "starting"
case StateRunning:
return "running"
case StateStopping:
return "stopping"
case StateStopped:
return "stopped"
case StateResetting:
return "resetting"
case StateError:
return "error"
default:
return "unknown"
}
}
// CanWrite 判断当前状态是否允许写入
func (s ProcessorState) CanWrite() bool {
// 允许在 Idle、Starting、Running 状态下写入
// 不允许在 Stopping、Stopped、Resetting、Error 状态下写入
return s == StateIdle || s == StateStarting || s == StateRunning
}
// CanQuery 判断当前状态是否允许查询
func (s ProcessorState) CanQuery() bool {
return s != StateResetting
}
// CanProcess 判断当前状态是否允许处理
func (s ProcessorState) CanProcess() bool {
return s == StateRunning
}
// ProcessorStatus 处理器状态信息
type ProcessorStatus struct {
State ProcessorState // 当前状态
LastUpdated time.Time // 最后更新时间
Error error // 错误信息(仅在 StateError 时有效)
}
// TopicProcessor 作为聚合器,持有所有核心组件并提供统一的访问接口
type TopicProcessor struct {
topic string
@@ -27,15 +86,15 @@ type TopicProcessor struct {
// 配置和状态
handler RecordHandler
tailConfig *TailConfig
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
stats *TopicStats // 统计信息
eventBus *EventBus // 事件总线
status ProcessorStatus // 处理器状态
// 并发控制
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running bool
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// TopicConfig topic 配置
@@ -88,8 +147,12 @@ func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *Topic
tailConfig: tailConfig,
stats: NewTopicStats(statsPath),
eventBus: NewEventBus(),
ctx: ctx,
cancel: cancel,
status: ProcessorStatus{
State: StateIdle,
LastUpdated: time.Now(),
},
ctx: ctx,
cancel: cancel,
}
// 初始化所有组件
@@ -200,6 +263,18 @@ func (tp *TopicProcessor) createTailer() error {
// Write 写入日志(统一接口)
func (tp *TopicProcessor) Write(data []byte) (int64, error) {
// 检查状态
tp.mu.RLock()
state := tp.status.State
tp.mu.RUnlock()
if !state.CanWrite() {
if state == StateResetting {
return 0, ErrProcessorResetting
}
return 0, ErrNotRunning
}
offset, err := tp.writer.Append(data)
if err != nil {
tp.logger.Error("failed to append", "error", err)
@@ -237,19 +312,21 @@ func (tp *TopicProcessor) Start() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
return NewTopicError(tp.topic, "start", ErrAlreadyRunning)
// 检查状态
if tp.status.State != StateIdle && tp.status.State != StateStopped {
return NewTopicError(tp.topic, "start", ErrInvalidState)
}
tp.logger.Debug("starting processor")
// 设置为启动中状态
tp.setState(StateStarting, nil)
// 重新创建 context如果之前被 cancel 了)
if tp.ctx.Err() != nil {
tp.ctx, tp.cancel = context.WithCancel(context.Background())
}
tp.running = true
// 启动定期保存统计信息的 goroutine
tp.wg.Add(1)
go func() {
@@ -281,6 +358,9 @@ func (tp *TopicProcessor) Start() error {
tp.logger.Debug("tailer goroutine finished")
}()
// 设置为运行中状态
tp.setState(StateRunning, nil)
// 发布启动事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStart,
@@ -294,12 +374,14 @@ func (tp *TopicProcessor) Start() error {
// Stop 停止 tailer
func (tp *TopicProcessor) Stop() error {
tp.mu.Lock()
if !tp.running {
if tp.status.State != StateRunning {
tp.mu.Unlock()
return nil
}
tp.logger.Debug("stopping processor")
tp.running = false
// 设置为停止中状态
tp.setState(StateStopping, nil)
tp.cancel()
tp.mu.Unlock()
@@ -308,6 +390,11 @@ func (tp *TopicProcessor) Stop() error {
tp.logger.Debug("processor stopped")
// 设置为已停止状态
tp.mu.Lock()
tp.setState(StateStopped, nil)
tp.mu.Unlock()
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
@@ -328,11 +415,38 @@ func (tp *TopicProcessor) Title() string {
return tp.title
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
// GetState 获取当前状态
func (tp *TopicProcessor) GetState() ProcessorState {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.running
return tp.status.State
}
// GetStatus 获取完整状态信息
func (tp *TopicProcessor) GetStatus() ProcessorStatus {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.status
}
// setState 设置状态(需持有锁)
func (tp *TopicProcessor) setState(state ProcessorState, err error) {
tp.status.State = state
tp.status.LastUpdated = time.Now()
tp.status.Error = err
// 发布状态变更事件
tp.eventBus.Publish(&Event{
Type: EventStateChanged,
Topic: tp.topic,
Timestamp: time.Now(),
Error: err,
})
}
// IsRunning 检查是否正在运行
func (tp *TopicProcessor) IsRunning() bool {
return tp.GetState() == StateRunning
}
// UpdateTailConfig 动态更新 tail 配置
@@ -368,8 +482,15 @@ func (tp *TopicProcessor) GetStats() Stats {
}
// Query 获取共享的查询器
func (tp *TopicProcessor) Query() *RecordQuery {
return tp.query
func (tp *TopicProcessor) Query() (*RecordQuery, error) {
tp.mu.RLock()
defer tp.mu.RUnlock()
if !tp.status.State.CanQuery() {
return nil, ErrProcessorResetting
}
return tp.query, nil
}
// addStatusToRecords 为记录添加状态信息(辅助方法)
@@ -609,50 +730,97 @@ func (tp *TopicProcessor) Unsubscribe(eventType EventType) {
tp.eventBus.Unsubscribe(eventType)
}
// CanReset 检查是否可以执行重置操作
// 返回 true 表示可以重置false 表示不能重置(正在重置中或有待处理的记录)
func (tp *TopicProcessor) CanReset() bool {
tp.mu.RLock()
defer tp.mu.RUnlock()
return tp.canResetLocked()
}
// canResetLocked 内部方法,检查是否可以重置(调用前必须已持有锁)
func (tp *TopicProcessor) canResetLocked() bool {
// 检查当前状态
if tp.status.State == StateResetting {
return false
}
// 检查是否有待处理的日志
processingIndex := 0
if tp.tailer != nil {
processingIndex = tp.tailer.GetStartIndex()
}
recordCount := 0
if tp.index != nil {
recordCount = tp.index.Count()
}
// 只有当所有记录都已处理完成时才能重置
return processingIndex >= recordCount
}
// Reset 清空 topic 的所有数据,包括日志文件、位置文件和统计文件
// 如果 processor 正在运行且没有待处理的日志,会自动停止
// 如果有待处理的日志,则返回错误,需要等待处理完成或手动停止
// 注意Reset 不会停止 processor,只有在没有待处理的日志时才能执行
// 重置完成后,如果之前是 Running 状态,会自动恢复到 Running 状态
func (tp *TopicProcessor) Reset() error {
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.running {
// 检查是否有待处理的日志
// 检查是否可以执行重置操作
if !tp.canResetLocked() {
// 提供详细的错误信息
currentState := tp.status.State
if currentState == StateResetting {
return nil
}
// 获取待处理记录信息
processingIndex := 0
if tp.tailer != nil {
processingIndex = tp.tailer.GetStartIndex()
}
recordCount := tp.index.Count()
hasPendingRecords := processingIndex < recordCount
if hasPendingRecords {
return fmt.Errorf("cannot reset while processor is running with pending records (%d/%d processed), please stop first or wait for processing to complete", processingIndex, recordCount)
recordCount := 0
if tp.index != nil {
recordCount = tp.index.Count()
}
// 没有待处理的日志,自动停止
tp.logger.Debug("auto-stopping processor before reset (no pending records)")
tp.running = false
return fmt.Errorf("cannot reset with pending records (%d/%d processed), wait for processing to complete", processingIndex, recordCount)
}
// 记录之前是否在运行
currentState := tp.status.State
wasRunning := currentState == StateRunning
// 进入 Resetting 状态
tp.setState(StateResetting, nil)
tp.logger.Debug("entering resetting state", "wasRunning", wasRunning, "currentState", currentState)
// 如果之前在运行,停止 goroutines
if wasRunning && tp.cancel != nil {
tp.cancel()
// 释放锁以等待 tailer 停止
// 释放锁以等待 goroutines 停止
tp.mu.Unlock()
tp.wg.Wait()
tp.mu.Lock()
// 发布停止事件
tp.eventBus.Publish(&Event{
Type: EventProcessorStop,
Topic: tp.topic,
Timestamp: time.Now(),
})
}
tp.logger.Debug("resetting processor")
var errs []error
// 关闭 writer如果还未关闭
// 关闭所有组件(避免在删除文件时重新创建
if tp.cursor != nil {
if err := tp.cursor.Close(); err != nil {
tp.logger.Error("failed to close cursor during reset", "error", err)
errs = append(errs, fmt.Errorf("close cursor: %w", err))
}
tp.cursor = nil
}
if tp.query != nil {
tp.query.Close()
tp.query = nil
}
if tp.writer != nil {
if err := tp.writer.Close(); err != nil {
tp.logger.Error("failed to close writer during reset", "error", err)
@@ -660,7 +828,12 @@ func (tp *TopicProcessor) Reset() error {
}
tp.writer = nil
}
if tp.index != nil {
tp.index.Close()
tp.index = nil
}
// 删除所有文件
// 删除日志文件
if err := os.Remove(tp.logPath); err != nil && !os.IsNotExist(err) {
tp.logger.Error("failed to remove log file", "error", err)
@@ -681,30 +854,25 @@ func (tp *TopicProcessor) Reset() error {
errs = append(errs, fmt.Errorf("remove index file: %w", err))
}
// 关闭所有组件
if tp.query != nil {
tp.query.Close()
tp.query = nil
}
if tp.index != nil {
tp.index.Close()
tp.index = nil
}
// 重新初始化所有组件(已持有锁)
// 这会重新创建 index, writer, query如果有 handler 也会创建 tailer
// 重新初始化所有组件
if err := tp.initializeComponents(); err != nil {
tp.logger.Error("failed to reinitialize components", "error", err)
errs = append(errs, fmt.Errorf("reinitialize components: %w", err))
tp.setState(StateError, err)
if len(errs) > 0 {
return errs[0]
}
return err
}
// 重置统计信息
// 重置统计信息(会删除统计文件并重置所有计数器)
if tp.stats != nil {
tp.stats.Reset()
if err := tp.stats.Reset(); err != nil {
tp.logger.Error("failed to reset stats", "error", err)
errs = append(errs, fmt.Errorf("reset stats: %w", err))
}
}
tp.logger.Debug("processor reset completed")
// 发布重置事件
tp.eventBus.Publish(&Event{
Type: EventProcessorReset,
@@ -712,7 +880,56 @@ func (tp *TopicProcessor) Reset() error {
Timestamp: time.Now(),
})
// 如果有多个错误,返回第一个
// 如果之前在运行,恢复到 Running 状态
if wasRunning {
// 创建新的 context
tp.ctx, tp.cancel = context.WithCancel(context.Background())
// 启动 tailer goroutine如果有 handler
if tp.handler != nil && tp.tailer != nil {
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
if err := tp.tailer.Start(tp.ctx); err != nil && err != context.Canceled {
tp.logger.Error("tailer error after reset", "error", err)
}
}()
}
// 启动统计保存 goroutine
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
ticker := time.NewTicker(tp.tailConfig.SaveInterval)
defer ticker.Stop()
for {
select {
case <-tp.ctx.Done():
return
case <-ticker.C:
if tp.stats != nil {
if err := tp.stats.Save(); err != nil {
tp.logger.Error("failed to save stats", "error", err)
}
}
}
}
}()
tp.setState(StateRunning, nil)
tp.logger.Debug("processor reset completed, resumed to running state")
} else {
// 恢复到之前的状态(通常是 Idle 或 Stopped
if currentState == StateStopped {
tp.setState(StateStopped, nil)
} else {
tp.setState(StateIdle, nil)
}
tp.logger.Debug("processor reset completed", "newState", tp.status.State)
}
// 如果有错误,返回第一个
if len(errs) > 0 {
return errs[0]
}