Skip to content

Commit

Permalink
Use copy of latest clogs package from mailroom
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 16, 2024
1 parent 6eae1a5 commit 13134ae
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 41 deletions.
14 changes: 6 additions & 8 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/buger/jsonparser"
"github.com/gomodule/redigo/redis"
"github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/test"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/i18n"
Expand Down Expand Up @@ -1056,14 +1056,12 @@ func (ts *BackendTestSuite) TestWriteChanneLog() {
assertdb.Query(ts.T(), ts.b.db, `SELECT channel_id, http_logs->0->>'url' AS url, errors->0->>'message' AS err FROM channels_channellog`).
Columns(map[string]any{"channel_id": int64(channel.ID()), "url": "https://api.messages.com/send.json", "err": "Unexpected response status code."})

resp, err := ts.b.dynamo.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(ts.b.dynamo.TableName("ChannelLogs")),
Key: map[string]types.AttributeValue{
"UUID": &types.AttributeValueMemberS{Value: string(clog1.UUID)},
},
})
// check that we can read the log back from DynamoDB
actualLog, err := clogs.Get(ctx, ts.b.dynamo, clog1.UUID)
ts.NoError(err)
ts.Equal(string(clog1.UUID), resp.Item["UUID"].(*types.AttributeValueMemberS).Value)
ts.Equal(clog1.UUID, actualLog.UUID)
ts.Equal(courier.ChannelLogTypeTokenRefresh, actualLog.Type)
ts.Equal([]*clogs.LogError{courier.ErrorResponseStatusCode()}, actualLog.Errors)

clog2 := courier.NewChannelLog(courier.ChannelLogTypeMsgSend, channel, nil)
clog2.HTTP(trace)
Expand Down
32 changes: 6 additions & 26 deletions backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dytypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
Expand Down Expand Up @@ -66,9 +63,7 @@ func queueChannelLog(b *backend, clog *courier.ChannelLog) {
return
}

dl := clogs.NewDynamoLog(clog.Log)

if b.dyLogWriter.Queue(dl) <= 0 {
if b.dyLogWriter.Queue(clog.Log) <= 0 {
log.With("storage", "dynamo").Error("channel log writer buffer full")
}

Expand Down Expand Up @@ -174,12 +169,12 @@ func writeStorageChannelLogs(ctx context.Context, s3s *s3x.Service, bucket strin
}

type DynamoLogWriter struct {
*syncx.Batcher[*clogs.DynamoLog]
*syncx.Batcher[*clogs.Log]
}

func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter {
return &DynamoLogWriter{
Batcher: syncx.NewBatcher(func(batch []*clogs.DynamoLog) {
Batcher: syncx.NewBatcher(func(batch []*clogs.Log) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

Expand All @@ -188,25 +183,10 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter
}
}

func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.DynamoLog) {
func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) {
log := slog.With("comp", "dynamo log writer")

var writeReqs []dytypes.WriteRequest
for _, l := range batch {
item, err := attributevalue.MarshalMap(l)
if err != nil {
log.Error("error marshalling channel log", "error", err)
} else {
writeReqs = append(writeReqs, dytypes.WriteRequest{PutRequest: &dytypes.PutRequest{Item: item}})
}
}

if len(writeReqs) > 0 {
_, err := dy.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]dytypes.WriteRequest{dy.TableName("ChannelLogs"): writeReqs},
})
if err != nil {
log.Error("error writing channel logs", "error", err)
}
if err := clogs.BulkPut(ctx, dy, batch); err != nil {
log.Error("error writing channel logs", "error", err)

Check warning on line 190 in backends/rapidpro/channel_log.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/channel_log.go#L190

Added line #L190 was not covered by tests
}
}
102 changes: 95 additions & 7 deletions utils/clogs/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,29 @@ package clogs
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"slices"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
)

const (
dynamoTTL = 14 * 24 * time.Hour
dynamoTableBase = "ChannelLogs"
dynamoTTL = 14 * 24 * time.Hour
)

