diff --git a/cursor.go b/cursor.go index 592cf4c..28b6ceb 100644 --- a/cursor.go +++ b/cursor.go @@ -25,7 +25,7 @@ type LogCursor struct { // index: 外部提供的索引管理器,用于快速定位记录 func NewCursor(path string, index *RecordIndex) (*LogCursor, error) { if index == nil { - return nil, fmt.Errorf("index cannot be nil") + return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } fd, err := os.Open(path) @@ -82,7 +82,7 @@ func (c *LogCursor) Next() (*Record, error) { // 读取并校验 UUID copy(rec.UUID[:], hdr[8:24]) if _, err := uuid.FromBytes(rec.UUID[:]); err != nil { - return nil, fmt.Errorf("invalid uuid: %w", err) + return nil, fmt.Errorf("%w: %v", ErrInvalidUUID, err) } // 如果数据大于缓冲区,分配新的 buffer @@ -97,7 +97,7 @@ func (c *LogCursor) Next() (*Record, error) { return nil, err } if crc32.ChecksumIEEE(payload) != rec.CRC { - return nil, fmt.Errorf("crc mismatch") + return nil, ErrCRCMismatch } rec.Data = append([]byte(nil), payload...) // 复制出去,复用 buffer @@ -112,7 +112,7 @@ func (c *LogCursor) Next() (*Record, error) { // 返回:读取到的记录列表,如果到达文件末尾,返回的记录数可能少于 count func (c *LogCursor) NextRange(count int) ([]*Record, error) { if count <= 0 { - return nil, fmt.Errorf("count must be greater than 0") + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } results := make([]*Record, 0, count) diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..3a358f3 --- /dev/null +++ b/errors.go @@ -0,0 +1,167 @@ +package seqlog + +import ( + "errors" + "fmt" +) + +// 哨兵错误(Sentinel Errors)- 可以使用 errors.Is 进行判断 +var ( + // ErrNilParameter 表示必需的参数为 nil + ErrNilParameter = errors.New("required parameter is nil") + + // ErrInvalidCount 表示 count 参数必须大于 0 + ErrInvalidCount = errors.New("count must be greater than 0") + + // ErrInvalidRange 表示索引或范围参数无效 + ErrInvalidRange = errors.New("invalid index or range") + + // ErrAlreadyRunning 表示组件已经在运行 + ErrAlreadyRunning = errors.New("already running") + + // ErrNotRunning 表示组件未运行 + ErrNotRunning = errors.New("not running") + + // ErrAlreadyRegistered 表示资源已经注册 + ErrAlreadyRegistered = errors.New("already registered") + + // ErrNotFound 表示资源未找到 + ErrNotFound = errors.New("not found") + + // ErrCRCMismatch 表示 CRC 校验失败 + ErrCRCMismatch = errors.New("crc mismatch") + + // ErrInvalidUUID 表示 UUID 格式无效 + ErrInvalidUUID = errors.New("invalid uuid") + + // ErrInvalidConfig 表示配置无效 + ErrInvalidConfig = errors.New("invalid config") +) + +// TopicError 表示与 topic 相关的错误 +type TopicError struct { + Topic string // topic 名称 + Op string // 操作名称(如 "write", "query", "start") + Err error // 底层错误 +} + +func (e *TopicError) Error() string { + if e.Err != nil { + return fmt.Sprintf("topic %s: %s: %v", e.Topic, e.Op, e.Err) + } + return fmt.Sprintf("topic %s: %s", e.Topic, e.Op) +} + +func (e *TopicError) Unwrap() error { + return e.Err +} + +// NewTopicError 创建一个 topic 相关的错误 +func NewTopicError(topic, op string, err error) *TopicError { + return &TopicError{ + Topic: topic, + Op: op, + Err: err, + } +} + +// FileError 表示文件操作相关的错误 +type FileError struct { + Path string // 文件路径 + Op string // 操作名称(如 "open", "read", "write") + Err error // 底层错误 +} + +func (e *FileError) Error() string { + if e.Err != nil { + return fmt.Sprintf("file %s: %s: %v", e.Path, e.Op, e.Err) + } + return fmt.Sprintf("file %s: %s", e.Path, e.Op) +} + +func (e *FileError) Unwrap() error { + return e.Err +} + +// NewFileError 创建一个文件操作相关的错误 +func NewFileError(path, op string, err error) *FileError { + return &FileError{ + Path: path, + Op: op, + Err: err, + } +} + +// IndexError 表示索引相关的错误 +type IndexError struct { + Index int // 请求的索引 + Max int // 最大有效索引 + Err error // 底层错误(通常是 ErrInvalidRange) +} + +func (e *IndexError) Error() string { + if e.Err != nil { + return fmt.Sprintf("index %d out of range [0, %d): %v", e.Index, e.Max, e.Err) + } + return fmt.Sprintf("index %d out of range [0, %d)", e.Index, e.Max) +} + +func (e *IndexError) Unwrap() error { + return e.Err +} + +// NewIndexError 创建一个索引越界错误 +func NewIndexError(index, max int) *IndexError { + return &IndexError{ + Index: index, + Max: max, + Err: ErrInvalidRange, + } +} + +// ValidationError 表示参数验证错误 +type ValidationError struct { + Field string // 字段名称 + Message string // 错误消息 + Err error // 底层错误 +} + +func (e *ValidationError) Error() string { + if e.Err != nil { + return fmt.Sprintf("validation error: %s: %s: %v", e.Field, e.Message, e.Err) + } + return fmt.Sprintf("validation error: %s: %s", e.Field, e.Message) +} + +func (e *ValidationError) Unwrap() error { + return e.Err +} + +// NewValidationError 创建一个参数验证错误 +func NewValidationError(field, message string, err error) *ValidationError { + return &ValidationError{ + Field: field, + Message: message, + Err: err, + } +} + +// IsTopicNotFound 检查错误是否为 topic 不存在 +func IsTopicNotFound(err error) bool { + var topicErr *TopicError + if errors.As(err, &topicErr) { + return errors.Is(topicErr.Err, ErrNotFound) + } + return errors.Is(err, ErrNotFound) +} + +// IsIndexOutOfRange 检查错误是否为索引越界 +func IsIndexOutOfRange(err error) bool { + var indexErr *IndexError + return errors.As(err, &indexErr) +} + +// IsCRCMismatch 检查错误是否为 CRC 校验失败 +func IsCRCMismatch(err error) bool { + return errors.Is(err, ErrCRCMismatch) +} diff --git a/errors_test.go b/errors_test.go new file mode 100644 index 0000000..5ad16d4 --- /dev/null +++ b/errors_test.go @@ -0,0 +1,113 @@ +package seqlog + +import ( + "errors" + "testing" +) + +func TestErrorTypes(t *testing.T) { + t.Run("TopicError", func(t *testing.T) { + err := NewTopicError("app", "write", ErrNotFound) + if err.Error() != "topic app: write: not found" { + t.Errorf("unexpected error message: %v", err) + } + + // 测试 Unwrap + if !errors.Is(err, ErrNotFound) { + t.Error("expected ErrNotFound") + } + + // 测试 errors.As + var topicErr *TopicError + if !errors.As(err, &topicErr) { + t.Error("expected TopicError") + } + if topicErr.Topic != "app" { + t.Errorf("expected topic 'app', got '%s'", topicErr.Topic) + } + }) + + t.Run("FileError", func(t *testing.T) { + err := NewFileError("/path/to/file", "open", errors.New("permission denied")) + if err.Error() != "file /path/to/file: open: permission denied" { + t.Errorf("unexpected error message: %v", err) + } + }) + + t.Run("IndexError", func(t *testing.T) { + err := NewIndexError(100, 50) + if err.Error() != "index 100 out of range [0, 50): invalid index or range" { + t.Errorf("unexpected error message: %v", err) + } + + // 测试 errors.Is + if !errors.Is(err, ErrInvalidRange) { + t.Error("expected ErrInvalidRange") + } + + // 测试 IsIndexOutOfRange + if !IsIndexOutOfRange(err) { + t.Error("expected IsIndexOutOfRange to return true") + } + }) + + t.Run("ValidationError", func(t *testing.T) { + err := NewValidationError("count", "must be greater than 0", ErrInvalidCount) + if err.Error() != "validation error: count: must be greater than 0: count must be greater than 0" { + t.Errorf("unexpected error message: %v", err) + } + + // 测试 errors.Is + if !errors.Is(err, ErrInvalidCount) { + t.Error("expected ErrInvalidCount") + } + }) + + t.Run("IsTopicNotFound", func(t *testing.T) { + err := NewTopicError("app", "get", ErrNotFound) + if !IsTopicNotFound(err) { + t.Error("expected IsTopicNotFound to return true") + } + + // 测试其他错误 + if IsTopicNotFound(ErrInvalidCount) { + t.Error("expected IsTopicNotFound to return false for ErrInvalidCount") + } + }) + + t.Run("IsCRCMismatch", func(t *testing.T) { + if !IsCRCMismatch(ErrCRCMismatch) { + t.Error("expected IsCRCMismatch to return true") + } + + // 测试其他错误 + if IsCRCMismatch(ErrNotFound) { + t.Error("expected IsCRCMismatch to return false for ErrNotFound") + } + }) + + t.Run("SentinelErrors", func(t *testing.T) { + // 测试所有哨兵错误都不为 nil + sentinelErrors := []error{ + ErrNilParameter, + ErrInvalidCount, + ErrInvalidRange, + ErrAlreadyRunning, + ErrNotRunning, + ErrAlreadyRegistered, + ErrNotFound, + ErrCRCMismatch, + ErrInvalidUUID, + ErrInvalidConfig, + } + + for _, err := range sentinelErrors { + if err == nil { + t.Error("sentinel error should not be nil") + } + if err.Error() == "" { + t.Error("sentinel error should have a message") + } + } + }) +} diff --git a/index.go b/index.go index 72b938f..002296c 100644 --- a/index.go +++ b/index.go @@ -182,7 +182,7 @@ func (ri *RecordIndex) Append(offset int64) error { // GetOffset 根据索引位置获取记录偏移 func (ri *RecordIndex) GetOffset(index int) (int64, error) { if index < 0 || index >= len(ri.offsets) { - return 0, fmt.Errorf("index out of range: %d (total: %d)", index, len(ri.offsets)) + return 0, NewIndexError(index, len(ri.offsets)) } return ri.offsets[index], nil } diff --git a/query.go b/query.go index 63bf440..f0a2ed3 100644 --- a/query.go +++ b/query.go @@ -51,7 +51,7 @@ type RecordQuery struct { // index 参数必须由外部提供,确保所有组件使用同一个索引实例 func NewRecordQuery(logPath string, index *RecordIndex) (*RecordQuery, error) { if index == nil { - return nil, fmt.Errorf("index cannot be nil") + return nil, NewValidationError("index", "index cannot be nil", ErrNilParameter) } fd, err := os.Open(logPath) @@ -121,7 +121,7 @@ func (rq *RecordQuery) readRecordsForward(startIndex, count int) ([]*Record, err // 返回的记录按时间顺序(索引递增方向) func (rq *RecordQuery) QueryOldest(startIndex, count int) ([]*Record, error) { if count <= 0 { - return nil, fmt.Errorf("count must be greater than 0") + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() @@ -152,7 +152,7 @@ func (rq *RecordQuery) QueryOldest(startIndex, count int) ([]*Record, error) { // 返回结果按时间倒序(最新在前,即 endIndex 对应的记录在最前) func (rq *RecordQuery) QueryNewest(endIndex, count int) ([]*Record, error) { if count <= 0 { - return nil, fmt.Errorf("count must be greater than 0") + return nil, NewValidationError("count", "count must be greater than 0", ErrInvalidCount) } totalCount := rq.index.Count() diff --git a/seqlog_manager.go b/seqlog_manager.go index 9f558c6..de5280a 100644 --- a/seqlog_manager.go +++ b/seqlog_manager.go @@ -86,7 +86,7 @@ func (s *Seqlog) RegisterHandlerWithConfig(topic string, config *TopicConfig) er }) } else { // Processor 已存在,handler 不可更新 - return fmt.Errorf("handler already registered for topic %s", topic) + return NewTopicError(topic, "register", ErrAlreadyRegistered) } s.logger.Info("handler registered", "topic", topic) @@ -115,7 +115,7 @@ func (s *Seqlog) Start() error { defer s.mu.Unlock() if s.running { - return fmt.Errorf("seqlog is already running") + return ErrAlreadyRunning } s.logger.Info("starting seqlog", "baseDir", s.baseDir, "processors", len(s.processors)) @@ -355,7 +355,7 @@ func (s *Seqlog) UpdateTopicConfig(topic string, config *TailConfig) error { s.mu.RUnlock() if !exists { - return fmt.Errorf("topic %s not found", topic) + return NewTopicError(topic, "operation", ErrNotFound) } return processor.UpdateTailConfig(config) @@ -368,7 +368,7 @@ func (s *Seqlog) GetTopicConfig(topic string) (*TailConfig, error) { s.mu.RUnlock() if !exists { - return nil, fmt.Errorf("topic %s not found", topic) + return nil, NewTopicError(topic, "get", ErrNotFound) } return processor.GetTailConfig(), nil @@ -381,7 +381,7 @@ func (s *Seqlog) GetTopicStats(topic string) (Stats, error) { s.mu.RUnlock() if !exists { - return Stats{}, fmt.Errorf("topic %s not found", topic) + return Stats{}, NewTopicError(topic, "get-stats", ErrNotFound) } return processor.GetStats(), nil @@ -406,7 +406,7 @@ func (s *Seqlog) NewTopicQuery(topic string) (*RecordQuery, error) { s.mu.RUnlock() if !exists { - return nil, fmt.Errorf("topic %s not found", topic) + return nil, NewTopicError(topic, "get", ErrNotFound) } return processor.Query(), nil @@ -445,7 +445,7 @@ func (s *Seqlog) GetProcessor(topic string) (*TopicProcessor, error) { s.mu.RUnlock() if !exists { - return nil, fmt.Errorf("topic %s not found", topic) + return nil, NewTopicError(topic, "get", ErrNotFound) } return processor, nil @@ -532,7 +532,7 @@ func (s *Seqlog) ResetTopic(topic string) error { s.mu.RUnlock() if !exists { - return fmt.Errorf("topic %s not found", topic) + return NewTopicError(topic, "operation", ErrNotFound) } // 先停止 processor diff --git a/topic_processor.go b/topic_processor.go index 852dd56..be3f83b 100644 --- a/topic_processor.go +++ b/topic_processor.go @@ -49,7 +49,7 @@ type TopicConfig struct { func NewTopicProcessor(baseDir, topic string, logger *slog.Logger, config *TopicConfig) (*TopicProcessor, error) { // 验证必填参数 if config == nil || config.Handler == nil { - return nil, fmt.Errorf("config and config.Handler are required") + return nil, NewValidationError("config", "config and config.Handler are required", ErrInvalidConfig) } ctx, cancel := context.WithCancel(context.Background()) @@ -229,7 +229,7 @@ func (tp *TopicProcessor) Start() error { defer tp.mu.Unlock() if tp.running { - return fmt.Errorf("topic processor for %s is already running", tp.topic) + return NewTopicError(tp.topic, "start", ErrAlreadyRunning) } tp.logger.Debug("starting processor")