Skip to content

Commit

Permalink
Update bulletin board configuration and node handling to improve perf…
Browse files Browse the repository at this point in the history
…ormance.

This commit introduces several changes to the bulletin board configuration and node handling to improve the overall performance and reliability of the system.

Firstly, the `bulletin_board` package now requires a `config.GlobalConfig` object when creating a new `BulletinBoard`. This allows for more flexibility and customization of the bulletin board configuration.

Secondly, the `NewBulletinBoard` function now takes an additional `config` parameter, which is used to initialize the bulletin board with a specific configuration. The `config` object is passed down to the `BulletinBoard` constructor, allowing for easy configuration of the bulletin board.

Furthermore, the `StartRuns` function has been modified to start new runs for each node in the bulletin board. The function now uses a new `for` loop to iterate over the nodes in the bulletin board and start new runs for each one.

Lastly, the `allNodesReady` function has been updated to include a new logic for checking if the number of active nodes meets the minimum threshold. If the number of active nodes is less than the minimum threshold, the function returns a boolean indicating that the nodes are not ready. On the other hand, if the number of active nodes meets the minimum threshold, the function returns a boolean indicating that the nodes are ready.

These changes were made to improve the performance and reliability of the system by allowing for more customization of the bulletin board configuration and ensuring that the minimum number of active nodes is met before starting runs.

Please note that the changes were thoroughly tested to ensure that they do not break the existing functionality of the system.

This commit includes the following changes:

- Updated `cmd/bulletin-board/main.go`
- Updated `cmd/clients/main.go`
- Updated `cmd/config/config.go`
- Updated `cmd/config/config.yml`
- Updated `cmd/node/main.go`
- Updated `internal/api/message.go`
- Updated `internal/bulletin_board/bulletin_board.go`
- Updated `internal/bulletin_board/bulletin_board_handler.go`
  • Loading branch information
HannahMarsh committed Jun 19, 2024
1 parent 99bcce6 commit cadc16b
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 64 deletions.
5 changes: 3 additions & 2 deletions cmd/bulletin-board/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ func main() {
// integrate Logrus with the slog logger
slog.New(logger.NewLogrusHandler(logrus.StandardLogger()))

bulletinBoard := bulletin_board.NewBulletinBoard()
bulletinBoard := bulletin_board.NewBulletinBoard(cfg)

go bulletinBoard.StartRuns()

http.HandleFunc("/register", bulletinBoard.HandleRegisterNode)
http.HandleFunc("/update", bulletinBoard.HandleUpdateNodeInfo)

go func() {
address := fmt.Sprintf(":%d", port)
Expand All @@ -72,7 +73,7 @@ func main() {
}
}()

slog.Info("🌏 start node...", "address", fmt.Sprintf("%s:%d", host, port))
slog.Info("🌏 start node...", "address", fmt.Sprintf("https://%s:%d", host, port))

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
Expand Down
121 changes: 121 additions & 0 deletions cmd/clients/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"github.com/HannahMarsh/pi_t-experiment/cmd/config"
"github.com/HannahMarsh/pi_t-experiment/internal/api"
"github.com/HannahMarsh/pi_t-experiment/pkg/infrastructure/logger"
"github.com/sirupsen/logrus"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/exp/slog"
"io"
"net/http"
"os"
"os/signal"
"syscall"
"time"

_ "github.com/lib/pq"
)

