From e5a50b9a3c5ab06c08ad4e7e925858ffe667d9b0 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 16 Sep 2024 14:38:36 -0500 Subject: [PATCH] Update to latest copy of clogs package --- backends/rapidpro/backend.go | 2 +- backends/rapidpro/backend_test.go | 4 +- backends/rapidpro/channel_log.go | 2 +- go.mod | 2 +- go.sum | 4 +- utils/clogs/batch.go | 36 +++++++++ utils/clogs/{base.go => clog.go} | 59 ++++++++++++++ utils/clogs/clog_test.go | 69 ++++++++++++++++ utils/clogs/dynamo.go | 128 ------------------------------ utils/clogs/dynamo_test.go | 37 --------- 10 files changed, 172 insertions(+), 171 deletions(-) create mode 100644 utils/clogs/batch.go rename utils/clogs/{base.go => clog.go} (55%) create mode 100644 utils/clogs/clog_test.go delete mode 100644 utils/clogs/dynamo.go delete mode 100644 utils/clogs/dynamo_test.go diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 4b9a27f0..984cd1f5 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -173,7 +173,7 @@ func (b *backend) Start() error { if err != nil { return err } - if err := b.dynamo.Test(ctx, "ChannelLogs"); err != nil { + if err := b.dynamo.Test(ctx); err != nil { log.Error("dynamodb not reachable", "error", err) } else { log.Info("dynamodb ok") diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 7358f292..9170fbee 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -17,6 +17,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/buger/jsonparser" "github.com/gomodule/redigo/redis" @@ -1057,7 +1058,8 @@ func (ts *BackendTestSuite) TestWriteChanneLog() { Columns(map[string]any{"channel_id": int64(channel.ID()), "url": "https://api.messages.com/send.json", "err": "Unexpected response status code."}) // check that we can read the log back from DynamoDB - actualLog, err := clogs.Get(ctx, ts.b.dynamo, clog1.UUID) + actualLog := &clogs.Log{} + err = ts.b.dynamo.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(clog1.UUID)}}, actualLog) ts.NoError(err) ts.Equal(clog1.UUID, actualLog.UUID) ts.Equal(courier.ChannelLogTypeTokenRefresh, actualLog.Type) diff --git a/backends/rapidpro/channel_log.go b/backends/rapidpro/channel_log.go index ae53d205..0ac7e663 100644 --- a/backends/rapidpro/channel_log.go +++ b/backends/rapidpro/channel_log.go @@ -186,7 +186,7 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) { log := slog.With("comp", "dynamo log writer") - if err := clogs.BulkPut(ctx, dy, batch); err != nil { + if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil { log.Error("error writing channel logs", "error", err) } } diff --git a/go.mod b/go.mod index 2c98b714..08457895 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/nyaruka/ezconf v0.3.0 - github.com/nyaruka/gocommon v1.58.0 + github.com/nyaruka/gocommon v1.59.0 github.com/nyaruka/null/v3 v3.0.0 github.com/nyaruka/redisx v0.8.1 github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index e11505f7..620fa080 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,8 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk= github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU= -github.com/nyaruka/gocommon v1.58.0 h1:q8/YCWQOEiFHVnYW+f5N+HjI0R4gjV93bOpVWqImr8k= -github.com/nyaruka/gocommon v1.58.0/go.mod h1:u0n6zC7AxmrUZxzY4VtZU1d26QJ8nhqRRb5IFOw/Lig= +github.com/nyaruka/gocommon v1.59.0 h1:XC//IVSOWawBXEZqDiYhqxrIHWo2QPPeCIkAm4n4sY0= +github.com/nyaruka/gocommon v1.59.0/go.mod h1:Upj2DG1iL55YcfF7rve8CRrKGjMaEn0jWUIWbQQgTFQ= github.com/nyaruka/librato v1.1.1 h1:0nTYtJLl3Sn7lX3CuHsLf+nXy1k/tGV0OjVxLy3Et4s= github.com/nyaruka/librato v1.1.1/go.mod h1:fme1Fu1PT2qvkaBZyw8WW+SrnFe2qeeCWpvqmAaKAKE= github.com/nyaruka/null/v2 v2.0.3 h1:rdmMRQyVzrOF3Jff/gpU/7BDR9mQX0lcLl4yImsA3kw= diff --git a/utils/clogs/batch.go b/utils/clogs/batch.go new file mode 100644 index 00000000..7d174b88 --- /dev/null +++ b/utils/clogs/batch.go @@ -0,0 +1,36 @@ +package clogs + +import ( + "context" + "fmt" + "slices" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/nyaruka/gocommon/aws/dynamo" +) + +// BatchPut writes multiple logs to DynamoDB in batches of 25. This should probably be a generic function in the +// gocommon/dynamo package but need to think more about errors. +func BatchPut(ctx context.Context, ds *dynamo.Service, table string, logs []*Log) error { + for batch := range slices.Chunk(logs, 25) { + writeReqs := make([]types.WriteRequest, len(batch)) + + for i, l := range batch { + d, err := l.MarshalDynamo() + if err != nil { + return fmt.Errorf("error marshalling log: %w", err) + } + writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} + } + + _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ds.TableName(table): writeReqs}, + }) + if err != nil { + return fmt.Errorf("error writing logs to db: %w", err) + } + } + + return nil +} diff --git a/utils/clogs/base.go b/utils/clogs/clog.go similarity index 55% rename from utils/clogs/base.go rename to utils/clogs/clog.go index 5c9e9228..0c14df29 100644 --- a/utils/clogs/base.go +++ b/utils/clogs/clog.go @@ -4,11 +4,18 @@ import ( "fmt" "time" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/nyaruka/gocommon/aws/dynamo" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/stringsx" "github.com/nyaruka/gocommon/uuids" ) +const ( + dynamoTTL = 14 * 24 * time.Hour +) + // LogUUID is the type of a channel log UUID (should be v7) type LogUUID uuids.UUID @@ -85,3 +92,55 @@ func (l *Log) End() { func (l *Log) traceToLog(t *httpx.Trace) *httpx.Log { return httpx.NewLog(t, 2048, 50000, l.redactor) } + +// log struct to be written to DynamoDB +type dynamoLog struct { + UUID LogUUID `dynamodbav:"UUID"` + Type LogType `dynamodbav:"Type"` + DataGZ []byte `dynamodbav:"DataGZ,omitempty"` + ElapsedMS int `dynamodbav:"ElapsedMS"` + CreatedOn time.Time `dynamodbav:"CreatedOn,unixtime"` + ExpiresOn time.Time `dynamodbav:"ExpiresOn,unixtime"` +} + +type dynamoLogData struct { + HttpLogs []*httpx.Log `json:"http_logs"` + Errors []*LogError `json:"errors"` +} + +func (l *Log) MarshalDynamo() (map[string]types.AttributeValue, error) { + data, err := dynamo.MarshalJSONGZ(&dynamoLogData{HttpLogs: l.HttpLogs, Errors: l.Errors}) + if err != nil { + return nil, fmt.Errorf("error marshaling log data: %w", err) + } + + return attributevalue.MarshalMap(&dynamoLog{ + UUID: l.UUID, + Type: l.Type, + DataGZ: data, + ElapsedMS: int(l.Elapsed / time.Millisecond), + CreatedOn: l.CreatedOn, + ExpiresOn: l.CreatedOn.Add(dynamoTTL), + }) +} + +func (l *Log) UnmarshalDynamo(m map[string]types.AttributeValue) error { + d := &dynamoLog{} + + if err := attributevalue.UnmarshalMap(m, d); err != nil { + return fmt.Errorf("error unmarshaling log: %w", err) + } + + data := &dynamoLogData{} + if err := dynamo.UnmarshalJSONGZ(d.DataGZ, data); err != nil { + return fmt.Errorf("error unmarshaling log data: %w", err) + } + + l.UUID = d.UUID + l.Type = d.Type + l.HttpLogs = data.HttpLogs + l.Errors = data.Errors + l.Elapsed = time.Duration(d.ElapsedMS) * time.Millisecond + l.CreatedOn = d.CreatedOn + return nil +} diff --git a/utils/clogs/clog_test.go b/utils/clogs/clog_test.go new file mode 100644 index 00000000..d35261b8 --- /dev/null +++ b/utils/clogs/clog_test.go @@ -0,0 +1,69 @@ +package clogs_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/nyaruka/courier/utils/clogs" + "github.com/nyaruka/gocommon/aws/dynamo" + "github.com/nyaruka/gocommon/httpx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLogs(t *testing.T) { + ctx := context.Background() + + defer httpx.SetRequestor(httpx.DefaultRequestor) + httpx.SetRequestor(httpx.NewMockRequestor(map[string][]*httpx.MockResponse{ + "http://ivr.com/start": {httpx.NewMockResponse(200, nil, []byte("OK"))}, + "http://ivr.com/hangup": {httpx.NewMockResponse(400, nil, []byte("Oops"))}, + })) + + clog1 := clogs.NewLog("type1", nil, []string{"sesame"}) + clog2 := clogs.NewLog("type1", nil, []string{"sesame"}) + + req1, _ := httpx.NewRequest("GET", "http://ivr.com/start", nil, map[string]string{"Authorization": "Token sesame"}) + trace1, err := httpx.DoTrace(http.DefaultClient, req1, nil, nil, -1) + require.NoError(t, err) + + clog1.HTTP(trace1) + clog1.End() + + req2, _ := httpx.NewRequest("GET", "http://ivr.com/hangup", nil, nil) + trace2, err := httpx.DoTrace(http.DefaultClient, req2, nil, nil, -1) + require.NoError(t, err) + + clog2.HTTP(trace2) + clog2.Error(clogs.NewLogError("", "", "oops")) + clog2.End() + + assert.NotEqual(t, clog1.UUID, clog2.UUID) + assert.NotEqual(t, time.Duration(0), clog1.Elapsed) + + ds, err := dynamo.NewService("root", "tembatemba", "us-east-1", "http://localhost:6000", "Test") + require.NoError(t, err) + + l1 := clogs.NewLog("test_type1", nil, nil) + l1.Error(clogs.NewLogError("code1", "ext", "message")) + + l2 := clogs.NewLog("test_type2", nil, nil) + l2.Error(clogs.NewLogError("code2", "ext", "message")) + + // write both logs to db + err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2}) + assert.NoError(t, err) + + // read log 1 back from db + l3 := &clogs.Log{} + err = ds.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(l1.UUID)}}, l3) + assert.NoError(t, err) + assert.Equal(t, l1.UUID, l3.UUID) + assert.Equal(t, clogs.LogType("test_type1"), l3.Type) + assert.Equal(t, []*clogs.LogError{clogs.NewLogError("code1", "ext", "message")}, l3.Errors) + assert.Equal(t, l1.Elapsed, l3.Elapsed) + assert.Equal(t, l1.CreatedOn.Truncate(time.Second), l3.CreatedOn) +} diff --git a/utils/clogs/dynamo.go b/utils/clogs/dynamo.go deleted file mode 100644 index 85168b10..00000000 --- a/utils/clogs/dynamo.go +++ /dev/null @@ -1,128 +0,0 @@ -package clogs - -import ( - "bytes" - "compress/gzip" - "context" - "encoding/json" - "fmt" - "io" - "slices" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/nyaruka/gocommon/aws/dynamo" - "github.com/nyaruka/gocommon/httpx" - "github.com/nyaruka/gocommon/jsonx" -) - -const ( - dynamoTableBase = "ChannelLogs" - dynamoTTL = 14 * 24 * time.Hour -) - -// log struct to be written to DynamoDB -type dynamoLog struct { - UUID LogUUID `dynamodbav:"UUID"` - Type LogType `dynamodbav:"Type"` - DataGZ []byte `dynamodbav:"DataGZ,omitempty"` - ElapsedMS int `dynamodbav:"ElapsedMS"` - CreatedOn time.Time `dynamodbav:"CreatedOn,unixtime"` - ExpiresOn time.Time `dynamodbav:"ExpiresOn,unixtime"` -} - -type dynamoLogData struct { - HttpLogs []*httpx.Log `json:"http_logs"` - Errors []*LogError `json:"errors"` -} - -func newDynamoLog(l *Log) *dynamoLog { - data := &dynamoLogData{HttpLogs: l.HttpLogs, Errors: l.Errors} - buf := &bytes.Buffer{} - w := gzip.NewWriter(buf) - w.Write(jsonx.MustMarshal(data)) - w.Close() - - return &dynamoLog{ - UUID: l.UUID, - Type: l.Type, - DataGZ: buf.Bytes(), - ElapsedMS: int(l.Elapsed / time.Millisecond), - CreatedOn: l.CreatedOn, - ExpiresOn: l.CreatedOn.Add(dynamoTTL), - } -} - -func (d *dynamoLog) unpack() (*Log, error) { - r, err := gzip.NewReader(bytes.NewReader(d.DataGZ)) - if err != nil { - return nil, err - } - j, err := io.ReadAll(r) - if err != nil { - return nil, err - } - - var data dynamoLogData - if err := json.Unmarshal(j, &data); err != nil { - return nil, err - } - - return &Log{ - UUID: d.UUID, - Type: d.Type, - HttpLogs: data.HttpLogs, - Errors: data.Errors, - CreatedOn: d.CreatedOn, - Elapsed: time.Duration(d.ElapsedMS) * time.Millisecond, - }, nil -} - -// Get retrieves a log from DynamoDB by its UUID -func Get(ctx context.Context, ds *dynamo.Service, uuid LogUUID) (*Log, error) { - resp, err := ds.Client.GetItem(ctx, &dynamodb.GetItemInput{ - TableName: aws.String(ds.TableName(dynamoTableBase)), - Key: map[string]types.AttributeValue{ - "UUID": &types.AttributeValueMemberS{Value: string(uuid)}, - }, - }) - if err != nil { - return nil, fmt.Errorf("error getting log from db: %w", err) - } - - var d dynamoLog - if err := attributevalue.UnmarshalMap(resp.Item, &d); err != nil { - return nil, fmt.Errorf("error unmarshaling log: %w", err) - } - - return d.unpack() -} - -// BulkPut writes multiple logs to DynamoDB in batches of 25 -func BulkPut(ctx context.Context, ds *dynamo.Service, logs []*Log) error { - for batch := range slices.Chunk(logs, 25) { - writeReqs := make([]types.WriteRequest, len(batch)) - - for i, l := range batch { - dl := newDynamoLog(l) - - item, err := attributevalue.MarshalMap(dl) - if err != nil { - return fmt.Errorf("error marshalling log: %w", err) - } - writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: item}} - } - - _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]types.WriteRequest{ds.TableName(dynamoTableBase): writeReqs}, - }) - if err != nil { - return fmt.Errorf("error writing logs to db: %w", err) - } - } - - return nil -} diff --git a/utils/clogs/dynamo_test.go b/utils/clogs/dynamo_test.go deleted file mode 100644 index a57a77e8..00000000 --- a/utils/clogs/dynamo_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package clogs_test - -import ( - "context" - "testing" - "time" - - "github.com/nyaruka/courier/utils/clogs" - "github.com/nyaruka/gocommon/aws/dynamo" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDynamo(t *testing.T) { - ctx := context.Background() - ds, err := dynamo.NewService("root", "tembatemba", "us-east-1", "http://localhost:6000", "Test") - require.NoError(t, err) - - l1 := clogs.NewLog("test_type1", nil, nil) - l1.Error(clogs.NewLogError("code1", "ext", "message")) - - l2 := clogs.NewLog("test_type2", nil, nil) - l2.Error(clogs.NewLogError("code2", "ext", "message")) - - // write both logs to db - err = clogs.BulkPut(ctx, ds, []*clogs.Log{l1, l2}) - assert.NoError(t, err) - - // read log 1 back from db - l3, err := clogs.Get(ctx, ds, l1.UUID) - assert.NoError(t, err) - assert.Equal(t, l1.UUID, l3.UUID) - assert.Equal(t, clogs.LogType("test_type1"), l3.Type) - assert.Equal(t, []*clogs.LogError{clogs.NewLogError("code1", "ext", "message")}, l3.Errors) - assert.Equal(t, l1.Elapsed, l3.Elapsed) - assert.Equal(t, l1.CreatedOn.Truncate(time.Second), l3.CreatedOn) -}