Skip to content

Commit

Permalink
feat: optimizing delay tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ifooth committed Nov 25, 2024
1 parent 6132c26 commit 7ffe5de
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 85 deletions.
270 changes: 270 additions & 0 deletions bcs-common/common/task/brokers/etcd/delay_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package etcd

import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/RichardKnop/machinery/v2/log"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"golang.org/x/sync/errgroup"
)

const (
// delayTaskMaxETA is the maximum eta duration for list&watch delayed task
delayTaskMaxETA = time.Minute * 10
)

type delayTask struct {
key string // {delayedTaskPrefix}/eta-{ms}/{queue}/{taskID}
taskKey string // {queue}/{taskID}
eta time.Time // eta-{ms}
kv *mvccpb.KeyValue
bindValue *mvccpb.KeyValue
}

func makeDelayTask(kv *mvccpb.KeyValue) (*delayTask, error) {
key := string(kv.Key)
// {delayedTaskPrefix}/eta-{ms}/{queue}/{taskID}
parts := strings.Split(key, "/")
if len(parts) != 8 {
return nil, fmt.Errorf("invalid key")
}

taskKey := fmt.Sprintf("%s/%s", parts[6], parts[7])
etaStr := parts[5][4:]

etaMilli, err := strconv.Atoi(etaStr)
if err != nil {
return nil, fmt.Errorf("invalid eta")
}

task := &delayTask{
key: key,
taskKey: taskKey,
eta: time.UnixMilli(int64(etaMilli)),
kv: kv,
}
return task, nil
}

func (b *etcdBroker) listWatchDelayedTask(ctx context.Context) error {
keyPrefix := fmt.Sprintf("%s/eta-", delayedTaskPrefix)
endTime := time.Now().Add(delayTaskMaxETA)

// List
listCtx, listCancel := context.WithTimeout(ctx, time.Second*10)
defer listCancel()

end := strconv.FormatInt(endTime.UnixMilli(), 10)
rangeOpts := []clientv3.OpOption{clientv3.WithRange(keyPrefix + end)}
resp, err := b.client.Get(listCtx, keyPrefix+"0", rangeOpts...)
if err != nil {
return err
}

b.delayedMtx.Lock()
// 清空数据
b.delayedTask = make(map[string]*delayTask)

for _, ev := range resp.Kvs {
task, err := makeDelayTask(ev)
if err != nil {
log.ERROR.Printf("make delay task %s failed, err: %s", ev.Key, err)
continue
}
b.delayedTask[task.key] = task
}
b.delayedMtx.Unlock()

// Watch
watchCtx, watchCancel := context.WithTimeout(ctx, delayTaskMaxETA)
defer watchCancel()

eg, egCtx := errgroup.WithContext(watchCtx)
eg.Go(func() error {
watchOpts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithKeysOnly(),
clientv3.WithRev(resp.Header.Revision),
}
wc := b.client.Watch(egCtx, keyPrefix, watchOpts...)
for wresp := range wc {
if wresp.Err() != nil {
return wresp.Err()
}

b.delayedMtx.Lock()
for _, ev := range wresp.Events {
task, err := makeDelayTask(ev.Kv)
if err != nil {
log.ERROR.Printf("make delay task %s failed, err: %s", ev.Kv.Key, err)
continue
}
if ev.Type == clientv3.EventTypeDelete {
delete(b.delayedTask, task.key)
}

if ev.Type == clientv3.EventTypePut {
b.delayedTask[task.key] = task
}
}
b.delayedMtx.Unlock()
}
return nil
})

eg.Go(func() error {
tick := time.NewTicker(time.Second)
defer tick.Stop()

for {
select {
case <-egCtx.Done():
return nil
case <-tick.C:
b.handleDelayTask(egCtx)
}
}
})

return eg.Wait()
}

func (b *etcdBroker) handleDelayedTask(ctx context.Context) error {
s, err := concurrency.NewSession(b.client)
if err != nil {
return err
}
defer s.Close() // nolint

m := concurrency.NewMutex(s, delayedTaskLockKey)

// 最长等待watch时间获取锁
lockCtx, lockCancel := context.WithTimeout(ctx, delayTaskMaxETA)
defer lockCancel()

log.INFO.Printf("try acquire delayed task lock")
if err = m.Lock(lockCtx); err != nil {
log.INFO.Printf("try acquire delayed task lock failed, err: %s", err)
return err
}
log.INFO.Printf("acquire delayed task lock done")

defer func() {
unlockCtx, unlockCancel := context.WithTimeout(context.Background(), time.Second*5)
defer unlockCancel()

if err = m.Unlock(unlockCtx); err != nil {
log.ERROR.Printf("unlock delayed task failed, err: %s", err)
}
}()

log.INFO.Printf("start handle delayed task")
err = b.listWatchDelayedTask(ctx)
log.INFO.Printf("handle delayed task done, err: %v", err)

return err
}

