Skip to content

Commit

Permalink
Merge pull request #790 from nyaruka/no_more_s3_logs
Browse files Browse the repository at this point in the history
Stop writing channel logs to S3
  • Loading branch information
rowanseymour authored Sep 18, 2024
2 parents 77f4c8d + 356c2f1 commit b6edd9e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 131 deletions.
18 changes: 3 additions & 15 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ type backend struct {
config *courier.Config

statusWriter *StatusWriter
dbLogWriter *DBLogWriter // unattached logs being written to the database
stLogWriter *StorageLogWriter // attached logs being written to storage
dyLogWriter *DynamoLogWriter // all logs being written to dynamo
dbLogWriter *DBLogWriter // unattached logs being written to the database
dyLogWriter *DynamoLogWriter // all logs being written to dynamo
writerWG *sync.WaitGroup

db *sqlx.DB
Expand Down Expand Up @@ -185,17 +184,12 @@ func (b *backend) Start() error {
return err
}

// check bucket access
// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
log.Error("attachments bucket not accessible", "error", err)
} else {
log.Info("attachments bucket ok")
}
if err := b.s3.Test(ctx, b.config.S3LogsBucket); err != nil {
log.Error("logs bucket not accessible", "error", err)
} else {
log.Info("logs bucket ok")
}

// create and start channel caches...
b.channelsByUUID = cache.NewLocal(b.loadChannelByUUID, time.Minute)
Expand Down Expand Up @@ -224,9 +218,6 @@ func (b *backend) Start() error {
b.dbLogWriter = NewDBLogWriter(b.db, b.writerWG)
b.dbLogWriter.Start()

b.stLogWriter = NewStorageLogWriter(b.s3, b.config.S3LogsBucket, b.writerWG)
b.stLogWriter.Start()

b.dyLogWriter = NewDynamoLogWriter(b.dynamo, b.writerWG)
b.dyLogWriter.Start()

Expand Down Expand Up @@ -260,9 +251,6 @@ func (b *backend) Cleanup() error {
if b.dbLogWriter != nil {
b.dbLogWriter.Stop()
}
if b.stLogWriter != nil {
b.stLogWriter.Stop()
}
if b.dyLogWriter != nil {
b.dyLogWriter.Stop()
}
Expand Down
9 changes: 5 additions & 4 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func testConfig() *courier.Config {
config.AWSSecretAccessKey = "tembatemba"
config.S3Endpoint = "http://localhost:9000"
config.S3AttachmentsBucket = "test-attachments"
config.S3LogsBucket = "test-attachments"
config.S3Minio = true
config.DynamoEndpoint = "http://localhost:6000"
config.DynamoTablePrefix = "Test"
Expand Down Expand Up @@ -1075,10 +1074,12 @@ func (ts *BackendTestSuite) TestWriteChanneLog() {

time.Sleep(time.Second) // give writer time to write this

_, body, err := ts.b.s3.GetObject(context.Background(), ts.b.config.S3LogsBucket, fmt.Sprintf("channels/%s/%s/%s.json", channel.UUID(), clog2.UUID[0:4], clog2.UUID))
// check that we can read the log back from DynamoDB
actualLog = &clogs.Log{}
err = ts.b.dynamo.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(clog2.UUID)}}, actualLog)
ts.NoError(err)
ts.Contains(string(body), "msg_send")
ts.Contains(string(body), "https://api.messages.com/send.json")
ts.Equal(clog2.UUID, actualLog.UUID)
ts.Equal(courier.ChannelLogTypeMsgSend, actualLog.Type)

ts.b.db.MustExec(`DELETE FROM channels_channellog`)

Expand Down
100 changes: 28 additions & 72 deletions backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import (
"encoding/json"
"fmt"
"log/slog"
"path"
"sync"
"time"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/syncx"
)
Expand All @@ -37,21 +35,6 @@ type dbChannelLog struct {
ElapsedMS int `db:"elapsed_ms"`
}

// channel log to be written to logs storage
type stChannelLog struct {
UUID clogs.LogUUID `json:"uuid"`
Type clogs.LogType `json:"type"`
HTTPLogs []*httpx.Log `json:"http_logs"`
Errors []*clogs.LogError `json:"errors"`
ElapsedMS int `json:"elapsed_ms"`
CreatedOn time.Time `json:"created_on"`
ChannelUUID courier.ChannelUUID `json:"-"`
}

func (l *stChannelLog) path() string {
return path.Join("channels", string(l.ChannelUUID), string(l.UUID[:4]), fmt.Sprintf("%s.json", l.UUID))
}

// queues the passed in channel log to a writer
func queueChannelLog(b *backend, clog *courier.ChannelLog) {
log := slog.With("log_uuid", clog.UUID, "log_type", clog.Type, "channel_uuid", clog.Channel().UUID())
Expand All @@ -67,22 +50,8 @@ func queueChannelLog(b *backend, clog *courier.ChannelLog) {
log.With("storage", "dynamo").Error("channel log writer buffer full")
}

// if log is attached to a call or message, only write to storage
if clog.Attached() {
v := &stChannelLog{
UUID: clog.UUID,
Type: clog.Type,
HTTPLogs: clog.HttpLogs,
Errors: clog.Errors,
ElapsedMS: int(clog.Elapsed / time.Millisecond),
CreatedOn: clog.CreatedOn,
ChannelUUID: clog.Channel().UUID(),
}
if b.stLogWriter.Queue(v) <= 0 {
log.With("storage", "s3").Error("channel log writer buffer full")
}
} else {
// otherwise write to database so it's retrievable
// if log is not attached to a call or message, need to write it to the database so that it is retrievable
if !clog.Attached() {
v := &dbChannelLog{
UUID: clog.UUID,
Type: clog.Type,
Expand Down Expand Up @@ -124,7 +93,7 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog)
for _, v := range batch {
err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v})
if err != nil {
log := slog.With("comp", "log writer", "log_uuid", v.UUID)
log := slog.With("comp", "db log writer", "log_uuid", v.UUID)

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
Expand All @@ -137,37 +106,6 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog)
}
}

type StorageLogWriter struct {
*syncx.Batcher[*stChannelLog]
}

func NewStorageLogWriter(s3s *s3x.Service, bucket string, wg *sync.WaitGroup) *StorageLogWriter {
return &StorageLogWriter{
Batcher: syncx.NewBatcher(func(batch []*stChannelLog) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

writeStorageChannelLogs(ctx, s3s, bucket, batch)
}, 1000, time.Millisecond*500, 1000, wg),
}
}

