Skip to content

Commit

Permalink
close producers on shutdown (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro authored Nov 14, 2024
1 parent d1d549e commit 50943d1
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 33 deletions.
14 changes: 12 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg
monitoring.StartServerMetrics(config, logger, registry)
}

producerRules, err := config.ConfigureProducers(airbrakeHandler, logger)
dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger)
if err != nil {
return err
}
Expand All @@ -75,5 +75,15 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg
if server.TLSConfig, err = config.ExtractServiceTLSConfig(logger); err != nil {
return err
}
return server.ListenAndServeTLS(config.TLS.ServerCert, config.TLS.ServerKey)

err = server.ListenAndServeTLS(config.TLS.ServerCert, config.TLS.ServerKey)
for dispatcher, producer := range dispatchers {
logger.ActivityLog("attempting_to_close", logrus.LogInfo{"dispatcher": dispatcher})
// We don't care if this fails. If it does, we'll just continue on.
if dispatcherCloseErr := producer.Close(); dispatcherCloseErr != nil {
logger.ErrorLog("producer_close_error", dispatcherCloseErr, logrus.LogInfo{"dispatcher": dispatcher})
}
}
logger.ActivityLog("stopped_server", nil)
return err
}
24 changes: 12 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ func (c *Config) prometheusEnabled() bool {
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, err
return nil, nil, err
}

producers := make(map[telemetry.Dispatcher]telemetry.Producer)
Expand All @@ -267,30 +267,30 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l

if _, ok := requiredDispatchers[telemetry.Kafka]; ok {
if c.Kafka == nil {
return nil, errors.New("expected Kafka to be configured")
return nil, nil, errors.New("expected Kafka to be configured")
}
convertKafkaConfig(c.Kafka)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger)
if err != nil {
return nil, err
return nil, nil, err
}
producers[telemetry.Kafka] = kafkaProducer
}

if _, ok := requiredDispatchers[telemetry.Pubsub]; ok {
if c.Pubsub == nil {
return nil, errors.New("expected Pubsub to be configured")
return nil, nil, errors.New("expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
if err != nil {
return nil, err
return nil, nil, err
}
producers[telemetry.Pubsub] = googleProducer
}

if recordNames, ok := requiredDispatchers[telemetry.Kinesis]; ok {
if c.Kinesis == nil {
return nil, errors.New("expected Kinesis to be configured")
return nil, nil, errors.New("expected Kinesis to be configured")
}
maxRetries := 1
if c.Kinesis.MaxRetries != nil {
Expand All @@ -299,18 +299,18 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
streamMapping := c.CreateKinesisStreamMapping(recordNames)
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kinesis], logger)
if err != nil {
return nil, err
return nil, nil, err
}
producers[telemetry.Kinesis] = kinesis
}

if _, ok := requiredDispatchers[telemetry.ZMQ]; ok {
if c.ZMQ == nil {
return nil, errors.New("expected ZMQ to be configured")
return nil, nil, errors.New("expected ZMQ to be configured")
}
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger)
if err != nil {
return nil, err
return nil, nil, err
}
producers[telemetry.ZMQ] = zmqProducer
}
Expand All @@ -324,11 +324,11 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
dispatchProducerRules[recordName] = dispatchFuncs

if len(dispatchProducerRules[recordName]) == 0 {
return nil, fmt.Errorf("unknown_dispatch_rule record: %v, dispatchRule:%v", recordName, dispatchRules)
return nil, nil, fmt.Errorf("unknown_dispatch_rule record: %v, dispatchRule:%v", recordName, dispatchRules)
}
}

return dispatchProducerRules, nil
return producers, dispatchProducerRules, nil
}

func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[string]interface{}, error) {
Expand Down
18 changes: 9 additions & 9 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).To(HaveLen(1))

Expand Down Expand Up @@ -174,7 +174,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(configInput)
Expect(err).NotTo(HaveOccurred())

producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError(errMessage))
Expect(producers).To(BeNil())
},
Expand All @@ -192,7 +192,7 @@ var _ = Describe("Test full application config", func() {
config.Records = map[string][]telemetry.Dispatcher{"V": {"kinesis"}}

var err error
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})
Expand Down Expand Up @@ -226,15 +226,15 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
_ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path")
_, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource"))
})

It("pubsub config works", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
var err error
producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand All @@ -253,19 +253,19 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}}
var err error
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("expected ZMQ to be configured"))
Expect(producers).To(BeNil())
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(BeNil())
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
})

