diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8580290..0935ca8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,8 +19,8 @@ jobs: - name: Set up protoc run: | - wget https://github.com/protocolbuffers/protobuf/releases/download/v26.1/protoc-26.1-linux-x86_64.zip - unzip protoc-26.1-linux-x86_64.zip + wget https://github.com/protocolbuffers/protobuf/releases/download/v28.3/protoc-28.3-linux-x86_64.zip + unzip protoc-28.3-linux-x86_64.zip sudo mv bin/protoc /usr/local/bin/protoc sudo mv include/* /usr/local/include/ diff --git a/README.md b/README.md index 8ce3a09..d302c83 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,7 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model ## Backends/dispatchers The following [dispatchers](./telemetry/producer.go#L10-L19) are supported * Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) - * Topics will need to be created for \*prefix\*`_V`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla` + * Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla` * Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`. * By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` diff --git a/config/config.go b/config/config.go index ae6384c..ae05931 100644 --- a/config/config.go +++ b/config/config.go @@ -264,7 +264,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if _, ok := requiredDispatchers[telemetry.Kafka]; ok { if c.Kafka == nil { - return nil, errors.New("Expected Kafka to be configured") + return nil, errors.New("expected Kafka to be configured") } convertKafkaConfig(c.Kafka) kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger) @@ -276,7 +276,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if _, ok := requiredDispatchers[telemetry.Pubsub]; ok { if c.Pubsub == nil { - return nil, errors.New("Expected Pubsub to be configured") + return nil, errors.New("expected Pubsub to be configured") } googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger) if err != nil { @@ -287,7 +287,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if recordNames, ok := requiredDispatchers[telemetry.Kinesis]; ok { if c.Kinesis == nil { - return nil, errors.New("Expected Kinesis to be configured") + return nil, errors.New("expected Kinesis to be configured") } maxRetries := 1 if c.Kinesis.MaxRetries != nil { @@ -303,7 +303,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l if _, ok := requiredDispatchers[telemetry.ZMQ]; ok { if c.ZMQ == nil { - return nil, errors.New("Expected ZMQ to be configured") + return nil, errors.New("expected ZMQ to be configured") } zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger) if err != nil { @@ -331,6 +331,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[string]interface{}, error) { reliableAckSources := make(map[telemetry.Dispatcher]map[string]interface{}, 0) for txType, dispatchRule := range c.ReliableAckSources { + if txType == "connectivity" { + return nil, fmt.Errorf("reliable ack not needed for txType: %s", txType) + } if dispatchRule == telemetry.Logger { return nil, fmt.Errorf("logger cannot be configured as reliable ack for record: %s", txType) } diff --git a/config/config_test.go b/config/config_test.go index e2cc34b..80b8920 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -181,6 +181,7 @@ var _ = Describe("Test full application config", func() { Entry("when reliable ack is mapped incorrectly", TestBadReliableAckConfig, "pubsub cannot be configured as reliable ack for record: V. Valid datastores configured [kafka]"), Entry("when logger is configured as reliable ack", TestLoggerAsReliableAckConfig, "logger cannot be configured as reliable ack for record: V"), Entry("when reliable ack is configured for unmapped txtype", TestUnusedTxTypeAsReliableAckConfig, "kafka cannot be configured as reliable ack for record: error since no record mapping exists"), + Entry("when reliable ack is mapped with unsupported txtype", TestBadTxTypeReliableAckConfig, "reliable ack not needed for txType: connectivity"), ) }) @@ -192,7 +193,7 @@ var _ = Describe("Test full application config", func() { var err error producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) - Expect(err).To(MatchError("Expected Kinesis to be configured")) + Expect(err).To(MatchError("expected Kinesis to be configured")) Expect(producers).To(BeNil()) }) @@ -253,7 +254,7 @@ var _ = Describe("Test full application config", func() { config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}} var err error producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) - Expect(err).To(MatchError("Expected ZMQ to be configured")) + Expect(err).To(MatchError("expected ZMQ to be configured")) Expect(producers).To(BeNil()) producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(BeNil()) diff --git a/config/test_configs_test.go b/config/test_configs_test.go index e65bac6..7ff0c6d 100644 --- a/config/test_configs_test.go +++ b/config/test_configs_test.go @@ -219,3 +219,30 @@ const TestAirbrakeConfig = ` } } ` + +const TestBadTxTypeReliableAckConfig = ` +{ + "host": "127.0.0.1", + "port": 443, + "status_port": 8080, + "namespace": "tesla_telemetry", + "reliable_ack_sources": { + "connectivity": "kafka" + }, + "kafka": { + "bootstrap.servers": "some.broker1:9093,some.broker1:9093", + "ssl.ca.location": "kafka.ca", + "ssl.certificate.location": "kafka.crt", + "ssl.key.location": "kafka.key", + "queue.buffering.max.messages": 1000000 + }, + "records": { + "V": ["kafka"], + "connectivity": ["kafka"] + }, + "tls": { + "server_cert": "your_own_cert.crt", + "server_key": "your_own_key.key" + } +} +` diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index 45365ab..32293e5 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -75,7 +75,7 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string, ackChan: ackChan, reliableAckTxTypes: reliableAckTxTypes, } - p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace}) + p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace}) return p, nil } diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index 263301b..c7b93ba 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -65,6 +65,8 @@ func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, err errorMaps[i] = transformers.VehicleErrorToMap(vehicleError) } return errorMaps, nil + case *protos.VehicleConnectivity: + return transformers.VehicleConnectivityToMap(payload), nil default: return nil, fmt.Errorf("unknown txType: %s", record.TxType) } diff --git a/datastore/simple/transformers/payload_test.go b/datastore/simple/transformers/payload_test.go index f1476bd..6f948ae 100644 --- a/datastore/simple/transformers/payload_test.go +++ b/datastore/simple/transformers/payload_test.go @@ -33,10 +33,10 @@ var _ = Describe("Payload", func() { payload := &protos.Payload{ Data: []*protos.Datum{ nil, - &protos.Datum{ + { Value: nil, }, - &protos.Datum{ + { Key: protos.Field_BatteryHeaterOn, Value: &protos.Value{ Value: &protos.Value_BooleanValue{BooleanValue: true}, diff --git a/datastore/simple/transformers/vehicle_connectivity.go b/datastore/simple/transformers/vehicle_connectivity.go new file mode 100644 index 0000000..87d8431 --- /dev/null +++ b/datastore/simple/transformers/vehicle_connectivity.go @@ -0,0 +1,15 @@ +package transformers + +import ( + "github.com/teslamotors/fleet-telemetry/protos" +) + +// VehicleConnectivityToMap converts a VehicleConnectivity proto message to a map representation +func VehicleConnectivityToMap(vehicleConnectivity *protos.VehicleConnectivity) map[string]interface{} { + return map[string]interface{}{ + "Vin": vehicleConnectivity.GetVin(), + "ConnectionID": vehicleConnectivity.GetConnectionId(), + "Status": vehicleConnectivity.GetStatus().String(), + "CreatedAt": vehicleConnectivity.CreatedAt.AsTime().Unix(), + } +} diff --git a/datastore/simple/transformers/vehicle_connectivity_test.go b/datastore/simple/transformers/vehicle_connectivity_test.go new file mode 100644 index 0000000..e8ec5ac --- /dev/null +++ b/datastore/simple/transformers/vehicle_connectivity_test.go @@ -0,0 +1,40 @@ +package transformers_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/teslamotors/fleet-telemetry/datastore/simple/transformers" + "github.com/teslamotors/fleet-telemetry/protos" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("VehicleConnectivity", func() { + Describe("VehicleConnectivityToMap", func() { + var ( + connectivity *protos.VehicleConnectivity + ) + + BeforeEach(func() { + connectivity = &protos.VehicleConnectivity{ + Vin: "Vin1", + ConnectionId: "connection1", + CreatedAt: timestamppb.New(time.Now()), + Status: protos.ConnectivityEvent_CONNECTED, + } + }) + + It("includes all expected data", func() { + result := transformers.VehicleConnectivityToMap(connectivity) + Expect(result).To(HaveLen(4)) + Expect(result["Vin"]).To(Equal("Vin1")) + Expect(result["ConnectionID"]).To(Equal("connection1")) + Expect(result["CreatedAt"]).To(BeNumerically("~", time.Now().Unix(), 1)) + Expect(result["Status"]).To(Equal("CONNECTED")) + }) + + }) +}) diff --git a/examples/server_config.json b/examples/server_config.json index bc2dc43..bd5c2f2 100644 --- a/examples/server_config.json +++ b/examples/server_config.json @@ -1,33 +1,36 @@ { - "host": "0.0.0.0", - "port": 443, - "log_level": "info", - "json_log_enable": true, - "namespace": "tesla_telemetry", - "reliable_ack": false, - "monitoring": { - "prometheus_metrics_port": 9090, - "profiler_port": 4269, - "profiling_path": "/tmp/trace.out" - }, - "rate_limit": { - "enabled": true, - "message_interval_time": 30, - "message_limit": 1000 - }, - "records": { - "alerts": [ - "logger" - ], - "errors": [ - "logger" - ], - "V": [ - "logger" - ] - }, - "tls": { - "server_cert": "/etc/certs/server/tls.crt", - "server_key": "/etc/certs/server/tls.key" - } + "host": "0.0.0.0", + "port": 443, + "log_level": "info", + "json_log_enable": true, + "namespace": "tesla_telemetry", + "reliable_ack": false, + "monitoring": { + "prometheus_metrics_port": 9090, + "profiler_port": 4269, + "profiling_path": "/tmp/trace.out" + }, + "rate_limit": { + "enabled": true, + "message_interval_time": 30, + "message_limit": 1000 + }, + "records": { + "alerts": [ + "logger" + ], + "errors": [ + "logger" + ], + "V": [ + "logger" + ], + "connectivity": [ + "logger" + ] + }, + "tls": { + "server_cert": "/etc/certs/server/tls.crt", + "server_key": "/etc/certs/server/tls.key" + } } diff --git a/go.mod b/go.mod index 89ecb19..bb54013 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/aws/aws-sdk-go v1.44.278 github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 + github.com/golang/protobuf v1.5.3 github.com/google/flatbuffers v23.3.3+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 @@ -34,7 +35,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect github.com/googleapis/gax-go/v2 v2.8.0 // indirect diff --git a/protos/python/vehicle_alert_pb2.py b/protos/python/vehicle_alert_pb2.py index 2a0ace6..96b99bc 100644 --- a/protos/python/vehicle_alert_pb2.py +++ b/protos/python/vehicle_alert_pb2.py @@ -1,12 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: vehicle_alert.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 5.28.3 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'vehicle_alert.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/protos/python/vehicle_connectivity_pb2.py b/protos/python/vehicle_connectivity_pb2.py new file mode 100644 index 0000000..0065ee2 --- /dev/null +++ b/protos/python/vehicle_connectivity_pb2.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: vehicle_connectivity.proto +# Protobuf Python Version: 5.28.3 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'vehicle_connectivity.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1avehicle_connectivity.proto\x12\x1etelemetry.vehicle_connectivity\x1a\x1fgoogle/protobuf/timestamp.proto\"\xac\x01\n\x13VehicleConnectivity\x12\x0b\n\x03vin\x18\x01 \x01(\t\x12\x15\n\rconnection_id\x18\x02 \x01(\t\x12\x41\n\x06status\x18\x03 \x01(\x0e\x32\x31.telemetry.vehicle_connectivity.ConnectivityEvent\x12.\n\ncreated_at\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp*A\n\x11\x43onnectivityEvent\x12\x0b\n\x07UNKNOWN\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x10\n\x0c\x44ISCONNECTED\x10\x02\x42/Z-github.com/teslamotors/fleet-telemetry/protosb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'vehicle_connectivity_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'Z-github.com/teslamotors/fleet-telemetry/protos' + _globals['_CONNECTIVITYEVENT']._serialized_start=270 + _globals['_CONNECTIVITYEVENT']._serialized_end=335 + _globals['_VEHICLECONNECTIVITY']._serialized_start=96 + _globals['_VEHICLECONNECTIVITY']._serialized_end=268 +# @@protoc_insertion_point(module_scope) diff --git a/protos/python/vehicle_data_pb2.py b/protos/python/vehicle_data_pb2.py index fce3045..0e77998 100644 --- a/protos/python/vehicle_data_pb2.py +++ b/protos/python/vehicle_data_pb2.py @@ -1,12 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: vehicle_data.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 5.28.3 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'vehicle_data.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/protos/python/vehicle_error_pb2.py b/protos/python/vehicle_error_pb2.py index 36a29d0..5c56343 100644 --- a/protos/python/vehicle_error_pb2.py +++ b/protos/python/vehicle_error_pb2.py @@ -1,12 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: vehicle_error.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 5.28.3 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'vehicle_error.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/protos/python/vehicle_metric_pb2.py b/protos/python/vehicle_metric_pb2.py index d65825a..0efbb9d 100644 --- a/protos/python/vehicle_metric_pb2.py +++ b/protos/python/vehicle_metric_pb2.py @@ -1,12 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: vehicle_metric.proto -# Protobuf Python Version: 5.26.1 +# Protobuf Python Version: 5.28.3 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'vehicle_metric.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/protos/ruby/vehicle_connectivity_pb.rb b/protos/ruby/vehicle_connectivity_pb.rb new file mode 100644 index 0000000..53c9113 --- /dev/null +++ b/protos/ruby/vehicle_connectivity_pb.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: vehicle_connectivity.proto + +require 'google/protobuf' + +require 'google/protobuf/timestamp_pb' + + +descriptor_data = "\n\x1avehicle_connectivity.proto\x12\x1etelemetry.vehicle_connectivity\x1a\x1fgoogle/protobuf/timestamp.proto\"\xac\x01\n\x13VehicleConnectivity\x12\x0b\n\x03vin\x18\x01 \x01(\t\x12\x15\n\rconnection_id\x18\x02 \x01(\t\x12\x41\n\x06status\x18\x03 \x01(\x0e\x32\x31.telemetry.vehicle_connectivity.ConnectivityEvent\x12.\n\ncreated_at\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp*A\n\x11\x43onnectivityEvent\x12\x0b\n\x07UNKNOWN\x10\x00\x12\r\n\tCONNECTED\x10\x01\x12\x10\n\x0c\x44ISCONNECTED\x10\x02\x42/Z-github.com/teslamotors/fleet-telemetry/protosb\x06proto3" + +pool = Google::Protobuf::DescriptorPool.generated_pool +pool.add_serialized_file(descriptor_data) + +module Telemetry + module VehicleConnectivity + VehicleConnectivity = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("telemetry.vehicle_connectivity.VehicleConnectivity").msgclass + ConnectivityEvent = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("telemetry.vehicle_connectivity.ConnectivityEvent").enummodule + end +end diff --git a/protos/vehicle_alert.pb.go b/protos/vehicle_alert.pb.go index 759933e..87c476e 100644 --- a/protos/vehicle_alert.pb.go +++ b/protos/vehicle_alert.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v5.26.1 +// protoc v5.28.3 // source: protos/vehicle_alert.proto package protos diff --git a/protos/vehicle_connectivity.pb.go b/protos/vehicle_connectivity.pb.go new file mode 100644 index 0000000..4527263 --- /dev/null +++ b/protos/vehicle_connectivity.pb.go @@ -0,0 +1,246 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v5.28.3 +// source: protos/vehicle_connectivity.proto + +package protos + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// ConnectivityEvent represents connection state of the vehicle +type ConnectivityEvent int32 + +const ( + ConnectivityEvent_UNKNOWN ConnectivityEvent = 0 + ConnectivityEvent_CONNECTED ConnectivityEvent = 1 + ConnectivityEvent_DISCONNECTED ConnectivityEvent = 2 +) + +// Enum value maps for ConnectivityEvent. +var ( + ConnectivityEvent_name = map[int32]string{ + 0: "UNKNOWN", + 1: "CONNECTED", + 2: "DISCONNECTED", + } + ConnectivityEvent_value = map[string]int32{ + "UNKNOWN": 0, + "CONNECTED": 1, + "DISCONNECTED": 2, + } +) + +func (x ConnectivityEvent) Enum() *ConnectivityEvent { + p := new(ConnectivityEvent) + *p = x + return p +} + +func (x ConnectivityEvent) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ConnectivityEvent) Descriptor() protoreflect.EnumDescriptor { + return file_protos_vehicle_connectivity_proto_enumTypes[0].Descriptor() +} + +func (ConnectivityEvent) Type() protoreflect.EnumType { + return &file_protos_vehicle_connectivity_proto_enumTypes[0] +} + +func (x ConnectivityEvent) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ConnectivityEvent.Descriptor instead. +func (ConnectivityEvent) EnumDescriptor() ([]byte, []int) { + return file_protos_vehicle_connectivity_proto_rawDescGZIP(), []int{0} +} + +// VehicleConnectivity represents connection status change for the vehicle +type VehicleConnectivity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Vin string `protobuf:"bytes,1,opt,name=vin,proto3" json:"vin,omitempty"` + ConnectionId string `protobuf:"bytes,2,opt,name=connection_id,json=connectionId,proto3" json:"connection_id,omitempty"` + Status ConnectivityEvent `protobuf:"varint,3,opt,name=status,proto3,enum=telemetry.vehicle_connectivity.ConnectivityEvent" json:"status,omitempty"` + CreatedAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` +} + +func (x *VehicleConnectivity) Reset() { + *x = VehicleConnectivity{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_vehicle_connectivity_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *VehicleConnectivity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VehicleConnectivity) ProtoMessage() {} + +func (x *VehicleConnectivity) ProtoReflect() protoreflect.Message { + mi := &file_protos_vehicle_connectivity_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VehicleConnectivity.ProtoReflect.Descriptor instead. +func (*VehicleConnectivity) Descriptor() ([]byte, []int) { + return file_protos_vehicle_connectivity_proto_rawDescGZIP(), []int{0} +} + +func (x *VehicleConnectivity) GetVin() string { + if x != nil { + return x.Vin + } + return "" +} + +func (x *VehicleConnectivity) GetConnectionId() string { + if x != nil { + return x.ConnectionId + } + return "" +} + +func (x *VehicleConnectivity) GetStatus() ConnectivityEvent { + if x != nil { + return x.Status + } + return ConnectivityEvent_UNKNOWN +} + +func (x *VehicleConnectivity) GetCreatedAt() *timestamppb.Timestamp { + if x != nil { + return x.CreatedAt + } + return nil +} + +var File_protos_vehicle_connectivity_proto protoreflect.FileDescriptor + +var file_protos_vehicle_connectivity_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x76, 0x65, 0x68, 0x69, 0x63, 0x6c, 0x65, + 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x1e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x76, + 0x65, 0x68, 0x69, 0x63, 0x6c, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, + 0x69, 0x74, 0x79, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd2, 0x01, 0x0a, 0x13, 0x56, 0x65, 0x68, 0x69, 0x63, 0x6c, 0x65, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x12, 0x10, 0x0a, 0x03, + 0x76, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x76, 0x69, 0x6e, 0x12, 0x23, + 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x12, 0x49, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, + 0x76, 0x65, 0x68, 0x69, 0x63, 0x6c, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, + 0x76, 0x69, 0x74, 0x79, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, + 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, + 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, + 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x2a, 0x41, 0x0a, 0x11, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0b, + 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x43, + 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x49, + 0x53, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x42, 0x2f, 0x5a, 0x2d, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x65, 0x73, 0x6c, 0x61, + 0x6d, 0x6f, 0x74, 0x6f, 0x72, 0x73, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x2d, 0x74, 0x65, 0x6c, + 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protos_vehicle_connectivity_proto_rawDescOnce sync.Once + file_protos_vehicle_connectivity_proto_rawDescData = file_protos_vehicle_connectivity_proto_rawDesc +) + +func file_protos_vehicle_connectivity_proto_rawDescGZIP() []byte { + file_protos_vehicle_connectivity_proto_rawDescOnce.Do(func() { + file_protos_vehicle_connectivity_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_vehicle_connectivity_proto_rawDescData) + }) + return file_protos_vehicle_connectivity_proto_rawDescData +} + +var file_protos_vehicle_connectivity_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_protos_vehicle_connectivity_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_protos_vehicle_connectivity_proto_goTypes = []interface{}{ + (ConnectivityEvent)(0), // 0: telemetry.vehicle_connectivity.ConnectivityEvent + (*VehicleConnectivity)(nil), // 1: telemetry.vehicle_connectivity.VehicleConnectivity + (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp +} +var file_protos_vehicle_connectivity_proto_depIdxs = []int32{ + 0, // 0: telemetry.vehicle_connectivity.VehicleConnectivity.status:type_name -> telemetry.vehicle_connectivity.ConnectivityEvent + 2, // 1: telemetry.vehicle_connectivity.VehicleConnectivity.created_at:type_name -> google.protobuf.Timestamp + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_protos_vehicle_connectivity_proto_init() } +func file_protos_vehicle_connectivity_proto_init() { + if File_protos_vehicle_connectivity_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protos_vehicle_connectivity_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*VehicleConnectivity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protos_vehicle_connectivity_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_protos_vehicle_connectivity_proto_goTypes, + DependencyIndexes: file_protos_vehicle_connectivity_proto_depIdxs, + EnumInfos: file_protos_vehicle_connectivity_proto_enumTypes, + MessageInfos: file_protos_vehicle_connectivity_proto_msgTypes, + }.Build() + File_protos_vehicle_connectivity_proto = out.File + file_protos_vehicle_connectivity_proto_rawDesc = nil + file_protos_vehicle_connectivity_proto_goTypes = nil + file_protos_vehicle_connectivity_proto_depIdxs = nil +} diff --git a/protos/vehicle_connectivity.proto b/protos/vehicle_connectivity.proto new file mode 100644 index 0000000..3c6f540 --- /dev/null +++ b/protos/vehicle_connectivity.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package telemetry.vehicle_connectivity; + +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/teslamotors/fleet-telemetry/protos"; + +// VehicleConnectivity represents connection status change for the vehicle +message VehicleConnectivity { + string vin = 1; + string connection_id = 2; + ConnectivityEvent status = 3; + google.protobuf.Timestamp created_at = 4; +} + +// ConnectivityEvent represents connection state of the vehicle +enum ConnectivityEvent { + UNKNOWN = 0; + CONNECTED = 1; + DISCONNECTED = 2; +} diff --git a/protos/vehicle_data.pb.go b/protos/vehicle_data.pb.go index 7006c49..40454b7 100644 --- a/protos/vehicle_data.pb.go +++ b/protos/vehicle_data.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v5.26.1 +// protoc v5.28.3 // source: protos/vehicle_data.proto package protos diff --git a/protos/vehicle_error.pb.go b/protos/vehicle_error.pb.go index e7becd8..4441249 100644 --- a/protos/vehicle_error.pb.go +++ b/protos/vehicle_error.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v5.26.1 +// protoc v5.28.3 // source: protos/vehicle_error.proto package protos diff --git a/protos/vehicle_metric.pb.go b/protos/vehicle_metric.pb.go index 80493b3..de24418 100644 --- a/protos/vehicle_metric.pb.go +++ b/protos/vehicle_metric.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v5.26.1 +// protoc v5.28.3 // source: protos/vehicle_metric.proto package protos diff --git a/server/streaming/server.go b/server/streaming/server.go index d4cad62..ed848c3 100644 --- a/server/streaming/server.go +++ b/server/streaming/server.go @@ -8,14 +8,17 @@ import ( "sync" "time" + "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/gorilla/websocket" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/teslamotors/fleet-telemetry/config" logrus "github.com/teslamotors/fleet-telemetry/logger" "github.com/teslamotors/fleet-telemetry/messages" "github.com/teslamotors/fleet-telemetry/metrics" "github.com/teslamotors/fleet-telemetry/metrics/adapter" + "github.com/teslamotors/fleet-telemetry/protos" "github.com/teslamotors/fleet-telemetry/server/airbrake" "github.com/teslamotors/fleet-telemetry/telemetry" ) @@ -32,6 +35,10 @@ var ( serverMetricsOnce sync.Once ) +const ( + connectitivityTopic = "connectivity" +) + // Metrics stores metrics reported from this package type ServerMetrics struct { reliableAckCount adapter.Counter @@ -127,16 +134,73 @@ func (s *Server) ServeBinaryWs(config *config.Config) func(w http.ResponseWriter s.logger.ErrorLog("extract_sender_id_err", err, nil) } + binarySerializer := telemetry.NewBinarySerializer(requestIdentity, s.DispatchRules, s.logger) socketManager := NewSocketManager(ctx, requestIdentity, ws, config, s.logger) - s.registry.RegisterSocket(socketManager) - defer s.registry.DeregisterSocket(socketManager) + s.registerSocket(socketManager, binarySerializer) + defer s.deregisterSocket(socketManager, binarySerializer) - binarySerializer := telemetry.NewBinarySerializer(requestIdentity, s.DispatchRules, s.logger) socketManager.ProcessTelemetry(binarySerializer) } } } +func (s *Server) dispatchConnectivityEvent(sm *SocketManager, serializer *telemetry.BinarySerializer, event protos.ConnectivityEvent) error { + connectivityDispatcher, ok := s.DispatchRules[connectitivityTopic] + if !ok { + return nil + } + + connectivityMessage := &protos.VehicleConnectivity{ + Vin: sm.requestIdentity.DeviceID, + ConnectionId: sm.UUID, + CreatedAt: timestamppb.Now(), + Status: event, + } + + payload, err := proto.Marshal(connectivityMessage) + if err != nil { + return nil + } + + // creating streamMessage is hack to satify input reqirements for telemetry.NewRecord + streamMessage := messages.StreamMessage{ + TXID: []byte(sm.UUID), + SenderID: []byte(sm.requestIdentity.SenderID), + DeviceID: []byte(sm.requestIdentity.DeviceID), + DeviceType: []byte("vehicle_device"), + MessageTopic: []byte(connectitivityTopic), + Payload: payload, + CreatedAt: uint32(connectivityMessage.CreatedAt.AsTime().Unix()), + } + + message, err := streamMessage.ToBytes() + if err != nil { + return nil + } + record, _ := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords) + for _, dispatcher := range connectivityDispatcher { + dispatcher.Produce(record) + } + return nil +} + +func (s *Server) registerSocket(sm *SocketManager, serializer *telemetry.BinarySerializer) { + s.registry.RegisterSocket(sm) + event := protos.ConnectivityEvent_CONNECTED + if err := s.dispatchConnectivityEvent(sm, serializer, event); err != nil { + s.logger.ErrorLog("connectivity_registeration_error", err, logrus.LogInfo{"deviceID": sm.requestIdentity.DeviceID, "event": event}) + } + +} + +func (s *Server) deregisterSocket(sm *SocketManager, serializer *telemetry.BinarySerializer) { + s.registry.DeregisterSocket(sm) + event := protos.ConnectivityEvent_DISCONNECTED + if err := s.dispatchConnectivityEvent(sm, serializer, event); err != nil { + s.logger.ErrorLog("connectivity_deregisteration_error", err, logrus.LogInfo{"deviceID": sm.requestIdentity.DeviceID, "event": event}) + } +} + func (s *Server) promoteToWebsocket(w http.ResponseWriter, r *http.Request) *websocket.Conn { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { diff --git a/server/streaming/socket.go b/server/streaming/socket.go index f420ecc..49d2326 100644 --- a/server/streaming/socket.go +++ b/server/streaming/socket.go @@ -41,7 +41,6 @@ type SocketManager struct { config *config.Config logger *logrus.Logger - registry *SocketRegistry requestIdentity *telemetry.RequestIdentity requestInfo map[string]interface{} metricsCollector metrics.MetricCollector diff --git a/telemetry/record.go b/telemetry/record.go index 4916c5a..0211c12 100644 --- a/telemetry/record.go +++ b/telemetry/record.go @@ -37,6 +37,9 @@ var ( "V": func() proto.Message { return &protos.Payload{} }, + "connectivity": func() proto.Message { + return &protos.VehicleConnectivity{} + }, } scientificNotationFloatRegex = regexp.MustCompile("^[+-]?(\\d*\\.\\d+|\\d+\\.\\d*)([eE][+-]?\\d+)$") ) @@ -176,6 +179,14 @@ func (record *Record) applyProtoRecordTransforms() error { transformScientificNotation(message) record.PayloadBytes, err = proto.Marshal(message) return err + case "connectivity": + message := &protos.VehicleConnectivity{} + err := proto.Unmarshal(record.Payload(), message) + if err != nil { + return err + } + record.PayloadBytes, err = proto.Marshal(message) + return err default: return nil } diff --git a/test/integration/config.json b/test/integration/config.json index d58f25b..5557bea 100644 --- a/test/integration/config.json +++ b/test/integration/config.json @@ -21,7 +21,8 @@ "override_host": "http://kinesis:4567", "max_retries": 3, "streams": { - "V": "test_V" + "V": "test_V", + "connectivity": "test_connectivity" } }, "zmq": { @@ -44,6 +45,13 @@ "pubsub", "logger", "zmq" + ], + "connectivity": [ + "kafka", + "kinesis", + "pubsub", + "logger", + "zmq" ] }, "tls": { diff --git a/test/integration/google_consumer_test.go b/test/integration/google_consumer_test.go index b36ba51..4031f05 100644 --- a/test/integration/google_consumer_test.go +++ b/test/integration/google_consumer_test.go @@ -2,6 +2,7 @@ package integration_test import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -9,60 +10,77 @@ import ( "cloud.google.com/go/pubsub" . "github.com/onsi/gomega" - "github.com/sirupsen/logrus" + logrus "github.com/teslamotors/fleet-telemetry/logger" "google.golang.org/api/iterator" ) type TestConsumer struct { pubsubClient *pubsub.Client - topic *pubsub.Topic - sub *pubsub.Subscription + subs map[string]*pubsub.Subscription } -func NewTestPubsubConsumer(projectID, topicID, subID string, logger *logrus.Logger) (*TestConsumer, error) { +func NewTestPubsubConsumer(projectID string, topicIDs []string, logger *logrus.Logger) (*TestConsumer, error) { ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, err } + subs := map[string]*pubsub.Subscription{} - topic, err := createTopicIfNotExists(context.Background(), topicID, client) - if err != nil { - return nil, err - } + for _, topicID := range topicIDs { + _, err := createTopicIfNotExists(context.Background(), topicID, client) + if err != nil { + return nil, err + } - sub, err := createSubscriptionIfNotExists(ctx, subID, topicID, client, logger) - if err != nil { - return nil, err + sub, err := createSubscriptionIfNotExists(ctx, topicID, client, logger) + if err != nil { + return nil, err + } + + sub.ReceiveSettings.Synchronous = false + sub.ReceiveSettings.MaxOutstandingMessages = -1 + + subs[topicID] = sub } - sub.ReceiveSettings.Synchronous = false - sub.ReceiveSettings.MaxOutstandingMessages = -1 return &TestConsumer{ pubsubClient: client, - topic: topic, - sub: sub, + subs: subs, }, nil } -func createSubscriptionIfNotExists(ctx context.Context, subID string, topicID string, pubsubClient *pubsub.Client, logger *logrus.Logger) (*pubsub.Subscription, error) { +func subID(topicID string) string { + return fmt.Sprintf("sub-id-%s", topicID) +} + +func createSubscriptionIfNotExists(ctx context.Context, topicID string, pubsubClient *pubsub.Client, logger *logrus.Logger) (*pubsub.Subscription, error) { topic, err := createTopicIfNotExists(ctx, topicID, pubsubClient) if err != nil { return nil, err } + subID := subID(topicID) sub := pubsubClient.Subscription(subID) exists, err := sub.Exists(ctx) if err != nil { return nil, err } if exists { - logger.Infof("subscription %v already present", sub) + logger.ActivityLog("subscription_present", logrus.LogInfo{"sub": sub}) return sub, nil } - return pubsubClient.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{ - Topic: topic, - AckDeadline: 20 * time.Second, + c, err := pubsubClient.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: 20 * time.Second, + EnableMessageOrdering: true, }) + if err != nil { + return nil, err + } + c.ReceiveSettings = pubsub.ReceiveSettings{ + MaxExtension: 10 * time.Hour, + } + return c, nil } func createTopicIfNotExists(ctx context.Context, topic string, pubsubClient *pubsub.Client) (*pubsub.Topic, error) { @@ -92,7 +110,11 @@ func (c *TestConsumer) ClearSubscriptions() { } } -func (c *TestConsumer) FetchPubsubMessage() (*pubsub.Message, error) { +func (c *TestConsumer) FetchPubsubMessage(topicId string) (*pubsub.Message, error) { + sub, ok := c.subs[topicId] + if !ok { + return nil, fmt.Errorf("unknown topic: %s", topicId) + } ctx := context.Background() var mu sync.Mutex var m *pubsub.Message @@ -100,7 +122,7 @@ func (c *TestConsumer) FetchPubsubMessage() (*pubsub.Message, error) { received := 0 cctx, cancel := context.WithTimeout(ctx, 10*time.Second) - err := c.sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) { + err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) { msg.Ack() atomic.StorePointer(unsafepL, unsafe.Pointer(msg)) mu.Lock() diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index ec79252..10ff3aa 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -16,7 +16,7 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/gorilla/websocket" - "github.com/sirupsen/logrus" + logrus "github.com/teslamotors/fleet-telemetry/logger" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -25,16 +25,17 @@ import ( ) const ( - vehicleName = "My Test Vehicle" - location = "(37.412374 S, 122.145867 E)" - projectID = "test-project-id" - subscriptionID = "sub-id-1" - kafkaGroup = "test-kafka-consumer" - kafkaBroker = "kafka:9092" - pubsubHost = "pubsub:8085" - zmqAddr = "tcp://app:5284" - kinesisHost = "http://kinesis:4567" - kinesisStreamName = "test_V" + vehicleName = "My Test Vehicle" + location = "(37.412374 S, 122.145867 E)" + projectID = "test-project-id" + kafkaGroup = "test-kafka-consumer" + kafkaBroker = "kafka:9092" + pubsubHost = "pubsub:8085" + zmqAddr = "tcp://app:5284" + kinesisHost = "http://kinesis:4567" + + kinesisStreamName = "test_V" + kinesisConnectivityStreamName = "test_connectivity" ) var expectedLocation = &protos.LocationValue{Latitude: -37.412374, Longitude: 122.145867} @@ -46,7 +47,9 @@ func setEnv(key string, value string) { var _ = Describe("Test messages", Ordered, func() { var ( - vehicleTopic = "tesla_telemetry_V" + vehicleTopic = "tesla_telemetry_V" + vehicleConnectivityTopic = "tesla_telemetry_connectivity" + payload []byte connection *websocket.Conn pubsubConsumer *TestConsumer @@ -59,20 +62,20 @@ var _ = Describe("Test messages", Ordered, func() { ) BeforeAll(func() { - logger = logrus.New() var err error + logger, err = logrus.NewBasicLogrusLogger("fleet-telemetry-integration-test") + Expect(err).NotTo(HaveOccurred()) tlsConfig, err = GetTLSConfig() Expect(err).NotTo(HaveOccurred()) timestamp = timestamppb.Now() payload = GenerateVehicleMessage(vehicleName, location, timestamp) - connection = CreateWebSocket(tlsConfig) - kinesisConsumer, err = NewTestKinesisConsumer(kinesisHost, kinesisStreamName) + kinesisConsumer, err = NewTestKinesisConsumer(kinesisHost, []string{kinesisStreamName, kinesisConnectivityStreamName}) Expect(err).NotTo(HaveOccurred()) setEnv("PUBSUB_EMULATOR_HOST", pubsubHost) - pubsubConsumer, err = NewTestPubsubConsumer(projectID, vehicleTopic, subscriptionID, logger) + pubsubConsumer, err = NewTestPubsubConsumer(projectID, []string{vehicleTopic, vehicleConnectivityTopic}, logger) Expect(err).NotTo(HaveOccurred()) kafkaConsumer, err = kafka.NewConsumer(&kafka.ConfigMap{ @@ -82,10 +85,18 @@ var _ = Describe("Test messages", Ordered, func() { }) Expect(err).NotTo(HaveOccurred()) - zmqConsumer, err = NewTestZMQConsumer(zmqAddr, vehicleTopic, logger) + zmqConsumer, err = NewTestZMQConsumer(zmqAddr, []string{vehicleTopic, vehicleConnectivityTopic}) Expect(err).NotTo(HaveOccurred()) }) + BeforeEach(func() { + connection = CreateWebSocket(tlsConfig) + }) + + AfterEach(func() { + Expect(connection.Close()).NotTo(HaveOccurred()) + }) + AfterAll(func() { _ = kafkaConsumer.Close() pubsubConsumer.ClearSubscriptions() @@ -94,85 +105,146 @@ var _ = Describe("Test messages", Ordered, func() { os.Clearenv() }) - It("reads vehicle data from kafka consumer", func() { - defer GinkgoRecover() - err := kafkaConsumer.Subscribe(vehicleTopic, nil) - Expect(err).NotTo(HaveOccurred()) - err = connection.WriteMessage(websocket.BinaryMessage, GenerateVehicleMessage(vehicleName, location, timestamp)) - verifyAckMessage(connection, "V") - Expect(err).NotTo(HaveOccurred()) - msg, err := kafkaConsumer.ReadMessage(10 * time.Second) - Expect(err).NotTo(HaveOccurred()) + Describe("v records", Ordered, func() { - Expect(msg).NotTo(BeNil()) - Expect(*msg.TopicPartition.Topic).To(Equal(vehicleTopic)) - Expect(string(msg.Key)).To(Equal(deviceID)) + It("reads vehicle data from kafka consumer", func() { + defer GinkgoRecover() + err := kafkaConsumer.Subscribe(vehicleTopic, nil) + Expect(err).NotTo(HaveOccurred()) + err = connection.WriteMessage(websocket.BinaryMessage, GenerateVehicleMessage(vehicleName, location, timestamp)) + verifyAckMessage(connection, "V") + Expect(err).NotTo(HaveOccurred()) + msg, err := kafkaConsumer.ReadMessage(10 * time.Second) + Expect(err).NotTo(HaveOccurred()) - headers := make(map[string]string) - for _, h := range msg.Headers { - headers[string(h.Key)] = string(h.Value) - } - VerifyMessageHeaders(headers) - VerifyMessageBody(msg.Value, vehicleName) - }) + Expect(msg).NotTo(BeNil()) + Expect(*msg.TopicPartition.Topic).To(Equal(vehicleTopic)) + Expect(string(msg.Key)).To(Equal(deviceID)) - It("returns 200 for mtls status", func() { - body, err := VerifyHTTPSRequest(serviceURL, "status", tlsConfig) - Expect(err).NotTo(HaveOccurred()) - Expect(string(body)).To(Equal("mtls ok")) - }) + headers := make(map[string]string) + for _, h := range msg.Headers { + headers[string(h.Key)] = string(h.Value) + } + VerifyMessageHeaders(headers) + VerifyMessageBody(msg.Value, vehicleName) + }) - It("returns 200 for status", func() { - body, err := VerifyHTTPRequest(statusURL, "status") - Expect(err).NotTo(HaveOccurred()) - Expect(string(body)).To(Equal("ok")) - }) + It("reads vehicle data from google subscriber", func() { + err := connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) - It("returns 200 for prom metrics", func() { - _, err := VerifyHTTPRequest(prometheusURL, "metrics") - Expect(err).NotTo(HaveOccurred()) - }) + var msg *pubsub.Message + Eventually(func() error { + msg, err = pubsubConsumer.FetchPubsubMessage(vehicleTopic) + return err + }, time.Second*2, time.Millisecond*100).Should(BeNil()) + Expect(msg).NotTo(BeNil()) + VerifyMessageHeaders(msg.Attributes) + VerifyMessageBody(msg.Data, vehicleName) + }) - It("reads vehicle data from google subscriber", func() { - err := connection.WriteMessage(websocket.BinaryMessage, payload) - Expect(err).NotTo(HaveOccurred()) + It("reads vehicle data from aws kinesis", func() { + var err error + // We found publishing a few records makes this test consistent, and + // no obvious way to enforce delivery with a single message + for i := 1; i <= 4; i++ { + err = connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) + } + + var record *kinesis.Record + Eventually(func() error { + record, err = kinesisConsumer.FetchFirstStreamMessage(kinesisStreamName) + return err + }, time.Second*5, time.Millisecond*100).Should(BeNil()) + VerifyMessageBody(record.Data, vehicleName) + }) + + It("reads data from zmq subscriber", func() { + err := connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) - var msg *pubsub.Message - Eventually(func() error { - msg, err = pubsubConsumer.FetchPubsubMessage() - return err - }, time.Second*2, time.Millisecond*100).Should(BeNil()) - Expect(msg).NotTo(BeNil()) - VerifyMessageHeaders(msg.Attributes) - VerifyMessageBody(msg.Data, vehicleName) + topic, data, err := zmqConsumer.NextMessage(vehicleTopic) + Expect(err).NotTo(HaveOccurred()) + Expect(data).NotTo(BeNil()) + Expect(topic).To(Equal(vehicleTopic)) + VerifyMessageBody(data, vehicleName) + }) }) - It("reads vehicle data from aws kinesis", func() { - var err error - // We found publishing a few records makes this test consistent, and - // no obvious way to enforce delivery with a single message - for i := 1; i <= 4; i++ { - err = connection.WriteMessage(websocket.BinaryMessage, payload) + Describe("connectivity records", Ordered, func() { + + It("reads vehicle data from kafka consumer", func() { + defer GinkgoRecover() + err := kafkaConsumer.Subscribe(vehicleConnectivityTopic, nil) + Expect(err).NotTo(HaveOccurred()) + msg, err := kafkaConsumer.ReadMessage(10 * time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(msg).NotTo(BeNil()) + Expect(*msg.TopicPartition.Topic).To(Equal(vehicleConnectivityTopic)) + Expect(string(msg.Key)).To(Equal(deviceID)) + + headers := make(map[string]string) + for _, h := range msg.Headers { + headers[string(h.Key)] = string(h.Value) + } + VerifyConnectivityMessageHeaders(headers) + VerifyConnectivityMessageBody(msg.Value) + }) + + It("reads vehicle data from google subscriber", func() { + var err error + var msg *pubsub.Message + Eventually(func() error { + msg, err = pubsubConsumer.FetchPubsubMessage(vehicleConnectivityTopic) + return err + }, time.Second*2, time.Millisecond*100).Should(BeNil()) + Expect(msg).NotTo(BeNil()) + VerifyConnectivityMessageHeaders(msg.Attributes) + VerifyConnectivityMessageBody(msg.Data) + }) + + It("reads vehicle data from aws kinesis", func() { + var err error + var record *kinesis.Record + Eventually(func() error { + record, err = kinesisConsumer.FetchFirstStreamMessage(kinesisConnectivityStreamName) + return err + }, time.Second*5, time.Millisecond*100).Should(BeNil()) + VerifyConnectivityMessageBody(record.Data) + }) + + It("reads data from zmq subscriber", func() { + err := connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) + + topic, data, err := zmqConsumer.NextMessage(vehicleConnectivityTopic) Expect(err).NotTo(HaveOccurred()) - } - - var record *kinesis.Record - Eventually(func() error { - record, err = kinesisConsumer.FetchFirstStreamMessage(kinesisStreamName) - return err - }, time.Second*5, time.Millisecond*100).Should(BeNil()) - VerifyMessageBody(record.Data, vehicleName) + Expect(data).NotTo(BeNil()) + Expect(topic).To(Equal(vehicleConnectivityTopic)) + VerifyConnectivityMessageBody(data) + }) + }) - It("reads data from zmq subscriber", func() { - err := connection.WriteMessage(websocket.BinaryMessage, payload) - Expect(err).NotTo(HaveOccurred()) + Describe("health checks", Ordered, func() { - topic, data, err := zmqConsumer.NextMessage() - Expect(err).NotTo(HaveOccurred()) - Expect(data).NotTo(BeNil()) - Expect(topic).To(Equal(vehicleTopic)) - VerifyMessageBody(data, vehicleName) + It("returns 200 for mtls status", func() { + body, err := VerifyHTTPSRequest(serviceURL, "status", tlsConfig) + Expect(err).NotTo(HaveOccurred()) + Expect(string(body)).To(Equal("mtls ok")) + }) + + It("returns 200 for status", func() { + body, err := VerifyHTTPRequest(statusURL, "status") + Expect(err).NotTo(HaveOccurred()) + Expect(string(body)).To(Equal("ok")) + }) + + It("returns 200 for prom metrics", func() { + _, err := VerifyHTTPRequest(prometheusURL, "metrics") + Expect(err).NotTo(HaveOccurred()) + }) }) }) @@ -215,14 +287,32 @@ func VerifyHTTPRequest(url string, path string) ([]byte, error) { return io.ReadAll(res.Body) } -// VerifyMessageHeaders validates headers returned from kafka/pubsub +// VerifyConnectivityMessageHeaders validates headers returned from dispatchers +func VerifyConnectivityMessageHeaders(headers map[string]string) { + Expect(headers["txid"]).NotTo(BeEmpty()) + Expect(headers["txtype"]).To(Equal("connectivity")) + Expect(headers["vin"]).To(Equal(deviceID)) +} + +// VerifyMessageHeaders validates headers returned from dispatchers func VerifyMessageHeaders(headers map[string]string) { Expect(headers["txid"]).To(Equal("integration-test-txid")) Expect(headers["txtype"]).To(Equal("V")) Expect(headers["vin"]).To(Equal(deviceID)) } -// VerifyMessageHeaders validates record message returned from kafka/pubsub +// VerifyConnectivityMessageBody validates record message returned from dispatchers +func VerifyConnectivityMessageBody(body []byte) { + payload := &protos.VehicleConnectivity{} + err := proto.Unmarshal(body, payload) + Expect(err).NotTo(HaveOccurred()) + Expect(payload.GetVin()).To(Equal(deviceID)) + Expect(payload.GetConnectionId()).NotTo(BeEmpty()) + Expect(payload.GetCreatedAt().AsTime().Unix()).NotTo(BeEquivalentTo(0)) + Expect(payload.GetStatus()).To(Or(Equal(protos.ConnectivityEvent_CONNECTED), Equal(protos.ConnectivityEvent_DISCONNECTED))) +} + +// VerifyMessageHeaders validates record message returned from dispatchers func VerifyMessageBody(body []byte, vehicleName string) { payload := &protos.Payload{} err := proto.Unmarshal(body, payload) diff --git a/test/integration/kinesis_consumer_test.go b/test/integration/kinesis_consumer_test.go index b0ded5a..c876510 100644 --- a/test/integration/kinesis_consumer_test.go +++ b/test/integration/kinesis_consumer_test.go @@ -23,7 +23,7 @@ type TestKinesisConsumer struct { kineses *kinesis.Kinesis } -func NewTestKinesisConsumer(host, streamName string) (*TestKinesisConsumer, error) { +func NewTestKinesisConsumer(host string, streamNames []string) (*TestKinesisConsumer, error) { creds := credentials.NewStaticCredentials(fakeAWSID, fakeAWSSecret, fakeAWSToken) awsConfig := aws.NewConfig().WithEndpoint(host).WithCredentialsChainVerboseErrors(true).WithRegion(fakeAWSRegion).WithCredentials(creds) sess, err := session.NewSessionWithOptions(session.Options{Config: *awsConfig}) @@ -34,37 +34,50 @@ func NewTestKinesisConsumer(host, streamName string) (*TestKinesisConsumer, erro t := &TestKinesisConsumer{ kineses: kinesis.New(sess, awsConfig), } - if err = t.createStreamIfNotExists(streamName); err != nil { - return nil, err + for _, streamName := range streamNames { + if err = t.createStreamIfNotExists(streamName); err != nil { + return nil, err + } } return t, nil } -func (t *TestKinesisConsumer) createStreamIfNotExists(streamName string) error { +func (t *TestKinesisConsumer) streamExists(streamName string) (bool, error) { response, err := t.kineses.ListStreams(&kinesis.ListStreamsInput{ Limit: aws.Int64(100), }) if err != nil { - return err + return false, err } - if len(response.StreamNames) == 0 { - _, err := t.kineses.CreateStream(&kinesis.CreateStreamInput{ - StreamName: aws.String(streamName), - ShardCount: aws.Int64(1), - }) - if err != nil { - return err + return false, nil + } + + for _, streamNameResponse := range response.StreamNames { + if strings.EqualFold(*streamNameResponse, streamName) { + return true, nil } + } + return false, nil +} + +func (t *TestKinesisConsumer) createStreamIfNotExists(streamName string) error { + ok, err := t.streamExists(streamName) + if err != nil { + return err + } + if ok { return nil } - for _, streamName := range response.StreamNames { - if strings.Compare(*streamName, *streamName) == 0 { - return nil - } + _, err = t.kineses.CreateStream(&kinesis.CreateStreamInput{ + StreamName: aws.String(streamName), + ShardCount: aws.Int64(1), + }) + if err != nil { + return err } - return fmt.Errorf("unable to create stream %s", streamName) + return nil } func (t *TestKinesisConsumer) FetchFirstStreamMessage(topic string) (*kinesis.Record, error) { @@ -77,6 +90,10 @@ func (t *TestKinesisConsumer) FetchFirstStreamMessage(topic string) (*kinesis.Re if err != nil { return nil, err } + kinesisStreamName := *describeOutput.StreamDescription.StreamName + if !strings.EqualFold(kinesisStreamName, topic) { + return nil, fmt.Errorf("stream name mismatch. Expected %s, Actual %s", kinesisStreamName, topic) + } if len(describeOutput.StreamDescription.Shards) == 0 { return nil, errors.New("empty shards") } diff --git a/test/integration/zmq_consumer_test.go b/test/integration/zmq_consumer_test.go index 22bcdcb..8bfb851 100644 --- a/test/integration/zmq_consumer_test.go +++ b/test/integration/zmq_consumer_test.go @@ -1,33 +1,41 @@ package integration_test import ( + "fmt" + + . "github.com/onsi/gomega" "github.com/pebbe/zmq4" - "github.com/sirupsen/logrus" ) type TestZMQConsumer struct { - sock *zmq4.Socket + socks map[string]*zmq4.Socket } -func NewTestZMQConsumer(addr string, subscription string, logger *logrus.Logger) (*TestZMQConsumer, error) { - sock, err := zmq4.NewSocket(zmq4.SUB) - if err != nil { - return nil, err - } +func NewTestZMQConsumer(addr string, topics []string) (*TestZMQConsumer, error) { + socks := make(map[string]*zmq4.Socket) + for _, topicID := range topics { + sock, err := zmq4.NewSocket(zmq4.SUB) + if err != nil { + return nil, err + } - if err := sock.SetSubscribe(subscription); err != nil { - return nil, err - } + if err := sock.SetSubscribe(topicID); err != nil { + return nil, err + } - if err := sock.Connect(addr); err != nil { - return nil, err + if err := sock.Connect(addr); err != nil { + return nil, err + } + socks[topicID] = sock } - return &TestZMQConsumer{sock}, nil + return &TestZMQConsumer{socks: socks}, nil } func (t *TestZMQConsumer) Close() { - t.sock.Close() + for _, sock := range t.socks { + Expect(sock.Close()).NotTo(HaveOccurred()) + } } type malformedZMQMessage struct{} @@ -38,8 +46,12 @@ func (malformedZMQMessage) Error() string { var ErrMalformedZMQMessage malformedZMQMessage -func (t *TestZMQConsumer) NextMessage() (topic string, data []byte, err error) { - messages, err := t.sock.RecvMessageBytes(zmq4.Flag(0)) +func (t *TestZMQConsumer) NextMessage(topicId string) (topic string, data []byte, err error) { + sock, ok := t.socks[topicId] + if !ok { + return "", nil, fmt.Errorf("no consumer for %s", topicId) + } + messages, err := sock.RecvMessageBytes(0) if err != nil { return "", nil, err } @@ -48,7 +60,5 @@ func (t *TestZMQConsumer) NextMessage() (topic string, data []byte, err error) { return "", nil, ErrMalformedZMQMessage } - topic = string(messages[0]) - data = messages[1] - return + return string(messages[0]), messages[1], nil }