diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index dec1c6d..d96951e 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,12 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 + taskOperationChunkSize = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { @@ -162,10 +163,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func } func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) { - taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []*task.Task{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID) if err != nil { return nil, err } @@ -938,10 +936,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) b.deadletterQueueKey(queueUID), b.pendingTaskQueueKey(queueUID), } - taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []string{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID) if err != nil { return []string{}, err } @@ -950,3 +945,23 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) } return keysToDelete, nil } + +func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) { + var cursor uint64 + var taskUIDs []string + for { + keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", taskOperationChunkSize).Result() + if err == redis.Nil { + return []string{}, nil + } + if err != nil { + return []string{}, err + } + taskUIDs = append(taskUIDs, keys...) + cursor = nextCursor + if cursor == 0 { + break + } + } + return taskUIDs, nil +}