Skip to content

Commit

Permalink
Update bulletin board logic, add active node handling, and fix concur…
Browse files Browse the repository at this point in the history
…rent map bugs.

This commit updates the bulletin board logic to properly report active nodes and handles the addition and removal of nodes. It also fixes bugs in the concurrent map handling. Implementations of onion routing and message queuing are added.
  • Loading branch information
HannahMarsh committed Jun 19, 2024
1 parent cadc16b commit ffbef78
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 48 deletions.
1 change: 1 addition & 0 deletions cmd/bulletin-board/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func main() {

http.HandleFunc("/register", bulletinBoard.HandleRegisterNode)
http.HandleFunc("/update", bulletinBoard.HandleUpdateNodeInfo)
http.HandleFunc("/nodes", bulletinBoard.HandleGetActiveNodes)

go func() {
address := fmt.Sprintf(":%d", port)
Expand Down
22 changes: 21 additions & 1 deletion internal/bulletin_board/bulletin_board_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Req
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
slog.Info("Updating node with id=%d", nodeInfo.ID)
slog.Info("Updating node with", "id", nodeInfo.ID)
if err := bb.UpdateNode(&nodeInfo); err != nil {
fmt.Printf("Error updating node %d: %v\n", nodeInfo.ID, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -44,6 +44,26 @@ func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Req
w.WriteHeader(http.StatusOK)
}

// HandleGetActiveNodes handles GET requests to return all active nodes
func (bb *BulletinBoard) HandleGetActiveNodes(w http.ResponseWriter, r *http.Request) {
bb.mu.Lock()
defer bb.mu.Unlock()
activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
return node.IsActive()
}).GetValues().Array

activeNodesApis := utils.Map(activeNodes, func(node *NodeView) api.PublicNodeApi {
return node.Api
})

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(activeNodesApis); err != nil {
slog.Error("Error encoding response", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (bb *BulletinBoard) signalNodesToStart() error {
slog.Info("Signaling nodes to start")
activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool {
Expand Down
123 changes: 93 additions & 30 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"fmt"
"github.com/HannahMarsh/pi_t-experiment/internal/api"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"github.com/enriquebris/goconcurrentqueue"
"github.com/orcaman/concurrent-map/v2"
"golang.org/x/exp/slog"
"io"
"net/http"
"sync"
"time"

"math/rand"
)

// Node represents a node in the onion routing network
Expand All @@ -22,9 +23,10 @@ type Node struct {
Port int
PublicKey []byte
PrivateKey []byte
OtherNodes cmap.ConcurrentMap[string, *api.PublicNodeApi]
QueuedOnions goconcurrentqueue.Queue
QueuedRequests goconcurrentqueue.Queue
ActiveNodes []api.PublicNodeApi
MessageQueue []*api.Message
OnionQueue []*Onion
mu sync.Mutex
NodeInfo api.PublicNodeApi
BulletinBoardUrl string
wg sync.WaitGroup
Expand All @@ -37,14 +39,14 @@ func NewNode(id int, host string, port int, bulletinBoardUrl string) (*Node, err
return nil, fmt.Errorf("node.NewNode(): failed to generate key pair: %w", err)
} else {
n := &Node{
ID: id,
Host: host,
Port: port,
PublicKey: publicKey,
PrivateKey: privateKey,
OtherNodes: cmap.New[*api.PublicNodeApi](),
QueuedOnions: goconcurrentqueue.NewFIFO(),
QueuedRequests: goconcurrentqueue.NewFIFO(),
ID: id,
Host: host,
Port: port,
PublicKey: publicKey,
PrivateKey: privateKey,
ActiveNodes: make([]api.PublicNodeApi, 0),
MessageQueue: make([]*api.Message, 0),
OnionQueue: make([]*Onion, 0),
NodeInfo: api.PublicNodeApi{
ID: id,
Address: fmt.Sprintf("%s:%d", host, port),
Expand All @@ -57,38 +59,101 @@ func NewNode(id int, host string, port int, bulletinBoardUrl string) (*Node, err
return nil, fmt.Errorf("node.NewNode(): failed to register with bulletin board: %w", err2)
}

go n.StartPeriodicUpdates(time.Second * 1)

return n, nil
}
}

func (n *Node) updateNode(node *api.PublicNodeApi) {
n.OtherNodes.Set(node.Address, node)
func (n *Node) StartPeriodicUpdates(interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
if err := n.updateBulletinBoard(); err != nil {
fmt.Printf("Error updating bulletin board: %v\n", err)
return
}
n.ProcessMessageQueue()
}
}()
}

func (n *Node) ProcessMessageQueue() {
n.mu.Lock()
defer n.mu.Unlock()
for _, msg := range n.MessageQueue {
// Create an onion from the message
if onion, err := n.NewOnion(msg, 1); err != nil {
fmt.Printf("Error creating onion: %v\n", err)
} else {
n.OnionQueue = append(n.OnionQueue, onion)
}
}
}

func (n *Node) getNode(id int) *api.PublicNodeApi {
for _, node := range n.ActiveNodes {
if node.ID == id {
return &node
}
}
return nil
}

func (n *Node) getRandomNode() *api.PublicNodeApi {
r := rand.Intn(len(n.ActiveNodes))
return &n.ActiveNodes[r]
}

func (n *Node) NewOnion(msg *api.Message, pathLength int) (*Onion, error) {
if msg_string, err := json.Marshal(msg); err != nil {
return nil, fmt.Errorf("NewOnion(): failed to marshal message: %w", err)
} else {
if to := n.getNode(msg.To); to != nil {
if o, err2 := NewOnion(fmt.Sprintf("%s/receive", to.Address), msg_string, to.PublicKey); err2 != nil {
return nil, fmt.Errorf("NewOnion(): failed to create onion: %w", err2)
} else {
for i := 0; i < pathLength; i++ {
var intermediary *api.PublicNodeApi
for intermediary.ID == n.ID {
intermediary = n.getRandomNode()
}
if err3 := o.AddLayer(intermediary.Address, intermediary.PublicKey); err3 != nil {
return nil, fmt.Errorf("NewOnion(): failed to add layer: %w", err3)
}
}
return o, nil
}
} else {
return nil, fmt.Errorf("NewOnion(): failed to get node with id %d", msg.To)
}
}
}

func (n *Node) startRun(activeNodes []api.PublicNodeApi) (didParticipate bool, e error) {
slog.Info("Starting run with", "num_onions", n.QueuedOnions.GetLen())
n.mu.Lock()
if len(activeNodes) == 0 {
n.mu.Unlock()
return false, fmt.Errorf("startRun(): no active nodes")
}
n.ActiveNodes = activeNodes
onionsToSend := n.OnionQueue
n.OnionQueue = make([]*Onion, 0)
n.mu.Unlock()
slog.Info("Starting run with", "num_onions", len(onionsToSend))
n.wg.Wait()
n.wg.Add(1)
defer n.wg.Done()

n.OtherNodes.Clear()
var participate bool = false
for _, node := range activeNodes {
n.updateNode(&node)
if node.ID == n.ID {
participate = true
}
}
if participate {
for {
if o, err := n.QueuedOnions.Dequeue(); o == nil || err != nil {
if err.Error() == "empty queue" {
break
}
return true, fmt.Errorf("startRun(): failed to dequeue onion: %w", err)
} else if on, ok := o.(*Onion); !ok {
return true, fmt.Errorf("startRun(): invalid onion type: %T", o)
} else if err2 := sendToNode(on); err2 != nil {
for _, onion := range onionsToSend {
if err2 := sendToNode(onion); err2 != nil {
return true, fmt.Errorf("startRun(): failed to send onion to next node: %w", err2)
}
}
Expand All @@ -101,9 +166,7 @@ func (n *Node) Receive(o *Onion) error {
if err := o.RemoveLayer(n.PrivateKey); err != nil {
return fmt.Errorf("node.Receive(): failed to remove layer: %w", err)
} else if o.HasNextLayer() {
if _, present := n.OtherNodes.Get(o.Address); !present {
return fmt.Errorf("node.Receive(): next node not found: %s", o.Address)
} else if err2 := sendToNode(o); err2 != nil {
if err2 := sendToNode(o); err2 != nil {
return fmt.Errorf("node.Receive(): failed to send to next node: %w", err2)
}
} else if o.HasMessage() {
Expand Down
60 changes: 45 additions & 15 deletions internal/node/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"encoding/json"
"fmt"
"github.com/HannahMarsh/pi_t-experiment/internal/api"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"golang.org/x/exp/slog"
"io"
"net/http"
"time"
)

func (n *Node) HandleReceive(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -54,11 +54,9 @@ func (n *Node) HandleClientRequest(w http.ResponseWriter, r *http.Request) {
}
slog.Info("Enqueuing messages", "num_messages", len(msgs))
for _, msg := range msgs {
if err := n.QueuedRequests.Enqueue(msg); err != nil {
slog.Error("Error enqueuing message", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
n.mu.Lock()
n.MessageQueue = append(n.MessageQueue, &msg)
n.mu.Unlock()
}
w.WriteHeader(http.StatusOK)
}
Expand All @@ -85,16 +83,48 @@ func (n *Node) RegisterWithBulletinBoard() error {
}
}

func (n *Node) GetActiveNodes() ([]api.PublicNodeApi, error) {
url := fmt.Sprintf("%s/nodes", n.BulletinBoardUrl)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("error making GET request to %s: %v", url, err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
fmt.Printf("error closing response body: %v\n", err)
}
}(resp.Body)

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

var activeNodes []api.PublicNodeApi
if err = json.NewDecoder(resp.Body).Decode(&activeNodes); err != nil {
return nil, fmt.Errorf("error decoding response: %v", err)
}

return activeNodes, nil
}

func (n *Node) updateBulletinBoard() error {
// getsnapshot of requested messages
pr := api.PrivateNodeApi{
TimeOfRequest: time.Now(),
ID: n.ID,
Address: n.NodeInfo.Address,
PublicKey: n.PublicKey,
MessageQueue:
n.mu.Lock()
a, _ := n.GetActiveNodes()
if a != nil && len(a) > 0 {
n.ActiveNodes = a
}
if data, err := json.Marshal(n.NodeInfo); err != nil {
m := utils.NewStream(n.MessageQueue).MapToInt(func(msg *api.Message) int {
return msg.To
}).Array
n.mu.Unlock()
nodeInfo := api.PrivateNodeApi{
ID: n.ID,
Address: n.NodeInfo.Address,
PublicKey: n.PublicKey,
MessageQueue: m,
}
if data, err := json.Marshal(nodeInfo); err != nil {
return fmt.Errorf("node.RegisterWithBulletinBoard(): failed to marshal node info: %w", err)
} else {
url := n.BulletinBoardUrl + "/update"
Expand All @@ -107,7 +137,7 @@ func (n *Node) updateBulletinBoard() error {
fmt.Printf("node.RegisterWithBulletinBoard(): error closing response body: %v\n", err2)
}
}(resp.Body)
if resp.StatusCode != http.StatusCreated {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("node.RegisterWithBulletinBoard(): failed to register node, status code: %d, %s", resp.StatusCode, resp.Status)
}
return nil
Expand Down
13 changes: 13 additions & 0 deletions internal/node/onion.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ func (o *Onion) RemoveLayer(privateKey []byte) error {
}
}

func NewOnion(addr string, msg []byte, publicKey []byte) (*Onion, error) {
if encryptedData, err := encrypt(msg, publicKey); err != nil {
return nil, fmt.Errorf("newOnion(): failed to encrypt message: %w", err)
} else {
return &Onion{
Address: addr,
Data: nil,
Message: encryptedData,
}, nil
}
}

func (o *Onion) AddLayer(addr string, publicKey []byte) error {
if b, err := toBytes(o); err != nil {
return fmt.Errorf("onion.AddLayer(): failed to add layer: %w", err)
Expand All @@ -61,6 +73,7 @@ func (o *Onion) AddLayer(addr string, publicKey []byte) error {
} else {
o.Address = addr
o.Data = encryptedData
o.Message = nil
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ type MapStream[K comparable, V any] struct {
Values map[K]V
}

func NewStream[T any](values []T) Stream[T] {
return Stream[T]{
func NewStream[T any](values []T) *Stream[T] {
return &Stream[T]{
Array: values,
}
}
Expand Down

0 comments on commit ffbef78

Please sign in to comment.