Skip to content

Commit

Permalink
feat: optimizing delay tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ifooth committed Nov 25, 2024
1 parent 5a64864 commit a03b6a9
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions bcs-common/common/task/brokers/etcd/delay_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"golang.org/x/sync/errgroup"
)

const (
// delayTaskMaxETA is the maximum ETA for a watch delayed task
delayTaskMaxETA = time.Minute * 10
)

type delayTask struct {
key string // {delayedTaskPrefix}/eta-{ms}/{queue}/{taskID}
taskKey string // {queue}/{taskID}
Expand Down Expand Up @@ -62,7 +67,7 @@ func makeDelayTask(kv *mvccpb.KeyValue) (*delayTask, error) {

func (b *etcdBroker) listWatchDelayedTask(ctx context.Context) error {
keyPrefix := fmt.Sprintf("%s/eta-", delayedTaskPrefix)
endTime := time.Now().Add(time.Minute * 10)
endTime := time.Now().Add(delayTaskMaxETA)

// List
listCtx, listCancel := context.WithTimeout(ctx, time.Second*10)
Expand Down Expand Up @@ -90,7 +95,7 @@ func (b *etcdBroker) listWatchDelayedTask(ctx context.Context) error {
b.delayMtx.Unlock()

// Watch
watchCtx, watchCancel := context.WithTimeout(ctx, time.Minute*10)
watchCtx, watchCancel := context.WithTimeout(ctx, delayTaskMaxETA)
defer watchCancel()

eg, egCtx := errgroup.WithContext(watchCtx)
Expand Down Expand Up @@ -152,8 +157,8 @@ func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {

m := concurrency.NewMutex(s, delayedTaskLockKey)

// 最长等待10m获取锁
lockCtx, lockCancel := context.WithTimeout(ctx, time.Minute*10)
// 最长等待watch时间获取锁
lockCtx, lockCancel := context.WithTimeout(ctx, delayTaskMaxETA)
defer lockCancel()
if err = m.Lock(lockCtx); err != nil {
return err
Expand Down

0 comments on commit a03b6a9

Please sign in to comment.