387 lines
8.1 KiB
Go
387 lines
8.1 KiB
Go
|
|
package taskq
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
)
|
||
|
|
|
||
|
|
// TestTaskPublishMissingClient tests Publish without a configured client
|
||
|
|
func TestTaskPublishMissingClient(t *testing.T) {
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err := task.Publish(ctx, struct{}{})
|
||
|
|
assert.Error(t, err)
|
||
|
|
// Error message may vary depending on implementation
|
||
|
|
assert.True(t, len(err.Error()) > 0)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishWithServlet tests Publish with servlet client
|
||
|
|
func TestTaskPublishWithServlet(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test data"})
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishWithDefaultServlet tests Publish with default servlet
|
||
|
|
func TestTaskPublishWithDefaultServlet(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := defaultServlet.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test data"})
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishWithOptions tests Publish with various options
|
||
|
|
func TestTaskPublishWithOptions(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
// Test with delay
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Test with TTR override
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(30*time.Second))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Test with retention
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(1*time.Hour))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
|
||
|
|
// Test with delay until
|
||
|
|
futureTime := time.Now().Add(1 * time.Hour)
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishInvalidData tests Publish with non-serializable data
|
||
|
|
func TestTaskPublishInvalidData(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func() error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
// Publish with channel (non-serializable)
|
||
|
|
err = task.Publish(ctx, make(chan int))
|
||
|
|
assert.Error(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishWithGroup tests Publish with group
|
||
|
|
func TestTaskPublishWithGroup(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Group: "test_group",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishWithTTR tests Publish with TTR
|
||
|
|
func TestTaskPublishWithTTR(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
TTR: 30 * time.Second,
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishMultipleTimes tests publishing the same task multiple times
|
||
|
|
func TestTaskPublishMultipleTimes(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
|
||
|
|
// Publish multiple times
|
||
|
|
for i := 0; i < 5; i++ {
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"})
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTaskPublishVariousDataTypes tests publishing with different data types
|
||
|
|
func TestTaskPublishVariousDataTypes(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
|
||
|
|
testCases := []struct {
|
||
|
|
name string
|
||
|
|
handler interface{}
|
||
|
|
data interface{}
|
||
|
|
}{
|
||
|
|
{
|
||
|
|
name: "struct_data",
|
||
|
|
handler: func(data struct{ Key string }) error { return nil },
|
||
|
|
data: struct{ Key string }{Key: "value"},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "no_params",
|
||
|
|
handler: func() error { return nil },
|
||
|
|
data: struct{}{},
|
||
|
|
},
|
||
|
|
{
|
||
|
|
name: "context_only",
|
||
|
|
handler: func(ctx context.Context) error { return nil },
|
||
|
|
data: struct{}{},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, tc := range testCases {
|
||
|
|
t.Run(tc.name, func(t *testing.T) {
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task_" + tc.name,
|
||
|
|
Queue: "default",
|
||
|
|
Handler: tc.handler,
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, tc.data)
|
||
|
|
assert.NoError(t, err)
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestDelayOption tests Delay publish option
|
||
|
|
func TestDelayOption(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestDelayUntilOption tests DelayUntil publish option
|
||
|
|
func TestDelayUntilOption(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
futureTime := time.Now().Add(10 * time.Minute)
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestTTROption tests TTR publish option
|
||
|
|
func TestTTROption(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(60*time.Second))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestRetentionOption tests Retention publish option
|
||
|
|
func TestRetentionOption(t *testing.T) {
|
||
|
|
rdb := getTestRedis(t)
|
||
|
|
defer rdb.Close()
|
||
|
|
|
||
|
|
s := NewServlet()
|
||
|
|
task := &Task{
|
||
|
|
Name: "test_task",
|
||
|
|
Queue: "default",
|
||
|
|
Handler: func(data struct{ Value string }) error { return nil },
|
||
|
|
servlet: s,
|
||
|
|
}
|
||
|
|
|
||
|
|
cfg := Config{
|
||
|
|
Redis: rdb,
|
||
|
|
Tasks: []*Task{task},
|
||
|
|
}
|
||
|
|
|
||
|
|
err := s.Configure(cfg)
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
ctx := context.Background()
|
||
|
|
err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(24*time.Hour))
|
||
|
|
assert.NoError(t, err)
|
||
|
|
}
|