From f1227b11fee2c929902ee8953d5e264ba6bc219a Mon Sep 17 00:00:00 2001 From: Norbert Kwizera Date: Tue, 12 Dec 2023 17:01:47 +0200 Subject: [PATCH] Make kannel channel paused when we get Queued message in response --- handlers/kannel/handler.go | 18 +++++++++++++++--- handlers/kannel/handler_test.go | 11 +++++++++++ handlers/whatsapp_legacy/handler.go | 7 ++----- queue/queue_test.go | 9 +++------ 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/handlers/kannel/handler.go b/handlers/kannel/handler.go index c249a6fef..9e78c3d0b 100644 --- a/handlers/kannel/handler.go +++ b/handlers/kannel/handler.go @@ -200,13 +200,25 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, clog *courier.Ch } var resp *http.Response + var respBody []byte if verifySSL { - resp, _, err = h.RequestHTTP(req, clog) + resp, respBody, err = h.RequestHTTP(req, clog) } else { - resp, _, err = h.RequestHTTPInsecure(req, clog) + resp, respBody, err = h.RequestHTTPInsecure(req, clog) } - status := h.Backend().NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusErrored, clog) + + if strings.Contains(string(respBody), "Queued") { + rc := h.Backend().RedisPool().Get() + defer rc.Close() + rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID()) + rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID()) + // We pause sending 30 seconds so the connection to the SMSC is reset + rc.Do("SET", rateLimitKey, "engaged", "EX", 30) + rc.Do("SET", rateLimitBulkKey, "engaged", "EX", 30) + // the message reached kannel and we should not return an error here, so continue to get the status below + } + if err == nil && resp.StatusCode/100 == 2 { status.SetStatus(courier.MsgStatusWired) } diff --git a/handlers/kannel/handler_test.go b/handlers/kannel/handler_test.go index 0698849a8..7dc5b2617 100644 --- a/handlers/kannel/handler_test.go +++ b/handlers/kannel/handler_test.go @@ -199,6 +199,17 @@ var defaultSendTestCases = []OutgoingTestCase{ ExpectedURLParams: map[string]string{"text": `Error Message`, "to": "+250788383383", "coding": "", "priority": ""}, SendPrep: setSendURL, }, + { + Label: "Rate Limit Engaged", + MsgText: "Hello", + MsgURN: "tel:+250788383383", + MsgHighPriority: false, + ExpectedMsgStatus: "W", + MockResponseBody: "3: Queued for later delivery", + MockResponseStatus: 202, + ExpectedURLParams: map[string]string{"text": `Hello`, "to": "+250788383383", "coding": "", "priority": ""}, + SendPrep: setSendURL, + }, { Label: "Custom Params", MsgText: "Custom Params", diff --git a/handlers/whatsapp_legacy/handler.go b/handlers/whatsapp_legacy/handler.go index 40fffe999..88e8378b7 100644 --- a/handlers/whatsapp_legacy/handler.go +++ b/handlers/whatsapp_legacy/handler.go @@ -906,12 +906,11 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) { rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID()) - rc.Do("SET", rateLimitKey, "engaged") // The rate limit is 50 requests per second // We pause sending 2 seconds so the limit count is reset // TODO: In the future we should the header value when available - rc.Do("EXPIRE", rateLimitKey, 2) + rc.Do("SET", rateLimitKey, "engaged", "EX", 2) return "", "", errors.New("received rate-limit response from send endpoint") } @@ -923,12 +922,10 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u if err == nil && len(errPayload.Errors) > 0 { if hasTiersError(*errPayload) { rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID()) - rc.Do("SET", rateLimitBulkKey, "engaged") // The WA tiers spam rate limit hit // We pause the bulk queue for 24 hours and 5min - rc.Do("EXPIRE", rateLimitBulkKey, (60*60*24)+(5*60)) - + rc.Do("SET", rateLimitBulkKey, "engaged", "EX", (60*60*24)+(5*60)) err := errors.Errorf("received error from send endpoint: %s", errPayload.Errors[0].Title) return "", "", err } diff --git a/queue/queue_test.go b/queue/queue_test.go index 5f6d11eeb..caef0b70e 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -60,8 +60,7 @@ func TestLua(t *testing.T) { delay := time.Second*2 - time.Duration(time.Now().UnixNano()%int64(time.Second)) time.Sleep(delay) - conn.Do("SET", "rate_limit_bulk:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5) + conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5) // we have the rate limit set, queue, value, err := PopFromQueue(conn, "msgs") @@ -120,8 +119,7 @@ func TestLua(t *testing.T) { assert.NoError(err) // make sure pause bulk key do not prevent use to get from the high priority queue - conn.Do("SET", "rate_limit_bulk:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5) + conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5) queue, value, err = PopFromQueue(conn, "msgs") assert.NoError(err) @@ -194,8 +192,7 @@ func TestLua(t *testing.T) { err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority) assert.NoError(err) - conn.Do("SET", "rate_limit:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit:chan1", 5) + conn.Do("SET", "rate_limit:chan1", "engaged", "EX", 5) // we have the rate limit set, queue, value, err = PopFromQueue(conn, "msgs")