// DynamoLog channel log to be written to DynamoDB
type DynamoLog struct {
// log struct to be written to DynamoDB
type dynamoLog struct {
UUID LogUUID `dynamodbav:"UUID"`
Type LogType `dynamodbav:"Type"`
DataGZ []byte `dynamodbav:"DataGZ,omitempty"`
Expand All @@ -22,14 +34,19 @@ type DynamoLog struct {
ExpiresOn time.Time `dynamodbav:"ExpiresOn,unixtime"`
}

func NewDynamoLog(l *Log) *DynamoLog {
data := jsonx.MustMarshal(map[string]any{"http_logs": l.HttpLogs, "errors": l.Errors})
type dynamoLogData struct {
HttpLogs []*httpx.Log `json:"http_logs"`
Errors []*LogError `json:"errors"`
}

func newDynamoLog(l *Log) *dynamoLog {
data := &dynamoLogData{HttpLogs: l.HttpLogs, Errors: l.Errors}
buf := &bytes.Buffer{}
w := gzip.NewWriter(buf)
w.Write(data)
w.Write(jsonx.MustMarshal(data))
w.Close()

return &DynamoLog{
return &dynamoLog{
UUID: l.UUID,
Type: l.Type,
DataGZ: buf.Bytes(),
Expand All @@ -38,3 +55,74 @@ func NewDynamoLog(l *Log) *DynamoLog {
ExpiresOn: l.CreatedOn.Add(dynamoTTL),
}
}

func (d *dynamoLog) unpack() (*Log, error) {
r, err := gzip.NewReader(bytes.NewReader(d.DataGZ))
if err != nil {
return nil, err

Check warning on line 62 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L62

Added line #L62 was not covered by tests
}
j, err := io.ReadAll(r)
if err != nil {
return nil, err

Check warning on line 66 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L66

Added line #L66 was not covered by tests
}

var data dynamoLogData
if err := json.Unmarshal(j, &data); err != nil {
return nil, err

Check warning on line 71 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L71

Added line #L71 was not covered by tests
}

return &Log{
UUID: d.UUID,
Type: d.Type,
HttpLogs: data.HttpLogs,
Errors: data.Errors,
CreatedOn: d.CreatedOn,
Elapsed: time.Duration(d.ElapsedMS) * time.Millisecond,
}, nil
}

// Get retrieves a log from DynamoDB by its UUID
func Get(ctx context.Context, ds *dynamo.Service, uuid LogUUID) (*Log, error) {
resp, err := ds.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(ds.TableName(dynamoTableBase)),
Key: map[string]types.AttributeValue{
"UUID": &types.AttributeValueMemberS{Value: string(uuid)},
},
})
if err != nil {
return nil, fmt.Errorf("error getting log from db: %w", err)

Check warning on line 93 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L93

Added line #L93 was not covered by tests
}

var d dynamoLog
if err := attributevalue.UnmarshalMap(resp.Item, &d); err != nil {
return nil, fmt.Errorf("error unmarshaling log: %w", err)

Check warning on line 98 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L98

Added line #L98 was not covered by tests
}

return d.unpack()
}

// BulkPut writes multiple logs to DynamoDB in batches of 25
func BulkPut(ctx context.Context, ds *dynamo.Service, logs []*Log) error {
for batch := range slices.Chunk(logs, 25) {
writeReqs := make([]types.WriteRequest, len(batch))

for i, l := range batch {
dl := newDynamoLog(l)

item, err := attributevalue.MarshalMap(dl)
if err != nil {
return fmt.Errorf("error marshalling log: %w", err)

Check warning on line 114 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L114

Added line #L114 was not covered by tests
}
writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: item}}
}

_, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{ds.TableName(dynamoTableBase): writeReqs},
})
if err != nil {
return fmt.Errorf("error writing logs to db: %w", err)

Check warning on line 123 in utils/clogs/dynamo.go

View check run for this annotation

Codecov / codecov/patch

utils/clogs/dynamo.go#L123

Added line #L123 was not covered by tests
}
}

return nil
}
37 changes: 37 additions & 0 deletions utils/clogs/dynamo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package clogs_test

import (
"context"
"testing"
"time"

"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDynamo(t *testing.T) {
ctx := context.Background()
ds, err := dynamo.NewService("root", "tembatemba", "us-east-1", "http://localhost:6000", "Test")
require.NoError(t, err)

l1 := clogs.NewLog("test_type1", nil, nil)
l1.Error(clogs.NewLogError("code1", "ext", "message"))

l2 := clogs.NewLog("test_type2", nil, nil)
l2.Error(clogs.NewLogError("code2", "ext", "message"))

// write both logs to db
err = clogs.BulkPut(ctx, ds, []*clogs.Log{l1, l2})
assert.NoError(t, err)

// read log 1 back from db
l3, err := clogs.Get(ctx, ds, l1.UUID)
assert.NoError(t, err)
assert.Equal(t, l1.UUID, l3.UUID)
assert.Equal(t, clogs.LogType("test_type1"), l3.Type)
assert.Equal(t, []*clogs.LogError{clogs.NewLogError("code1", "ext", "message")}, l3.Errors)
assert.Equal(t, l1.Elapsed, l3.Elapsed)
assert.Equal(t, l1.CreatedOn.Truncate(time.Second), l3.CreatedOn)
}

0 comments on commit 13134ae

Please sign in to comment.