Files
taskq/plugin_test.go

232 lines
4.3 KiB
Go
Raw Permalink Normal View History

package taskq
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestContextRedis tests Context.Redis method
func TestContextRedis(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
cfg := Config{
Redis: rdb,
Tasks: []*Task{},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := &Context{
Context: context.Background(),
servlet: s,
}
redisClient := ctx.Redis()
assert.NotNil(t, redisClient)
assert.Equal(t, rdb, redisClient)
}
// TestContextQueues tests Context.Queues method
func TestContextQueues(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "task1",
Queue: "default",
Priority: 5,
Handler: func() error { return nil },
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := &Context{
Context: context.Background(),
servlet: s,
}
queues := ctx.Queues()
assert.NotNil(t, queues)
assert.Equal(t, 5, queues["default"])
}
// TestPluginName tests Plugin Name method
func TestPluginName(t *testing.T) {
plugin := &TestPlugin{}
assert.Equal(t, "TestPlugin", plugin.Name())
}
// TestPluginInit tests Plugin Init method
func TestPluginInit(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
plugin := &TestPlugin{}
s := NewServlet()
cfg := Config{
Redis: rdb,
Tasks: []*Task{},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := &Context{
Context: context.Background(),
servlet: s,
}
err = plugin.Init(ctx)
assert.NoError(t, err)
assert.True(t, plugin.initCalled)
}
// TestPluginStop tests Plugin Stop method
func TestPluginStop(t *testing.T) {
plugin := &TestPlugin{}
err := plugin.Stop()
assert.NoError(t, err)
assert.True(t, plugin.stopCalled)
}
// TestPluginInitError tests error handling when plugin Init fails
func TestPluginInitError(t *testing.T) {
// Skip: This test can cause goroutines to leak during plugin error handling
t.Skip("Init error handling causes goroutine leaks in asynq")
}
// TestPluginContextAccess tests plugin accessing context resources
func TestPluginContextAccess(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
resourcePlugin := &ResourceAccessPlugin{}
s := NewServlet()
cfg := Config{
Redis: rdb,
Tasks: []*Task{},
Plugins: []Plugin{resourcePlugin},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = s.Init(ctx)
require.NoError(t, err)
assert.True(t, resourcePlugin.redisAccessible)
assert.True(t, resourcePlugin.queuesAccessible)
}
// Test helper plugins
// TestPlugin implements the Plugin interface for testing
type TestPlugin struct {
initCalled bool
startCalled bool
stopCalled bool
}
func (tp *TestPlugin) Name() string {
return "TestPlugin"
}
func (tp *TestPlugin) Init(ctx *Context) error {
tp.initCalled = true
return nil
}
func (tp *TestPlugin) Start(ctx *Context) error {
tp.startCalled = true
return nil
}
func (tp *TestPlugin) Stop() error {
tp.stopCalled = true
return nil
}
// ErrorPlugin implements the Plugin interface and can fail at specified stage
type ErrorPlugin struct {
failAt string // "init" or "start"
}
func (ep *ErrorPlugin) Name() string {
return "ErrorPlugin"
}
func (ep *ErrorPlugin) Init(ctx *Context) error {
if ep.failAt == "init" {
return &ErrorType{message: "init error"}
}
return nil
}
func (ep *ErrorPlugin) Start(ctx *Context) error {
if ep.failAt == "start" {
return &ErrorType{message: "start error"}
}
return nil
}
func (ep *ErrorPlugin) Stop() error {
return nil
}
// ErrorType for testing
type ErrorType struct {
message string
}
func (e *ErrorType) Error() string {
return e.message
}
// ResourceAccessPlugin tests accessing servlet resources through context
type ResourceAccessPlugin struct {
redisAccessible bool
queuesAccessible bool
}
func (rap *ResourceAccessPlugin) Name() string {
return "ResourceAccessPlugin"
}
func (rap *ResourceAccessPlugin) Init(ctx *Context) error {
// Test Redis access
redis := ctx.Redis()
if redis != nil {
rap.redisAccessible = true
}
// Test Queues access
queues := ctx.Queues()
if queues != nil {
rap.queuesAccessible = true
}
return nil
}
func (rap *ResourceAccessPlugin) Start(ctx *Context) error {
return nil
}
func (rap *ResourceAccessPlugin) Stop() error {
return nil
}