Files
seqlog/loghub.go
bourdon bcc328b129 重构:TopicProcessor 状态管理系统与 Reset 方法优化
新增功能:
- 添加 ProcessorState 状态类型(Idle/Starting/Running/Stopping/Stopped/Resetting/Error)
- 添加 ProcessorStatus 结构体和状态管理方法(GetState/GetStatus/setState)
- 实现状态转换逻辑和访问控制(CanWrite/CanQuery)
- 新增 CanReset() 方法检查是否可执行重置操作

Reset 方法优化:
- 重写 Reset() 方法,不再停止 processor
- 只有在无待处理记录时才能执行重置
- 进入 Resetting 状态期间阻止所有读写操作
- 重置后自动恢复到之前的运行状态
- 正确关闭并重置 cursor 和 stats 组件
- 调整执行顺序:先关闭组件,再删除文件,后重新初始化

错误处理增强:
- 添加 ErrProcessorResetting 和 ErrInvalidState 错误类型
- 添加 EventStateChanged 事件类型
- 修复 writer/index 为 nil 时的空指针问题

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 18:56:52 +08:00

594 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"fmt"
"log/slog"
"os"
"strings"
"sync"
)
// LogHub 日志中枢,统一管理多个 topic 的日志分发
//
// 自动恢复机制:
// - Start() 时自动扫描 baseDir 中所有 .log 文件
// - 为每个发现的日志文件创建 processor
// - 使用 .pos 文件保存的游标位置恢复处理进度
// - 只处理上次中断后新增的日志,避免重复处理
type LogHub struct {
baseDir string
processors map[string]*TopicProcessor
defaultHandler TopicRecordHandler
defaultConfig *TailConfig
logger *slog.Logger // 用于内部日志记录
globalEventBus *EventBus // 全局事件总线
pendingSubscribers map[EventType][]EventListener
mu sync.RWMutex
running bool
}
// NewLogHub 创建一个新的日志中枢
// logger: 内部日志记录器,如果不需要可以传 slog.Default()
func NewLogHub(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *LogHub {
if logger == nil {
logger = slog.Default()
}
return &LogHub{
baseDir: baseDir,
processors: make(map[string]*TopicProcessor),
defaultHandler: defaultHandler,
globalEventBus: NewEventBus(),
pendingSubscribers: make(map[EventType][]EventListener),
defaultConfig: &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
},
logger: logger,
}
}
// SetDefaultTailConfig 设置默认的 tail 配置
func (s *LogHub) SetDefaultTailConfig(config *TailConfig) {
s.mu.Lock()
defer s.mu.Unlock()
if config != nil {
s.defaultConfig = config
}
}
// RegisterHandler 为指定 topic 注册 handler
func (s *LogHub) RegisterHandler(topic string, handler RecordHandler) error {
return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler})
}
// RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置
// 注意handler 为必填参数,如果 topic 已存在则返回错误
func (s *LogHub) RegisterHandlerWithConfig(topic string, config *TopicConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
processor := s.processors[topic]
if processor == nil {
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("creating new processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor", "topic", topic, "error", err)
return fmt.Errorf("failed to create processor for topic %s: %w", topic, err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
} else {
// Processor 已存在handler 不可更新
return NewTopicError(topic, "register", ErrAlreadyRegistered)
}
s.logger.Info("handler registered", "topic", topic)
return nil
}
// Write 写入日志到指定 topic
func (s *LogHub) Write(topic string, data []byte) (int64, error) {
processor, err := s.getOrCreateProcessor(topic)
if err != nil {
s.logger.Error("failed to get processor", "topic", topic, "error", err)
return 0, fmt.Errorf("failed to get processor for topic %s: %w", topic, err)
}
offset, err := processor.Write(data)
if err != nil {
s.logger.Error("failed to write", "topic", topic, "error", err)
return 0, err
}
s.logger.Debug("write success", "topic", topic, "offset", offset, "size", len(data))
return offset, nil
}
// Start 启动 LogHub 和所有已注册的 processor
func (s *LogHub) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return ErrAlreadyRunning
}
s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors))
// 自动发现 baseDir 中已存在的日志文件
if err := s.discoverExistingTopics(); err != nil {
s.logger.Warn("failed to discover existing topics", "error", err)
}
// 启动所有已存在的 processor
for topic, processor := range s.processors {
if err := processor.Start(); err != nil {
// 忽略文件不存在的错误,因为可能还没写入
// processor 会在第一次写入时自动创建 writer 和 tailer
s.logger.Debug("processor start skipped", "topic", topic, "error", err)
} else {
s.logger.Debug("processor started", "topic", topic)
}
}
s.running = true
s.logger.Info("seqlog started successfully", "total_processors", len(s.processors))
return nil
}
// discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor
// 注意:此方法需要在持有锁的情况下调用
func (s *LogHub) discoverExistingTopics() error {
// 确保目录存在
if err := os.MkdirAll(s.baseDir, 0755); err != nil {
return fmt.Errorf("failed to create base directory: %w", err)
}
// 读取目录中的所有 .log 文件
entries, err := os.ReadDir(s.baseDir)
if err != nil {
return fmt.Errorf("failed to read base directory: %w", err)
}
discovered := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
// 只处理 .log 文件,忽略 .pos 位置文件
if !strings.HasSuffix(name, ".log") {
continue
}
// 提取 topic 名称(去掉 .log 后缀)
topic := strings.TrimSuffix(name, ".log")
// 如果 processor 已存在,跳过
if _, exists := s.processors[topic]; exists {
continue
}
// 创建 processor使用默认配置和 handler
s.logger.Info("discovered existing topic", "topic", topic)
var config *TopicConfig
if s.defaultHandler != nil {
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
}
topicLogger := s.logger.With("topic", topic)
processor, err := NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor for discovered topic", "topic", topic, "error", err)
continue
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
discovered++
}
if discovered > 0 {
s.logger.Info("auto-discovered topics", "count", discovered)
}
return nil
}
// Stop 停止所有 processor
func (s *LogHub) Stop() error {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return nil
}
s.logger.Info("stopping seqlog")
s.running = false
processors := make([]*TopicProcessor, 0, len(s.processors))
for _, p := range s.processors {
processors = append(processors, p)
}
s.mu.Unlock()
// 停止并清理所有 processor
for _, processor := range processors {
topic := processor.Topic()
// 先停止
if err := processor.Stop(); err != nil {
s.logger.Error("failed to stop processor", "topic", topic, "error", err)
return fmt.Errorf("failed to stop processor for topic %s: %w", topic, err)
}
s.logger.Debug("processor stopped", "topic", topic)
// 再清理资源
if err := processor.Close(); err != nil {
s.logger.Error("failed to close processor", "topic", topic, "error", err)
// 继续清理其他 processor不返回错误
}
}
s.logger.Info("seqlog stopped successfully")
return nil
}
// getOrCreateProcessor 获取或创建指定 topic 的 processor使用默认配置
// 如果没有 defaultHandler使用空 handlerno-op
func (s *LogHub) getOrCreateProcessor(topic string) (*TopicProcessor, error) {
// 创建默认配置
var config *TopicConfig
if s.defaultHandler != nil {
// 使用默认 handler包装成 RecordHandler
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
} else {
// 没有 defaultHandler检查是否已存在
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
// 使用空 handlerno-op允许只写入不处理
config = &TopicConfig{
Handler: func(rec *Record) error {
return nil // 空处理,什么都不做
},
TailConfig: s.defaultConfig,
}
}
return s.getOrCreateProcessorWithConfig(topic, config)
}
// getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor使用指定配置
func (s *LogHub) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
s.mu.Lock()
defer s.mu.Unlock()
// 双重检查
if processor, exists := s.processors[topic]; exists {
return processor, nil
}
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("auto-creating processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
topicLogger.Error("failed to create processor", "error", err)
return nil, fmt.Errorf("failed to create processor: %w", err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
// 如果正在运行,立即启动 processor
if s.running {
if err := processor.Start(); err != nil {
topicLogger.Error("failed to start processor", "error", err)
return nil, fmt.Errorf("failed to start processor: %w", err)
}
topicLogger.Debug("processor auto-started")
}
return processor, nil
}
// GetTopics 获取所有已知的 topic
func (s *LogHub) GetTopics() []string {
s.mu.RLock()
defer s.mu.RUnlock()
topics := make([]string, 0, len(s.processors))
for topic := range s.processors {
topics = append(topics, topic)
}
return topics
}
// IsRunning 检查 LogHub 是否正在运行
func (s *LogHub) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.running
}
// UpdateTopicConfig 动态更新指定 topic 的 tail 配置
func (s *LogHub) UpdateTopicConfig(topic string, config *TailConfig) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return NewTopicError(topic, "operation", ErrNotFound)
}
return processor.UpdateTailConfig(config)
}
// GetTopicConfig 获取指定 topic 的 tail 配置
func (s *LogHub) GetTopicConfig(topic string) (*TailConfig, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor.GetTailConfig(), nil
}
// GetTopicStats 获取指定 topic 的统计信息
func (s *LogHub) GetTopicStats(topic string) (Stats, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return Stats{}, NewTopicError(topic, "get-stats", ErrNotFound)
}
return processor.GetStats(), nil
}
// GetAllStats 获取所有 topic 的统计信息
func (s *LogHub) GetAllStats() map[string]Stats {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]Stats, len(s.processors))
for topic, processor := range s.processors {
result[topic] = processor.GetStats()
}
return result
}
// NewTopicQuery 为指定 topic 获取查询器(返回共享实例)
func (s *LogHub) NewTopicQuery(topic string) (*RecordQuery, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor.Query()
}
// GetProcessingIndex 获取指定 topic 的当前处理索引
func (s *LogHub) GetProcessingIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetProcessingIndex()
}
// GetReadIndex 获取指定 topic 的当前读取索引
func (s *LogHub) GetReadIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetReadIndex()
}
// GetProcessor 获取指定 topic 的 processor
func (s *LogHub) GetProcessor(topic string) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor, nil
}
// QueryFromProcessing 从当前处理窗口的开始位置向索引递增方向查询记录
// topic: 主题名称
// count: 查询数量
func (s *LogHub) QueryFromProcessing(topic string, count int) ([]*RecordWithStatus, error) {
processor, err := s.GetProcessor(topic)
if err != nil {
return nil, err
}
return processor.QueryFromProcessing(count)
}
// QueryFromFirst 从第一条记录向索引递增方向查询
// topic: 主题名称
// count: 查询数量
func (s *LogHub) QueryFromFirst(topic string, count int) ([]*RecordWithStatus, error) {
processor, err := s.GetProcessor(topic)
if err != nil {
return nil, err
}
return processor.QueryFromFirst(count)
}
// QueryFromLast 从最后一条记录向索引递减方向查询
// topic: 主题名称
// count: 查询数量
func (s *LogHub) QueryFromLast(topic string, count int) ([]*RecordWithStatus, error) {
processor, err := s.GetProcessor(topic)
if err != nil {
return nil, err
}
return processor.QueryFromLast(count)
}
// Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅)
func (s *LogHub) Subscribe(topic string, eventType EventType, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.Subscribe(eventType, listener)
} else {
// topic 还不存在,保存待处理的订阅
// 使用包装函数,只转发给对应 topic 的事件
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
// 同时订阅到全局事件总线
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
return nil
}
// SubscribeAll 为指定 topic 订阅所有事件
func (s *LogHub) SubscribeAll(topic string, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.SubscribeAll(listener)
} else {
// topic 还不存在,为所有事件类型保存待处理的订阅
allTypes := []EventType{
EventWriteSuccess,
EventWriteError,
EventProcessSuccess,
EventProcessError,
EventProcessorStart,
EventProcessorStop,
EventProcessorReset,
EventPositionSaved,
}
for _, eventType := range allTypes {
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
}
return nil
}
// SubscribeAllTopics 为所有 topic 订阅指定事件
func (s *LogHub) SubscribeAllTopics(eventType EventType, listener EventListener) {
s.mu.Lock()
defer s.mu.Unlock()
// 为已存在的 processor 订阅
for _, processor := range s.processors {
processor.Subscribe(eventType, listener)
}
// 保存为全局订阅,新创建的 processor 也会自动订阅
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], listener)
s.globalEventBus.Subscribe(eventType, listener)
}
// ResetTopic 重置指定 topic 的所有数据
// 如果 processor 正在运行且没有待处理的日志,会自动停止后重置
// 如果有待处理的日志,则返回错误
func (s *LogHub) ResetTopic(topic string) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return NewTopicError(topic, "operation", ErrNotFound)
}
// 执行重置(如果没有待处理的日志会自动停止)
if err := processor.Reset(); err != nil {
return fmt.Errorf("failed to reset processor: %w", err)
}
// 如果 LogHub 正在运行且 processor 未运行,启动 processor
// 注意:如果 Reset() 已经自动恢复到 Running 状态,就不需要再启动
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if running && processor.GetState() != StateRunning {
if err := processor.Start(); err != nil {
return fmt.Errorf("failed to restart processor: %w", err)
}
}
return nil
}