Skip to content

Commit

Permalink
add mqqt pipeline client
Browse files Browse the repository at this point in the history
Signed-off-by: hmoazzem <moazzem@edgeflare.io>
  • Loading branch information
hmoazzem committed Oct 24, 2024
1 parent f142a00 commit 6732574
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
)

func main() {
if err := run(); err != nil {
// if err := run(); err != nil {
// log.Fatal(err)
// }

if err := pipelinesDemo(); err != nil {
log.Fatal(err)
}
}
Expand Down
73 changes: 73 additions & 0 deletions examples/postgres-cdc/pipelines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

func pipelinesDemo() error {
// Check if PGO_POSTGRES_LOGREPL_CONN_STRING is set
if os.Getenv("PGO_POSTGRES_LOGREPL_CONN_STRING") == "" {
return fmt.Errorf("PGO_POSTGRES_LOGREPL_CONN_STRING environment variable is not set")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Start consuming CDC events
eventsChan, err := logrepl.Run(ctx)
if err != nil {
return err
}

// Get the pipeline manager
m := pipeline.Manager()
// TODO: should also take config here
m.AddPeer(pipeline.ConnectorMQTT, "mqtt-default")
m.AddPeer(pipeline.ConnectorDebug, "debug")
// TODO: add more peers

// Initialize all peers
for _, p := range m.Peers() {
// use config it not nil. then check env var. finally fall back to defaults
err := p.Connector().Init(nil)
if err != nil {
return fmt.Errorf("failed to initialize connector %s: %w", p.Name(), err)
}
}

// Process events in a separate goroutine for each peer
for _, p := range m.Peers() {
go func(peer pipeline.Peer) {
for event := range eventsChan {
err := peer.Connector().Publish(event)
if err != nil {
log.Printf("Error publishing to %s: %v", peer.Name(), err)
}
}
}(p)
}

log.Println("Logical replication started. Press Ctrl+C to exit.")

// Wait for termination signal
<-sigChan
log.Println("Received termination signal, shutting down gracefully...")

// Trigger cancellation of the context
cancel()

log.Println("Shutdown complete")
return nil
}
5 changes: 3 additions & 2 deletions pkg/pipeline/clickhouse/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

type ClickHousePeer struct {
conn clickhouse.Conn
}

func (p *ClickHousePeer) Publish(data interface{}) error {
func (p *ClickHousePeer) Publish(event logrepl.PostgresCDC) error {
// TODO: implement
// send (possibly transformed) data to clickhouse
log.Println(pipeline.ConnectorClickHouse, data)
log.Println(pipeline.ConnectorClickHouse, event)
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package pipeline

import (
"encoding/json"

"github.com/edgeflare/pgo/pkg/x/logrepl"
)

// Connector defines the interface for data publishing components.
type Connector interface {
// Publish sends the given data to the connector's destination.
// It returns an error if the publish operation fails.
Publish(data interface{}) error
Publish(event logrepl.PostgresCDC) error

// Init initializes the connector with the provided configuration.
// The config parameter is a raw JSON message containing connector-specific settings.
Expand Down
15 changes: 8 additions & 7 deletions pkg/pipeline/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package pipeline
import (
"fmt"
"testing"

"github.com/edgeflare/pgo/pkg/x/logrepl"
)

func TestNewManager(t *testing.T) {
// Create a new manager
manager := NewManager()

// Test starting the manager
t.Run("Start Manager", func(t *testing.T) {
manager.Start()
})
manager := Manager()

// Test connectors
t.Run("Test Connectors", func(t *testing.T) {
Expand All @@ -22,7 +19,11 @@ func TestNewManager(t *testing.T) {
t.Errorf("Failed to initialize connector: %v", err)
}

msg := "hello..."
msg := logrepl.PostgresCDC{
Table: "test",
Data: map[string]interface{}{"hello": "world"},
Operation: logrepl.OperationInsert,
}
if err := c.Publish(msg); err != nil {
t.Errorf("Failed to publish message: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/debug/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"log"

"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

// PeerDebug is a debug peer that logs the data to the console
type PeerDebug struct{}

func (p *PeerDebug) Publish(data interface{}) error {
log.Println(pipeline.ConnectorDebug, data)
func (p *PeerDebug) Publish(event logrepl.PostgresCDC) error {
log.Println(pipeline.ConnectorDebug, event)
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/kafka/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"log"

"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

type PeerKafka struct{}

func (p *PeerKafka) Publish(data interface{}) error {
func (p *PeerKafka) Publish(event logrepl.PostgresCDC) error {
// TODO: implement
// send (possibly transformed) data to kafka topic
log.Println(pipeline.ConnectorKafka, data)
log.Println(pipeline.ConnectorKafka, event)
return nil
}

Expand Down
37 changes: 14 additions & 23 deletions pkg/pipeline/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ var (
peers = make(map[string]Peer)
)

// Manager handles connectors and peers for data pipeline operations.
// Mngr (Manager) handles connectors and peers for data pipeline operations.
// It supports dynamic loading of connector plugins and manages the lifecycle
// of data flows from PostgreSQL to various destinations.
type Manager struct {
type Mngr struct {
connectors map[string]Connector
peers map[string]Peer
}

// NewManager creates a new Manager instance.
func NewManager() *Manager {
return &Manager{
connectors: make(map[string]Connector),
peers: make(map[string]Peer),
// Manager returns the singleton Manager instance
func Manager() *Mngr {
return &Mngr{
connectors: connectors,
peers: peers,
}
}

// RegisterConnectorPlugin loads and registers a connector plugin from the specified path.
func (m *Manager) RegisterConnectorPlugin(path string, name string) error {
func (m *Mngr) RegisterConnectorPlugin(path string, name string) error {
plug, err := plugin.Open(path)
if err != nil {
return err
Expand All @@ -48,7 +48,7 @@ func (m *Manager) RegisterConnectorPlugin(path string, name string) error {
}

// NewPeer creates a new Peer
func (m *Manager) NewPeer(connector string, name string) (*Peer, error) {
func (m *Mngr) AddPeer(connector string, name string) (*Peer, error) {
if _, exists := m.connectors[connector]; !exists {
return nil, fmt.Errorf("connector %s not found", connector)
}
Expand All @@ -58,19 +58,10 @@ func (m *Manager) NewPeer(connector string, name string) (*Peer, error) {
return &peer, nil
}

// Start initializes and runs all registered peers and their associated connectors.
func (m *Manager) Start() {
// check connectors
for _, c := range connectors {
fmt.Println(c.Init(nil))
fmt.Println(c.Publish("hello.."))
}

// check peers
m.NewPeer(ConnectorMQTT, "mqtt-test")
m.NewPeer(ConnectorHTTP, "kafka-test")
for _, p := range peers {
fmt.Println(p.Name(), p.Connector().Init(nil))
fmt.Println(p.Name(), p.Connector().Publish("hello.."))
func (m *Mngr) Peers() []Peer {
peers := make([]Peer, 0, len(m.peers))
for _, p := range m.peers {
peers = append(peers, p)
}
return peers
}
75 changes: 68 additions & 7 deletions pkg/pipeline/mqtt/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,84 @@ package mqtt

import (
"encoding/json"
"fmt"
"log"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/util"
"github.com/edgeflare/pgo/pkg/util/rand"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

type PeerMQTT struct{}
type PeerMQTT struct {
client mqtt.Client
}

func (p *PeerMQTT) Publish(event logrepl.PostgresCDC) error {
topic := fmt.Sprintf("/pgcdc/%s", event.Table)
data, err := json.Marshal(event.Data)
if err != nil {
return err
}

func (p *PeerMQTT) Publish(data interface{}) error {
// TODO: implement
// send (possibly transformed) data to mqtt topic
log.Println(pipeline.ConnectorMQTT, data)
token := p.client.Publish(topic, 0, false, data)
token.Wait()
if err := token.Error(); err != nil {
log.Printf("Publish error: %v", err)
return err
}
return nil
}

func (p *PeerMQTT) Init(config json.RawMessage) error {
// TODO: implement
// parse config and set up mqtt client
type Config struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
ClientID string `json:"client_id"`
}

var cfg Config
if config != nil {
if err := json.Unmarshal(config, &cfg); err != nil {
return fmt.Errorf("config parse error: %w", err)
}
}

if cfg.Broker == "" {
cfg.Broker = util.GetEnvOrDefault("PGO_MQTT_BROKER", "tcp://127.0.0.1:1883")
}

opts := mqtt.NewClientOptions()
opts.AddBroker(cfg.Broker)

username := cfg.Username
if username == "" {
username = util.GetEnvOrDefault("PGO_MQTT_USERNAME", "")
}
opts.SetUsername(username)

password := cfg.Password
if password == "" {
password = util.GetEnvOrDefault("PGO_MQTT_PASSWORD", "")
}
opts.SetPassword(password)

clientID := cfg.ClientID
if clientID == "" {
clientID = fmt.Sprintf("pgo-logrepl-%s", rand.NewName())
}
opts.SetClientID(clientID)

mqttClient := mqtt.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return fmt.Errorf("broker connection error: %w", token.Error())
}

p.client = mqttClient
log.Println("MQTT client initialized")

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/plugin_example/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"log"

"github.com/edgeflare/pgo/pkg/pipeline"
"github.com/edgeflare/pgo/pkg/x/logrepl"
)

type PeerExample struct{}

func (p *PeerExample) Publish(data interface{}) error {
log.Println("example connector plugin publish", data)
func (p *PeerExample) Publish(event logrepl.PostgresCDC) error {
log.Println("example connector plugin publish", event)
return nil
}

Expand Down

0 comments on commit 6732574

Please sign in to comment.