diff --git a/backends/rapidpro/channel_log.go b/backends/rapidpro/channel_log.go index 20418962..245b9659 100644 --- a/backends/rapidpro/channel_log.go +++ b/backends/rapidpro/channel_log.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" "github.com/nyaruka/courier/utils/clogs" @@ -90,7 +92,7 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog) for _, v := range batch { err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v}) if err != nil { - log := slog.With("comp", "log writer", "log_uuid", v.UUID) + log := slog.With("comp", "db log writer", "log_uuid", v.UUID) if qerr := dbutil.AsQueryError(err); qerr != nil { query, params := qerr.Query() @@ -118,10 +120,27 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter } } -func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) { +func writeDynamoChannelLogs(ctx context.Context, ds *dynamo.Service, batch []*clogs.Log) { log := slog.With("comp", "dynamo log writer") - if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil { - log.Error("error writing channel logs", "error", err) + writeReqs := make([]types.WriteRequest, len(batch)) + + for _, l := range batch { + d, err := l.MarshalDynamo() + if err != nil { + log.Error("error marshalling log for dynamo", "error", err) + } + writeReqs = append(writeReqs, types.WriteRequest{PutRequest: &types.PutRequest{Item: d}}) + } + + resp, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ds.TableName("ChannelLogs"): writeReqs}, + }) + if err != nil { + log.Error("error writing logs to dynamo", "error", err) + } + if len(resp.UnprocessedItems) > 0 { + // TODO shouldn't happend.. but need to figure out how we would retry these + log.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems)) } } diff --git a/utils/clogs/batch.go b/utils/clogs/batch.go deleted file mode 100644 index 7d174b88..00000000 --- a/utils/clogs/batch.go +++ /dev/null @@ -1,36 +0,0 @@ -package clogs - -import ( - "context" - "fmt" - "slices" - - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/nyaruka/gocommon/aws/dynamo" -) - -// BatchPut writes multiple logs to DynamoDB in batches of 25. This should probably be a generic function in the -// gocommon/dynamo package but need to think more about errors. -func BatchPut(ctx context.Context, ds *dynamo.Service, table string, logs []*Log) error { - for batch := range slices.Chunk(logs, 25) { - writeReqs := make([]types.WriteRequest, len(batch)) - - for i, l := range batch { - d, err := l.MarshalDynamo() - if err != nil { - return fmt.Errorf("error marshalling log: %w", err) - } - writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} - } - - _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]types.WriteRequest{ds.TableName(table): writeReqs}, - }) - if err != nil { - return fmt.Errorf("error writing logs to db: %w", err) - } - } - - return nil -} diff --git a/utils/clogs/clog_test.go b/utils/clogs/clog_test.go index d35261b8..202b2f8d 100644 --- a/utils/clogs/clog_test.go +++ b/utils/clogs/clog_test.go @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) { l2.Error(clogs.NewLogError("code2", "ext", "message")) // write both logs to db - err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2}) + err = ds.PutItem(ctx, "ChannelLogs", l1) + assert.NoError(t, err) + err = ds.PutItem(ctx, "ChannelLogs", l2) assert.NoError(t, err) // read log 1 back from db