-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an sqs destination. #23
base: main
Are you sure you want to change the base?
Conversation
Got this working! |
internal/destinations/sqs/sqs.go
Outdated
} | ||
|
||
type sqs struct { | ||
batcher *batch.Destination[types.Event] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
types.Event
-> []byte
internal/destinations/sqs/sqs.go
Outdated
for _, msg := range msgs { | ||
messageRequest := &awssqs.SendMessageBatchRequestEntry{ | ||
Id: aws.String(ksuid.New().String()), | ||
MessageBody: aws.String(string(msg.Value.RawLog)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.Value.RawLog
-> msg.Value
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good, almost to where I'd imagine it needs to be.
I also removed the top-level internal/destinations
directory. Now everything lives under kawa/x/{plugin}
or kawa/cmd/kawad/internal/{sources,destinations}
. The []byte
source and destination would go under x/sqs
, and the types.Event
implementation would go under cmd/kawad/internal/destinations
.
Then, when registering the plugin here, we'd use the types.Event
implementation.
LMK if this makes sense or if you think the indirection is too much. I like the idea of reusing sources carrying different serialized data in bytes, but also don't love how the source/destination wrappers look currently.
@@ -34,6 +35,9 @@ func init() { | |||
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { | |||
return &PrinterConfig{} | |||
}) | |||
loader.Register("sqs", func() loader.Builder[kawa.Destination[[]byte]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be types.Event
, which requires implementing a small shim between bytes and types.Event
.
See @harpesichord's PR for MQTT for an example:
No description provided.