Files
taskq/task_test.go

387 lines
8.1 KiB
Go
Raw Permalink Normal View History

package taskq
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestTaskPublishMissingClient tests Publish without a configured client
func TestTaskPublishMissingClient(t *testing.T) {
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func() error { return nil },
}
ctx := context.Background()
err := task.Publish(ctx, struct{}{})
assert.Error(t, err)
// Error message may vary depending on implementation
assert.True(t, len(err.Error()) > 0)
}
// TestTaskPublishWithServlet tests Publish with servlet client
func TestTaskPublishWithServlet(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test data"})
assert.NoError(t, err)
}
// TestTaskPublishWithDefaultServlet tests Publish with default servlet
func TestTaskPublishWithDefaultServlet(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := defaultServlet.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test data"})
assert.NoError(t, err)
}
// TestTaskPublishWithOptions tests Publish with various options
func TestTaskPublishWithOptions(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
// Test with delay
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second))
assert.NoError(t, err)
// Test with TTR override
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(30*time.Second))
assert.NoError(t, err)
// Test with retention
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(1*time.Hour))
assert.NoError(t, err)
// Test with delay until
futureTime := time.Now().Add(1 * time.Hour)
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime))
assert.NoError(t, err)
}
// TestTaskPublishInvalidData tests Publish with non-serializable data
func TestTaskPublishInvalidData(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func() error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
// Publish with channel (non-serializable)
err = task.Publish(ctx, make(chan int))
assert.Error(t, err)
}
// TestTaskPublishWithGroup tests Publish with group
func TestTaskPublishWithGroup(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Group: "test_group",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
assert.NoError(t, err)
}
// TestTaskPublishWithTTR tests Publish with TTR
func TestTaskPublishWithTTR(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
TTR: 30 * time.Second,
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
assert.NoError(t, err)
}
// TestTaskPublishMultipleTimes tests publishing the same task multiple times
func TestTaskPublishMultipleTimes(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
// Publish multiple times
for i := 0; i < 5; i++ {
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
assert.NoError(t, err)
}
}
// TestTaskPublishVariousDataTypes tests publishing with different data types
func TestTaskPublishVariousDataTypes(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
testCases := []struct {
name string
handler interface{}
data interface{}
}{
{
name: "struct_data",
handler: func(data struct{ Key string }) error { return nil },
data: struct{ Key string }{Key: "value"},
},
{
name: "no_params",
handler: func() error { return nil },
data: struct{}{},
},
{
name: "context_only",
handler: func(ctx context.Context) error { return nil },
data: struct{}{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
task := &Task{
Name: "test_task_" + tc.name,
Queue: "default",
Handler: tc.handler,
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, tc.data)
assert.NoError(t, err)
})
}
}
// TestDelayOption tests Delay publish option
func TestDelayOption(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second))
assert.NoError(t, err)
}
// TestDelayUntilOption tests DelayUntil publish option
func TestDelayUntilOption(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
futureTime := time.Now().Add(10 * time.Minute)
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime))
assert.NoError(t, err)
}
// TestTTROption tests TTR publish option
func TestTTROption(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(60*time.Second))
assert.NoError(t, err)
}
// TestRetentionOption tests Retention publish option
func TestRetentionOption(t *testing.T) {
rdb := getTestRedis(t)
defer rdb.Close()
s := NewServlet()
task := &Task{
Name: "test_task",
Queue: "default",
Handler: func(data struct{ Value string }) error { return nil },
servlet: s,
}
cfg := Config{
Redis: rdb,
Tasks: []*Task{task},
}
err := s.Configure(cfg)
require.NoError(t, err)
ctx := context.Background()
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(24*time.Hour))
assert.NoError(t, err)
}