Skip to content

Commit

Permalink
Log and report kafka metrics after producing them (#155)
Browse files Browse the repository at this point in the history
Currently we report on kafka metrics on .Produce().
However, there are scenarios when kafka delivery fails and
we don't record those failures.
Handling reporting of metrics based on producer events instead
  • Loading branch information
agbpatro authored Apr 11, 2024
1 parent e902968 commit 86315ff
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ type Producer struct {

// Metrics stores metrics reported from this package
type Metrics struct {
produceCount adapter.Counter
byteTotal adapter.Counter
errorCount adapter.Counter
produceCount adapter.Counter
bytesTotal adapter.Counter
produceAckCount adapter.Counter
bytesAckTotal adapter.Counter
errorCount adapter.Counter
}

var (
Expand Down Expand Up @@ -57,9 +59,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
reliableAck: reliableAckWorkers > 0,
}

for i := 0; i < reliableAckWorkers; i++ {
go producer.handleProducerEvents(ackChan)
}
go producer.handleProducerEvents(ackChan)
producer.logger.ActivityLog("kafka_registered", logrus.LogInfo{"namespace": namespace})
return producer, nil
}
Expand All @@ -74,21 +74,18 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Key: []byte(entry.Vin),
Headers: headersFromRecord(entry),
Timestamp: time.Now(),
Opaque: entry,
}

// Note: confluent kafka supports the concept of one channel per connection, so we could add those here and get rid of reliableAckWorkers
// ex.: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_custom_channel_example/producer_custom_channel_example.go#L79
if p.reliableAck {
msg.Opaque = entry
}
entry.ProduceTime = time.Now()
if err := p.kafkaProducer.Produce(msg, nil); err != nil {
p.logError(err)
return
}

metricsRegistry.produceCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
metricsRegistry.bytesTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}

// ReportError to airbrake and logger
Expand All @@ -113,9 +110,14 @@ func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
case kafka.Error:
p.logError(fmt.Errorf("producer_error %v", ev))
case *kafka.Message:
record, ok := ev.Opaque.(*telemetry.Record)
if ok {
ackChan <- record
entry, ok := ev.Opaque.(*telemetry.Record)
if !ok {
continue
}
metricsRegistry.produceAckCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.bytesAckTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
if p.reliableAck {
ackChan <- entry
}
default:
p.logger.ActivityLog("kafka_event_ignored", logrus.LogInfo{"event": ev.String()})
Expand All @@ -139,12 +141,24 @@ func registerMetrics(metricsCollector metrics.MetricCollector) {
Labels: []string{"record_type"},
})

metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
metricsRegistry.bytesTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_produce_total_bytes",
Help: "The number of bytes produced to Kafka.",
Labels: []string{"record_type"},
})

metricsRegistry.produceAckCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_produce_ack_total",
Help: "The number of records produced to Kafka for which we got an ACK.",
Labels: []string{"record_type"},
})

metricsRegistry.bytesAckTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_produce_ack_total_bytes",
Help: "The number of bytes produced to Kafka for which we got an ACK.",
Labels: []string{"record_type"},
})

metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_err",
Help: "The number of errors while producing to Kafka.",
Expand Down

0 comments on commit 86315ff

Please sign in to comment.