Skip to content

Commit

Permalink
Log unprocessed items when writing logs to DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 18, 2024
1 parent 8e70c71 commit 356c2f1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 43 deletions.
33 changes: 27 additions & 6 deletions backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package rapidpro
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/utils/clogs"
Expand Down Expand Up @@ -90,7 +93,7 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog)
for _, v := range batch {
err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v})
if err != nil {
log := slog.With("comp", "log writer", "log_uuid", v.UUID)
log := slog.With("comp", "db log writer", "log_uuid", v.UUID)

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
Expand All @@ -113,15 +116,33 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

writeDynamoChannelLogs(ctx, dy, batch)
if err := writeDynamoChannelLogs(ctx, dy, batch); err != nil {
slog.Error("error writing logs to dynamo", "error", err)
}
}, 25, time.Millisecond*500, 1000, wg),
}
}

func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) {
log := slog.With("comp", "dynamo log writer")
func writeDynamoChannelLogs(ctx context.Context, ds *dynamo.Service, batch []*clogs.Log) error {
writeReqs := make([]types.WriteRequest, len(batch))

for i, l := range batch {
d, err := l.MarshalDynamo()
if err != nil {
return fmt.Errorf("error marshalling log for dynamo: %w", err)
}
writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}}
}

if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil {
log.Error("error writing channel logs", "error", err)
resp, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{ds.TableName("ChannelLogs"): writeReqs},
})
if err != nil {
return err
}
if len(resp.UnprocessedItems) > 0 {
// TODO shouldn't happend.. but need to figure out how we would retry these
slog.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems))
}
return nil
}
36 changes: 0 additions & 36 deletions utils/clogs/batch.go

This file was deleted.

4 changes: 3 additions & 1 deletion utils/clogs/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) {
l2.Error(clogs.NewLogError("code2", "ext", "message"))

// write both logs to db
err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2})
err = ds.PutItem(ctx, "ChannelLogs", l1)
assert.NoError(t, err)
err = ds.PutItem(ctx, "ChannelLogs", l2)
assert.NoError(t, err)

// read log 1 back from db
Expand Down

0 comments on commit 356c2f1

Please sign in to comment.