Skip to content

Commit

Permalink
add priority chats to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
graynk committed Jul 21, 2024
1 parent 8b820f6 commit 8591ee6
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
11 changes: 10 additions & 1 deletion app/distortioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,22 @@ func main() {
if codec == "" {
codec = "libx264"
}
priorityChatsStr := strings.Split(os.Getenv("DISTORTIONER_PRIORITY_CHATS"), ",")
priorityChats := make([]int64, len(priorityChatsStr))
for i, s := range priorityChatsStr {
priorityChats[i], err = strconv.ParseInt(s, 10, 64)
if err != nil {
logger.Fatal(err)
}
}

d := DistorterBot{
adminID: adminID,
rl: tools.NewRateLimiter(),
logger: logger,
mu: &sync.Mutex{},
graceWg: &sync.WaitGroup{},
videoWorker: tools.NewVideoWorker(3),
videoWorker: tools.NewVideoWorker(3, priorityChats),
codec: codec,
}
b.Poller = tb.NewMiddlewarePoller(&tb.LongPoller{Timeout: 10 * time.Second}, func(update *tb.Update) bool {
Expand Down
31 changes: 20 additions & 11 deletions app/queue/honest_priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,25 @@ import (
// Wraps PriorityQueue to make it thread-safe. Manages priorities.
// Extremely inefficient, but works for my use-case (very slow jobs and small queue sizes)
type HonestJobQueue struct {
mu *sync.RWMutex
queue PriorityQueue
users map[int64]int // Tracks the amount of job per-user currently in the queue. Used to calculate priority
banned map[int64]any // Drop jobs from these users
maintenance bool
mu *sync.RWMutex
queue PriorityQueue
users map[int64]int // Tracks the amount of job per-user currently in the queue. Used to calculate priority
banned map[int64]any // Drop jobs from these users
maintenance bool
priorityChats map[int64]any // not very honest of an honest job queue, but I don't care, I'm not waiting with everybody else
}

func NewHonestJobQueue(initialCapacity int) *HonestJobQueue {
func NewHonestJobQueue(initialCapacity int, priorityChats []int64) *HonestJobQueue {
priorityChatsMap := make(map[int64]any)
for _, chat := range priorityChats {
priorityChatsMap[chat] = nil
}
return &HonestJobQueue{
mu: &sync.RWMutex{},
queue: make(PriorityQueue, 0, initialCapacity),
users: make(map[int64]int),
banned: make(map[int64]any),
mu: &sync.RWMutex{},
queue: make(PriorityQueue, 0, initialCapacity),
users: make(map[int64]int),
banned: make(map[int64]any),
priorityChats: priorityChatsMap,
}
}

Expand Down Expand Up @@ -117,8 +123,11 @@ func (hjq *HonestJobQueue) Push(userID int64, runnable func()) error {
if hjq.queue.Len() > 2000 {
return errors.New("There are too many items queued already, try again later")
}

priority := hjq.users[userID]
_, ok := hjq.priorityChats[userID]
if ok {
priority = -2
}

if priority > 2 {
hjq.users[userID]--
Expand Down
4 changes: 2 additions & 2 deletions app/queue/honest_priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestHonestJobQueue_InsertionOrder(t *testing.T) {
hjq := NewHonestJobQueue(50)
hjq := NewHonestJobQueue(50, []int64{})
for id := int64(1); id < 4; id++ {
hjq.Push(id, func() {})
}
Expand All @@ -20,7 +20,7 @@ func TestHonestJobQueue_InsertionOrder(t *testing.T) {
}

func TestHonestJobQueue_RepeatUsers(t *testing.T) {
hjq := NewHonestJobQueue(50)
hjq := NewHonestJobQueue(50, []int64{})

// three jobs by user 1
for i := 0; i < 3; i++ {
Expand Down
4 changes: 2 additions & 2 deletions app/tools/video_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ type VideoWorker struct {
workerCount int
}

func NewVideoWorker(workerCount int) *VideoWorker {
func NewVideoWorker(workerCount int, priorityChats []int64) *VideoWorker {
capacity := 300
worker := VideoWorker{
queue: queue.NewHonestJobQueue(capacity),
queue: queue.NewHonestJobQueue(capacity, priorityChats),
messenger: make(chan interface{}, capacity),
workerCount: workerCount,
}
Expand Down

0 comments on commit 8591ee6

Please sign in to comment.