func (b *etcdBroker) handleDelayTask(ctx context.Context) {
now := time.Now()
taskList := []*delayTask{}

b.delayedMtx.Lock()
for _, task := range b.delayedTask {
if task.eta.Before(now) {
taskList = append(taskList, task)
}
}
b.delayedMtx.Unlock()

// 最老的任务最快处理
sort.Slice(taskList, func(i, j int) bool {
return taskList[i].eta.Before(taskList[j].eta)
})

for _, task := range taskList {
// 超时控制
select {
case <-ctx.Done():
return
default:
}

if err := b.ensureDelayTaskBody(ctx, task); err != nil {
log.ERROR.Printf("ensure delay task body %s failed, diff=%s, err=%s", task.key, time.Since(task.eta), err)
continue
}

if err := b.moveToPendingTask(ctx, task); err != nil {
log.ERROR.Printf("move delay task %s failed, diff=%s, err=%s", task.key, time.Since(task.eta), err)
continue
}

b.delayedMtx.Lock()
delete(b.delayedTask, task.key)
b.delayedMtx.Unlock()
}
}

func (b *etcdBroker) ensureDelayTaskBody(ctx context.Context, task *delayTask) error {
if task.bindValue != nil {
return nil
}

delayKeyNotChange := clientv3.Compare(clientv3.ModRevision(task.key), "=", task.kv.ModRevision)
getReq := clientv3.OpGet(task.key)
resp, err := b.client.Txn(ctx).If(delayKeyNotChange).Then(getReq).Commit()
if err != nil {
return err
}

if len(resp.Responses) != 1 {
return fmt.Errorf("tnx resp invalid, count=%d", len(resp.Responses))
}

getResp := resp.Responses[0].GetResponseRange()
if len(getResp.Kvs) == 0 || len(getResp.Kvs[0].Value) == 0 {
return fmt.Errorf("have no body")
}

task.bindValue = getResp.Kvs[0]
return nil
}

func (b *etcdBroker) moveToPendingTask(ctx context.Context, task *delayTask) error {
pendingKey := fmt.Sprintf("%s/%s", pendingTaskPrefix, task.taskKey)
delayKeyNotChange := clientv3.Compare(clientv3.ModRevision(task.key), "=", task.kv.ModRevision)
pendingKeyNotExist := clientv3.Compare(clientv3.CreateRevision(pendingKey), "=", 0)

deleteReq := clientv3.OpDelete(task.key)
putReq := clientv3.OpPut(pendingKey, string(task.kv.Value))
c, err := b.client.Txn(ctx).If(delayKeyNotChange, pendingKeyNotExist).Then(deleteReq, putReq).Commit()
if err != nil {
return err
}
if !c.Succeeded {
return fmt.Errorf("txn not success, maybe key conflict, will retry later")
}

log.DEBUG.Printf("move delay task %s to pending queue done, diff=%s", task.key, time.Since(task.eta))
return nil
}
63 changes: 63 additions & 0 deletions bcs-common/common/task/brokers/etcd/delay_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package etcd

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.etcd.io/etcd/api/v3/mvccpb"
)

func TestMakeDelayTask(t *testing.T) {
tests := []struct {
name string
input string
taskKey string
eta time.Time
}{
{
name: "delayed_task1",
input: "/machinery/v2/broker/delayed_tasks/eta-0/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(0),
},
{
name: "delayed_task1",
input: "/machinery/v2/broker/delayed_tasks/eta-0/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(0),
},
{
name: "delayed_task3",
input: "/machinery/v2/broker/delayed_tasks/eta-1732356480583/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(1732356480583),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kv := &mvccpb.KeyValue{Key: []byte(tt.input)}
task, err := makeDelayTask(kv)
require.NoError(t, err)

assert.Equal(t, tt.input, task.key)
assert.Equal(t, tt.taskKey, task.taskKey)
assert.Equal(t, tt.eta, task.eta)
})
}
}
2 changes: 0 additions & 2 deletions bcs-common/common/task/brokers/etcd/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ func (d *deliver) Ack() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

time.Sleep(time.Millisecond * 2000)

pendingKey := fmt.Sprintf("%s/%s", pendingTaskPrefix, d.key)
_, err := d.client.Delete(ctx, pendingKey)
if err != nil {
Expand Down
Loading

0 comments on commit 7ffe5de

Please sign in to comment.