Files
seqlog/seqlog_manager.go
bourdon 6862de12ff 新增:统一的错误类型系统 (errors.go)
主要功能:
- 定义哨兵错误(Sentinel Errors):ErrNilParameter, ErrInvalidCount,
  ErrInvalidRange, ErrAlreadyRunning, ErrNotFound, ErrCRCMismatch 等
- 实现结构化错误类型:TopicError, FileError, IndexError, ValidationError
- 提供错误检查辅助函数:IsTopicNotFound, IsIndexOutOfRange, IsCRCMismatch
- 支持 errors.Is 和 errors.As 进行错误判断

更新相关文件使用新错误类型:
- cursor.go: 使用 ValidationError 和 ErrCRCMismatch
- index.go: 使用 IndexError 处理索引越界
- query.go: 使用 ValidationError 验证参数
- seqlog_manager.go: 使用 TopicError 和 ErrAlreadyRegistered
- topic_processor.go: 使用 ErrAlreadyRunning 和 ErrInvalidConfig

测试覆盖:
- errors_test.go 提供完整的错误类型测试
- 所有现有测试继续通过

使用示例:
```go
// 检查 topic 是否存在
if IsTopicNotFound(err) {
    // 处理 topic 不存在的情况
}

// 检查索引越界
if IsIndexOutOfRange(err) {
    var indexErr *IndexError
    errors.As(err, &indexErr)
    fmt.Printf("index %d out of range\n", indexErr.Index)
}
```

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 01:56:22 +08:00

