From 13134ae27e6862ccf29c3458b0e1a80583127232 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 16 Sep 2024 09:10:56 -0500 Subject: [PATCH] Use copy of latest clogs package from mailroom --- backends/rapidpro/backend_test.go | 14 ++-- backends/rapidpro/channel_log.go | 32 ++-------- utils/clogs/dynamo.go | 102 ++++++++++++++++++++++++++++-- utils/clogs/dynamo_test.go | 37 +++++++++++ 4 files changed, 144 insertions(+), 41 deletions(-) create mode 100644 utils/clogs/dynamo_test.go diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index a672961d..7358f292 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -17,7 +17,6 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/buger/jsonparser" "github.com/gomodule/redigo/redis" @@ -25,6 +24,7 @@ import ( "github.com/nyaruka/courier" "github.com/nyaruka/courier/queue" "github.com/nyaruka/courier/test" + "github.com/nyaruka/courier/utils/clogs" "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/i18n" @@ -1056,14 +1056,12 @@ func (ts *BackendTestSuite) TestWriteChanneLog() { assertdb.Query(ts.T(), ts.b.db, `SELECT channel_id, http_logs->0->>'url' AS url, errors->0->>'message' AS err FROM channels_channellog`). Columns(map[string]any{"channel_id": int64(channel.ID()), "url": "https://api.messages.com/send.json", "err": "Unexpected response status code."}) - resp, err := ts.b.dynamo.Client.GetItem(ctx, &dynamodb.GetItemInput{ - TableName: aws.String(ts.b.dynamo.TableName("ChannelLogs")), - Key: map[string]types.AttributeValue{ - "UUID": &types.AttributeValueMemberS{Value: string(clog1.UUID)}, - }, - }) + // check that we can read the log back from DynamoDB + actualLog, err := clogs.Get(ctx, ts.b.dynamo, clog1.UUID) ts.NoError(err) - ts.Equal(string(clog1.UUID), resp.Item["UUID"].(*types.AttributeValueMemberS).Value) + ts.Equal(clog1.UUID, actualLog.UUID) + ts.Equal(courier.ChannelLogTypeTokenRefresh, actualLog.Type) + ts.Equal([]*clogs.LogError{courier.ErrorResponseStatusCode()}, actualLog.Errors) clog2 := courier.NewChannelLog(courier.ChannelLogTypeMsgSend, channel, nil) clog2.HTTP(trace) diff --git a/backends/rapidpro/channel_log.go b/backends/rapidpro/channel_log.go index 9d0f28aa..ae53d205 100644 --- a/backends/rapidpro/channel_log.go +++ b/backends/rapidpro/channel_log.go @@ -9,9 +9,6 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - dytypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" @@ -66,9 +63,7 @@ func queueChannelLog(b *backend, clog *courier.ChannelLog) { return } - dl := clogs.NewDynamoLog(clog.Log) - - if b.dyLogWriter.Queue(dl) <= 0 { + if b.dyLogWriter.Queue(clog.Log) <= 0 { log.With("storage", "dynamo").Error("channel log writer buffer full") } @@ -174,12 +169,12 @@ func writeStorageChannelLogs(ctx context.Context, s3s *s3x.Service, bucket strin } type DynamoLogWriter struct { - *syncx.Batcher[*clogs.DynamoLog] + *syncx.Batcher[*clogs.Log] } func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter { return &DynamoLogWriter{ - Batcher: syncx.NewBatcher(func(batch []*clogs.DynamoLog) { + Batcher: syncx.NewBatcher(func(batch []*clogs.Log) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -188,25 +183,10 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter } } -func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.DynamoLog) { +func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) { log := slog.With("comp", "dynamo log writer") - var writeReqs []dytypes.WriteRequest - for _, l := range batch { - item, err := attributevalue.MarshalMap(l) - if err != nil { - log.Error("error marshalling channel log", "error", err) - } else { - writeReqs = append(writeReqs, dytypes.WriteRequest{PutRequest: &dytypes.PutRequest{Item: item}}) - } - } - - if len(writeReqs) > 0 { - _, err := dy.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]dytypes.WriteRequest{dy.TableName("ChannelLogs"): writeReqs}, - }) - if err != nil { - log.Error("error writing channel logs", "error", err) - } + if err := clogs.BulkPut(ctx, dy, batch); err != nil { + log.Error("error writing channel logs", "error", err) } } diff --git a/utils/clogs/dynamo.go b/utils/clogs/dynamo.go index 1a88ca01..85168b10 100644 --- a/utils/clogs/dynamo.go +++ b/utils/clogs/dynamo.go @@ -3,17 +3,29 @@ package clogs import ( "bytes" "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "slices" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/nyaruka/gocommon/aws/dynamo" + "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" ) const ( - dynamoTTL = 14 * 24 * time.Hour + dynamoTableBase = "ChannelLogs" + dynamoTTL = 14 * 24 * time.Hour ) -// DynamoLog channel log to be written to DynamoDB -type DynamoLog struct { +// log struct to be written to DynamoDB +type dynamoLog struct { UUID LogUUID `dynamodbav:"UUID"` Type LogType `dynamodbav:"Type"` DataGZ []byte `dynamodbav:"DataGZ,omitempty"` @@ -22,14 +34,19 @@ type DynamoLog struct { ExpiresOn time.Time `dynamodbav:"ExpiresOn,unixtime"` } -func NewDynamoLog(l *Log) *DynamoLog { - data := jsonx.MustMarshal(map[string]any{"http_logs": l.HttpLogs, "errors": l.Errors}) +type dynamoLogData struct { + HttpLogs []*httpx.Log `json:"http_logs"` + Errors []*LogError `json:"errors"` +} + +func newDynamoLog(l *Log) *dynamoLog { + data := &dynamoLogData{HttpLogs: l.HttpLogs, Errors: l.Errors} buf := &bytes.Buffer{} w := gzip.NewWriter(buf) - w.Write(data) + w.Write(jsonx.MustMarshal(data)) w.Close() - return &DynamoLog{ + return &dynamoLog{ UUID: l.UUID, Type: l.Type, DataGZ: buf.Bytes(), @@ -38,3 +55,74 @@ func NewDynamoLog(l *Log) *DynamoLog { ExpiresOn: l.CreatedOn.Add(dynamoTTL), } } + +func (d *dynamoLog) unpack() (*Log, error) { + r, err := gzip.NewReader(bytes.NewReader(d.DataGZ)) + if err != nil { + return nil, err + } + j, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + var data dynamoLogData + if err := json.Unmarshal(j, &data); err != nil { + return nil, err + } + + return &Log{ + UUID: d.UUID, + Type: d.Type, + HttpLogs: data.HttpLogs, + Errors: data.Errors, + CreatedOn: d.CreatedOn, + Elapsed: time.Duration(d.ElapsedMS) * time.Millisecond, + }, nil +} + +// Get retrieves a log from DynamoDB by its UUID +func Get(ctx context.Context, ds *dynamo.Service, uuid LogUUID) (*Log, error) { + resp, err := ds.Client.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: aws.String(ds.TableName(dynamoTableBase)), + Key: map[string]types.AttributeValue{ + "UUID": &types.AttributeValueMemberS{Value: string(uuid)}, + }, + }) + if err != nil { + return nil, fmt.Errorf("error getting log from db: %w", err) + } + + var d dynamoLog + if err := attributevalue.UnmarshalMap(resp.Item, &d); err != nil { + return nil, fmt.Errorf("error unmarshaling log: %w", err) + } + + return d.unpack() +} + +// BulkPut writes multiple logs to DynamoDB in batches of 25 +func BulkPut(ctx context.Context, ds *dynamo.Service, logs []*Log) error { + for batch := range slices.Chunk(logs, 25) { + writeReqs := make([]types.WriteRequest, len(batch)) + + for i, l := range batch { + dl := newDynamoLog(l) + + item, err := attributevalue.MarshalMap(dl) + if err != nil { + return fmt.Errorf("error marshalling log: %w", err) + } + writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: item}} + } + + _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ds.TableName(dynamoTableBase): writeReqs}, + }) + if err != nil { + return fmt.Errorf("error writing logs to db: %w", err) + } + } + + return nil +} diff --git a/utils/clogs/dynamo_test.go b/utils/clogs/dynamo_test.go new file mode 100644 index 00000000..a57a77e8 --- /dev/null +++ b/utils/clogs/dynamo_test.go @@ -0,0 +1,37 @@ +package clogs_test + +import ( + "context" + "testing" + "time" + + "github.com/nyaruka/courier/utils/clogs" + "github.com/nyaruka/gocommon/aws/dynamo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDynamo(t *testing.T) { + ctx := context.Background() + ds, err := dynamo.NewService("root", "tembatemba", "us-east-1", "http://localhost:6000", "Test") + require.NoError(t, err) + + l1 := clogs.NewLog("test_type1", nil, nil) + l1.Error(clogs.NewLogError("code1", "ext", "message")) + + l2 := clogs.NewLog("test_type2", nil, nil) + l2.Error(clogs.NewLogError("code2", "ext", "message")) + + // write both logs to db + err = clogs.BulkPut(ctx, ds, []*clogs.Log{l1, l2}) + assert.NoError(t, err) + + // read log 1 back from db + l3, err := clogs.Get(ctx, ds, l1.UUID) + assert.NoError(t, err) + assert.Equal(t, l1.UUID, l3.UUID) + assert.Equal(t, clogs.LogType("test_type1"), l3.Type) + assert.Equal(t, []*clogs.LogError{clogs.NewLogError("code1", "ext", "message")}, l3.Errors) + assert.Equal(t, l1.Elapsed, l3.Elapsed) + assert.Equal(t, l1.CreatedOn.Truncate(time.Second), l3.CreatedOn) +}