Skip to content

Commit

Permalink
Merge pull request #623 from nyaruka/resolve_status_ids_from_redis
Browse files Browse the repository at this point in the history
Record external ids of sent messages in redis
  • Loading branch information
rowanseymour authored Sep 4, 2023
2 parents 740cbce + ac6ee38 commit c3a087d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 22 deletions.
35 changes: 27 additions & 8 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type backend struct {
receivedExternalIDs *redisx.IntervalHash // using external id
receivedMsgs *redisx.IntervalHash // using content hash

// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments
dbWaitDuration time.Duration
dbWaitCount int64
Expand All @@ -93,8 +96,9 @@ func newBackend(cfg *courier.Config) courier.Backend {
mediaCache: redisx.NewIntervalHash("media-lookups", time.Hour*24, 2),
mediaMutexes: *syncx.NewHashMutex(8),

receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2),
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2),
receivedMsgs: redisx.NewIntervalHash("seen-msgs", time.Second*2, 2), // 2 - 4 seconds
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours
}
}

Expand Down Expand Up @@ -242,7 +246,7 @@ func (b *backend) Start() error {
}

// create our batched writers and start them
b.statusWriter = NewStatusWriter(b.db, b.config.SpoolDir, b.writerWG)
b.statusWriter = NewStatusWriter(b, b.config.SpoolDir, b.writerWG)
b.statusWriter.Start()

b.dbLogWriter = NewDBLogWriter(b.db, b.writerWG)
Expand Down Expand Up @@ -525,6 +529,8 @@ func (b *backend) NewStatusUpdateByExternalID(channel courier.Channel, externalI

// WriteStatusUpdate writes the passed in MsgStatus to our store
func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUpdate) error {
su := status.(*StatusUpdate)

if status.ID() == courier.NilMsgID && status.ExternalID() == "" {
return errors.New("message status with no id or external id")
}
Expand All @@ -536,11 +542,24 @@ func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUp
}
}