561 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"fmt"
"log/slog"
"os"
"strings"
"sync"
)
// Seqlog 日志管理器,统一管理多个 topic 的日志分发
//
// 自动恢复机制:
// - Start() 时自动扫描 baseDir 中所有 .log 文件
// - 为每个发现的日志文件创建 processor
// - 使用 .pos 文件保存的游标位置恢复处理进度
// - 只处理上次中断后新增的日志,避免重复处理
type Seqlog struct {
baseDir string
processors map[string]*TopicProcessor
defaultHandler TopicRecordHandler
defaultConfig *TailConfig
logger *slog.Logger // 用于内部日志记录
globalEventBus *EventBus // 全局事件总线
pendingSubscribers map[EventType][]EventListener
mu sync.RWMutex
running bool
}
// NewSeqlog 创建一个新的日志管理器
// logger: 内部日志记录器,如果不需要可以传 slog.Default()
func NewSeqlog(baseDir string, logger *slog.Logger, defaultHandler TopicRecordHandler) *Seqlog {
if logger == nil {
logger = slog.Default()
}
return &Seqlog{
baseDir: baseDir,
processors: make(map[string]*TopicProcessor),
defaultHandler: defaultHandler,
globalEventBus: NewEventBus(),
pendingSubscribers: make(map[EventType][]EventListener),
defaultConfig: &TailConfig{
PollInterval: 100 * 1000000, // 100ms
SaveInterval: 1000 * 1000000, // 1s
},
logger: logger,
}
}
// SetDefaultTailConfig 设置默认的 tail 配置
func (s *Seqlog) SetDefaultTailConfig(config *TailConfig) {
s.mu.Lock()
defer s.mu.Unlock()
if config != nil {
s.defaultConfig = config
}
}
// RegisterHandler 为指定 topic 注册 handler
func (s *Seqlog) RegisterHandler(topic string, handler RecordHandler) error {
return s.RegisterHandlerWithConfig(topic, &TopicConfig{Handler: handler})
}
// RegisterHandlerWithConfig 为指定 topic 注册 handler 和配置
// 注意handler 为必填参数,如果 topic 已存在则返回错误
func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
processor := s.processors[topic]
if processor == nil {
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("creating new processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor", "topic", topic, "error", err)
return fmt.Errorf("failed to create processor for topic %s: %w", topic, err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
} else {
// Processor 已存在handler 不可更新
return NewTopicError(topic, "register", ErrAlreadyRegistered)
}
s.logger.Info("handler registered", "topic", topic)
return nil
}
// Write 写入日志到指定 topic
func (s *Seqlog) Write(topic string, data []byte) (int64, error) {
processor, err := s.getOrCreateProcessor(topic)
if err != nil {
s.logger.Error("failed to get processor", "topic", topic, "error", err)
return 0, fmt.Errorf("failed to get processor for topic %s: %w", topic, err)
}
offset, err := processor.Write(data)
if err != nil {
s.logger.Error("failed to write", "topic", topic, "error", err)
return 0, err
}
s.logger.Debug("write success", "topic", topic, "offset", offset, "size", len(data))
return offset, nil
}
// Start 启动 Seqlog 和所有已注册的 processor
func (s *Seqlog) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return ErrAlreadyRunning
}
s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors))
// 自动发现 baseDir 中已存在的日志文件
if err := s.discoverExistingTopics(); err != nil {
s.logger.Warn("failed to discover existing topics", "error", err)
}
// 启动所有已存在的 processor
for topic, processor := range s.processors {
if err := processor.Start(); err != nil {
// 忽略文件不存在的错误,因为可能还没写入
// processor 会在第一次写入时自动创建 writer 和 tailer
s.logger.Debug("processor start skipped", "topic", topic, "error", err)
} else {
s.logger.Debug("processor started", "topic", topic)
}
}
s.running = true
s.logger.Info("seqlog started successfully", "total_processors", len(s.processors))
return nil
}
// discoverExistingTopics 自动发现 baseDir 中已存在的日志文件并创建对应的 processor
// 注意:此方法需要在持有锁的情况下调用
func (s *Seqlog) discoverExistingTopics() error {
// 确保目录存在
if err := os.MkdirAll(s.baseDir, 0755); err != nil {
return fmt.Errorf("failed to create base directory: %w", err)
}
// 读取目录中的所有 .log 文件
entries, err := os.ReadDir(s.baseDir)
if err != nil {
return fmt.Errorf("failed to read base directory: %w", err)
}
discovered := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
// 只处理 .log 文件,忽略 .pos 位置文件
if !strings.HasSuffix(name, ".log") {
continue
}
// 提取 topic 名称(去掉 .log 后缀)
topic := strings.TrimSuffix(name, ".log")
// 如果 processor 已存在,跳过
if _, exists := s.processors[topic]; exists {
continue
}
// 创建 processor使用默认配置和 handler
s.logger.Info("discovered existing topic", "topic", topic)
var config *TopicConfig
if s.defaultHandler != nil {
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
}
topicLogger := s.logger.With("topic", topic)
processor, err := NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
s.logger.Error("failed to create processor for discovered topic", "topic", topic, "error", err)
continue
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
discovered++
}
if discovered > 0 {
s.logger.Info("auto-discovered topics", "count", discovered)
}
return nil
}
// Stop 停止所有 processor
func (s *Seqlog) Stop() error {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return nil
}
s.logger.Info("stopping seqlog")
s.running = false
processors := make([]*TopicProcessor, 0, len(s.processors))
for _, p := range s.processors {
processors = append(processors, p)
}
s.mu.Unlock()
// 停止并清理所有 processor
for _, processor := range processors {
topic := processor.Topic()
// 先停止
if err := processor.Stop(); err != nil {
s.logger.Error("failed to stop processor", "topic", topic, "error", err)
return fmt.Errorf("failed to stop processor for topic %s: %w", topic, err)
}
s.logger.Debug("processor stopped", "topic", topic)
// 再清理资源
if err := processor.Close(); err != nil {
s.logger.Error("failed to close processor", "topic", topic, "error", err)
// 继续清理其他 processor不返回错误
}
}
s.logger.Info("seqlog stopped successfully")
return nil
}
// getOrCreateProcessor 获取或创建指定 topic 的 processor使用默认配置
// 如果没有 defaultHandler使用空 handlerno-op
func (s *Seqlog) getOrCreateProcessor(topic string) (*TopicProcessor, error) {
// 创建默认配置
var config *TopicConfig
if s.defaultHandler != nil {
// 使用默认 handler包装成 RecordHandler
topicName := topic
handler := func(rec *Record) error {
return s.defaultHandler(topicName, rec)
}
config = &TopicConfig{
Handler: handler,
TailConfig: s.defaultConfig,
}
} else {
// 没有 defaultHandler检查是否已存在
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
// 使用空 handlerno-op允许只写入不处理
config = &TopicConfig{
Handler: func(rec *Record) error {
return nil // 空处理,什么都不做
},
TailConfig: s.defaultConfig,
}
}
return s.getOrCreateProcessorWithConfig(topic, config)
}
// getOrCreateProcessorWithConfig 获取或创建指定 topic 的 processor使用指定配置
func (s *Seqlog) getOrCreateProcessorWithConfig(topic string, config *TopicConfig) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if exists {
return processor, nil
}
s.mu.Lock()
defer s.mu.Unlock()
// 双重检查
if processor, exists := s.processors[topic]; exists {
return processor, nil
}
// 创建新的 processor使用带 topic 属性的 logger
topicLogger := s.logger.With("topic", topic)
topicLogger.Debug("auto-creating processor")
var err error
processor, err = NewTopicProcessor(s.baseDir, topic, topicLogger, config)
if err != nil {
topicLogger.Error("failed to create processor", "error", err)
return nil, fmt.Errorf("failed to create processor: %w", err)
}
s.processors[topic] = processor
// 订阅 processor 的所有事件,转发到全局事件总线
processor.SubscribeAll(func(event *Event) {
s.globalEventBus.Publish(event)
})
// 如果正在运行,立即启动 processor
if s.running {
if err := processor.Start(); err != nil {
topicLogger.Error("failed to start processor", "error", err)
return nil, fmt.Errorf("failed to start processor: %w", err)
}
topicLogger.Debug("processor auto-started")
}
return processor, nil
}
// GetTopics 获取所有已知的 topic
func (s *Seqlog) GetTopics() []string {
s.mu.RLock()
defer s.mu.RUnlock()
topics := make([]string, 0, len(s.processors))
for topic := range s.processors {
topics = append(topics, topic)
}
return topics
}
// IsRunning 检查 Seqlog 是否正在运行
func (s *Seqlog) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.running
}
// UpdateTopicConfig 动态更新指定 topic 的 tail 配置
func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return NewTopicError(topic, "operation", ErrNotFound)
}
return processor.UpdateTailConfig(config)
}
// GetTopicConfig 获取指定 topic 的 tail 配置
func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor.GetTailConfig(), nil
}
// GetTopicStats 获取指定 topic 的统计信息
func (s *Seqlog) GetTopicStats(topic string) (Stats, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return Stats{}, NewTopicError(topic, "get-stats", ErrNotFound)
}
return processor.GetStats(), nil
}
// GetAllStats 获取所有 topic 的统计信息
func (s *Seqlog) GetAllStats() map[string]Stats {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]Stats, len(s.processors))
for topic, processor := range s.processors {
result[topic] = processor.GetStats()
}
return result
}
// NewTopicQuery 为指定 topic 获取查询器(返回共享实例)
func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor.Query(), nil
}
// GetProcessingIndex 获取指定 topic 的当前处理索引
func (s *Seqlog) GetProcessingIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetProcessingIndex()
}
// GetReadIndex 获取指定 topic 的当前读取索引
func (s *Seqlog) GetReadIndex(topic string) int {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return 0
}
return processor.GetReadIndex()
}
// GetProcessor 获取指定 topic 的 processor
func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return nil, NewTopicError(topic, "get", ErrNotFound)
}
return processor, nil
}
// Subscribe 为指定 topic 订阅事件(如果 topic 不存在,会在创建时应用订阅)
func (s *Seqlog) Subscribe(topic string, eventType EventType, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.Subscribe(eventType, listener)
} else {
// topic 还不存在,保存待处理的订阅
// 使用包装函数,只转发给对应 topic 的事件
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
// 同时订阅到全局事件总线
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
return nil
}
// SubscribeAll 为指定 topic 订阅所有事件
func (s *Seqlog) SubscribeAll(topic string, listener EventListener) error {
s.mu.Lock()
defer s.mu.Unlock()
processor, exists := s.processors[topic]
if exists {
processor.SubscribeAll(listener)
} else {
// topic 还不存在,为所有事件类型保存待处理的订阅
allTypes := []EventType{
EventWriteSuccess,
EventWriteError,
EventProcessSuccess,
EventProcessError,
EventProcessorStart,
EventProcessorStop,
EventProcessorReset,
EventPositionSaved,
}
for _, eventType := range allTypes {
wrappedListener := func(event *Event) {
if event.Topic == topic {
listener(event)
}
}
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], wrappedListener)
s.globalEventBus.Subscribe(eventType, wrappedListener)
}
}
return nil
}
// SubscribeAllTopics 为所有 topic 订阅指定事件
func (s *Seqlog) SubscribeAllTopics(eventType EventType, listener EventListener) {
s.mu.Lock()
defer s.mu.Unlock()
// 为已存在的 processor 订阅
for _, processor := range s.processors {
processor.Subscribe(eventType, listener)
}
// 保存为全局订阅,新创建的 processor 也会自动订阅
s.pendingSubscribers[eventType] = append(s.pendingSubscribers[eventType], listener)
s.globalEventBus.Subscribe(eventType, listener)
}
// ResetTopic 重置指定 topic 的所有数据
// 注意:必须先停止 Seqlog 或至少停止该 topic 的 processor
func (s *Seqlog) ResetTopic(topic string) error {
s.mu.RLock()
processor, exists := s.processors[topic]
s.mu.RUnlock()
if !exists {
return NewTopicError(topic, "operation", ErrNotFound)
}
// 先停止 processor
if err := processor.Stop(); err != nil {
return fmt.Errorf("failed to stop processor: %w", err)
}
// 执行重置
if err := processor.Reset(); err != nil {
return fmt.Errorf("failed to reset processor: %w", err)
}
// 如果 seqlog 正在运行,重新启动 processor
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if running {
if err := processor.Start(); err != nil {
return fmt.Errorf("failed to restart processor: %w", err)
}
}
return nil
}