为所有核心组件添加 Reset() 方法: - LogWriter.Reset(): 删除并重新创建日志文件,保持 index 和 wbuf 引用不变 - RecordIndex.Reset(): 清空索引数据并重新创建索引文件 - RecordQuery.Reset(): 关闭并重新打开日志文件 - ProcessCursor.Reset(): 删除位置文件并重置游标位置 - LogTailer.Reset(): 重置内部 channel 状态 优化 TopicProcessor.Reset() 实现: - 不再销毁和重建组件对象 - 通过调用各组件的 Reset() 方法重置状态 - 保持组件间引用关系稳定 - 减少代码行数约 20 行 - 避免空指针风险和内存分配开销 代码改进: - LogWriter 添加 path 字段用于重置 - 移除 topic_processor.go 中未使用的 os import - 职责分离更清晰,每个组件管理自己的重置逻辑 测试结果: - TestTopicReset: PASS - TestTopicResetWithPendingRecords: PASS - 所有 TopicProcessor 相关测试通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
166 lines
3.8 KiB
Go
166 lines
3.8 KiB
Go
package seqlog
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"time"
|
||
)
|
||
|
||
// RecordHandler 日志记录处理函数类型
|
||
type RecordHandler func(*Record) error
|
||
|
||
// TopicRecordHandler 带 topic 信息的日志记录处理函数类型
|
||
type TopicRecordHandler func(topic string, rec *Record) error
|
||
|
||
// TailConfig tail 模式配置
|
||
type TailConfig struct {
|
||
PollInterval time.Duration // 轮询间隔,默认 100ms
|
||
SaveInterval time.Duration // 位置保存间隔,默认 1s
|
||
BatchSize int // 批量处理大小,默认 10
|
||
}
|
||
|
||
// LogTailer 持续监控处理器
|
||
type LogTailer struct {
|
||
cursor *ProcessCursor
|
||
handler RecordHandler
|
||
config TailConfig
|
||
configCh chan TailConfig // 用于动态更新配置
|
||
stopCh chan struct{}
|
||
doneCh chan struct{}
|
||
}
|
||
|
||
// NewTailer 创建一个新的 tail 处理器
|
||
// cursor: 外部提供的游标,用于读取和跟踪日志位置
|
||
func NewTailer(cursor *ProcessCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) {
|
||
if cursor == nil {
|
||
return nil, fmt.Errorf("cursor cannot be nil")
|
||
}
|
||
|
||
cfg := TailConfig{
|
||
PollInterval: 100 * time.Millisecond,
|
||
SaveInterval: 1 * time.Second,
|
||
BatchSize: 10,
|
||
}
|
||
if config != nil {
|
||
if config.PollInterval > 0 {
|
||
cfg.PollInterval = config.PollInterval
|
||
}
|
||
if config.SaveInterval > 0 {
|
||
cfg.SaveInterval = config.SaveInterval
|
||
}
|
||
if config.BatchSize > 0 {
|
||
cfg.BatchSize = config.BatchSize
|
||
}
|
||
}
|
||
|
||
return &LogTailer{
|
||
cursor: cursor,
|
||
handler: handler,
|
||
config: cfg,
|
||
configCh: make(chan TailConfig, 1),
|
||
stopCh: make(chan struct{}),
|
||
doneCh: make(chan struct{}),
|
||
}, nil
|
||
}
|
||
|
||
// Start 使用 context 控制的启动方式
|
||
func (t *LogTailer) Start(ctx context.Context) error {
|
||
defer close(t.doneCh)
|
||
defer t.cursor.savePosition() // 退出时保存位置
|
||
|
||
saveTicker := time.NewTicker(t.config.SaveInterval)
|
||
defer saveTicker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-t.stopCh:
|
||
return nil
|
||
case newConfig := <-t.configCh:
|
||
// 动态更新配置
|
||
t.config = newConfig
|
||
saveTicker.Reset(t.config.SaveInterval)
|
||
case <-saveTicker.C:
|
||
// 定期保存位置
|
||
t.cursor.savePosition()
|
||
default:
|
||
// 批量读取记录
|
||
records, err := t.cursor.NextRange(t.config.BatchSize)
|
||
if err != nil {
|
||
if err == io.EOF {
|
||
// 文件末尾,等待新数据
|
||
time.Sleep(t.config.PollInterval)
|
||
continue
|
||
}
|
||
return fmt.Errorf("read records error: %w", err)
|
||
}
|
||
|
||
// 批量处理记录
|
||
for _, rec := range records {
|
||
if err := t.handler(rec); err != nil {
|
||
// 处理失败,回滚窗口
|
||
t.cursor.Rollback()
|
||
return fmt.Errorf("handler error: %w", err)
|
||
}
|
||
}
|
||
|
||
// 全部处理成功,提交窗口
|
||
t.cursor.Commit()
|
||
}
|
||
}
|
||
}
|
||
|
||
// Stop 停止监控
|
||
func (t *LogTailer) Stop() {
|
||
close(t.stopCh)
|
||
<-t.doneCh // 等待完全停止
|
||
}
|
||
|
||
// UpdateConfig 动态更新配置
|
||
func (t *LogTailer) UpdateConfig(config TailConfig) {
|
||
select {
|
||
case t.configCh <- config:
|
||
// 配置已发送
|
||
default:
|
||
// channel 满了,丢弃旧配置,发送新配置
|
||
select {
|
||
case <-t.configCh:
|
||
default:
|
||
}
|
||
t.configCh <- config
|
||
}
|
||
}
|
||
|
||
// GetConfig 获取当前配置
|
||
func (t *LogTailer) GetConfig() TailConfig {
|
||
return t.config
|
||
}
|
||
|
||
// GetStartIndex 获取已处理索引(窗口开始索引)
|
||
func (t *LogTailer) GetStartIndex() int {
|
||
return t.cursor.StartIndex()
|
||
}
|
||
|
||
// GetEndIndex 获取当前读取索引(窗口结束索引)
|
||
func (t *LogTailer) GetEndIndex() int {
|
||
return t.cursor.EndIndex()
|
||
}
|
||
|
||
// Reset 重置 tailer 的内部状态
|
||
// 注意:调用前必须确保 tailer 已停止
|
||
func (t *LogTailer) Reset() error {
|
||
// 重新创建 channel(确保没有遗留的信号)
|
||
t.stopCh = make(chan struct{})
|
||
t.doneCh = make(chan struct{})
|
||
|
||
// 清空配置 channel
|
||
select {
|
||
case <-t.configCh:
|
||
default:
|
||
}
|
||
|
||
return nil
|
||
}
|