232 lines
4.3 KiB
Go
232 lines
4.3 KiB
Go
|
|
package taskq
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
)
|
||
|
|
|
||
|
|
// TestContextRedis tests Context.Redis method
|
||
|
|
func TestContextRedis(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := &Context{
|
||
|
|
Context: context.Background(),
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
redisClient := ctx.Redis()
|
||
|
|
assert.NotNil(t, redisClient)
|
||
|
|
assert.Equal(t, rdb, redisClient)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestContextQueues tests Context.Queues method
|
||
|
|
func TestContextQueues(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "task1",
|
||
|
|
Queue: "default",
|
||
|
|
Priority: 5,
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := &Context{
|
||
|
|
Context: context.Background(),
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
queues := ctx.Queues()
|
||
|
|
assert.NotNil(t, queues)
|
||
|
|
assert.Equal(t, 5, queues["default"])
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPluginName tests Plugin Name method
|
||
|
|
func TestPluginName(t *testing.T) {
|
||
|
|
plugin := &TestPlugin{}
|
||
|
|
assert.Equal(t, "TestPlugin", plugin.Name())
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPluginInit tests Plugin Init method
|
||
|
|
func TestPluginInit(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
plugin := &TestPlugin{}
|
||
|
|
s := NewServlet()
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := &Context{
|
||
|
|
Context: context.Background(),
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
err = plugin.Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
assert.True(t, plugin.initCalled)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPluginStop tests Plugin Stop method
|
||
|
|
func TestPluginStop(t *testing.T) {
|
||
|
|
plugin := &TestPlugin{}
|
||
|
|
err := plugin.Stop()
|
||
|
|
assert.NoError(t, err)
|
||
|
|
assert.True(t, plugin.stopCalled)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPluginInitError tests error handling when plugin Init fails
|
||
|
|
func TestPluginInitError(t *testing.T) {
|
||
|
|
// Skip: This test can cause goroutines to leak during plugin error handling
|
||
|
|
t.Skip("Init error handling causes goroutine leaks in asynq")
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPluginContextAccess tests plugin accessing context resources
|
||
|
|
func TestPluginContextAccess(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
resourcePlugin := &ResourceAccessPlugin{}
|
||
|
|
s := NewServlet()
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
Plugins: []Plugin{resourcePlugin},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = s.Init(ctx)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
assert.True(t, resourcePlugin.redisAccessible)
|
||
|
|
assert.True(t, resourcePlugin.queuesAccessible)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Test helper plugins
|
||
|
|
|
||
|
|
// TestPlugin implements the Plugin interface for testing
|
||
|
|
type TestPlugin struct {
|
||
|
|
initCalled bool
|
||
|
|
startCalled bool
|
||
|
|
stopCalled bool
|
||
|
|
}
|
||
|
|
|
||
|
|
func (tp *TestPlugin) Name() string {
|
||
|
|
return "TestPlugin"
|
||
|
|
}
|
||
|
|
|
||
|
|
func (tp *TestPlugin) Init(ctx *Context) error {
|
||
|
|
tp.initCalled = true
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (tp *TestPlugin) Start(ctx *Context) error {
|
||
|
|
tp.startCalled = true
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (tp *TestPlugin) Stop() error {
|
||
|
|
tp.stopCalled = true
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// ErrorPlugin implements the Plugin interface and can fail at specified stage
|
||
|
|
type ErrorPlugin struct {
|
||
|
|
failAt string // "init" or "start"
|
||
|
|
}
|
||
|
|
|
||
|
|
func (ep *ErrorPlugin) Name() string {
|
||
|
|
return "ErrorPlugin"
|
||
|
|
}
|
||
|
|
|
||
|
|
func (ep *ErrorPlugin) Init(ctx *Context) error {
|
||
|
|
if ep.failAt == "init" {
|
||
|
|
return &ErrorType{message: "init error"}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (ep *ErrorPlugin) Start(ctx *Context) error {
|
||
|
|
if ep.failAt == "start" {
|
||
|
|
return &ErrorType{message: "start error"}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (ep *ErrorPlugin) Stop() error {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// ErrorType for testing
|
||
|
|
type ErrorType struct {
|
||
|
|
message string
|
||
|
|
}
|
||
|
|
|
||
|
|
func (e *ErrorType) Error() string {
|
||
|
|
return e.message
|
||
|
|
}
|
||
|
|
|
||
|
|
// ResourceAccessPlugin tests accessing servlet resources through context
|
||
|
|
type ResourceAccessPlugin struct {
|
||
|
|
redisAccessible bool
|
||
|
|
queuesAccessible bool
|
||
|
|
}
|
||
|
|
|
||
|
|
func (rap *ResourceAccessPlugin) Name() string {
|
||
|
|
return "ResourceAccessPlugin"
|
||
|
|
}
|
||
|
|
|
||
|
|
func (rap *ResourceAccessPlugin) Init(ctx *Context) error {
|
||
|
|
// Test Redis access
|
||
|
|
redis := ctx.Redis()
|
||
|
|
if redis != nil {
|
||
|
|
rap.redisAccessible = true
|
||
|
|
}
|
||
|
|
|
||
|
|
// Test Queues access
|
||
|
|
queues := ctx.Queues()
|
||
|
|
if queues != nil {
|
||
|
|
rap.queuesAccessible = true
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (rap *ResourceAccessPlugin) Start(ctx *Context) error {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (rap *ResourceAccessPlugin) Stop() error {
|
||
|
|
return nil
|
||
|
|
}
|