// if we have an id and are marking an outgoing msg as errored, then clear our sent flag
if status.ID() != courier.NilMsgID && status.Status() == courier.MsgStatusErrored {
err := b.ClearMsgSent(ctx, status.ID())
if err != nil {
logrus.WithError(err).WithField("msg", status.ID()).Error("error clearing sent flags")
if status.ID() != courier.NilMsgID {
// this is a message we've just sent and were given an external id for
if status.ExternalID() != "" {
rc := b.redisPool.Get()
defer rc.Close()

err := b.sentExternalIDs.Set(rc, fmt.Sprintf("%d|%s", su.ChannelID_, su.ExternalID_), fmt.Sprintf("%d", status.ID()))
if err != nil {
logrus.WithError(err).WithField("msg", status.ID()).Error("error recording external id")
}
}

// we sent a message that errored so clear our sent flag to allow it to be retried
if status.Status() == courier.MsgStatusErrored {
err := b.ClearMsgSent(ctx, status.ID())
if err != nil {
logrus.WithError(err).WithField("msg", status.ID()).Error("error clearing sent flags")
}
}
}

Expand Down
37 changes: 37 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,43 @@ func (ts *BackendTestSuite) TestMsgStatus() {
ts.NoError(tx.Commit())
}

func (ts *BackendTestSuite) TestSentExternalIDCaching() {
r := ts.b.redisPool.Get()
defer r.Close()

ctx := context.Background()
channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")
clog := courier.NewChannelLog(courier.ChannelLogTypeMsgSend, channel, nil)

ts.clearRedis()

// create a status update from a send which will have id and external id
status1 := ts.b.NewStatusUpdate(channel, 10000, courier.MsgStatusSent, clog)
status1.SetExternalID("ex457")
err := ts.b.WriteStatusUpdate(ctx, status1)
ts.NoError(err)

keys, err := redis.Strings(r.Do("KEYS", "sent-external-ids:*"))
ts.NoError(err)
ts.Len(keys, 1)
assertredis.HGetAll(ts.T(), ts.b.redisPool, keys[0], map[string]string{"10|ex457": "10000"})

// mimic a delay in that status being written by reverting the db changes
ts.b.db.MustExec(`UPDATE msgs_msg SET status = 'W', external_id = NULL WHERE id = 10000`)

// create a callback status update which only has external id
status2 := ts.b.NewStatusUpdateByExternalID(channel, "ex457", courier.MsgStatusDelivered, clog)

err = ts.b.WriteStatusUpdate(ctx, status2)
ts.NoError(err)

// give batcher time to write it
time.Sleep(time.Millisecond * 700)

// msg status successfully updated in the database
assertdb.Query(ts.T(), ts.b.db, `SELECT status FROM msgs_msg WHERE id = 10000`).Returns("D")
}

func (ts *BackendTestSuite) TestHealth() {
// all should be well in test land
ts.Equal(ts.b.Health(), "")
Expand Down
51 changes: 37 additions & 14 deletions backends/rapidpro/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/syncx"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (b *backend) flushStatusFile(filename string, contents []byte) error {
}

// try to flush to our db
_, err = writeStatusUpdatesToDB(ctx, b.db, []*StatusUpdate{status})
_, err = b.writeStatusUpdatesToDB(ctx, []*StatusUpdate{status})
return err
}

Expand Down Expand Up @@ -187,28 +186,28 @@ type StatusWriter struct {
*syncx.Batcher[*StatusUpdate]
}

func NewStatusWriter(db *sqlx.DB, spoolDir string, wg *sync.WaitGroup) *StatusWriter {
func NewStatusWriter(b *backend, spoolDir string, wg *sync.WaitGroup) *StatusWriter {
return &StatusWriter{
Batcher: syncx.NewBatcher[*StatusUpdate](func(batch []*StatusUpdate) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

writeStatuseUpdates(ctx, db, spoolDir, batch)
b.writeStatuseUpdates(ctx, spoolDir, batch)

}, 1000, time.Millisecond*500, 1000, wg),
}
}

// tries to write a batch of message statuses to the database and spools those that fail
func writeStatuseUpdates(ctx context.Context, db *sqlx.DB, spoolDir string, batch []*StatusUpdate) {
func (b *backend) writeStatuseUpdates(ctx context.Context, spoolDir string, batch []*StatusUpdate) {
log := logrus.WithField("comp", "status writer")

unresolved, err := writeStatusUpdatesToDB(ctx, db, batch)
unresolved, err := b.writeStatusUpdatesToDB(ctx, batch)

// if we received an error, try again one at a time (in case it is one value hanging us up)
if err != nil {
for _, s := range batch {
_, err = writeStatusUpdatesToDB(ctx, db, []*StatusUpdate{s})
_, err = b.writeStatusUpdatesToDB(ctx, []*StatusUpdate{s})
if err != nil {
log := log.WithField("msg_id", s.ID())

Expand All @@ -234,7 +233,7 @@ func writeStatuseUpdates(ctx context.Context, db *sqlx.DB, spoolDir string, batc

// writes a batch of msg status updates to the database - messages that can't be resolved are returned and aren't
// considered an error
func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*StatusUpdate) ([]*StatusUpdate, error) {
func (b *backend) writeStatusUpdatesToDB(ctx context.Context, statuses []*StatusUpdate) ([]*StatusUpdate, error) {
// get the statuses which have external ID instead of a message ID
missingID := make([]*StatusUpdate, 0, 500)
for _, s := range statuses {
Expand All @@ -245,7 +244,7 @@ func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*Status

// try to resolve channel ID + external ID to message IDs
if len(missingID) > 0 {
if err := resolveStatusUpdateMsgIDs(ctx, db, missingID); err != nil {
if err := b.resolveStatusUpdateMsgIDs(ctx, missingID); err != nil {
return nil, err
}
}
Expand All @@ -261,7 +260,7 @@ func writeStatusUpdatesToDB(ctx context.Context, db *sqlx.DB, statuses []*Status
}
}

err := dbutil.BulkQuery(ctx, db, sqlUpdateMsgByID, resolved)
err := dbutil.BulkQuery(ctx, b.db, sqlUpdateMsgByID, resolved)
if err != nil {
return nil, errors.Wrap(err, "error updating status")
}
Expand All @@ -276,23 +275,47 @@ SELECT id, channel_id, external_id

// resolveStatusUpdateMsgIDs tries to resolve msg IDs for the given statuses - if there's no matching channel id + external id pair
// found for a status, that status will be left with a nil msg ID.
func resolveStatusUpdateMsgIDs(ctx context.Context, db *sqlx.DB, statuses []*StatusUpdate) error {
func (b *backend) resolveStatusUpdateMsgIDs(ctx context.Context, statuses []*StatusUpdate) error {
rc := b.redisPool.Get()
defer rc.Close()

chAndExtKeys := make([]string, len(statuses))
for i, s := range statuses {
chAndExtKeys[i] = fmt.Sprintf("%d|%s", s.ChannelID_, s.ExternalID_)
}
cachedIDs, err := b.sentExternalIDs.MGet(rc, chAndExtKeys...)
if err != nil {
// log error but we continue and try to get ids from the database
logrus.WithError(err).Error("error looking up sent message ids in redis")
}

// collect the statuses that couldn't be resolved from cache, update the ones that could
notInCache := make([]*StatusUpdate, 0, len(statuses))
for i := range cachedIDs {
id, err := strconv.Atoi(cachedIDs[i])
if err != nil {
notInCache = append(notInCache, statuses[i])
} else {
statuses[i].ID_ = courier.MsgID(id)
}
}

// create a mapping of channel id + external id -> status
type ext struct {
channelID courier.ChannelID
externalID string
}
statusesByExt := make(map[ext]*StatusUpdate, len(statuses))
statusesByExt := make(map[ext]*StatusUpdate, len(notInCache))
for _, s := range statuses {
statusesByExt[ext{s.ChannelID_, s.ExternalID_}] = s
}

sql, params, err := dbutil.BulkSQL(db, sqlResolveStatusMsgIDs, statuses)
sql, params, err := dbutil.BulkSQL(b.db, sqlResolveStatusMsgIDs, notInCache)
if err != nil {
return err
}

rows, err := db.QueryContext(ctx, sql, params...)
rows, err := b.db.QueryContext(ctx, sql, params...)
if err != nil {
return err
}
Expand Down

0 comments on commit c3a087d

Please sign in to comment.