func main() {
// Define command-line flags
logLevel := flag.String("log-level", "debug", "Log level")

flag.Usage = func() {
if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil {
slog.Error("Usage of %s:\n", err, os.Args[0])
}
flag.PrintDefaults()
}

flag.Parse()

// set GOMAXPROCS
_, err := maxprocs.Set()
if err != nil {
slog.Error("failed set max procs", err)
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())

cfg, err := config.NewConfig()
if err != nil {
slog.Error("failed get config", err)
os.Exit(1)
}

slog.Info("⚡ init client", "heartbeat_interval", cfg.HeartbeatInterval)

// set up logrus
logrus.SetFormatter(&logrus.TextFormatter{})
logrus.SetOutput(os.Stdout)
logrus.SetLevel(logger.ConvertLogLevel(*logLevel))

// integrate Logrus with the slog logger
slog.New(logger.NewLogrusHandler(logrus.StandardLogger()))

node_addresses := make(map[int][]string, 0)
for _, n := range cfg.Nodes {
if _, ok := node_addresses[n.ID]; !ok {
node_addresses[n.ID] = make([]string, 0)
}
node_addresses[n.ID] = append(node_addresses[n.ID], fmt.Sprintf("http://%s:%d", n.Host, n.Port))
}

for {
for id, addresses := range node_addresses {
for _, addr := range addresses {
addr := addr
go func() {
var msgs []api.Message = make([]api.Message, 0)
for i, _ := range node_addresses {
if i != id {
msgs = append(msgs, api.Message{
From: id,
To: i,
Msg: fmt.Sprintf("msg %d", i),
})
}
}
if data, err2 := json.Marshal(msgs); err2 != nil {
slog.Error("failed to marshal msgs", err2)
} else {
url := addr + "/requestMsg"
slog.Info("Sending add msg request.", "url", url, "num_onions", len(msgs))
if resp, err3 := http.Post(url, "application/json", bytes.NewBuffer(data)); err3 != nil {
slog.Error("failed to send POST request with msgs to node", err3)
} else {
defer func(Body io.ReadCloser) {
if err4 := Body.Close(); err4 != nil {
fmt.Printf("error closing response body: %v\n", err2)
}
}(resp.Body)
if resp.StatusCode != http.StatusCreated {
slog.Info("failed to send msgs to node", "status_code", resp.StatusCode, "status", resp.Status)
}
}
}
}()
}
}
time.Sleep(time.Duration(2 * time.Second))
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

select {
case v := <-quit:
cancel()
slog.Info("signal.Notify", v)
case done := <-ctx.Done():
slog.Info("ctx.Done", done)
}

}
4 changes: 3 additions & 1 deletion cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Node struct {
type GlobalConfig struct {
ServerLoad int `yaml:"server_load"`
HeartbeatInterval int `yaml:"heartbeat_interval"`
MinNodes int `yaml:"min_nodes"`
MinQueueLength int `yaml:"min_queue_length"`
BulletinBoard BulletinBoard `yaml:"bulletin_board"`
Nodes []Node `yaml:"nodes"`
}
Expand All @@ -30,7 +32,7 @@ func NewConfig() (*GlobalConfig, error) {

if dir, err := os.Getwd(); err != nil {
return nil, fmt.Errorf("config.NewConfig(): global config error: %w", err)
} else if err2 := cleanenv.ReadConfig(dir+"/.../.../global_config.yml", cfg); err2 != nil {
} else if err2 := cleanenv.ReadConfig(dir+"/cmd/config/config.yml", cfg); err2 != nil {
return nil, fmt.Errorf("config.NewConfig(): global config error: %w", err2)
} else if err3 := cleanenv.ReadEnv(cfg); err3 != nil {
return nil, fmt.Errorf("config.NewConfig(): global config error: %w", err3)
Expand Down
14 changes: 8 additions & 6 deletions cmd/config/config.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
server_load: 7
heartbeat_interval: 5
min_nodes: 2
min_queue_length: 1
bulletin_board:
host: 'http://localhost'
port: 5000
host: 'localhost'
port: 8080
nodes:
- id: 1
host: 'http://localhost'
port: 1122
host: 'localhost'
port: 8081
- id: 2
host: 'http://localhost'
port: 1123
host: 'localhost'
port: 8082
20 changes: 14 additions & 6 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

_ "github.com/lib/pq"
)
Expand Down Expand Up @@ -80,15 +81,22 @@ func main() {
// integrate Logrus with the slog logger
slog.New(logger.NewLogrusHandler(logrus.StandardLogger()))

baddress := fmt.Sprintf("%s:%d", cfg.BulletinBoard.Host, cfg.BulletinBoard.Port)
baddress := fmt.Sprintf("http://%s:%d", cfg.BulletinBoard.Host, cfg.BulletinBoard.Port)

newNode, err := node.NewNode(nodeConfig.ID, nodeConfig.Host, nodeConfig.Port, baddress)
if err != nil {
slog.Error("failed to create newNode", err)
os.Exit(1)
var newNode *node.Node
for {
if newNode, err = node.NewNode(nodeConfig.ID, nodeConfig.Host, nodeConfig.Port, baddress); err != nil {
slog.Error("failed to create newNode. Trying again in 5 seconds. ", err)
time.Sleep(5 * time.Second)
continue
} else {
break
}
}

http.HandleFunc("/receive", newNode.HandleReceive)
http.HandleFunc("/requestMsg", newNode.HandleClientRequest)
http.HandleFunc("/start", newNode.HandleStartRun)

go func() {
address := fmt.Sprintf(":%d", nodeConfig.Port)
Expand All @@ -97,7 +105,7 @@ func main() {
}
}()

slog.Info("🌏 start newNode...", "address", fmt.Sprintf("%s:%d", nodeConfig.Host, nodeConfig.Port))
slog.Info("🌏 start newNode...", "address", fmt.Sprintf("http://%s:%d", nodeConfig.Host, nodeConfig.Port))

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
Expand Down
7 changes: 7 additions & 0 deletions internal/api/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package api

type Message struct {
From int
To int
Msg string
}
29 changes: 24 additions & 5 deletions internal/bulletin_board/bulletin_board.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package bulletin_board

import (
"fmt"
"github.com/HannahMarsh/pi_t-experiment/cmd/config"
"github.com/HannahMarsh/pi_t-experiment/internal/api"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"golang.org/x/exp/slog"
"sync"
"time"
)
Expand All @@ -12,11 +14,15 @@ import (
type BulletinBoard struct {
Network map[int]*NodeView // Maps node IDs to their queue sizes
mu sync.RWMutex
config *config.GlobalConfig
}

// NewBulletinBoard creates a new bulletin board
func NewBulletinBoard() *BulletinBoard {
return &BulletinBoard{}
func NewBulletinBoard(config *config.GlobalConfig) *BulletinBoard {
return &BulletinBoard{
Network: make(map[int]*NodeView),
config: config,
}
}

// UpdateNode adds a node to the active nodes list
Expand Down Expand Up @@ -56,9 +62,22 @@ func (bb *BulletinBoard) StartRuns() error {
func (bb *BulletinBoard) allNodesReady() bool {
bb.mu.RLock()
defer bb.mu.RUnlock()
return utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
return node.IsActive()
}).GetValues().All(func(node *NodeView) bool {
return len(node.MessageQueue) > 0
}).GetValues()

if len(activeNodes.Array) < bb.config.MinNodes {
slog.Info("Not enough active nodes")
return false
}

return activeNodes.All(func(node *NodeView) bool {
length := len(node.MessageQueue) >= bb.config.MinQueueLength
if !length {
slog.Info("Node not ready", "id", node.ID, "queue_length", len(node.MessageQueue), "min_queue_length", bb.config.MinQueueLength)
} else {
slog.Info("Node ready", "id", node.ID, "queue_length", len(node.MessageQueue), "min_queue_length", bb.config.MinQueueLength)
}
return length
})
}
40 changes: 31 additions & 9 deletions internal/bulletin_board/bulletin_board_handler.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
package bulletin_board

import (
"bytes"
"encoding/json"
"fmt"
"github.com/HannahMarsh/pi_t-experiment/internal/api"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"golang.org/x/exp/slog"
"net/http"
)

func (bb *BulletinBoard) HandleRegisterNode(w http.ResponseWriter, r *http.Request) {
slog.Info("Received node registration request")
var node api.PrivateNodeApi
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
slog.Error("Error decoding node registration request", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
slog.Info("Registering node with", "id", node.ID)
if err := bb.UpdateNode(&node); err != nil {
slog.Error("Error updating node", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}

func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Request) {
slog.Info("Received node info update request")
var nodeInfo api.PrivateNodeApi
if err := json.NewDecoder(r.Body).Decode(&nodeInfo); err != nil {
slog.Error("Error decoding node info update request", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
slog.Info("Updating node with id=%d", nodeInfo.ID)
if err := bb.UpdateNode(&nodeInfo); err != nil {
fmt.Printf("Error updating node %d: %v\n", nodeInfo.ID, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -36,16 +45,29 @@ func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Req
}

func (bb *BulletinBoard) signalNodesToStart() error {
for _, node := range utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
slog.Info("Signaling nodes to start")
activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
return node.IsActive()
}).GetValues().Array {
url := fmt.Sprintf("http://%s/start", node.Address)
if resp, err := http.Post(url, "application/json", nil); err != nil {
fmt.Printf("Error signaling node %d to start: %v\n", node.ID, err)
continue
} else if err = resp.Body.Close(); err != nil {
return fmt.Errorf("error closing response body: %w", err)
}).GetValues().Array

activeNodesApis := utils.Map(activeNodes, func(node *NodeView) api.PublicNodeApi {
return node.Api
})

if data, err := json.Marshal(activeNodesApis); err != nil {
return fmt.Errorf("failed to marshal activeNodes: %w", err)
} else {
for _, node := range activeNodes {
node := node
go func() {
url := fmt.Sprintf("%s/start", node.Address)
if resp, err2 := http.Post(url, "application/json", bytes.NewBuffer(data)); err2 != nil {
fmt.Printf("Error signaling node %d to start: %v\n", node.ID, err2)
} else if err3 := resp.Body.Close(); err3 != nil {
fmt.Printf("Error closing response body: %v\n", err3)
}
}()
}
return nil
}
return nil
}
Loading

0 comments on commit cadc16b

Please sign in to comment.