Skip to content

Commit

Permalink
Log unprocessed items when writing logs to DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 18, 2024
1 parent 8e70c71 commit 08a55c7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 41 deletions.
27 changes: 23 additions & 4 deletions backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/utils/clogs"
Expand Down Expand Up @@ -90,7 +92,7 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog)
for _, v := range batch {
err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v})
if err != nil {
log := slog.With("comp", "log writer", "log_uuid", v.UUID)
log := slog.With("comp", "db log writer", "log_uuid", v.UUID)

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
Expand Down Expand Up @@ -118,10 +120,27 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter
}
}

func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) {
func writeDynamoChannelLogs(ctx context.Context, ds *dynamo.Service, batch []*clogs.Log) {
log := slog.With("comp", "dynamo log writer")

if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil {
log.Error("error writing channel logs", "error", err)
writeReqs := make([]types.WriteRequest, len(batch))

for _, l := range batch {
d, err := l.MarshalDynamo()
if err != nil {
log.Error("error marshalling log for dynamo", "error", err)
}
writeReqs = append(writeReqs, types.WriteRequest{PutRequest: &types.PutRequest{Item: d}})
}

resp, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{ds.TableName("ChannelLogs"): writeReqs},
})
if err != nil {
log.Error("error writing logs to dynamo", "error", err)
}
if len(resp.UnprocessedItems) > 0 {
// TODO shouldn't happend.. but need to figure out how we would retry these
log.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems))
}
}
36 changes: 0 additions & 36 deletions utils/clogs/batch.go

This file was deleted.

4 changes: 3 additions & 1 deletion utils/clogs/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) {
l2.Error(clogs.NewLogError("code2", "ext", "message"))

// write both logs to db
err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2})
err = ds.PutItem(ctx, "ChannelLogs", l1)
assert.NoError(t, err)
err = ds.PutItem(ctx, "ChannelLogs", l2)
assert.NoError(t, err)

// read log 1 back from db
Expand Down

0 comments on commit 08a55c7

Please sign in to comment.