561 lines
15 KiB
Go
561 lines
15 KiB
Go
|
|
package seqlog
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"fmt"
|
|||
|
|
"log/slog"
|
|||
|
|
"os"
|
|||
|
|
"strings"
|
|||
|
|
"sync"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// Seqlog 日志管理器,统一管理多个 topic 的日志分发
|
|||
|
|
//
|
|||
|
|
// 自动恢复机制:
|
|||
|
|
// - Start() 时自动扫描 baseDir 中所有 .log 文件
|
|||
|
|
// - 为每个发现的日志文件创建 processor
|
|||
|
|
// - 使用 .pos 文件保存的游标位置恢复处理进度
|
|||
|
|
// - 只处理上次中断后新增的日志,避免重复处理
|
|||
|
|
type Seqlog struct {
|
|||
|
|
baseDir string
|
|||
|
|
processors map[string]*TopicProcessor
|
|||
|
|
defaultHandler TopicRecordHandler
|
|||
|
|
defaultConfig *TailConfig
|
|||
|
|
logger *slog.Logger // 用于内部日志记录
|
|||
|
|
globalEventBus *EventBus // 全局事件总线
|
|||
|
|
pendingSubscribers map[EventType][]EventListener
|
|||
|
|
mu sync.RWMutex
|
|||
|
|
running bool
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewSeqlog 创建一个新的日志管理器
|
|||
|
|
// logger: 内部日志记录器,如果不需要可以传 slog.Default()
|
|||
|
|
func NewSeqlog(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *Seqlog {
|
|||
|
|
if logger == nil {
|
|||
|
|
logger = slog.Default()
|
|||
|
|
}
|
|||
|
|
return &Seqlog{
|
|||
|
|
baseDir: baseDir,
|
|||
|
|
processors: make(map[string]*TopicProcessor),
|
|||
|
|
defaultHandler: defaultHandler,
|
|||
|
|
globalEventBus: NewEventBus(),
|
|||
|
|
pendingSubscribers: make(map[EventType][]EventListener),
|
|||
|
|
defaultConfig: &TailConfig{
|
|||
|
|
PollInterval: 100 * 1000000, // 100ms
|
|||
|
|
SaveInterval: 1000 * 1000000, // 1s
|
|||
|
|
},
|
|||
|
|
logger: logger,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SetDefaultTailConfig 设置默认的 tail 配置
|
|||
|
|
func (s *Seqlog) SetDefaultTailConfig(config *TailConfig) {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
if config != nil {
|
|||
|
|
s.defaultConfig = config
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RegisterHandler 为指定 topic 注册 handler
|
|||
|
|
func (s *Seqlog) RegisterHandler(topic string, handler RecordHandler) error {
|
|||
|
|
return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler})
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置
|
|||
|
|
// 注意:handler 为必填参数,如果 topic 已存在则返回错误
|
|||
|
|
func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) error {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
processor := s.processors[topic]
|
|||
|
|
if processor == nil {
|
|||
|
|
// 创建新的 processor,使用带 topic 属性的 logger
|
|||
|
|
topicLogger := s.logger.With("topic", topic)
|
|||
|
|
topicLogger.Debug("creating new processor")
|
|||
|
|
var err error
|
|||
|
|
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
|
|||
|
|
if err != nil {
|
|||
|
|
s.logger.Error("failed to create processor", "topic", topic, "error", err)
|
|||
|
|
return fmt.Errorf("failed to create processor for topic %s: %w", topic, err)
|
|||
|
|
}
|
|||
|
|
s.processors[topic] = processor
|
|||
|
|
|
|||
|
|
// 订阅 processor 的所有事件,转发到全局事件总线
|
|||
|
|
processor.SubscribeAll(func(event *Event) {
|
|||
|
|
s.globalEventBus.Publish(event)
|
|||
|
|
})
|
|||
|
|
} else {
|
|||
|
|
// Processor 已存在,handler 不可更新
|
|||
|
|
return fmt.Errorf("handler already registered for topic %s", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.Info("handler registered", "topic", topic)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Write 写入日志到指定 topic
|
|||
|
|
func (s *Seqlog) Write(topic string, data []byte) (int64, error) {
|
|||
|
|
processor, err := s.getOrCreateProcessor(topic)
|
|||
|
|
if err != nil {
|
|||
|
|
s.logger.Error("failed to get processor", "topic", topic, "error", err)
|
|||
|
|
return 0, fmt.Errorf("failed to get processor for topic %s: %w", topic, err)
|
|||
|
|
}
|
|||
|
|
offset, err := processor.Write(data)
|
|||
|
|
if err != nil {
|
|||
|
|
s.logger.Error("failed to write", "topic", topic, "error", err)
|
|||
|
|
return 0, err
|
|||
|
|
}
|
|||
|
|
s.logger.Debug("write success", "topic", topic, "offset", offset, "size", len(data))
|
|||
|
|
return offset, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Start 启动 Seqlog 和所有已注册的 processor
|
|||
|
|
func (s *Seqlog) Start() error {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
if s.running {
|
|||
|
|
return fmt.Errorf("seqlog is already running")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors))
|
|||
|
|
|
|||
|
|
// 自动发现 baseDir 中已存在的日志文件
|
|||
|
|
if err := s.discoverExistingTopics(); err != nil {
|
|||
|
|
s.logger.Warn("failed to discover existing topics", "error", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 启动所有已存在的 processor
|
|||
|
|
for topic, processor := range s.processors {
|
|||
|
|
if err := processor.Start(); err != nil {
|
|||
|
|
// 忽略文件不存在的错误,因为可能还没写入
|
|||
|
|
// processor 会在第一次写入时自动创建 writer 和 tailer
|
|||
|
|
s.logger.Debug("processor start skipped", "topic", topic, "error", err)
|
|||
|
|
} else {
|
|||
|
|
s.logger.Debug("processor started", "topic", topic)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.running = true
|
|||
|
|
s.logger.Info("seqlog started successfully", "total_processors", len(s.processors))
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor
|
|||
|
|
// 注意:此方法需要在持有锁的情况下调用
|
|||
|
|
func (s *Seqlog) discoverExistingTopics() error {
|
|||
|
|
// 确保目录存在
|
|||
|
|
if err := os.MkdirAll(s.baseDir, 0755); err != nil {
|
|||
|
|
return fmt.Errorf("failed to create base directory: %w", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 读取目录中的所有 .log 文件
|
|||
|
|
entries, err := os.ReadDir(s.baseDir)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("failed to read base directory: %w", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
discovered := 0
|
|||
|
|
for _, entry := range entries {
|
|||
|
|
if entry.IsDir() {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
name := entry.Name()
|
|||
|
|
// 只处理 .log 文件,忽略 .pos 位置文件
|
|||
|
|
if !strings.HasSuffix(name, ".log") {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 提取 topic 名称(去掉 .log 后缀)
|
|||
|
|
topic := strings.TrimSuffix(name, ".log")
|
|||
|
|
|
|||
|
|
// 如果 processor 已存在,跳过
|
|||
|
|
if _, exists := s.processors[topic]; exists {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建 processor(使用默认配置和 handler)
|
|||
|
|
s.logger.Info("discovered existing topic", "topic", topic)
|
|||
|
|
var config *TopicConfig
|
|||
|
|
if s.defaultHandler != nil {
|
|||
|
|
topicName := topic
|
|||
|
|
handler := func(rec *Record) error {
|
|||
|
|
return s.defaultHandler(topicName, rec)
|
|||
|
|
}
|
|||
|
|
config = &TopicConfig{
|
|||
|
|
Handler: handler,
|
|||
|
|
TailConfig: s.defaultConfig,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
topicLogger := s.logger.With("topic", topic)
|
|||
|
|
processor, err := NewTopicProcessor(s.baseDir, topic, topicLogger, config)
|
|||
|
|
if err != nil {
|
|||
|
|
s.logger.Error("failed to create processor for discovered topic", "topic", topic, "error", err)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
s.processors[topic] = processor
|
|||
|
|
|
|||
|
|
// 订阅 processor 的所有事件,转发到全局事件总线
|
|||
|
|
processor.SubscribeAll(func(event *Event) {
|
|||
|
|
s.globalEventBus.Publish(event)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
discovered++
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if discovered > 0 {
|
|||
|
|
s.logger.Info("auto-discovered topics", "count", discovered)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Stop 停止所有 processor
|
|||
|
|
func (s *Seqlog) Stop() error {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
if !s.running {
|
|||
|
|
s.mu.Unlock()
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
s.logger.Info("stopping seqlog")
|
|||
|
|
s.running = false
|
|||
|
|
processors := make([]*TopicProcessor, 0, len(s.processors))
|
|||
|
|
for _, p := range s.processors {
|
|||
|
|
processors = append(processors, p)
|
|||
|
|
}
|
|||
|
|
s.mu.Unlock()
|
|||
|
|
|
|||
|
|
// 停止并清理所有 processor
|
|||
|
|
for _, processor := range processors {
|
|||
|
|
topic := processor.Topic()
|
|||
|
|
// 先停止
|
|||
|
|
if err := processor.Stop(); err != nil {
|
|||
|
|
s.logger.Error("failed to stop processor", "topic", topic, "error", err)
|
|||
|
|
return fmt.Errorf("failed to stop processor for topic %s: %w", topic, err)
|
|||
|
|
}
|
|||
|
|
s.logger.Debug("processor stopped", "topic", topic)
|
|||
|
|
|
|||
|
|
// 再清理资源
|
|||
|
|
if err := processor.Close(); err != nil {
|
|||
|
|
s.logger.Error("failed to close processor", "topic", topic, "error", err)
|
|||
|
|
// 继续清理其他 processor,不返回错误
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.logger.Info("seqlog stopped successfully")
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// getOrCreateProcessor 获取或创建指定 topic 的 processor(使用默认配置)
|
|||
|
|
// 如果没有 defaultHandler,使用空 handler(no-op)
|
|||
|
|
func (s *Seqlog) getOrCreateProcessor(topic string) (*TopicProcessor, error) {
|
|||
|
|
// 创建默认配置
|
|||
|
|
var config *TopicConfig
|
|||
|
|
if s.defaultHandler != nil {
|
|||
|
|
// 使用默认 handler,包装成 RecordHandler
|
|||
|
|
topicName := topic
|
|||
|
|
handler := func(rec *Record) error {
|
|||
|
|
return s.defaultHandler(topicName, rec)
|
|||
|
|
}
|
|||
|
|
config = &TopicConfig{
|
|||
|
|
Handler: handler,
|
|||
|
|
TailConfig: s.defaultConfig,
|
|||
|
|
}
|
|||
|
|
} else {
|
|||
|
|
// 没有 defaultHandler,检查是否已存在
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
if exists {
|
|||
|
|
return processor, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 使用空 handler(no-op),允许只写入不处理
|
|||
|
|
config = &TopicConfig{
|
|||
|
|
Handler: func(rec *Record) error {
|
|||
|
|
return nil // 空处理,什么都不做
|
|||
|
|
},
|
|||
|
|
TailConfig: s.defaultConfig,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return s.getOrCreateProcessorWithConfig(topic, config)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor(使用指定配置)
|
|||
|
|
func (s *Seqlog) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if exists {
|
|||
|
|
return processor, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
// 双重检查
|
|||
|
|
if processor, exists := s.processors[topic]; exists {
|
|||
|
|
return processor, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建新的 processor,使用带 topic 属性的 logger
|
|||
|
|
topicLogger := s.logger.With("topic", topic)
|
|||
|
|
topicLogger.Debug("auto-creating processor")
|
|||
|
|
var err error
|
|||
|
|
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
|
|||
|
|
if err != nil {
|
|||
|
|
topicLogger.Error("failed to create processor", "error", err)
|
|||
|
|
return nil, fmt.Errorf("failed to create processor: %w", err)
|
|||
|
|
}
|
|||
|
|
s.processors[topic] = processor
|
|||
|
|
|
|||
|
|
// 订阅 processor 的所有事件,转发到全局事件总线
|
|||
|
|
processor.SubscribeAll(func(event *Event) {
|
|||
|
|
s.globalEventBus.Publish(event)
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
// 如果正在运行,立即启动 processor
|
|||
|
|
if s.running {
|
|||
|
|
if err := processor.Start(); err != nil {
|
|||
|
|
topicLogger.Error("failed to start processor", "error", err)
|
|||
|
|
return nil, fmt.Errorf("failed to start processor: %w", err)
|
|||
|
|
}
|
|||
|
|
topicLogger.Debug("processor auto-started")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetTopics 获取所有已知的 topic
|
|||
|
|
func (s *Seqlog) GetTopics() []string {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
defer s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
topics := make([]string, 0, len(s.processors))
|
|||
|
|
for topic := range s.processors {
|
|||
|
|
topics = append(topics, topic)
|
|||
|
|
}
|
|||
|
|
return topics
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// IsRunning 检查 Seqlog 是否正在运行
|
|||
|
|
func (s *Seqlog) IsRunning() bool {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
defer s.mu.RUnlock()
|
|||
|
|
return s.running
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// UpdateTopicConfig 动态更新指定 topic 的 tail 配置
|
|||
|
|
func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.UpdateTailConfig(config)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetTopicConfig 获取指定 topic 的 tail 配置
|
|||
|
|
func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return nil, fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.GetTailConfig(), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetTopicStats 获取指定 topic 的统计信息
|
|||
|
|
func (s *Seqlog) GetTopicStats(topic string) (Stats, error) {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return Stats{}, fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.GetStats(), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetAllStats 获取所有 topic 的统计信息
|
|||
|
|
func (s *Seqlog) GetAllStats() map[string]Stats {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
defer s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
result := make(map[string]Stats, len(s.processors))
|
|||
|
|
for topic, processor := range s.processors {
|
|||
|
|
result[topic] = processor.GetStats()
|
|||
|
|
}
|
|||
|
|
return result
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewTopicQuery 为指定 topic 获取查询器(返回共享实例)
|
|||
|
|
func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return nil, fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.Query(), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetProcessingIndex 获取指定 topic 的当前处理索引
|
|||
|
|
func (s *Seqlog) GetProcessingIndex(topic string) int {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.GetProcessingIndex()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetReadIndex 获取指定 topic 的当前读取索引
|
|||
|
|
func (s *Seqlog) GetReadIndex(topic string) int {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor.GetReadIndex()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetProcessor 获取指定 topic 的 processor
|
|||
|
|
func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return nil, fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return processor, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅)
|
|||
|
|
func (s *Seqlog) Subscribe(topic string, eventType EventType, listener EventListener) error {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
if exists {
|
|||
|
|
processor.Subscribe(eventType, listener)
|
|||
|
|
} else {
|
|||
|
|
// topic 还不存在,保存待处理的订阅
|
|||
|
|
// 使用包装函数,只转发给对应 topic 的事件
|
|||
|
|
wrappedListener := func(event *Event) {
|
|||
|
|
if event.Topic == topic {
|
|||
|
|
listener(event)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
|
|||
|
|
|
|||
|
|
// 同时订阅到全局事件总线
|
|||
|
|
s.globalEventBus.Subscribe(eventType, wrappedListener)
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SubscribeAll 为指定 topic 订阅所有事件
|
|||
|
|
func (s *Seqlog) SubscribeAll(topic string, listener EventListener) error {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
if exists {
|
|||
|
|
processor.SubscribeAll(listener)
|
|||
|
|
} else {
|
|||
|
|
// topic 还不存在,为所有事件类型保存待处理的订阅
|
|||
|
|
allTypes := []EventType{
|
|||
|
|
EventWriteSuccess,
|
|||
|
|
EventWriteError,
|
|||
|
|
EventProcessSuccess,
|
|||
|
|
EventProcessError,
|
|||
|
|
EventProcessorStart,
|
|||
|
|
EventProcessorStop,
|
|||
|
|
EventProcessorReset,
|
|||
|
|
EventPositionSaved,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for _, eventType := range allTypes {
|
|||
|
|
wrappedListener := func(event *Event) {
|
|||
|
|
if event.Topic == topic {
|
|||
|
|
listener(event)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
|
|||
|
|
s.globalEventBus.Subscribe(eventType, wrappedListener)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// SubscribeAllTopics 为所有 topic 订阅指定事件
|
|||
|
|
func (s *Seqlog) SubscribeAllTopics(eventType EventType, listener EventListener) {
|
|||
|
|
s.mu.Lock()
|
|||
|
|
defer s.mu.Unlock()
|
|||
|
|
|
|||
|
|
// 为已存在的 processor 订阅
|
|||
|
|
for _, processor := range s.processors {
|
|||
|
|
processor.Subscribe(eventType, listener)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 保存为全局订阅,新创建的 processor 也会自动订阅
|
|||
|
|
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], listener)
|
|||
|
|
s.globalEventBus.Subscribe(eventType, listener)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ResetTopic 重置指定 topic 的所有数据
|
|||
|
|
// 注意:必须先停止 Seqlog 或至少停止该 topic 的 processor
|
|||
|
|
func (s *Seqlog) ResetTopic(topic string) error {
|
|||
|
|
s.mu.RLock()
|
|||
|
|
processor, exists := s.processors[topic]
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if !exists {
|
|||
|
|
return fmt.Errorf("topic %s not found", topic)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 先停止 processor
|
|||
|
|
if err := processor.Stop(); err != nil {
|
|||
|
|
return fmt.Errorf("failed to stop processor: %w", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行重置
|
|||
|
|
if err := processor.Reset(); err != nil {
|
|||
|
|
return fmt.Errorf("failed to reset processor: %w", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 如果 seqlog 正在运行,重新启动 processor
|
|||
|
|
s.mu.RLock()
|
|||
|
|
running := s.running
|
|||
|
|
s.mu.RUnlock()
|
|||
|
|
|
|||
|
|
if running {
|
|||
|
|
if err := processor.Start(); err != nil {
|
|||
|
|
return fmt.Errorf("failed to restart processor: %w", err)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|