272 lines
6.1 KiB
Go
272 lines
6.1 KiB
Go
|
|
package taskq
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"testing"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
)
|
||
|
|
|
||
|
|
// TestDefault tests Default function returns default servlet
|
||
|
|
func TestDefault(t *testing.T) {
|
||
|
|
servlet := Default()
|
||
|
|
assert.NotNil(t, servlet)
|
||
|
|
assert.Equal(t, defaultServlet, servlet)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestSetDefault tests SetDefault function
|
||
|
|
func TestSetDefault(t *testing.T) {
|
||
|
|
originalDefault := defaultServlet
|
||
|
|
defer func() {
|
||
|
|
defaultServlet = originalDefault
|
||
|
|
}()
|
||
|
|
|
||
|
|
newServlet := NewServlet()
|
||
|
|
SetDefault(newServlet)
|
||
|
|
|
||
|
|
assert.Equal(t, newServlet, Default())
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestConfigureDefault tests Configure function with default servlet
|
||
|
|
func TestConfigureDefault(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestConfigureDefaultMissingRedis tests Configure without Redis
|
||
|
|
func TestConfigureDefaultMissingRedis(t *testing.T) {
|
||
|
|
cfg := Config{
|
||
|
|
Redis: nil,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.Error(t, err)
|
||
|
|
assert.Equal(t, "taskq: redis client is required", err.Error())
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestInitDefault tests Init function with default servlet
|
||
|
|
func TestInitDefault(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestStartDefault tests Start function with default servlet
|
||
|
|
|
||
|
|
// TestStopDefault tests Stop function with default servlet
|
||
|
|
func TestStopDefault(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
// Configure and initialize default servlet
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Starting the real asynq server is an integration operation that
|
||
|
|
// can spawn background goroutines and is flaky in unit tests.
|
||
|
|
t.Skip("skipping Start/Stop in unit tests; run integration tests separately")
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestStartBeforeInit tests Start called without Init
|
||
|
|
func TestStartBeforeInit(t *testing.T) {
|
||
|
|
// Starting a fresh Servlet without Configure is unsafe (would start with nil Redis client).
|
||
|
|
// We avoid calling Start here to prevent background goroutine panics; assert precondition instead.
|
||
|
|
newServlet := NewServlet()
|
||
|
|
if newServlet.RedisClient() == nil {
|
||
|
|
t.Log("New servlet has no Redis client; Start would be unsafe — skipping actual Start call")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
// If a Redis client is present unexpectedly, attempt Start and ensure no error.
|
||
|
|
err := newServlet.Start(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Logf("Start returned error: %v", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestFullLifecycleWithDefaultServlet tests complete lifecycle using package-level functions
|
||
|
|
func TestFullLifecycleWithDefaultServlet(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(ctx context.Context) error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
// Configure
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Initialize
|
||
|
|
ctx := context.Background()
|
||
|
|
err = Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Starting the real asynq server is an integration operation that
|
||
|
|
// can spawn background goroutines and is flaky in unit tests.
|
||
|
|
t.Skip("skipping Start/Stop lifecycle in unit tests; run integration tests separately")
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestConfigureWithTasks tests Configure with multiple tasks
|
||
|
|
func TestConfigureWithTasks(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
tasks := []*Task{
|
||
|
|
{
|
||
|
|
Name: "task1",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
},
|
||
|
|
{
|
||
|
|
Name: "task2",
|
||
|
|
Queue: "high",
|
||
|
|
Priority: 10,
|
||
|
|
Handler: func(ctx context.Context, data struct{ Value string }) error { return nil },
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: tasks,
|
||
|
|
}
|
||
|
|
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Verify tasks were registered
|
||
|
|
servlet := Default()
|
||
|
|
queues := servlet.Queues()
|
||
|
|
assert.Equal(t, 2, len(queues))
|
||
|
|
assert.Equal(t, 10, queues["high"])
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestConfigureWithPlugins tests Configure with plugins
|
||
|
|
func TestConfigureWithPlugins(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
mockPlugin := &MockPlugin{}
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
Plugins: []Plugin{mockPlugin},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
assert.True(t, mockPlugin.initCalled)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestInitBeforeConfigure tests Init called without Configure
|
||
|
|
func TestInitBeforeConfigure(t *testing.T) {
|
||
|
|
// Init should be safe to call even if Configure was not called (no plugins)
|
||
|
|
newServlet := NewServlet()
|
||
|
|
err := newServlet.Init(context.Background())
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestMultipleConfigure tests that Configure can be called multiple times
|
||
|
|
// (though this might reset previous configuration)
|
||
|
|
func TestMultipleConfigure(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
|
||
|
|
task1 := &Task{
|
||
|
|
Name: "task1",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg1 := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task1},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg1)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
assert.Equal(t, 1, len(s.handlers))
|
||
|
|
|
||
|
|
task2 := &Task{
|
||
|
|
Name: "task2",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg2 := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task1, task2},
|
||
|
|
}
|
||
|
|
|
||
|
|
err = s.Configure(cfg2)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
assert.Equal(t, 2, len(s.handlers))
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestContextCancellation tests that Start respects context cancellation
|
||
|
|
func TestContextCancellation(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = s.Init(ctx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Use a cancellable context and cancel immediately to ensure Start handles cancellation
|
||
|
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
||
|
|
cancel()
|
||
|
|
|
||
|
|
// Start with already-cancelled context; Start should return quickly and not leak
|
||
|
|
err = s.Start(cancelCtx)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|