It("zmq config works", func() {
// ZMQ close is async, this removes the need to sync between tests.
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := logrus.NoOpLogger()
var err error
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand Down
5 changes: 5 additions & 0 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ func (p *Producer) Produce(entry *telemetry.Record) {

}

// Close the producer
func (p *Producer) Close() error {
return p.pubsubClient.Close()
}

// ProcessReliableAck sends to ackChan if reliable ack is configured
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
_, ok := p.reliableAckTxTypes[entry.TxType]
Expand Down
6 changes: 6 additions & 0 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ func (p *Producer) handleProducerEvents() {
}
}

// Close the producer
func (p *Producer) Close() error {
p.kafkaProducer.Close()
return nil
}

// ProcessReliableAck sends to ackChan if reliable ack is configured
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
_, ok := p.reliableAckTxTypes[entry.TxType]
Expand Down
5 changes: 5 additions & 0 deletions datastore/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (p *Producer) Produce(entry *telemetry.Record) {
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}

// Close the producer
func (p *Producer) Close() error {
return nil
}

// ProcessReliableAck sends to ackChan if reliable ack is configured
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
_, ok := p.reliableAckTxTypes[entry.TxType]
Expand Down
19 changes: 12 additions & 7 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@ type Config struct {
Verbose bool `json:"verbose"`
}

// ProtoLogger is a simple protobuf logger
type ProtoLogger struct {
// Producer is a simple protobuf logger
type Producer struct {
Config *Config
logger *logrus.Logger
}

// NewProtoLogger initializes the parameters for protobuf payload logging
func NewProtoLogger(config *Config, logger *logrus.Logger) telemetry.Producer {
return &ProtoLogger{Config: config, logger: logger}
return &Producer{Config: config, logger: logger}
}

// Close the producer
func (p *Producer) Close() error {
return nil
}

// ProcessReliableAck noop method
func (p *ProtoLogger) ProcessReliableAck(_ *telemetry.Record) {
func (p *Producer) ProcessReliableAck(_ *telemetry.Record) {
}

// Produce sends the data to the logger
func (p *ProtoLogger) Produce(entry *telemetry.Record) {
func (p *Producer) Produce(entry *telemetry.Record) {
data, err := p.recordToLogMap(entry)
if err != nil {
p.logger.ErrorLog("record_logging_error", err, logrus.LogInfo{"vin": entry.Vin, "txtype": entry.TxType, "metadata": entry.Metadata()})
Expand All @@ -41,11 +46,11 @@ func (p *ProtoLogger) Produce(entry *telemetry.Record) {
}

// ReportError noop method
func (p *ProtoLogger) ReportError(_ string, _ error, _ logrus.LogInfo) {
func (p *Producer) ReportError(_ string, _ error, _ logrus.LogInfo) {
}

// recordToLogMap converts the data of a record to a map or slice of maps
func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, error) {
func (p *Producer) recordToLogMap(record *telemetry.Record) (interface{}, error) {
switch payload := record.GetProtoMessage().(type) {
case *protos.Payload:
return transformers.PayloadToMap(payload, p.Config.Verbose, p.logger), nil
Expand Down
6 changes: 3 additions & 3 deletions datastore/simple/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

var _ = Describe("ProtoLogger", func() {
var (
protoLogger *simple.ProtoLogger
protoLogger *simple.Producer
testLogger *logrus.Logger
hook *test.Hook
config *simple.Config
Expand All @@ -28,7 +28,7 @@ var _ = Describe("ProtoLogger", func() {
BeforeEach(func() {
testLogger, hook = logrus.NoOpLogger()
config = &simple.Config{Verbose: false}
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger)
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.Producer)
})

Describe("NewProtoLogger", func() {
Expand Down Expand Up @@ -116,7 +116,7 @@ var _ = Describe("ProtoLogger", func() {
Context("when verbose set to true", func() {
BeforeEach(func() {
config.Verbose = true
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger)
protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.Producer)
})

It("does not include types in the data", func() {
Expand Down
1 change: 1 addition & 0 deletions telemetry/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func BuildTopicName(namespace, recordName string) string {

// Producer handles dispatching data received from the vehicle
type Producer interface {
Close() error
Produce(entry *Record)
ProcessReliableAck(entry *Record)
ReportError(message string, err error, logInfo logrus.LogInfo)
Expand Down
4 changes: 4 additions & 0 deletions telemetry/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type CallbackTester struct {
reliableAck int
}

func (c *CallbackTester) Close() error {
return nil
}

func (c *CallbackTester) Produce(_ *telemetry.Record) {
c.counter++
}
Expand Down

0 comments on commit 50943d1

Please sign in to comment.