package taskq import ( "context" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestTaskPublishMissingClient tests Publish without a configured client func TestTaskPublishMissingClient(t *testing.T) { task := &Task{ Name: "test_task", Queue: "default", Handler: func() error { return nil }, } ctx := context.Background() err := task.Publish(ctx, struct{}{}) assert.Error(t, err) // Error message may vary depending on implementation assert.True(t, len(err.Error()) > 0) } // TestTaskPublishWithServlet tests Publish with servlet client func TestTaskPublishWithServlet(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test data"}) assert.NoError(t, err) } // TestTaskPublishWithDefaultServlet tests Publish with default servlet func TestTaskPublishWithDefaultServlet(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := defaultServlet.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test data"}) assert.NoError(t, err) } // TestTaskPublishWithOptions tests Publish with various options func TestTaskPublishWithOptions(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() // Test with delay err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second)) assert.NoError(t, err) // Test with TTR override err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(30*time.Second)) assert.NoError(t, err) // Test with retention err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(1*time.Hour)) assert.NoError(t, err) // Test with delay until futureTime := time.Now().Add(1 * time.Hour) err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime)) assert.NoError(t, err) } // TestTaskPublishInvalidData tests Publish with non-serializable data func TestTaskPublishInvalidData(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func() error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() // Publish with channel (non-serializable) err = task.Publish(ctx, make(chan int)) assert.Error(t, err) } // TestTaskPublishWithGroup tests Publish with group func TestTaskPublishWithGroup(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Group: "test_group", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test"}) assert.NoError(t, err) } // TestTaskPublishWithTTR tests Publish with TTR func TestTaskPublishWithTTR(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", TTR: 30 * time.Second, Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test"}) assert.NoError(t, err) } // TestTaskPublishMultipleTimes tests publishing the same task multiple times func TestTaskPublishMultipleTimes(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() // Publish multiple times for i := 0; i < 5; i++ { err = task.Publish(ctx, struct{ Value string }{Value: "test"}) assert.NoError(t, err) } } // TestTaskPublishVariousDataTypes tests publishing with different data types func TestTaskPublishVariousDataTypes(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() testCases := []struct { name string handler interface{} data interface{} }{ { name: "struct_data", handler: func(data struct{ Key string }) error { return nil }, data: struct{ Key string }{Key: "value"}, }, { name: "no_params", handler: func() error { return nil }, data: struct{}{}, }, { name: "context_only", handler: func(ctx context.Context) error { return nil }, data: struct{}{}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { task := &Task{ Name: "test_task_" + tc.name, Queue: "default", Handler: tc.handler, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, tc.data) assert.NoError(t, err) }) } } // TestDelayOption tests Delay publish option func TestDelayOption(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Delay(5*time.Second)) assert.NoError(t, err) } // TestDelayUntilOption tests DelayUntil publish option func TestDelayUntilOption(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() futureTime := time.Now().Add(10 * time.Minute) err = task.Publish(ctx, struct{ Value string }{Value: "test"}, DelayUntil(futureTime)) assert.NoError(t, err) } // TestTTROption tests TTR publish option func TestTTROption(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test"}, TTR(60*time.Second)) assert.NoError(t, err) } // TestRetentionOption tests Retention publish option func TestRetentionOption(t *testing.T) { rdb := getTestRedis(t) defer rdb.Close() s := NewServlet() task := &Task{ Name: "test_task", Queue: "default", Handler: func(data struct{ Value string }) error { return nil }, servlet: s, } cfg := Config{ Redis: rdb, Tasks: []*Task{task}, } err := s.Configure(cfg) require.NoError(t, err) ctx := context.Background() err = task.Publish(ctx, struct{ Value string }{Value: "test"}, Retention(24*time.Hour)) assert.NoError(t, err) }