Files
seqlog/tailer.go

166 lines
3.8 KiB
Go
Raw Permalink Normal View History

重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
package seqlog
import (
"context"
"fmt"
"io"
"time"
)
// RecordHandler 日志记录处理函数类型
type RecordHandler func(*Record) error
// TopicRecordHandler 带 topic 信息的日志记录处理函数类型
type TopicRecordHandler func(topic string, rec *Record) error
// TailConfig tail 模式配置
type TailConfig struct {
PollInterval time.Duration // 轮询间隔,默认 100ms
SaveInterval time.Duration // 位置保存间隔,默认 1s
BatchSize int // 批量处理大小,默认 10
}
// LogTailer 持续监控处理器
type LogTailer struct {
cursor *ProcessCursor
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
handler RecordHandler
config TailConfig
configCh chan TailConfig // 用于动态更新配置
stopCh chan struct{}
doneCh chan struct{}
}
// NewTailer 创建一个新的 tail 处理器
// cursor: 外部提供的游标,用于读取和跟踪日志位置
func NewTailer(cursor *ProcessCursor, handler RecordHandler, config *TailConfig) (*LogTailer, error) {
重构:统一使用索引(Index)替代位置(Position)进行状态判断 ## 主要变更 ### 架构改进 - 明确索引(Index)与偏移(Offset)的职责分离 - Index: 记录序号(逻辑概念),用于状态判断 - Offset: 文件字节位置(物理概念),仅用于 I/O 操作 ### API 变更 - 删除所有 Position 相关方法: - `LogCursor.StartPos()/EndPos()` - `LogTailer.GetStartPos()/GetEndPos()` - `TopicProcessor.GetProcessingPosition()/GetReadPosition()` - `Seqlog.GetProcessingPosition()/GetReadPosition()` - 新增索引方法: - `LogCursor.StartIndex()/EndIndex()` - `LogTailer.GetStartIndex()/GetEndIndex()` - `TopicProcessor.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessingIndex()/GetReadIndex()` - `Seqlog.GetProcessor()` - 获取 processor 实例以访问 Index ### 查询接口变更 - `RecordQuery.QueryOldest(startIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryNewest(endIndex, count, startIdx, endIdx)` - 使用索引参数 - `RecordQuery.QueryAt(position, direction, count, startIdx, endIdx)` - startIdx/endIdx 用于状态判断 ### 性能优化 - 状态判断改用整数比较,不再需要计算偏移量 - 减少不必要的索引到偏移的转换 - 只在实际文件 I/O 时才获取 offset ### 测试更新 - 更新所有测试用例使用新的 Index API - 更新示例代码(topic_processor_example.go, webapp/main.go) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:48:21 +08:00
if cursor == nil {
return nil, fmt.Errorf("cursor cannot be nil")
}
cfg := TailConfig{
PollInterval: 100 * time.Millisecond,
SaveInterval: 1 * time.Second,
BatchSize: 10,
}
if config != nil {
if config.PollInterval > 0 {
cfg.PollInterval = config.PollInterval
}
if config.SaveInterval > 0 {
cfg.SaveInterval = config.SaveInterval
}
if config.BatchSize > 0 {
cfg.BatchSize = config.BatchSize
}
}
return &LogTailer{
cursor: cursor,
handler: handler,
config: cfg,
configCh: make(chan TailConfig, 1),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}, nil
}
// Start 使用 context 控制的启动方式
func (t *LogTailer) Start(ctx context.Context) error {
defer close(t.doneCh)
defer t.cursor.savePosition() // 退出时保存位置
saveTicker := time.NewTicker(t.config.SaveInterval)
defer saveTicker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.stopCh:
return nil
case newConfig := <-t.configCh:
// 动态更新配置
t.config = newConfig
saveTicker.Reset(t.config.SaveInterval)
case <-saveTicker.C:
// 定期保存位置
t.cursor.savePosition()
default:
// 批量读取记录
records, err := t.cursor.NextRange(t.config.BatchSize)
if err != nil {
if err == io.EOF {
// 文件末尾,等待新数据
time.Sleep(t.config.PollInterval)
continue
}
return fmt.Errorf("read records error: %w", err)
}
// 批量处理记录
for _, rec := range records {
if err := t.handler(rec); err != nil {
// 处理失败,回滚窗口
t.cursor.Rollback()
return fmt.Errorf("handler error: %w", err)
}
}
// 全部处理成功,提交窗口
t.cursor.Commit()
}
}
}
// Stop 停止监控
func (t *LogTailer) Stop() {
close(t.stopCh)
<-t.doneCh // 等待完全停止
}
// UpdateConfig 动态更新配置
func (t *LogTailer) UpdateConfig(config TailConfig) {
select {
case t.configCh <- config:
// 配置已发送
default:
// channel 满了,丢弃旧配置,发送新配置
select {
case <-t.configCh:
default:
}
t.configCh <- config
}
}
// GetConfig 获取当前配置
func (t *LogTailer) GetConfig() TailConfig {
return t.config
}
// GetStartIndex 获取已处理索引(窗口开始索引)
func (t *LogTailer) GetStartIndex() int {
return t.cursor.StartIndex()
}
// GetEndIndex 获取当前读取索引(窗口结束索引)
func (t *LogTailer) GetEndIndex() int {
return t.cursor.EndIndex()
}
// Reset 重置 tailer 的内部状态
// 注意:调用前必须确保 tailer 已停止
func (t *LogTailer) Reset() error {
// 重新创建 channel确保没有遗留的信号
t.stopCh = make(chan struct{})
t.doneCh = make(chan struct{})
// 清空配置 channel
select {
case <-t.configCh:
default:
}
return nil
}