package seqlog import ( "fmt" "log/slog" "os" "strings" "sync" ) // LogHub 日志中枢,统一管理多个 topic 的日志分发 // // 自动恢复机制: // - Start() 时自动扫描 baseDir 中所有 .log 文件 // - 为每个发现的日志文件创建 processor // - 使用 .pos 文件保存的游标位置恢复处理进度 // - 只处理上次中断后新增的日志,避免重复处理 type LogHub struct { baseDir string processors map[string]*TopicProcessor defaultHandler TopicRecordHandler defaultConfig *TailConfig logger *slog.Logger // 用于内部日志记录 globalEventBus *EventBus // 全局事件总线 pendingSubscribers map[EventType][]EventListener mu sync.RWMutex running bool } // NewLogHub 创建一个新的日志中枢 // logger: 内部日志记录器,如果不需要可以传 slog.Default() func NewLogHub(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *LogHub { if logger == nil { logger = slog.Default() } return &LogHub{ baseDir: baseDir, processors: make(map[string]*TopicProcessor), defaultHandler: defaultHandler, globalEventBus: NewEventBus(), pendingSubscribers: make(map[EventType][]EventListener), defaultConfig: &TailConfig{ PollInterval: 100 * 1000000, // 100ms SaveInterval: 1000 * 1000000, // 1s }, logger: logger, } } // SetDefaultTailConfig 设置默认的 tail 配置 func (s *LogHub) SetDefaultTailConfig(config *TailConfig) { s.mu.Lock() defer s.mu.Unlock() if config != nil { s.defaultConfig = config } } // RegisterHandler 为指定 topic 注册 handler func (s *LogHub) RegisterHandler(topic string, handler RecordHandler) error { return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler}) } // RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置 // 注意:handler 为必填参数,如果 topic 已存在则返回错误 func (s *LogHub) RegisterHandlerWithConfig(topic string, config *TopicConfig) error { s.mu.Lock() defer s.mu.Unlock() processor := s.processors[topic] if processor == nil { // 创建新的 processor,使用带 topic 属性的 logger topicLogger := s.logger.With("topic", topic) topicLogger.Debug("creating new processor") var err error processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config) if err != nil { s.logger.Error("failed to create processor", "topic", topic, "error", err) return fmt.Errorf("failed to create processor for topic %s: %w", topic, err) } s.processors[topic] = processor // 订阅 processor 的所有事件,转发到全局事件总线 processor.SubscribeAll(func(event *Event) { s.globalEventBus.Publish(event) }) } else { // Processor 已存在,handler 不可更新 return NewTopicError(topic, "register", ErrAlreadyRegistered) } s.logger.Info("handler registered", "topic", topic) return nil } // Write 写入日志到指定 topic func (s *LogHub) Write(topic string, data []byte) (int64, error) { processor, err := s.getOrCreateProcessor(topic) if err != nil { s.logger.Error("failed to get processor", "topic", topic, "error", err) return 0, fmt.Errorf("failed to get processor for topic %s: %w", topic, err) } offset, err := processor.Write(data) if err != nil { s.logger.Error("failed to write", "topic", topic, "error", err) return 0, err } s.logger.Debug("write success", "topic", topic, "offset", offset, "size", len(data)) return offset, nil } // Start 启动 LogHub 和所有已注册的 processor func (s *LogHub) Start() error { s.mu.Lock() defer s.mu.Unlock() if s.running { return ErrAlreadyRunning } s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors)) // 自动发现 baseDir 中已存在的日志文件 if err := s.discoverExistingTopics(); err != nil { s.logger.Warn("failed to discover existing topics", "error", err) } // 启动所有已存在的 processor for topic, processor := range s.processors { if err := processor.Start(); err != nil { // 忽略文件不存在的错误,因为可能还没写入 // processor 会在第一次写入时自动创建 writer 和 tailer s.logger.Debug("processor start skipped", "topic", topic, "error", err) } else { s.logger.Debug("processor started", "topic", topic) } } s.running = true s.logger.Info("seqlog started successfully", "total_processors", len(s.processors)) return nil } // discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor // 注意:此方法需要在持有锁的情况下调用 func (s *LogHub) discoverExistingTopics() error { // 确保目录存在 if err := os.MkdirAll(s.baseDir, 0755); err != nil { return fmt.Errorf("failed to create base directory: %w", err) } // 读取目录中的所有 .log 文件 entries, err := os.ReadDir(s.baseDir) if err != nil { return fmt.Errorf("failed to read base directory: %w", err) } discovered := 0 for _, entry := range entries { if entry.IsDir() { continue } name := entry.Name() // 只处理 .log 文件,忽略 .pos 位置文件 if !strings.HasSuffix(name, ".log") { continue } // 提取 topic 名称(去掉 .log 后缀) topic := strings.TrimSuffix(name, ".log") // 如果 processor 已存在,跳过 if _, exists := s.processors[topic]; exists { continue } // 创建 processor(使用默认配置和 handler) s.logger.Info("discovered existing topic", "topic", topic) var config *TopicConfig if s.defaultHandler != nil { topicName := topic handler := func(rec *Record) error { return s.defaultHandler(topicName, rec) } config = &TopicConfig{ Handler: handler, TailConfig: s.defaultConfig, } } topicLogger := s.logger.With("topic", topic) processor, err := NewTopicProcessor(s.baseDir, topic, topicLogger, config) if err != nil { s.logger.Error("failed to create processor for discovered topic", "topic", topic, "error", err) continue } s.processors[topic] = processor // 订阅 processor 的所有事件,转发到全局事件总线 processor.SubscribeAll(func(event *Event) { s.globalEventBus.Publish(event) }) discovered++ } if discovered > 0 { s.logger.Info("auto-discovered topics", "count", discovered) } return nil } // Stop 停止所有 processor func (s *LogHub) Stop() error { s.mu.Lock() if !s.running { s.mu.Unlock() return nil } s.logger.Info("stopping seqlog") s.running = false processors := make([]*TopicProcessor, 0, len(s.processors)) for _, p := range s.processors { processors = append(processors, p) } s.mu.Unlock() // 停止并清理所有 processor for _, processor := range processors { topic := processor.Topic() // 先停止 if err := processor.Stop(); err != nil { s.logger.Error("failed to stop processor", "topic", topic, "error", err) return fmt.Errorf("failed to stop processor for topic %s: %w", topic, err) } s.logger.Debug("processor stopped", "topic", topic) // 再清理资源 if err := processor.Close(); err != nil { s.logger.Error("failed to close processor", "topic", topic, "error", err) // 继续清理其他 processor,不返回错误 } } s.logger.Info("seqlog stopped successfully") return nil } // getOrCreateProcessor 获取或创建指定 topic 的 processor(使用默认配置) // 如果没有 defaultHandler,使用空 handler(no-op) func (s *LogHub) getOrCreateProcessor(topic string) (*TopicProcessor, error) { // 创建默认配置 var config *TopicConfig if s.defaultHandler != nil { // 使用默认 handler,包装成 RecordHandler topicName := topic handler := func(rec *Record) error { return s.defaultHandler(topicName, rec) } config = &TopicConfig{ Handler: handler, TailConfig: s.defaultConfig, } } else { // 没有 defaultHandler,检查是否已存在 s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if exists { return processor, nil } // 使用空 handler(no-op),允许只写入不处理 config = &TopicConfig{ Handler: func(rec *Record) error { return nil // 空处理,什么都不做 }, TailConfig: s.defaultConfig, } } return s.getOrCreateProcessorWithConfig(topic, config) } // getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor(使用指定配置) func (s *LogHub) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if exists { return processor, nil } s.mu.Lock() defer s.mu.Unlock() // 双重检查 if processor, exists := s.processors[topic]; exists { return processor, nil } // 创建新的 processor,使用带 topic 属性的 logger topicLogger := s.logger.With("topic", topic) topicLogger.Debug("auto-creating processor") var err error processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config) if err != nil { topicLogger.Error("failed to create processor", "error", err) return nil, fmt.Errorf("failed to create processor: %w", err) } s.processors[topic] = processor // 订阅 processor 的所有事件,转发到全局事件总线 processor.SubscribeAll(func(event *Event) { s.globalEventBus.Publish(event) }) // 如果正在运行,立即启动 processor if s.running { if err := processor.Start(); err != nil { topicLogger.Error("failed to start processor", "error", err) return nil, fmt.Errorf("failed to start processor: %w", err) } topicLogger.Debug("processor auto-started") } return processor, nil } // GetTopics 获取所有已知的 topic func (s *LogHub) GetTopics() []string { s.mu.RLock() defer s.mu.RUnlock() topics := make([]string, 0, len(s.processors)) for topic := range s.processors { topics = append(topics, topic) } return topics } // IsRunning 检查 LogHub 是否正在运行 func (s *LogHub) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() return s.running } // UpdateTopicConfig 动态更新指定 topic 的 tail 配置 func (s *LogHub) UpdateTopicConfig(topic string, config *TailConfig) error { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return NewTopicError(topic, "operation", ErrNotFound) } return processor.UpdateTailConfig(config) } // GetTopicConfig 获取指定 topic 的 tail 配置 func (s *LogHub) GetTopicConfig(topic string) (*TailConfig, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return nil, NewTopicError(topic, "get", ErrNotFound) } return processor.GetTailConfig(), nil } // GetTopicStats 获取指定 topic 的统计信息 func (s *LogHub) GetTopicStats(topic string) (Stats, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return Stats{}, NewTopicError(topic, "get-stats", ErrNotFound) } return processor.GetStats(), nil } // GetAllStats 获取所有 topic 的统计信息 func (s *LogHub) GetAllStats() map[string]Stats { s.mu.RLock() defer s.mu.RUnlock() result := make(map[string]Stats, len(s.processors)) for topic, processor := range s.processors { result[topic] = processor.GetStats() } return result } // NewTopicQuery 为指定 topic 获取查询器(返回共享实例) func (s *LogHub) NewTopicQuery(topic string) (*RecordQuery, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return nil, NewTopicError(topic, "get", ErrNotFound) } return processor.Query() } // GetProcessingIndex 获取指定 topic 的当前处理索引 func (s *LogHub) GetProcessingIndex(topic string) int { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return 0 } return processor.GetProcessingIndex() } // GetReadIndex 获取指定 topic 的当前读取索引 func (s *LogHub) GetReadIndex(topic string) int { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return 0 } return processor.GetReadIndex() } // GetProcessor 获取指定 topic 的 processor func (s *LogHub) GetProcessor(topic string) (*TopicProcessor, error) { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return nil, NewTopicError(topic, "get", ErrNotFound) } return processor, nil } // QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录 // topic: 主题名称 // count: 查询数量 func (s *LogHub) QueryFromProcessing(topic string, count int) ([]*RecordWithStatus, error) { processor, err := s.GetProcessor(topic) if err != nil { return nil, err } return processor.QueryFromProcessing(count) } // QueryFromFirst 从第一条记录向索引递增方向查询 // topic: 主题名称 // count: 查询数量 func (s *LogHub) QueryFromFirst(topic string, count int) ([]*RecordWithStatus, error) { processor, err := s.GetProcessor(topic) if err != nil { return nil, err } return processor.QueryFromFirst(count) } // QueryFromLast 从最后一条记录向索引递减方向查询 // topic: 主题名称 // count: 查询数量 func (s *LogHub) QueryFromLast(topic string, count int) ([]*RecordWithStatus, error) { processor, err := s.GetProcessor(topic) if err != nil { return nil, err } return processor.QueryFromLast(count) } // Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅) func (s *LogHub) Subscribe(topic string, eventType EventType, listener EventListener) error { s.mu.Lock() defer s.mu.Unlock() processor, exists := s.processors[topic] if exists { processor.Subscribe(eventType, listener) } else { // topic 还不存在,保存待处理的订阅 // 使用包装函数,只转发给对应 topic 的事件 wrappedListener := func(event *Event) { if event.Topic == topic { listener(event) } } s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener) // 同时订阅到全局事件总线 s.globalEventBus.Subscribe(eventType, wrappedListener) } return nil } // SubscribeAll 为指定 topic 订阅所有事件 func (s *LogHub) SubscribeAll(topic string, listener EventListener) error { s.mu.Lock() defer s.mu.Unlock() processor, exists := s.processors[topic] if exists { processor.SubscribeAll(listener) } else { // topic 还不存在,为所有事件类型保存待处理的订阅 allTypes := []EventType{ EventWriteSuccess, EventWriteError, EventProcessSuccess, EventProcessError, EventProcessorStart, EventProcessorStop, EventProcessorReset, EventPositionSaved, } for _, eventType := range allTypes { wrappedListener := func(event *Event) { if event.Topic == topic { listener(event) } } s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener) s.globalEventBus.Subscribe(eventType, wrappedListener) } } return nil } // SubscribeAllTopics 为所有 topic 订阅指定事件 func (s *LogHub) SubscribeAllTopics(eventType EventType, listener EventListener) { s.mu.Lock() defer s.mu.Unlock() // 为已存在的 processor 订阅 for _, processor := range s.processors { processor.Subscribe(eventType, listener) } // 保存为全局订阅,新创建的 processor 也会自动订阅 s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], listener) s.globalEventBus.Subscribe(eventType, listener) } // ResetTopic 重置指定 topic 的所有数据 // 如果 processor 正在运行且没有待处理的日志,会自动停止后重置 // 如果有待处理的日志,则返回错误 func (s *LogHub) ResetTopic(topic string) error { s.mu.RLock() processor, exists := s.processors[topic] s.mu.RUnlock() if !exists { return NewTopicError(topic, "operation", ErrNotFound) } // 执行重置(如果没有待处理的日志会自动停止) if err := processor.Reset(); err != nil { return fmt.Errorf("failed to reset processor: %w", err) } // 如果 LogHub 正在运行且 processor 未运行,启动 processor // 注意:如果 Reset() 已经自动恢复到 Running 状态,就不需要再启动 s.mu.RLock() running := s.running s.mu.RUnlock() if running && processor.GetState() != StateRunning { if err := processor.Start(); err != nil { return fmt.Errorf("failed to restart processor: %w", err) } } return nil }