新增功能: - 添加 ProcessorState 状态类型(Idle/Starting/Running/Stopping/Stopped/Resetting/Error) - 添加 ProcessorStatus 结构体和状态管理方法(GetState/GetStatus/setState) - 实现状态转换逻辑和访问控制(CanWrite/CanQuery) - 新增 CanReset() 方法检查是否可执行重置操作 Reset 方法优化: - 重写 Reset() 方法,不再停止 processor - 只有在无待处理记录时才能执行重置 - 进入 Resetting 状态期间阻止所有读写操作 - 重置后自动恢复到之前的运行状态 - 正确关闭并重置 cursor 和 stats 组件 - 调整执行顺序:先关闭组件,再删除文件,后重新初始化 错误处理增强: - 添加 ErrProcessorResetting 和 ErrInvalidState 错误类型 - 添加 EventStateChanged 事件类型 - 修复 writer/index 为 nil 时的空指针问题 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
175 lines
4.0 KiB
Go
175 lines
4.0 KiB
Go
package seqlog
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// EventType 事件类型
|
|
type EventType int
|
|
|
|
const (
|
|
EventWriteSuccess EventType = iota // 写入成功
|
|
EventWriteError // 写入错误
|
|
EventProcessSuccess // 处理成功
|
|
EventProcessError // 处理错误
|
|
EventProcessorStart // Processor 启动
|
|
EventProcessorStop // Processor 停止
|
|
EventProcessorReset // Processor 重置
|
|
EventPositionSaved // 位置保存
|
|
EventStateChanged // 状态变更
|
|
)
|
|
|
|
// String 返回事件类型的字符串表示
|
|
func (e EventType) String() string {
|
|
switch e {
|
|
case EventWriteSuccess:
|
|
return "写入成功"
|
|
case EventWriteError:
|
|
return "写入错误"
|
|
case EventProcessSuccess:
|
|
return "处理成功"
|
|
case EventProcessError:
|
|
return "处理错误"
|
|
case EventProcessorStart:
|
|
return "Processor 启动"
|
|
case EventProcessorStop:
|
|
return "Processor 停止"
|
|
case EventProcessorReset:
|
|
return "Processor 重置"
|
|
case EventPositionSaved:
|
|
return "位置保存"
|
|
case EventStateChanged:
|
|
return "状态变更"
|
|
default:
|
|
return "未知事件"
|
|
}
|
|
}
|
|
|
|
// Event 事件数据
|
|
type Event struct {
|
|
Type EventType // 事件类型
|
|
Topic string // topic 名称
|
|
Timestamp time.Time // 事件时间
|
|
Record *Record // 相关记录(可选)
|
|
Error error // 错误信息(可选)
|
|
Position int64 // 位置信息(可选)
|
|
}
|
|
|
|
// EventListener 事件监听器
|
|
type EventListener func(*Event)
|
|
|
|
// EventBus 事件总线
|
|
type EventBus struct {
|
|
listeners map[EventType][]EventListener
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewEventBus 创建事件总线
|
|
func NewEventBus() *EventBus {
|
|
return &EventBus{
|
|
listeners: make(map[EventType][]EventListener),
|
|
}
|
|
}
|
|
|
|
// Subscribe 订阅事件
|
|
func (eb *EventBus) Subscribe(eventType EventType, listener EventListener) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
eb.listeners[eventType] = append(eb.listeners[eventType], listener)
|
|
}
|
|
|
|
// SubscribeAll 订阅所有事件
|
|
func (eb *EventBus) SubscribeAll(listener EventListener) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
allTypes := []EventType{
|
|
EventWriteSuccess,
|
|
EventWriteError,
|
|
EventProcessSuccess,
|
|
EventProcessError,
|
|
EventProcessorStart,
|
|
EventProcessorStop,
|
|
EventProcessorReset,
|
|
EventPositionSaved,
|
|
}
|
|
|
|
for _, eventType := range allTypes {
|
|
eb.listeners[eventType] = append(eb.listeners[eventType], listener)
|
|
}
|
|
}
|
|
|
|
// Unsubscribe 取消订阅(移除所有该类型的监听器)
|
|
func (eb *EventBus) Unsubscribe(eventType EventType) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
delete(eb.listeners, eventType)
|
|
}
|
|
|
|
// Publish 发布事件
|
|
func (eb *EventBus) Publish(event *Event) {
|
|
eb.mu.RLock()
|
|
listeners := eb.listeners[event.Type]
|
|
eb.mu.RUnlock()
|
|
|
|
// 异步通知所有监听器
|
|
for _, listener := range listeners {
|
|
// 每个监听器在单独的 goroutine 中执行,避免阻塞
|
|
go func(l EventListener) {
|
|
defer func() {
|
|
// 防止 listener panic 影响其他监听器
|
|
if r := recover(); r != nil {
|
|
// 可以在这里记录 panic 信息
|
|
}
|
|
}()
|
|
l(event)
|
|
}(listener)
|
|
}
|
|
}
|
|
|
|
// PublishSync 同步发布事件(按顺序执行监听器)
|
|
func (eb *EventBus) PublishSync(event *Event) {
|
|
eb.mu.RLock()
|
|
listeners := eb.listeners[event.Type]
|
|
eb.mu.RUnlock()
|
|
|
|
for _, listener := range listeners {
|
|
func(l EventListener) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// 防止 panic
|
|
}
|
|
}()
|
|
l(event)
|
|
}(listener)
|
|
}
|
|
}
|
|
|
|
// Clear 清空所有监听器
|
|
func (eb *EventBus) Clear() {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
eb.listeners = make(map[EventType][]EventListener)
|
|
}
|
|
|
|
// HasListeners 检查是否有监听器
|
|
func (eb *EventBus) HasListeners(eventType EventType) bool {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
|
|
listeners, exists := eb.listeners[eventType]
|
|
return exists && len(listeners) > 0
|
|
}
|
|
|
|
// ListenerCount 获取监听器数量
|
|
func (eb *EventBus) ListenerCount(eventType EventType) int {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
|
|
return len(eb.listeners[eventType])
|
|
}
|