Skip to content

Commit

Permalink
Heartbeat based connection
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Feb 21, 2017
1 parent 7d1c9b9 commit be91c4a
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 47 deletions.
6 changes: 3 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"fmt"
"os"

"github.com/getsentry/raven-go"
raven "github.com/getsentry/raven-go"
"github.com/labstack/echo"
"github.com/labstack/echo/engine"
"github.com/labstack/echo/engine/standard"
Expand All @@ -30,7 +30,7 @@ type App struct {
Host string
API *echo.Echo
Engine engine.Server
MqttBot *bot.MqttBot
MQTTBot *bot.MQTTBot
RedisClient *redisclient.RedisClient
NewRelic newrelic.Application
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (app *App) configureSentry() {
}

func (app *App) configureApplication() {
app.MqttBot = bot.GetMqttBot()
app.MQTTBot = bot.GetMQTTBot()
app.Engine = standard.New(fmt.Sprintf("%s:%d", app.Host, app.Port))
app.API = echo.New()
a := app.API
Expand Down
29 changes: 18 additions & 11 deletions bot/mqtt_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ type Subscription struct {
PluginMappings []*PluginMapping
}

// MqttBot defines the bot, it contains plugins, subscriptions and a client
type MqttBot struct {
// MQTTBot defines the bot, it contains plugins, subscriptions and a client
type MQTTBot struct {
Plugins *plugins.Plugins
Subscriptions []*Subscription
Client *mqttclient.MqttClient
Client *mqttclient.MQTTClient
Config *viper.Viper
}

var mqttBot *MqttBot
var mqttBot *MQTTBot
var once sync.Once

var h mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
Expand All @@ -52,18 +52,24 @@ var h mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
}
}

// GetMqttBot returns a initialized mqtt bot
func GetMqttBot() *MqttBot {
//ResetMQTTBot clears once and mqttbot instance
func ResetMQTTBot() {
once = sync.Once{}
mqttBot = nil
}

// GetMQTTBot returns a initialized mqtt bot
func GetMQTTBot() *MQTTBot {
once.Do(func() {
addCredentialsToRedis()
mqttBot = &MqttBot{}
mqttBot.Client = mqttclient.GetMqttClient(onClientConnectHandler)
mqttBot = &MQTTBot{}
mqttBot.Client = mqttclient.GetMQTTClient(onClientConnectHandler)
mqttBot.setupPlugins()
})
return mqttBot
}

func (b *MqttBot) setupPlugins() {
func (b *MQTTBot) setupPlugins() {
b.Plugins = plugins.GetPlugins()
b.Plugins.SetupPlugins()
}
Expand All @@ -74,9 +80,10 @@ var onClientConnectHandler = func(client mqtt.Client) {

// StartBot starts the bot, it subscribes the bot to the topics defined in the
// configuration file
func (b *MqttBot) StartBot() {
func (b *MQTTBot) StartBot() {
subscriptions := viper.Get("mqttserver.subscriptionRequests").([]interface{})
client := b.Client.MqttClient
client := b.Client.MQTTClient
fmt.Println(12)
b.Subscriptions = []*Subscription{}
for _, s := range subscriptions {
sMap := s.(map[interface{}]interface{})
Expand Down
10 changes: 5 additions & 5 deletions modules/plugins_mqtt_client_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

var mqttClient mqtt.Client

// MqttClientModuleLoader loads the module and prepares it
func MqttClientModuleLoader(L *lua.LState) int {
configureMqttModule()
// MQTTClientModuleLoader loads the module and prepares it
func MQTTClientModuleLoader(L *lua.LState) int {
configureMQTTModule()
mod := L.SetFuncs(L.NewTable(), mqttClientModuleExports)
L.Push(mod)
return 1
Expand All @@ -23,8 +23,8 @@ var mqttClientModuleExports = map[string]lua.LGFunction{
"send_message": SendMessage,
}

func configureMqttModule() {
mqttClient = mqttclient.GetMqttClient(nil).MqttClient
func configureMQTTModule() {
mqttClient = mqttclient.GetMQTTClient(nil).MQTTClient
}

// SendMessage sends message to mqtt
Expand Down
67 changes: 67 additions & 0 deletions mqttclient/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package mqttclient

import (
"fmt"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

//Heartbeat for MQTT Client
//Will Subscribe and Publish to MQTT
type Heartbeat struct {
Topic string
Client *MQTTClient
OnHeartbeatMissed func(error)
LastHeartbeat time.Time
stopped bool
MaxDurationMs int64
}

func (h *Heartbeat) receivedHeartbeat(client MQTT.Client, msg MQTT.Message) {
h.LastHeartbeat = time.Now()
}

//Start the heartbeat
func (h *Heartbeat) Start() error {
if h.MaxDurationMs == 0 {
h.MaxDurationMs = 5000
}
h.stopped = false
client := h.Client.MQTTClient
if !client.IsConnected() {
return fmt.Errorf("Can't start heartbeat. MQTT Client is not connected!")
}
if token := client.Subscribe(h.Topic, uint8(0), h.receivedHeartbeat); token.Wait() && token.Error() != nil {
return token.Error()
}
h.LastHeartbeat = time.Now()

go func() {
for !h.stopped {
token := h.Client.MQTTClient.Publish(h.Topic, uint8(2), false, "OK")
token.Wait()
err := token.Error()
if err != nil {
h.OnHeartbeatMissed(err)
h.stopped = true
return
}
time.Sleep(1000 * time.Millisecond)
}
}()

go func() {
for !h.stopped {
duration := time.Now().Sub(h.LastHeartbeat).Nanoseconds() / 1000000
if duration > h.MaxDurationMs {
h.stopped = true
h.OnHeartbeatMissed(fmt.Errorf("Timeout in heartbeat: %d.", duration))
return
}
time.Sleep(500 * time.Millisecond)
}

}()
return nil
}
91 changes: 65 additions & 26 deletions mqttclient/mqtt_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,66 @@ import (
"time"

"github.com/eclipse/paho.mqtt.golang"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"github.com/topfreegames/mqttbot/logger"
)

// MqttClient contains the data needed to connect the client
type MqttClient struct {
MqttServerHost string
MqttServerPort int
MqttClient mqtt.Client
// MQTTClient contains the data needed to connect the client
type MQTTClient struct {
MQTTServerHost string
MQTTServerPort int
MQTTClient mqtt.Client
Heartbeat *Heartbeat
OnConnectHandler mqtt.OnConnectHandler
}

var client *MqttClient
var client *MQTTClient
var once sync.Once

// GetMqttClient creates the mqttclient and returns it
func GetMqttClient(onConnectHandler mqtt.OnConnectHandler) *MqttClient {
//ResetMQTTClient resets once
func ResetMQTTClient() {
once = sync.Once{}
client = nil
}

// GetMQTTClient creates the mqttclient and returns it
func GetMQTTClient(onConnectHandler mqtt.OnConnectHandler) *MQTTClient {
once.Do(func() {
client = &MqttClient{}
client = &MQTTClient{}
client.configure(onConnectHandler)
})
return client
}

func (mc *MqttClient) configure(onConnectHandler mqtt.OnConnectHandler) {
func (mc *MQTTClient) hasConnected(client mqtt.Client) {
if mc.OnConnectHandler != nil {
mc.OnConnectHandler(client)
}

mc.Heartbeat = &Heartbeat{
Topic: uuid.NewV4().String(),
Client: mc,
OnHeartbeatMissed: mc.onHeartbeatMissed,
}
mc.Heartbeat.Start()
}

func (mc *MQTTClient) configure(onConnectHandler mqtt.OnConnectHandler) {
mc.OnConnectHandler = onConnectHandler
mc.setConfigurationDefaults()
mc.configureClient()
mc.start(onConnectHandler)
mc.start()
}

func (mc *MqttClient) setConfigurationDefaults() {
func (mc *MQTTClient) onHeartbeatMissed(err error) {
if mc.MQTTClient.IsConnected() {
mc.MQTTClient.Disconnect(0)
}
mc.start()
}

func (mc *MQTTClient) setConfigurationDefaults() {
viper.SetDefault("mqttserver.host", "localhost")
viper.SetDefault("mqttserver.port", 1883)
viper.SetDefault("mqttserver.user", "admin")
Expand All @@ -47,23 +77,25 @@ func (mc *MqttClient) setConfigurationDefaults() {
viper.SetDefault("mqttserver.ca_cert_file", "")
}

func (mc *MqttClient) configureClient() {
mc.MqttServerHost = viper.GetString("mqttserver.host")
mc.MqttServerPort = viper.GetInt("mqttserver.port")
func (mc *MQTTClient) configureClient() {
mc.MQTTServerHost = viper.GetString("mqttserver.host")
mc.MQTTServerPort = viper.GetInt("mqttserver.port")
}

func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {
func (mc *MQTTClient) start() {
logger.Logger.Debug("Initializing mqtt client")

useTls := viper.GetBool("mqttserver.usetls")
useTLS := viper.GetBool("mqttserver.usetls")
protocol := "tcp"
if useTls {
if useTLS {
protocol = "ssl"
}

opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("%s://%s:%d", protocol, mc.MqttServerHost, mc.MqttServerPort)).SetClientID("mqttbot")
opts := mqtt.NewClientOptions().AddBroker(
fmt.Sprintf("%s://%s:%d", protocol, mc.MQTTServerHost, mc.MQTTServerPort),
).SetClientID("mqttbot")

if useTls {
if useTLS {
logger.Logger.Info("mqttclient using tls")
certpool := x509.NewCertPool()
if viper.GetString("mqttserver.ca_cert_file") != "" {
Expand All @@ -74,7 +106,11 @@ func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {
logger.Logger.Error(err.Error())
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: viper.GetBool("mqttserver.insecure_tls"), ClientAuth: tls.NoClientCert, RootCAs: certpool}
tlsConfig := &tls.Config{
InsecureSkipVerify: viper.GetBool("mqttserver.insecure_tls"),
ClientAuth: tls.NoClientCert,
RootCAs: certpool,
}
opts.SetTLSConfig(tlsConfig)
}

Expand All @@ -83,15 +119,18 @@ func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {
opts.SetKeepAlive(3 * time.Second)
opts.SetPingTimeout(5 * time.Second)
opts.SetMaxReconnectInterval(30 * time.Second)
opts.SetOnConnectHandler(onConnectHandler)
mc.MqttClient = mqtt.NewClient(opts)
opts.SetOnConnectHandler(mc.hasConnected)
opts.SetAutoReconnect(false)
mc.MQTTClient = mqtt.NewClient(opts)

c := mc.MqttClient
c := mc.MQTTClient

if token := c.Connect(); token.Wait() && token.Error() != nil {
logger.Logger.Fatal(token.Error())
}

logger.Logger.Info(fmt.Sprintf("Successfully connected to mqtt server at %s:%d!",
mc.MqttServerHost, mc.MqttServerPort))
logger.Logger.Info(fmt.Sprintf(
"Successfully connected to mqtt server at %s:%d!",
mc.MQTTServerHost, mc.MQTTServerPort,
))
}
Loading

0 comments on commit be91c4a

Please sign in to comment.