package seqlog import ( "sync" "time" ) // EventType 事件类型 type EventType int const ( EventWriteSuccess EventType = iota // 写入成功 EventWriteError // 写入错误 EventProcessSuccess // 处理成功 EventProcessError // 处理错误 EventProcessorStart // Processor 启动 EventProcessorStop // Processor 停止 EventProcessorReset // Processor 重置 EventPositionSaved // 位置保存 EventStateChanged // 状态变更 ) // String 返回事件类型的字符串表示 func (e EventType) String() string { switch e { case EventWriteSuccess: return "写入成功" case EventWriteError: return "写入错误" case EventProcessSuccess: return "处理成功" case EventProcessError: return "处理错误" case EventProcessorStart: return "Processor 启动" case EventProcessorStop: return "Processor 停止" case EventProcessorReset: return "Processor 重置" case EventPositionSaved: return "位置保存" case EventStateChanged: return "状态变更" default: return "未知事件" } } // Event 事件数据 type Event struct { Type EventType // 事件类型 Topic string // topic 名称 Timestamp time.Time // 事件时间 Record *Record // 相关记录(可选) Error error // 错误信息(可选) Position int64 // 位置信息(可选) } // EventListener 事件监听器 type EventListener func(*Event) // EventBus 事件总线 type EventBus struct { listeners map[EventType][]EventListener mu sync.RWMutex } // NewEventBus 创建事件总线 func NewEventBus() *EventBus { return &EventBus{ listeners: make(map[EventType][]EventListener), } } // Subscribe 订阅事件 func (eb *EventBus) Subscribe(eventType EventType, listener EventListener) { eb.mu.Lock() defer eb.mu.Unlock() eb.listeners[eventType] = append(eb.listeners[eventType], listener) } // SubscribeAll 订阅所有事件 func (eb *EventBus) SubscribeAll(listener EventListener) { eb.mu.Lock() defer eb.mu.Unlock() allTypes := []EventType{ EventWriteSuccess, EventWriteError, EventProcessSuccess, EventProcessError, EventProcessorStart, EventProcessorStop, EventProcessorReset, EventPositionSaved, } for _, eventType := range allTypes { eb.listeners[eventType] = append(eb.listeners[eventType], listener) } } // Unsubscribe 取消订阅(移除所有该类型的监听器) func (eb *EventBus) Unsubscribe(eventType EventType) { eb.mu.Lock() defer eb.mu.Unlock() delete(eb.listeners, eventType) } // Publish 发布事件 func (eb *EventBus) Publish(event *Event) { eb.mu.RLock() listeners := eb.listeners[event.Type] eb.mu.RUnlock() // 异步通知所有监听器 for _, listener := range listeners { // 每个监听器在单独的 goroutine 中执行,避免阻塞 go func(l EventListener) { defer func() { // 防止 listener panic 影响其他监听器 if r := recover(); r != nil { // 可以在这里记录 panic 信息 } }() l(event) }(listener) } } // PublishSync 同步发布事件(按顺序执行监听器) func (eb *EventBus) PublishSync(event *Event) { eb.mu.RLock() listeners := eb.listeners[event.Type] eb.mu.RUnlock() for _, listener := range listeners { func(l EventListener) { defer func() { if r := recover(); r != nil { // 防止 panic } }() l(event) }(listener) } } // Clear 清空所有监听器 func (eb *EventBus) Clear() { eb.mu.Lock() defer eb.mu.Unlock() eb.listeners = make(map[EventType][]EventListener) } // HasListeners 检查是否有监听器 func (eb *EventBus) HasListeners(eventType EventType) bool { eb.mu.RLock() defer eb.mu.RUnlock() listeners, exists := eb.listeners[eventType] return exists && len(listeners) > 0 } // ListenerCount 获取监听器数量 func (eb *EventBus) ListenerCount(eventType EventType) int { eb.mu.RLock() defer eb.mu.RUnlock() return len(eb.listeners[eventType]) }