func writeStorageChannelLogs(ctx context.Context, s3s *s3x.Service, bucket string, batch []*stChannelLog) {
uploads := make([]*s3x.Upload, len(batch))
for i, l := range batch {
uploads[i] = &s3x.Upload{
Bucket: bucket,
Key: l.path(),
ContentType: "application/json",
Body: jsonx.MustMarshal(l),
ACL: s3types.ObjectCannedACLPrivate,
}
}
if err := s3s.BatchPut(ctx, uploads, 32); err != nil {
slog.Error("error writing channel logs", "comp", "storage log writer")
}
}

type DynamoLogWriter struct {
*syncx.Batcher[*clogs.Log]
}
Expand All @@ -178,15 +116,33 @@ func NewDynamoLogWriter(dy *dynamo.Service, wg *sync.WaitGroup) *DynamoLogWriter
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

writeDynamoChannelLogs(ctx, dy, batch)
if err := writeDynamoChannelLogs(ctx, dy, batch); err != nil {
slog.Error("error writing logs to dynamo", "error", err)
}
}, 25, time.Millisecond*500, 1000, wg),
}
}

func writeDynamoChannelLogs(ctx context.Context, dy *dynamo.Service, batch []*clogs.Log) {
log := slog.With("comp", "dynamo log writer")
func writeDynamoChannelLogs(ctx context.Context, ds *dynamo.Service, batch []*clogs.Log) error {
writeReqs := make([]types.WriteRequest, len(batch))

if err := clogs.BatchPut(ctx, dy, "ChannelLogs", batch); err != nil {
log.Error("error writing channel logs", "error", err)
for i, l := range batch {
d, err := l.MarshalDynamo()
if err != nil {
return fmt.Errorf("error marshalling log for dynamo: %w", err)
}
writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}}
}

resp, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{ds.TableName("ChannelLogs"): writeReqs},
})
if err != nil {
return err
}
if len(resp.UnprocessedItems) > 0 {
// TODO shouldn't happend.. but need to figure out how we would retry these
slog.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems))
}
return nil
}
2 changes: 0 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Config struct {

S3Endpoint string `help:"S3 service endpoint, e.g. https://s3.amazonaws.com"`
S3AttachmentsBucket string `help:"S3 bucket to write attachments to"`
S3LogsBucket string `help:"S3 bucket to write channel logs to"`
S3Minio bool `help:"S3 is actually Minio or other compatible service"`

FacebookApplicationSecret string `help:"the Facebook app secret"`
Expand Down Expand Up @@ -79,7 +78,6 @@ func NewDefaultConfig() *Config {

S3Endpoint: "https://s3.amazonaws.com",
S3AttachmentsBucket: "temba-attachments",
S3LogsBucket: "temba-logs",
S3Minio: false,

FacebookApplicationSecret: "missing_facebook_app_secret",
Expand Down
36 changes: 0 additions & 36 deletions utils/clogs/batch.go

This file was deleted.

2 changes: 1 addition & 1 deletion utils/clogs/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
dynamoTTL = 14 * 24 * time.Hour
dynamoTTL = 7 * 24 * time.Hour // 1 week
)

// LogUUID is the type of a channel log UUID (should be v7)
Expand Down
4 changes: 3 additions & 1 deletion utils/clogs/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) {
l2.Error(clogs.NewLogError("code2", "ext", "message"))

// write both logs to db
err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2})
err = ds.PutItem(ctx, "ChannelLogs", l1)
assert.NoError(t, err)
err = ds.PutItem(ctx, "ChannelLogs", l2)
assert.NoError(t, err)

// read log 1 back from db
Expand Down

0 comments on commit b6edd9e

Please sign in to comment.