Skip to content

Commit

Permalink
Publish connecitivity events to a new topic (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro authored Oct 29, 2024
1 parent d41976f commit b3d1541
Show file tree
Hide file tree
Showing 32 changed files with 880 additions and 200 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:

- name: Set up protoc
run: |
wget https://github.com/protocolbuffers/protobuf/releases/download/v26.1/protoc-26.1-linux-x86_64.zip
unzip protoc-26.1-linux-x86_64.zip
wget https://github.com/protocolbuffers/protobuf/releases/download/v28.3/protoc-28.3-linux-x86_64.zip
unzip protoc-28.3-linux-x86_64.zip
sudo mv bin/protoc /usr/local/bin/protoc
sudo mv include/* /usr/local/include/
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model
## Backends/dispatchers
The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
* Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Topics will need to be created for \*prefix\*`_V`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla`
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla`
* Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`.
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
Expand Down
11 changes: 7 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l

if _, ok := requiredDispatchers[telemetry.Kafka]; ok {
if c.Kafka == nil {
return nil, errors.New("Expected Kafka to be configured")
return nil, errors.New("expected Kafka to be configured")
}
convertKafkaConfig(c.Kafka)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger)
Expand All @@ -276,7 +276,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l

if _, ok := requiredDispatchers[telemetry.Pubsub]; ok {
if c.Pubsub == nil {
return nil, errors.New("Expected Pubsub to be configured")
return nil, errors.New("expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
if err != nil {
Expand All @@ -287,7 +287,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l

if recordNames, ok := requiredDispatchers[telemetry.Kinesis]; ok {
if c.Kinesis == nil {
return nil, errors.New("Expected Kinesis to be configured")
return nil, errors.New("expected Kinesis to be configured")
}
maxRetries := 1
if c.Kinesis.MaxRetries != nil {
Expand All @@ -303,7 +303,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l

if _, ok := requiredDispatchers[telemetry.ZMQ]; ok {
if c.ZMQ == nil {
return nil, errors.New("Expected ZMQ to be configured")
return nil, errors.New("expected ZMQ to be configured")
}
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger)
if err != nil {
Expand Down Expand Up @@ -331,6 +331,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[string]interface{}, error) {
reliableAckSources := make(map[telemetry.Dispatcher]map[string]interface{}, 0)
for txType, dispatchRule := range c.ReliableAckSources {
if txType == "connectivity" {
return nil, fmt.Errorf("reliable ack not needed for txType: %s", txType)
}
if dispatchRule == telemetry.Logger {
return nil, fmt.Errorf("logger cannot be configured as reliable ack for record: %s", txType)
}
Expand Down
5 changes: 3 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ var _ = Describe("Test full application config", func() {
Entry("when reliable ack is mapped incorrectly", TestBadReliableAckConfig, "pubsub cannot be configured as reliable ack for record: V. Valid datastores configured [kafka]"),
Entry("when logger is configured as reliable ack", TestLoggerAsReliableAckConfig, "logger cannot be configured as reliable ack for record: V"),
Entry("when reliable ack is configured for unmapped txtype", TestUnusedTxTypeAsReliableAckConfig, "kafka cannot be configured as reliable ack for record: error since no record mapping exists"),
Entry("when reliable ack is mapped with unsupported txtype", TestBadTxTypeReliableAckConfig, "reliable ack not needed for txType: connectivity"),
)

})
Expand All @@ -192,7 +193,7 @@ var _ = Describe("Test full application config", func() {

var err error
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("Expected Kinesis to be configured"))
Expect(err).To(MatchError("expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})

Expand Down Expand Up @@ -253,7 +254,7 @@ var _ = Describe("Test full application config", func() {
config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}}
var err error
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(MatchError("Expected ZMQ to be configured"))
Expect(err).To(MatchError("expected ZMQ to be configured"))
Expect(producers).To(BeNil())
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
Expect(err).To(BeNil())
Expand Down
27 changes: 27 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,30 @@ const TestAirbrakeConfig = `
}
}
`

const TestBadTxTypeReliableAckConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"namespace": "tesla_telemetry",
"reliable_ack_sources": {
"connectivity": "kafka"
},
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
"ssl.certificate.location": "kafka.crt",
"ssl.key.location": "kafka.key",
"queue.buffering.max.messages": 1000000
},
"records": {
"V": ["kafka"],
"connectivity": ["kafka"]
},
"tls": {
"server_cert": "your_own_cert.crt",
"server_key": "your_own_key.key"
}
}
`
2 changes: 1 addition & 1 deletion datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
}
p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace})
p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
}

Expand Down
2 changes: 2 additions & 0 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, err
errorMaps[i] = transformers.VehicleErrorToMap(vehicleError)
}
return errorMaps, nil
case *protos.VehicleConnectivity:
return transformers.VehicleConnectivityToMap(payload), nil
default:
return nil, fmt.Errorf("unknown txType: %s", record.TxType)
}
Expand Down
4 changes: 2 additions & 2 deletions datastore/simple/transformers/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ var _ = Describe("Payload", func() {
payload := &protos.Payload{
Data: []*protos.Datum{
nil,
&protos.Datum{
{
Value: nil,
},
&protos.Datum{
{
Key: protos.Field_BatteryHeaterOn,
Value: &protos.Value{
Value: &protos.Value_BooleanValue{BooleanValue: true},
Expand Down
15 changes: 15 additions & 0 deletions datastore/simple/transformers/vehicle_connectivity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package transformers

import (
"github.com/teslamotors/fleet-telemetry/protos"
)

// VehicleConnectivityToMap converts a VehicleConnectivity proto message to a map representation
func VehicleConnectivityToMap(vehicleConnectivity *protos.VehicleConnectivity) map[string]interface{} {
return map[string]interface{}{
"Vin": vehicleConnectivity.GetVin(),
"ConnectionID": vehicleConnectivity.GetConnectionId(),
"Status": vehicleConnectivity.GetStatus().String(),
"CreatedAt": vehicleConnectivity.CreatedAt.AsTime().Unix(),
}
}
40 changes: 40 additions & 0 deletions datastore/simple/transformers/vehicle_connectivity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package transformers_test

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/teslamotors/fleet-telemetry/datastore/simple/transformers"
"github.com/teslamotors/fleet-telemetry/protos"

"google.golang.org/protobuf/types/known/timestamppb"
)

var _ = Describe("VehicleConnectivity", func() {
Describe("VehicleConnectivityToMap", func() {
var (
connectivity *protos.VehicleConnectivity
)

BeforeEach(func() {
connectivity = &protos.VehicleConnectivity{
Vin: "Vin1",
ConnectionId: "connection1",
CreatedAt: timestamppb.New(time.Now()),
Status: protos.ConnectivityEvent_CONNECTED,
}
})

It("includes all expected data", func() {
result := transformers.VehicleConnectivityToMap(connectivity)
Expect(result).To(HaveLen(4))
Expect(result["Vin"]).To(Equal("Vin1"))
Expect(result["ConnectionID"]).To(Equal("connection1"))
Expect(result["CreatedAt"]).To(BeNumerically("~", time.Now().Unix(), 1))
Expect(result["Status"]).To(Equal("CONNECTED"))
})

})
})
65 changes: 34 additions & 31 deletions examples/server_config.json
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
{
"host": "0.0.0.0",
"port": 443,
"log_level": "info",
"json_log_enable": true,
"namespace": "tesla_telemetry",
"reliable_ack": false,
"monitoring": {
"prometheus_metrics_port": 9090,
"profiler_port": 4269,
"profiling_path": "/tmp/trace.out"
},
"rate_limit": {
"enabled": true,
"message_interval_time": 30,
"message_limit": 1000
},
"records": {
"alerts": [
"logger"
],
"errors": [
"logger"
],
"V": [
"logger"
]
},
"tls": {
"server_cert": "/etc/certs/server/tls.crt",
"server_key": "/etc/certs/server/tls.key"
}
"host": "0.0.0.0",
"port": 443,
"log_level": "info",
"json_log_enable": true,
"namespace": "tesla_telemetry",
"reliable_ack": false,
"monitoring": {
"prometheus_metrics_port": 9090,
"profiler_port": 4269,
"profiling_path": "/tmp/trace.out"
},
"rate_limit": {
"enabled": true,
"message_interval_time": 30,
"message_limit": 1000
},
"records": {
"alerts": [
"logger"
],
"errors": [
"logger"
],
"V": [
"logger"
],
"connectivity": [
"logger"
]
},
"tls": {
"server_cert": "/etc/certs/server/tls.crt",
"server_key": "/etc/certs/server/tls.key"
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go v1.44.278
github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/golang/protobuf v1.5.3
github.com/google/flatbuffers v23.3.3+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
Expand All @@ -34,7 +35,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.8.0 // indirect
Expand Down
12 changes: 11 additions & 1 deletion protos/python/vehicle_alert_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions protos/python/vehicle_connectivity_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion protos/python/vehicle_data_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b3d1541

Please sign in to comment.