Skip to content

Commit

Permalink
Merge pull request #789 from nyaruka/clogs4
Browse files Browse the repository at this point in the history
Update to latest copy of clogs package
  • Loading branch information
rowanseymour authored Sep 16, 2024
2 parents fd4f62b + e5a50b9 commit 77f4c8d
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 171 deletions.
2 changes: 1 addition & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (b *backend) Start() error {
if err != nil {
return err
}
if err := b.dynamo.Test(ctx, "ChannelLogs"); err != nil {
if err := b.dynamo.Test(ctx); err != nil {
log.Error("dynamodb not reachable", "error", err)
} else {
log.Info("dynamodb ok")
Expand Down
4 changes: 3 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/buger/jsonparser"
"github.com/gomodule/redigo/redis"
Expand Down Expand Up @@ -1057,7 +1058,8 @@ func (ts *BackendTestSuite) TestWriteChanneLog() {
Columns(map[string]any{"channel_id": int64(channel.ID()), "url": "https://api.messages.com/send.json", "err": "Unexpected response status code."})

// check that we can read the log back from DynamoDB
actualLog, err := clogs.Get(ctx, ts.b.dynamo, clog1.UUID)
actualLog := &clogs.Log{}
err = ts.b.dynamo.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(clog1.UUID)}}, actualLog)
ts.NoError(err)
ts.Equal(clog1.UUID, actualLog.UUID)
ts.Equal(courier.ChannelLogTypeTokenRefresh, actualLog.Type)
Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter
func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) {
log := slog.With("comp", "dynamo log writer")

if err := clogs.BulkPut(ctx, dy, batch); err != nil {
if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil {
log.Error("error writing channel logs", "error", err)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.58.0
github.com/nyaruka/gocommon v1.59.0
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.8.1
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk=
github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU=
github.com/nyaruka/gocommon v1.58.0 h1:q8/YCWQOEiFHVnYW+f5N+HjI0R4gjV93bOpVWqImr8k=
github.com/nyaruka/gocommon v1.58.0/go.mod h1:u0n6zC7AxmrUZxzY4VtZU1d26QJ8nhqRRb5IFOw/Lig=
github.com/nyaruka/gocommon v1.59.0 h1:XC//IVSOWawBXEZqDiYhqxrIHWo2QPPeCIkAm4n4sY0=
github.com/nyaruka/gocommon v1.59.0/go.mod h1:Upj2DG1iL55YcfF7rve8CRrKGjMaEn0jWUIWbQQgTFQ=
github.com/nyaruka/librato v1.1.1 h1:0nTYtJLl3Sn7lX3CuHsLf+nXy1k/tGV0OjVxLy3Et4s=
github.com/nyaruka/librato v1.1.1/go.mod h1:fme1Fu1PT2qvkaBZyw8WW+SrnFe2qeeCWpvqmAaKAKE=
github.com/nyaruka/null/v2 v2.0.3 h1:rdmMRQyVzrOF3Jff/gpU/7BDR9mQX0lcLl4yImsA3kw=
Expand Down
36 changes: 36 additions & 0 deletions utils/clogs/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package clogs

import (
"context"
"fmt"
"slices"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nyaruka/gocommon/aws/dynamo"
)

// BatchPut writes multiple logs to DynamoDB in batches of 25. This should probably be a generic function in the
// gocommon/dynamo package but need to think more about errors.
func BatchPut(ctx context.Context, ds *dynamo.Service, table string, logs []*Log) error {
for batch := range slices.Chunk(logs, 25) {
writeReqs := make([]types.WriteRequest, len(batch))

for i, l := range batch {
d, err := l.MarshalDynamo()
if err != nil {
return fmt.Errorf("error marshalling log: %w", err)
}
writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}}
}

_, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{ds.TableName(table): writeReqs},
})
if err != nil {
return fmt.Errorf("error writing logs to db: %w", err)
}
}

return nil
}
59 changes: 59 additions & 0 deletions utils/clogs/base.go → utils/clogs/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/stringsx"
"github.com/nyaruka/gocommon/uuids"
)

const (
dynamoTTL = 14 * 24 * time.Hour
)

// LogUUID is the type of a channel log UUID (should be v7)
type LogUUID uuids.UUID

