Skip to content

Commit

Permalink
Use redisx.NewIntervalSet to track sent message ids
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 5, 2024
1 parent 938b060 commit 5494cc1
Showing 1 changed file with 14 additions and 39 deletions.
53 changes: 14 additions & 39 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import (
// the name for our message queue
const msgQueueName = "msgs"

// the name of our set for tracking sends
const sentSetName = "msgs_sent_%s"

// our timeout for backend operations
const backendTimeout = time.Second * 20

Expand Down Expand Up @@ -82,6 +79,9 @@ type backend struct {
receivedExternalIDs *redisx.IntervalHash // using external id
receivedMsgs *redisx.IntervalHash // using content hash

// tracking of sent message ids to avoid dupe sends
sentIDs *redisx.IntervalSet

// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

Expand Down Expand Up @@ -124,6 +124,7 @@ func newBackend(cfg *courier.Config) courier.Backend {

receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), // 2 - 4 seconds
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours
}
}
Expand Down Expand Up @@ -423,40 +424,19 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error
return nil, nil
}

var luaSent = redis.NewScript(3,
`-- KEYS: [TodayKey, YesterdayKey, MsgID]
local found = redis.call("sismember", KEYS[1], KEYS[3])
if found == 1 then
return 1
end
return redis.call("sismember", KEYS[2], KEYS[3])
`)

// WasMsgSent returns whether the passed in message has already been sent
func (b *backend) WasMsgSent(ctx context.Context, id courier.MsgID) (bool, error) {
rc := b.rp.Get()
defer rc.Close()

todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02"))
return redis.Bool(luaSent.Do(rc, todayKey, yesterdayKey, id.String()))
return b.sentIDs.IsMember(rc, id.String())
}

var luaClearSent = redis.NewScript(3,
`-- KEYS: [TodayKey, YesterdayKey, MsgID]
redis.call("srem", KEYS[1], KEYS[3])
redis.call("srem", KEYS[2], KEYS[3])
`)

func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
rc := b.rp.Get()
defer rc.Close()

todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02"))
_, err := luaClearSent.Do(rc, todayKey, yesterdayKey, id.String())
return err
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
Expand All @@ -470,23 +450,18 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
slog.Error("unable to mark queue task complete", "error", err)
}

// mark as sent in redis as well if message send ended in status that won't be retried
// if message won't be retried, mark as sent to avoid dupe sends
if status.Status() != courier.MsgStatusErrored {
dateKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02"))
rc.Send("SADD", dateKey, msg.ID().String())
rc.Send("EXPIRE", dateKey, 60*60*24*2)

_, err := rc.Do("")
if err != nil {
if err := b.sentIDs.Add(rc, msg.ID().String()); err != nil {
slog.Error("unable to mark message sent", "error", err)
}
}

// if our msg has an associated session and timeout, update that
if dbMsg.SessionWaitStartedOn_ != nil {
err = updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_)
if err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
// if message was successfully sent, and we have a session timeout, update it
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
if wasSuccess && dbMsg.SessionWaitStartedOn_ != nil {
if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_); err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}
}
Expand Down

0 comments on commit 5494cc1

Please sign in to comment.