Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.mqtt): Add support for MQTT protocol version 5 #11284

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ following works:
- github.com/eapache/go-resiliency [MIT License](https://github.com/eapache/go-resiliency/blob/master/LICENSE)
- github.com/eapache/go-xerial-snappy [MIT License](https://github.com/eapache/go-xerial-snappy/blob/master/LICENSE)
- github.com/eapache/queue [MIT License](https://github.com/eapache/queue/blob/master/LICENSE)
- github.com/eclipse/paho.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.golang/blob/master/LICENSE)
- github.com/eclipse/paho.mqtt.golang [Eclipse Public License - v 1.0](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE)
- github.com/emicklei/go-restful [MIT License](https://github.com/emicklei/go-restful/blob/v3/LICENSE)
- github.com/fatih/color [MIT License](https://github.com/fatih/color/blob/master/LICENSE.md)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0
github.com/eclipse/paho.golang v0.10.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/fatih/color v1.13.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg=
github.com/echlebek/crock v1.0.1/go.mod h1:/kvwHRX3ZXHj/kHWJkjXDmzzRow54EJuHtQ/PapL/HI=
github.com/echlebek/timeproxy v1.0.0 h1:V41/v8tmmMDNMA2GrBPI45nlXb3F7+OY+nJz1BqKsCk=
github.com/echlebek/timeproxy v1.0.0/go.mod h1:0dg2Lnb8no/jFwoMQKMTU6iAivgoMptGqSTprhnrRtk=
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
Expand Down
15 changes: 10 additions & 5 deletions plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# MQTT Producer Output Plugin

This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
Producer.
Producer. It supports MQTT protocols `3.1.1` and `5`.

## Mosquitto v2.0.12+ and `identifier rejected`

Expand All @@ -10,7 +10,7 @@ In v2.0.12+ of the mosquitto MQTT server, there is a
`keep_alive` value to be set non-zero in your telegraf configuration. If not
set, the server will return with `identifier rejected`.

As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
As a reference `eclipse/paho.golang` sets the `keep_alive` to 30.

## Configuration

Expand All @@ -19,9 +19,14 @@ As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
[[outputs.mqtt]]
## MQTT Brokers
## The list of brokers should only include the hostname or IP address and the
## port to the broker. This should follow the format '{host}:{port}'. For
## example, "localhost:1883" or "127.0.0.1:8883".
servers = ["localhost:1883"]
## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For
## example, `localhost:1883` or `mqtt://localhost:1883`.
## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts://
## non-TLS and TLS servers can not be mix-and-matched.
servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"]

## Protocol can be `3.1.1` or `5`. Default is `3.1.1`
# procotol = "3.1.1"

## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
Expand Down
131 changes: 46 additions & 85 deletions plugins/outputs/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
package mqtt

import (
// Blank import to support go:embed compile directive
_ "embed"
"fmt"
"net/url"
"strings"
"sync"
"time"

paho "github.com/eclipse/paho.mqtt.golang"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
Expand All @@ -23,66 +21,69 @@ import (
var sampleConfig string

const (
defaultKeepAlive = 0
defaultKeepAlive = 30
)

type MQTT struct {
Servers []string `toml:"servers"`
Username string
Password string
Protocol string `toml:"protocol"`
Username string `toml:"username"`
Password string `toml:"password"`
Database string
Timeout config.Duration
TopicPrefix string
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
Timeout config.Duration `toml:"timeout"`
TopicPrefix string `toml:"topic_prefix"`
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
tls.ClientConfig
BatchMessage bool `toml:"batch"`
Retain bool `toml:"retain"`
KeepAlive int64 `toml:"keep_alive"`
Log telegraf.Logger `toml:"-"`

client paho.Client
opts *paho.ClientOptions

client Client
serializer serializers.Serializer

sync.Mutex
}

// Client is a protocol neutral MQTT client for connecting,
// disconnecting, and publishing data to a topic.
// The protocol specific clients must implement this interface
type Client interface {
Connect() error
Publish(topic string, data []byte) error
Close() error
}

func (*MQTT) SampleConfig() string {
return sampleConfig
}

func (m *MQTT) Connect() error {
var err error
m.Lock()
defer m.Unlock()
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS)
}

m.opts, err = m.createOpts()
if err != nil {
return err
switch m.Protocol {
case "", "3.1.1":
m.client = newMQTTv311Client(m)
case "5":
m.client = newMQTTv5Client(m)
default:
return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol)
}

m.client = paho.NewClient(m.opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

return nil
return m.client.Connect()
}

func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
m.serializer = serializer
}

func (m *MQTT) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(20)
}
return nil
return m.client.Close()
}

func (m *MQTT) Write(metrics []telegraf.Metric) error {
Expand Down Expand Up @@ -119,7 +120,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
continue
}

err = m.publish(topic, buf)
err = m.client.Publish(topic, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
Expand All @@ -132,69 +133,29 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if err != nil {
return err
}
publisherr := m.publish(key, buf)
if publisherr != nil {
return fmt.Errorf("could not write to MQTT server, %s", publisherr)
err = m.client.Publish(key, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
}

return nil
}

func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
token.WaitTimeout(time.Duration(m.Timeout))
if token.Error() != nil {
return token.Error()
}
return nil
}

func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
opts.KeepAlive = m.KeepAlive

if m.Timeout < config.Duration(time.Second) {
m.Timeout = config.Duration(5 * time.Second)
}
opts.WriteTimeout = time.Duration(m.Timeout)

if m.ClientID != "" {
opts.SetClientID(m.ClientID)
} else {
opts.SetClientID("Telegraf-Output-" + internal.RandomString(5))
}

tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

scheme := "tcp"
if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg)
}

user := m.Username
if user != "" {
opts.SetUsername(user)
}
password := m.Password
if password != "" {
opts.SetPassword(password)
}

if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host informations")
}
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host)

opts.AddBroker(server)
func parseServers(servers []string) ([]*url.URL, error) {
urls := make([]*url.URL, 0, len(servers))
for _, svr := range servers {
if !strings.Contains(svr, "://") {
urls = append(urls, &url.URL{Scheme: "tcp", Host: svr})
} else {
u, err := url.Parse(svr)
if err != nil {
return nil, err
}
urls = append(urls, u)
}
}
opts.SetAutoReconnect(true)
return opts, nil
return urls, nil
}

func init() {
Expand Down
92 changes: 83 additions & 9 deletions plugins/outputs/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,113 @@ package mqtt

import (
"fmt"
"path/filepath"
"testing"

"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/stretchr/testify/require"
)

const servicePort = "1883"

func launchTestContainer(t *testing.T) *testutil.Container {
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")

container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
err = container.Start()
require.NoError(t, err, "failed to start container")

return &container
}

func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "1883"
container := testutil.Container{
Image: "ncarlier/mqtt",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqtt-default-integration-test"},
}
err := container.Start()
require.NoError(t, err, "failed to start container")

// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "3.1.1",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv311-integration-test"},
}

// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()

var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, _ := serializers.NewInfluxSerializer()
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "5",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv5-integration-test"},
}

// Verify that we can connect to the MQTT broker
Expand Down
Loading