diff --git a/backend.go b/backend.go index 0479ffb0..659f690e 100644 --- a/backend.go +++ b/backend.go @@ -80,7 +80,7 @@ type Backend interface { ClearMsgSent(context.Context, MsgID) error // MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case - // of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be + // of errors during sending as it will manage the number of active workers per channel. The status parameter can be // used to determine any sort of deduping of msg sends MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate) diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 90d5551d..b6733c68 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -40,9 +40,6 @@ import ( // the name for our message queue const msgQueueName = "msgs" -// the name of our set for tracking sends -const sentSetName = "msgs_sent_%s" - // our timeout for backend operations const backendTimeout = time.Second * 20 @@ -82,6 +79,9 @@ type backend struct { receivedExternalIDs *redisx.IntervalHash // using external id receivedMsgs *redisx.IntervalHash // using content hash + // tracking of sent message ids to avoid dupe sends + sentIDs *redisx.IntervalSet + // tracking of external ids of messages we've sent in case we need one before its status update has been written sentExternalIDs *redisx.IntervalHash @@ -124,6 +124,7 @@ func newBackend(cfg *courier.Config) courier.Backend { receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), // 2 - 4 seconds receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours + sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours } } @@ -423,40 +424,19 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error return nil, nil } -var luaSent = redis.NewScript(3, - `-- KEYS: [TodayKey, YesterdayKey, MsgID] - local found = redis.call("sismember", KEYS[1], KEYS[3]) - if found == 1 then - return 1 - end - - return redis.call("sismember", KEYS[2], KEYS[3]) -`) - // WasMsgSent returns whether the passed in message has already been sent func (b *backend) WasMsgSent(ctx context.Context, id courier.MsgID) (bool, error) { rc := b.rp.Get() defer rc.Close() - todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02")) - yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02")) - return redis.Bool(luaSent.Do(rc, todayKey, yesterdayKey, id.String())) + return b.sentIDs.IsMember(rc, id.String()) } -var luaClearSent = redis.NewScript(3, - `-- KEYS: [TodayKey, YesterdayKey, MsgID] - redis.call("srem", KEYS[1], KEYS[3]) - redis.call("srem", KEYS[2], KEYS[3]) -`) - func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error { rc := b.rp.Get() defer rc.Close() - todayKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02")) - yesterdayKey := fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).UTC().Format("2006_01_02")) - _, err := luaClearSent.Do(rc, todayKey, yesterdayKey, id.String()) - return err + return b.sentIDs.Rem(rc, id.String()) } // MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel @@ -466,24 +446,22 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu dbMsg := msg.(*Msg) - queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken) + if err := queue.MarkComplete(rc, msgQueueName, dbMsg.workerToken); err != nil { + slog.Error("unable to mark queue task complete", "error", err) + } - // mark as sent in redis as well if this was actually wired or sent - if status != nil && (status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusWired) { - dateKey := fmt.Sprintf(sentSetName, time.Now().UTC().Format("2006_01_02")) - rc.Send("sadd", dateKey, msg.ID().String()) - rc.Send("expire", dateKey, 60*60*24*2) - _, err := rc.Do("") - if err != nil { - slog.Error("unable to add new unsent message", "error", err, "sent_msgs_key", dateKey) + // if message won't be retried, mark as sent to avoid dupe sends + if status.Status() != courier.MsgStatusErrored { + if err := b.sentIDs.Add(rc, msg.ID().String()); err != nil { + slog.Error("unable to mark message sent", "error", err) } + } - // if our msg has an associated session and timeout, update that - if dbMsg.SessionWaitStartedOn_ != nil { - err = updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_) - if err != nil { - slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_) - } + // if message was successfully sent, and we have a session timeout, update it + wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead + if wasSuccess && dbMsg.SessionWaitStartedOn_ != nil { + if err := updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_); err != nil { + slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_) } } }