Expand Down Expand Up @@ -85,3 +92,55 @@ func (l *Log) End() {
func (l *Log) traceToLog(t *httpx.Trace) *httpx.Log {
return httpx.NewLog(t, 2048, 50000, l.redactor)
}

// log struct to be written to DynamoDB
type dynamoLog struct {
UUID LogUUID `dynamodbav:"UUID"`
Type LogType `dynamodbav:"Type"`
DataGZ []byte `dynamodbav:"DataGZ,omitempty"`
ElapsedMS int `dynamodbav:"ElapsedMS"`
CreatedOn time.Time `dynamodbav:"CreatedOn,unixtime"`
ExpiresOn time.Time `dynamodbav:"ExpiresOn,unixtime"`
}

type dynamoLogData struct {
HttpLogs []*httpx.Log `json:"http_logs"`
Errors []*LogError `json:"errors"`
}

func (l *Log) MarshalDynamo() (map[string]types.AttributeValue, error) {
data, err := dynamo.MarshalJSONGZ(&dynamoLogData{HttpLogs: l.HttpLogs, Errors: l.Errors})
if err != nil {
return nil, fmt.Errorf("error marshaling log data: %w", err)
}

return attributevalue.MarshalMap(&dynamoLog{
UUID: l.UUID,
Type: l.Type,
DataGZ: data,
ElapsedMS: int(l.Elapsed / time.Millisecond),
CreatedOn: l.CreatedOn,
ExpiresOn: l.CreatedOn.Add(dynamoTTL),
})
}

func (l *Log) UnmarshalDynamo(m map[string]types.AttributeValue) error {
d := &dynamoLog{}

if err := attributevalue.UnmarshalMap(m, d); err != nil {
return fmt.Errorf("error unmarshaling log: %w", err)
}

data := &dynamoLogData{}
if err := dynamo.UnmarshalJSONGZ(d.DataGZ, data); err != nil {
return fmt.Errorf("error unmarshaling log data: %w", err)
}

l.UUID = d.UUID
l.Type = d.Type
l.HttpLogs = data.HttpLogs
l.Errors = data.Errors
l.Elapsed = time.Duration(d.ElapsedMS) * time.Millisecond
l.CreatedOn = d.CreatedOn
return nil
}
69 changes: 69 additions & 0 deletions utils/clogs/clog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package clogs_test

import (
"context"
"net/http"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/httpx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLogs(t *testing.T) {
ctx := context.Background()

defer httpx.SetRequestor(httpx.DefaultRequestor)
httpx.SetRequestor(httpx.NewMockRequestor(map[string][]*httpx.MockResponse{
"http://ivr.com/start": {httpx.NewMockResponse(200, nil, []byte("OK"))},
"http://ivr.com/hangup": {httpx.NewMockResponse(400, nil, []byte("Oops"))},
}))

clog1 := clogs.NewLog("type1", nil, []string{"sesame"})
clog2 := clogs.NewLog("type1", nil, []string{"sesame"})

req1, _ := httpx.NewRequest("GET", "http://ivr.com/start", nil, map[string]string{"Authorization": "Token sesame"})
trace1, err := httpx.DoTrace(http.DefaultClient, req1, nil, nil, -1)
require.NoError(t, err)

clog1.HTTP(trace1)
clog1.End()

req2, _ := httpx.NewRequest("GET", "http://ivr.com/hangup", nil, nil)
trace2, err := httpx.DoTrace(http.DefaultClient, req2, nil, nil, -1)
require.NoError(t, err)

clog2.HTTP(trace2)
clog2.Error(clogs.NewLogError("", "", "oops"))
clog2.End()

assert.NotEqual(t, clog1.UUID, clog2.UUID)
assert.NotEqual(t, time.Duration(0), clog1.Elapsed)

ds, err := dynamo.NewService("root", "tembatemba", "us-east-1", "http://localhost:6000", "Test")
require.NoError(t, err)

l1 := clogs.NewLog("test_type1", nil, nil)
l1.Error(clogs.NewLogError("code1", "ext", "message"))

l2 := clogs.NewLog("test_type2", nil, nil)
l2.Error(clogs.NewLogError("code2", "ext", "message"))

// write both logs to db
err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2})
assert.NoError(t, err)

// read log 1 back from db
l3 := &clogs.Log{}
err = ds.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(l1.UUID)}}, l3)
assert.NoError(t, err)
assert.Equal(t, l1.UUID, l3.UUID)
assert.Equal(t, clogs.LogType("test_type1"), l3.Type)
assert.Equal(t, []*clogs.LogError{clogs.NewLogError("code1", "ext", "message")}, l3.Errors)
assert.Equal(t, l1.Elapsed, l3.Elapsed)
assert.Equal(t, l1.CreatedOn.Truncate(time.Second), l3.CreatedOn)
}
128 changes: 0 additions & 128 deletions utils/clogs/dynamo.go

This file was deleted.

Loading

0 comments on commit 77f4c8d

Please sign in to comment.