From 8e70c7157fe5541ce54f71928bd8d0e295b5a10e Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 18 Sep 2024 13:14:36 -0500 Subject: [PATCH 1/2] Stop writing channel logs to S3 --- backends/rapidpro/backend.go | 18 ++------ backends/rapidpro/backend_test.go | 9 ++-- backends/rapidpro/channel_log.go | 69 +------------------------------ config.go | 2 - utils/clogs/clog.go | 2 +- 5 files changed, 11 insertions(+), 89 deletions(-) diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 984cd1f5..7766761e 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -56,9 +56,8 @@ type backend struct { config *courier.Config statusWriter *StatusWriter - dbLogWriter *DBLogWriter // unattached logs being written to the database - stLogWriter *StorageLogWriter // attached logs being written to storage - dyLogWriter *DynamoLogWriter // all logs being written to dynamo + dbLogWriter *DBLogWriter // unattached logs being written to the database + dyLogWriter *DynamoLogWriter // all logs being written to dynamo writerWG *sync.WaitGroup db *sqlx.DB @@ -185,17 +184,12 @@ func (b *backend) Start() error { return err } - // check bucket access + // check attachment bucket access if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil { log.Error("attachments bucket not accessible", "error", err) } else { log.Info("attachments bucket ok") } - if err := b.s3.Test(ctx, b.config.S3LogsBucket); err != nil { - log.Error("logs bucket not accessible", "error", err) - } else { - log.Info("logs bucket ok") - } // create and start channel caches... b.channelsByUUID = cache.NewLocal(b.loadChannelByUUID, time.Minute) @@ -224,9 +218,6 @@ func (b *backend) Start() error { b.dbLogWriter = NewDBLogWriter(b.db, b.writerWG) b.dbLogWriter.Start() - b.stLogWriter = NewStorageLogWriter(b.s3, b.config.S3LogsBucket, b.writerWG) - b.stLogWriter.Start() - b.dyLogWriter = NewDynamoLogWriter(b.dynamo, b.writerWG) b.dyLogWriter.Start() @@ -260,9 +251,6 @@ func (b *backend) Cleanup() error { if b.dbLogWriter != nil { b.dbLogWriter.Stop() } - if b.stLogWriter != nil { - b.stLogWriter.Stop() - } if b.dyLogWriter != nil { b.dyLogWriter.Stop() } diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 9170fbee..033368f5 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -53,7 +53,6 @@ func testConfig() *courier.Config { config.AWSSecretAccessKey = "tembatemba" config.S3Endpoint = "http://localhost:9000" config.S3AttachmentsBucket = "test-attachments" - config.S3LogsBucket = "test-attachments" config.S3Minio = true config.DynamoEndpoint = "http://localhost:6000" config.DynamoTablePrefix = "Test" @@ -1075,10 +1074,12 @@ func (ts *BackendTestSuite) TestWriteChanneLog() { time.Sleep(time.Second) // give writer time to write this - _, body, err := ts.b.s3.GetObject(context.Background(), ts.b.config.S3LogsBucket, fmt.Sprintf("channels/%s/%s/%s.json", channel.UUID(), clog2.UUID[0:4], clog2.UUID)) + // check that we can read the log back from DynamoDB + actualLog = &clogs.Log{} + err = ts.b.dynamo.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(clog2.UUID)}}, actualLog) ts.NoError(err) - ts.Contains(string(body), "msg_send") - ts.Contains(string(body), "https://api.messages.com/send.json") + ts.Equal(clog2.UUID, actualLog.UUID) + ts.Equal(courier.ChannelLogTypeMsgSend, actualLog.Type) ts.b.db.MustExec(`DELETE FROM channels_channellog`) diff --git a/backends/rapidpro/channel_log.go b/backends/rapidpro/channel_log.go index 0ac7e663..20418962 100644 --- a/backends/rapidpro/channel_log.go +++ b/backends/rapidpro/channel_log.go @@ -3,20 +3,15 @@ package rapidpro import ( "context" "encoding/json" - "fmt" "log/slog" - "path" "sync" "time" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" "github.com/nyaruka/courier/utils/clogs" "github.com/nyaruka/gocommon/aws/dynamo" - "github.com/nyaruka/gocommon/aws/s3x" "github.com/nyaruka/gocommon/dbutil" - "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/syncx" ) @@ -37,21 +32,6 @@ type dbChannelLog struct { ElapsedMS int `db:"elapsed_ms"` } -// channel log to be written to logs storage -type stChannelLog struct { - UUID clogs.LogUUID `json:"uuid"` - Type clogs.LogType `json:"type"` - HTTPLogs []*httpx.Log `json:"http_logs"` - Errors []*clogs.LogError `json:"errors"` - ElapsedMS int `json:"elapsed_ms"` - CreatedOn time.Time `json:"created_on"` - ChannelUUID courier.ChannelUUID `json:"-"` -} - -func (l *stChannelLog) path() string { - return path.Join("channels", string(l.ChannelUUID), string(l.UUID[:4]), fmt.Sprintf("%s.json", l.UUID)) -} - // queues the passed in channel log to a writer func queueChannelLog(b *backend, clog *courier.ChannelLog) { log := slog.With("log_uuid", clog.UUID, "log_type", clog.Type, "channel_uuid", clog.Channel().UUID()) @@ -67,22 +47,8 @@ func queueChannelLog(b *backend, clog *courier.ChannelLog) { log.With("storage", "dynamo").Error("channel log writer buffer full") } - // if log is attached to a call or message, only write to storage - if clog.Attached() { - v := &stChannelLog{ - UUID: clog.UUID, - Type: clog.Type, - HTTPLogs: clog.HttpLogs, - Errors: clog.Errors, - ElapsedMS: int(clog.Elapsed / time.Millisecond), - CreatedOn: clog.CreatedOn, - ChannelUUID: clog.Channel().UUID(), - } - if b.stLogWriter.Queue(v) <= 0 { - log.With("storage", "s3").Error("channel log writer buffer full") - } - } else { - // otherwise write to database so it's retrievable + // if log is not attached to a call or message, need to write it to the database so that it is retrievable + if !clog.Attached() { v := &dbChannelLog{ UUID: clog.UUID, Type: clog.Type, @@ -137,37 +103,6 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog) } } -type StorageLogWriter struct { - *syncx.Batcher[*stChannelLog] -} - -func NewStorageLogWriter(s3s *s3x.Service, bucket string, wg *sync.WaitGroup) *StorageLogWriter { - return &StorageLogWriter{ - Batcher: syncx.NewBatcher(func(batch []*stChannelLog) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - writeStorageChannelLogs(ctx, s3s, bucket, batch) - }, 1000, time.Millisecond*500, 1000, wg), - } -} - -func writeStorageChannelLogs(ctx context.Context, s3s *s3x.Service, bucket string, batch []*stChannelLog) { - uploads := make([]*s3x.Upload, len(batch)) - for i, l := range batch { - uploads[i] = &s3x.Upload{ - Bucket: bucket, - Key: l.path(), - ContentType: "application/json", - Body: jsonx.MustMarshal(l), - ACL: s3types.ObjectCannedACLPrivate, - } - } - if err := s3s.BatchPut(ctx, uploads, 32); err != nil { - slog.Error("error writing channel logs", "comp", "storage log writer") - } -} - type DynamoLogWriter struct { *syncx.Batcher[*clogs.Log] } diff --git a/config.go b/config.go index 80899be1..625573a2 100644 --- a/config.go +++ b/config.go @@ -34,7 +34,6 @@ type Config struct { S3Endpoint string `help:"S3 service endpoint, e.g. https://s3.amazonaws.com"` S3AttachmentsBucket string `help:"S3 bucket to write attachments to"` - S3LogsBucket string `help:"S3 bucket to write channel logs to"` S3Minio bool `help:"S3 is actually Minio or other compatible service"` FacebookApplicationSecret string `help:"the Facebook app secret"` @@ -79,7 +78,6 @@ func NewDefaultConfig() *Config { S3Endpoint: "https://s3.amazonaws.com", S3AttachmentsBucket: "temba-attachments", - S3LogsBucket: "temba-logs", S3Minio: false, FacebookApplicationSecret: "missing_facebook_app_secret", diff --git a/utils/clogs/clog.go b/utils/clogs/clog.go index 0c14df29..60a9ff0d 100644 --- a/utils/clogs/clog.go +++ b/utils/clogs/clog.go @@ -13,7 +13,7 @@ import ( ) const ( - dynamoTTL = 14 * 24 * time.Hour + dynamoTTL = 7 * 24 * time.Hour // 1 week ) // LogUUID is the type of a channel log UUID (should be v7) From 356c2f13051acb6df89e22d4d1f4f949ef0ee990 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 18 Sep 2024 14:15:29 -0500 Subject: [PATCH 2/2] Log unprocessed items when writing logs to DynamoDB --- backends/rapidpro/channel_log.go | 33 +++++++++++++++++++++++------ utils/clogs/batch.go | 36 -------------------------------- utils/clogs/clog_test.go | 4 +++- 3 files changed, 30 insertions(+), 43 deletions(-) delete mode 100644 utils/clogs/batch.go diff --git a/backends/rapidpro/channel_log.go b/backends/rapidpro/channel_log.go index 20418962..768e9844 100644 --- a/backends/rapidpro/channel_log.go +++ b/backends/rapidpro/channel_log.go @@ -3,10 +3,13 @@ package rapidpro import ( "context" "encoding/json" + "fmt" "log/slog" "sync" "time" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" "github.com/nyaruka/courier/utils/clogs" @@ -90,7 +93,7 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog) for _, v := range batch { err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v}) if err != nil { - log := slog.With("comp", "log writer", "log_uuid", v.UUID) + log := slog.With("comp", "db log writer", "log_uuid", v.UUID) if qerr := dbutil.AsQueryError(err); qerr != nil { query, params := qerr.Query() @@ -113,15 +116,33 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - writeDynamoChannelLogs(ctx, dy, batch) + if err := writeDynamoChannelLogs(ctx, dy, batch); err != nil { + slog.Error("error writing logs to dynamo", "error", err) + } }, 25, time.Millisecond*500, 1000, wg), } } -func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) { - log := slog.With("comp", "dynamo log writer") +func writeDynamoChannelLogs(ctx context.Context, ds *dynamo.Service, batch []*clogs.Log) error { + writeReqs := make([]types.WriteRequest, len(batch)) + + for i, l := range batch { + d, err := l.MarshalDynamo() + if err != nil { + return fmt.Errorf("error marshalling log for dynamo: %w", err) + } + writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} + } - if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil { - log.Error("error writing channel logs", "error", err) + resp, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{ds.TableName("ChannelLogs"): writeReqs}, + }) + if err != nil { + return err + } + if len(resp.UnprocessedItems) > 0 { + // TODO shouldn't happend.. but need to figure out how we would retry these + slog.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems)) } + return nil } diff --git a/utils/clogs/batch.go b/utils/clogs/batch.go deleted file mode 100644 index 7d174b88..00000000 --- a/utils/clogs/batch.go +++ /dev/null @@ -1,36 +0,0 @@ -package clogs - -import ( - "context" - "fmt" - "slices" - - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/nyaruka/gocommon/aws/dynamo" -) - -// BatchPut writes multiple logs to DynamoDB in batches of 25. This should probably be a generic function in the -// gocommon/dynamo package but need to think more about errors. -func BatchPut(ctx context.Context, ds *dynamo.Service, table string, logs []*Log) error { - for batch := range slices.Chunk(logs, 25) { - writeReqs := make([]types.WriteRequest, len(batch)) - - for i, l := range batch { - d, err := l.MarshalDynamo() - if err != nil { - return fmt.Errorf("error marshalling log: %w", err) - } - writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} - } - - _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]types.WriteRequest{ds.TableName(table): writeReqs}, - }) - if err != nil { - return fmt.Errorf("error writing logs to db: %w", err) - } - } - - return nil -} diff --git a/utils/clogs/clog_test.go b/utils/clogs/clog_test.go index d35261b8..202b2f8d 100644 --- a/utils/clogs/clog_test.go +++ b/utils/clogs/clog_test.go @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) { l2.Error(clogs.NewLogError("code2", "ext", "message")) // write both logs to db - err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2}) + err = ds.PutItem(ctx, "ChannelLogs", l1) + assert.NoError(t, err) + err = ds.PutItem(ctx, "ChannelLogs", l2) assert.NoError(t, err) // read log 1 back from db