Files
seqlog/tailer.go
bourdon 6fb0731935 重构:为核心组件实现 Reset 方法优化重置机制
为所有核心组件添加 Reset() 方法:
- LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变
- RecordIndex.Reset(): 清空索引数据并重新创建索引文件
- RecordQuery.Reset(): 关闭并重新打开日志文件
- ProcessCursor.Reset(): 删除位置文件并重置游标位置
- LogTailer.Reset(): 重置内部 channel 状态

优化 TopicProcessor.Reset() 实现:
- 不再销毁和重建组件对象
- 通过调用各组件的 Reset() 方法重置状态
- 保持组件间引用关系稳定
- 减少代码行数约 20 行
- 避免空指针风险和内存分配开销

代码改进:
- LogWriter 添加 path 字段用于重置
- 移除 topic_processor.go 中未使用的 os import
- 职责分离更清晰,每个组件管理自己的重置逻辑

测试结果:
- TestTopicReset: PASS
- TestTopicResetWithPendingRecords: PASS
- 所有 TopicProcessor 相关测试通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-04 21:58:54 +08:00

166 lines
3.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package seqlog
import (
"context"
"fmt"
"io"
"time"
)
// RecordHandler 日志记录处理函数类型
type RecordHandler func(*Record) error
// TopicRecordHandler 带 topic 信息的日志记录处理函数类型
type TopicRecordHandler func(topic string, rec *Record) error
// TailConfig tail 模式配置
type TailConfig struct {
PollInterval time.Duration // 轮询间隔,默认 100ms
SaveInterval time.Duration // 位置保存间隔,默认 1s
BatchSize int // 批量处理大小,默认 10
}
// LogTailer 持续监控处理器
type LogTailer struct {
cursor *ProcessCursor
handler RecordHandler
config TailConfig
configCh chan TailConfig // 用于动态更新配置
stopCh chan struct{}
doneCh chan struct{}
}
// NewTailer 创建一个新的 tail 处理器
// cursor: 外部提供的游标,用于读取和跟踪日志位置
func NewTailer(cursor *ProcessCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) {
if cursor == nil {
return nil, fmt.Errorf("cursor cannot be nil")
}
cfg := TailConfig{
PollInterval: 100 * time.Millisecond,
SaveInterval: 1 * time.Second,
BatchSize: 10,
}
if config != nil {
if config.PollInterval > 0 {
cfg.PollInterval = config.PollInterval
}
if config.SaveInterval > 0 {
cfg.SaveInterval = config.SaveInterval
}
if config.BatchSize > 0 {
cfg.BatchSize = config.BatchSize
}
}
return &LogTailer{
cursor: cursor,
handler: handler,
config: cfg,
configCh: make(chan TailConfig, 1),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}, nil
}
// Start 使用 context 控制的启动方式
func (t *LogTailer) Start(ctx context.Context) error {
defer close(t.doneCh)
defer t.cursor.savePosition() // 退出时保存位置
saveTicker := time.NewTicker(t.config.SaveInterval)
defer saveTicker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.stopCh:
return nil
case newConfig := <-t.configCh:
// 动态更新配置
t.config = newConfig
saveTicker.Reset(t.config.SaveInterval)
case <-saveTicker.C:
// 定期保存位置
t.cursor.savePosition()
default:
// 批量读取记录
records, err := t.cursor.NextRange(t.config.BatchSize)
if err != nil {
if err == io.EOF {
// 文件末尾,等待新数据
time.Sleep(t.config.PollInterval)
continue
}
return fmt.Errorf("read records error: %w", err)
}
// 批量处理记录
for _, rec := range records {
if err := t.handler(rec); err != nil {
// 处理失败,回滚窗口
t.cursor.Rollback()
return fmt.Errorf("handler error: %w", err)
}
}
// 全部处理成功,提交窗口
t.cursor.Commit()
}
}
}
// Stop 停止监控
func (t *LogTailer) Stop() {
close(t.stopCh)
<-t.doneCh // 等待完全停止
}
// UpdateConfig 动态更新配置
func (t *LogTailer) UpdateConfig(config TailConfig) {
select {
case t.configCh <- config:
// 配置已发送
default:
// channel 满了,丢弃旧配置,发送新配置
select {
case <-t.configCh:
default:
}
t.configCh <- config
}
}
// GetConfig 获取当前配置
func (t *LogTailer) GetConfig() TailConfig {
return t.config
}
// GetStartIndex 获取已处理索引(窗口开始索引)
func (t *LogTailer) GetStartIndex() int {
return t.cursor.StartIndex()
}
// GetEndIndex 获取当前读取索引(窗口结束索引)
func (t *LogTailer) GetEndIndex() int {
return t.cursor.EndIndex()
}
// Reset 重置 tailer 的内部状态
// 注意:调用前必须确保 tailer 已停止
func (t *LogTailer) Reset() error {
// 重新创建 channel确保没有遗留的信号
t.stopCh = make(chan struct{})
t.doneCh = make(chan struct{})
// 清空配置 channel
select {
case <-t.configCh:
default:
}
return nil
}