Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marking as sent in redis should include failed messages #799

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Backend interface {
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)

Expand Down
60 changes: 19 additions & 41 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import (
// the name for our message queue
const msgQueueName = "msgs"

// the name of our set for tracking sends
const sentSetName = "msgs_sent_%s"

// our timeout for backend operations
const backendTimeout = time.Second * 20

Expand Down Expand Up @@ -82,6 +79,9 @@ type backend struct {
receivedExternalIDs *redisx.IntervalHash // using external id
receivedMsgs *redisx.IntervalHash // using content hash

// tracking of sent message ids to avoid dupe sends
sentIDs *redisx.IntervalSet

// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

Expand Down Expand Up @@ -124,6 +124,7 @@ func newBackend(cfg *courier.Config) courier.Backend {

receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), // 2 - 4 seconds
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours
}
}
Expand Down Expand Up @@ -423,40 +424,19 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error
return nil, nil
}

var luaSent = redis.NewScript(3,
`-- KEYS: [TodayKey, YesterdayKey, MsgID]
local found = redis.call("sismember", KEYS[1], KEYS[3])
if found == 1 then
return 1
end

return redis.call("sismember", KEYS[2], KEYS[3])
`)

// WasMsgSent returns whether the passed in message has already been sent
func (b *backend) WasMsgSent(ctx context.Context, id courier.MsgID) (bool, error) {
rc := b.rp.Get()
defer rc.Close()

todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02"))
return redis.Bool(luaSent.Do(rc, todayKey, yesterdayKey, id.String()))
return b.sentIDs.IsMember(rc, id.String())
}

var luaClearSent = redis.NewScript(3,
`-- KEYS: [TodayKey, YesterdayKey, MsgID]
redis.call("srem", KEYS[1], KEYS[3])
redis.call("srem", KEYS[2], KEYS[3])
`)

func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
rc := b.rp.Get()
defer rc.Close()

todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02"))
_, err := luaClearSent.Do(rc, todayKey, yesterdayKey, id.String())
return err
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
Expand All @@ -466,24 +446,22 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu

dbMsg := msg.(*Msg)

queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken)
if err := queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken); err != nil {
slog.Error("unable to mark queue task complete", "error", err)
}

// mark as sent in redis as well if this was actually wired or sent
if status != nil && (status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusWired) {
dateKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
rc.Send("sadd", dateKey, msg.ID().String())
rc.Send("expire", dateKey, 60*60*24*2)
_, err := rc.Do("")
if err != nil {
slog.Error("unable to add new unsent message", "error", err, "sent_msgs_key", dateKey)
// if message won't be retried, mark as sent to avoid dupe sends
if status.Status() != courier.MsgStatusErrored {
if err := b.sentIDs.Add(rc, msg.ID().String()); err != nil {
slog.Error("unable to mark message sent", "error", err)
}
}

// if our msg has an associated session and timeout, update that
if dbMsg.SessionWaitStartedOn_ != nil {
err = updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_)
if err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
// if message was successfully sent, and we have a session timeout, update it
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
if wasSuccess && dbMsg.SessionWaitStartedOn_ != nil {
if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_); err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}
}
Expand Down
Loading