Skip to content

Commit

Permalink
feat(output.mqtt): Add support for MQTT protocol version 5 (#11284)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmackenzie1 authored Jul 27, 2022
1 parent b87d06e commit af43d01
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 102 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ following works:
- github.com/eapache/go-resiliency [MIT License](https://github.com/eapache/go-resiliency/blob/master/LICENSE)
- github.com/eapache/go-xerial-snappy [MIT License](https://github.com/eapache/go-xerial-snappy/blob/master/LICENSE)
- github.com/eapache/queue [MIT License](https://github.com/eapache/queue/blob/master/LICENSE)
- github.com/eclipse/paho.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.golang/blob/master/LICENSE)
- github.com/eclipse/paho.mqtt.golang [Eclipse Public License - v 1.0](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE)
- github.com/emicklei/go-restful [MIT License](https://github.com/emicklei/go-restful/blob/v3/LICENSE)
- github.com/fatih/color [MIT License](https://github.com/fatih/color/blob/master/LICENSE.md)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0
github.com/eclipse/paho.golang v0.10.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/fatih/color v1.13.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg=
github.com/echlebek/crock v1.0.1/go.mod h1:/kvwHRX3ZXHj/kHWJkjXDmzzRow54EJuHtQ/PapL/HI=
github.com/echlebek/timeproxy v1.0.0 h1:V41/v8tmmMDNMA2GrBPI45nlXb3F7+OY+nJz1BqKsCk=
github.com/echlebek/timeproxy v1.0.0/go.mod h1:0dg2Lnb8no/jFwoMQKMTU6iAivgoMptGqSTprhnrRtk=
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
Expand Down
15 changes: 10 additions & 5 deletions plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# MQTT Producer Output Plugin

This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
Producer.
Producer. It supports MQTT protocols `3.1.1` and `5`.

## Mosquitto v2.0.12+ and `identifier rejected`

Expand All @@ -10,7 +10,7 @@ In v2.0.12+ of the mosquitto MQTT server, there is a
`keep_alive` value to be set non-zero in your telegraf configuration. If not
set, the server will return with `identifier rejected`.

As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
As a reference `eclipse/paho.golang` sets the `keep_alive` to 30.

## Configuration

Expand All @@ -19,9 +19,14 @@ As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
[[outputs.mqtt]]
## MQTT Brokers
## The list of brokers should only include the hostname or IP address and the
## port to the broker. This should follow the format '{host}:{port}'. For
## example, "localhost:1883" or "127.0.0.1:8883".
servers = ["localhost:1883"]
## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For
## example, `localhost:1883` or `mqtt://localhost:1883`.
## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts://
## non-TLS and TLS servers can not be mix-and-matched.
servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"]

## Protocol can be `3.1.1` or `5`. Default is `3.1.1`
# procotol = "3.1.1"

## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
Expand Down
131 changes: 46 additions & 85 deletions plugins/outputs/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
package mqtt

import (
// Blank import to support go:embed compile directive
_ "embed"
"fmt"
"net/url"
"strings"
"sync"
"time"

paho "github.com/eclipse/paho.mqtt.golang"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
Expand All @@ -23,66 +21,69 @@ import (
var sampleConfig string

const (
defaultKeepAlive = 0
defaultKeepAlive = 30
)

type MQTT struct {
Servers []string `toml:"servers"`
Username string
Password string
Protocol string `toml:"protocol"`
Username string `toml:"username"`
Password string `toml:"password"`
Database string
Timeout config.Duration
TopicPrefix string
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
Timeout config.Duration `toml:"timeout"`
TopicPrefix string `toml:"topic_prefix"`
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
tls.ClientConfig
BatchMessage bool `toml:"batch"`
Retain bool `toml:"retain"`
KeepAlive int64 `toml:"keep_alive"`
Log telegraf.Logger `toml:"-"`

client paho.Client
opts *paho.ClientOptions

client Client
serializer serializers.Serializer

sync.Mutex
}

// Client is a protocol neutral MQTT client for connecting,
// disconnecting, and publishing data to a topic.
// The protocol specific clients must implement this interface
type Client interface {
Connect() error
Publish(topic string, data []byte) error
Close() error
}

func (*MQTT) SampleConfig() string {
return sampleConfig
}

func (m *MQTT) Connect() error {
var err error
m.Lock()
defer m.Unlock()
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS)
}

m.opts, err = m.createOpts()
if err != nil {
return err
switch m.Protocol {
case "", "3.1.1":
m.client = newMQTTv311Client(m)
case "5":
m.client = newMQTTv5Client(m)
default:
return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol)
}

m.client = paho.NewClient(m.opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

return nil
return m.client.Connect()
}

func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
m.serializer = serializer
}

func (m *MQTT) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(20)
}
return nil
return m.client.Close()
}

func (m *MQTT) Write(metrics []telegraf.Metric) error {
Expand Down Expand Up @@ -119,7 +120,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
continue
}

err = m.publish(topic, buf)
err = m.client.Publish(topic, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
Expand All @@ -132,69 +133,29 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if err != nil {
return err
}
publisherr := m.publish(key, buf)
if publisherr != nil {
return fmt.Errorf("could not write to MQTT server, %s", publisherr)
err = m.client.Publish(key, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
}

return nil
}

func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
token.WaitTimeout(time.Duration(m.Timeout))
if token.Error() != nil {
return token.Error()
}
return nil
}

func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
opts.KeepAlive = m.KeepAlive

if m.Timeout < config.Duration(time.Second) {
m.Timeout = config.Duration(5 * time.Second)
}
opts.WriteTimeout = time.Duration(m.Timeout)

if m.ClientID != "" {
opts.SetClientID(m.ClientID)
} else {
opts.SetClientID("Telegraf-Output-" + internal.RandomString(5))
}

tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

scheme := "tcp"
if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg)
}

user := m.Username
if user != "" {
opts.SetUsername(user)
}
password := m.Password
if password != "" {
opts.SetPassword(password)
}

if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host informations")
}
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host)

opts.AddBroker(server)
func parseServers(servers []string) ([]*url.URL, error) {
urls := make([]*url.URL, 0, len(servers))
for _, svr := range servers {
if !strings.Contains(svr, "://") {
urls = append(urls, &url.URL{Scheme: "tcp", Host: svr})
} else {
u, err := url.Parse(svr)
if err != nil {
return nil, err
}
urls = append(urls, u)
}
}
opts.SetAutoReconnect(true)
return opts, nil
return urls, nil
}

func init() {
Expand Down
92 changes: 83 additions & 9 deletions plugins/outputs/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,113 @@ package mqtt

import (
"fmt"
"path/filepath"
"testing"

"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/stretchr/testify/require"
)

const servicePort = "1883"

func launchTestContainer(t *testing.T) *testutil.Container {
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")

container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
err = container.Start()
require.NoError(t, err, "failed to start container")

return &container
}

func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "1883"
container := testutil.Container{
Image: "ncarlier/mqtt",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqtt-default-integration-test"},
}
err := container.Start()
require.NoError(t, err, "failed to start container")

// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "3.1.1",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv311-integration-test"},
}

// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, _ := serializers.NewInfluxSerializer()
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "5",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv5-integration-test"},
}

// Verify that we can connect to the MQTT broker
Expand Down
Loading

0 comments on commit af43d01

Please sign in to comment.