package seqlog import ( "context" "fmt" "io" "time" ) // RecordHandler 日志记录处理函数类型 type RecordHandler func(*Record) error // TopicRecordHandler 带 topic 信息的日志记录处理函数类型 type TopicRecordHandler func(topic string, rec *Record) error // TailConfig tail 模式配置 type TailConfig struct { PollInterval time.Duration // 轮询间隔,默认 100ms SaveInterval time.Duration // 位置保存间隔,默认 1s BatchSize int // 批量处理大小,默认 10 } // LogTailer 持续监控处理器 type LogTailer struct { cursor *ProcessCursor handler RecordHandler config TailConfig configCh chan TailConfig // 用于动态更新配置 stopCh chan struct{} doneCh chan struct{} } // NewTailer 创建一个新的 tail 处理器 // cursor: 外部提供的游标,用于读取和跟踪日志位置 func NewTailer(cursor *ProcessCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) { if cursor == nil { return nil, fmt.Errorf("cursor cannot be nil") } cfg := TailConfig{ PollInterval: 100 * time.Millisecond, SaveInterval: 1 * time.Second, BatchSize: 10, } if config != nil { if config.PollInterval > 0 { cfg.PollInterval = config.PollInterval } if config.SaveInterval > 0 { cfg.SaveInterval = config.SaveInterval } if config.BatchSize > 0 { cfg.BatchSize = config.BatchSize } } return &LogTailer{ cursor: cursor, handler: handler, config: cfg, configCh: make(chan TailConfig, 1), stopCh: make(chan struct{}), doneCh: make(chan struct{}), }, nil } // Start 使用 context 控制的启动方式 func (t *LogTailer) Start(ctx context.Context) error { defer close(t.doneCh) defer t.cursor.savePosition() // 退出时保存位置 saveTicker := time.NewTicker(t.config.SaveInterval) defer saveTicker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-t.stopCh: return nil case newConfig := <-t.configCh: // 动态更新配置 t.config = newConfig saveTicker.Reset(t.config.SaveInterval) case <-saveTicker.C: // 定期保存位置 t.cursor.savePosition() default: // 批量读取记录 records, err := t.cursor.NextRange(t.config.BatchSize) if err != nil { if err == io.EOF { // 文件末尾,等待新数据 time.Sleep(t.config.PollInterval) continue } return fmt.Errorf("read records error: %w", err) } // 批量处理记录 for _, rec := range records { if err := t.handler(rec); err != nil { // 处理失败,回滚窗口 t.cursor.Rollback() return fmt.Errorf("handler error: %w", err) } } // 全部处理成功,提交窗口 t.cursor.Commit() } } } // Stop 停止监控 func (t *LogTailer) Stop() { close(t.stopCh) <-t.doneCh // 等待完全停止 } // UpdateConfig 动态更新配置 func (t *LogTailer) UpdateConfig(config TailConfig) { select { case t.configCh <- config: // 配置已发送 default: // channel 满了,丢弃旧配置,发送新配置 select { case <-t.configCh: default: } t.configCh <- config } } // GetConfig 获取当前配置 func (t *LogTailer) GetConfig() TailConfig { return t.config } // GetStartIndex 获取已处理索引(窗口开始索引) func (t *LogTailer) GetStartIndex() int { return t.cursor.StartIndex() } // GetEndIndex 获取当前读取索引(窗口结束索引) func (t *LogTailer) GetEndIndex() int { return t.cursor.EndIndex() } // Reset 重置 tailer 的内部状态 // 注意:调用前必须确保 tailer 已停止 func (t *LogTailer) Reset() error { // 重新创建 channel(确保没有遗留的信号) t.stopCh = make(chan struct{}) t.doneCh = make(chan struct{}) // 清空配置 channel select { case <-t.configCh: default: } return nil }