diff --git a/cmd/bulletin-board/main.go b/cmd/bulletin-board/main.go index 9a99bdc..c1ca7de 100755 --- a/cmd/bulletin-board/main.go +++ b/cmd/bulletin-board/main.go @@ -65,6 +65,7 @@ func main() { http.HandleFunc("/register", bulletinBoard.HandleRegisterNode) http.HandleFunc("/update", bulletinBoard.HandleUpdateNodeInfo) + http.HandleFunc("/nodes", bulletinBoard.HandleGetActiveNodes) go func() { address := fmt.Sprintf(":%d", port) diff --git a/internal/bulletin_board/bulletin_board_handler.go b/internal/bulletin_board/bulletin_board_handler.go index b8503be..dfd2ac2 100644 --- a/internal/bulletin_board/bulletin_board_handler.go +++ b/internal/bulletin_board/bulletin_board_handler.go @@ -35,7 +35,7 @@ func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Req http.Error(w, err.Error(), http.StatusBadRequest) return } - slog.Info("Updating node with id=%d", nodeInfo.ID) + slog.Info("Updating node with", "id", nodeInfo.ID) if err := bb.UpdateNode(&nodeInfo); err != nil { fmt.Printf("Error updating node %d: %v\n", nodeInfo.ID, err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -44,6 +44,26 @@ func (bb *BulletinBoard) HandleUpdateNodeInfo(w http.ResponseWriter, r *http.Req w.WriteHeader(http.StatusOK) } +// HandleGetActiveNodes handles GET requests to return all active nodes +func (bb *BulletinBoard) HandleGetActiveNodes(w http.ResponseWriter, r *http.Request) { + bb.mu.Lock() + defer bb.mu.Unlock() + activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool { + return node.IsActive() + }).GetValues().Array + + activeNodesApis := utils.Map(activeNodes, func(node *NodeView) api.PublicNodeApi { + return node.Api + }) + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(activeNodesApis); err != nil { + slog.Error("Error encoding response", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + func (bb *BulletinBoard) signalNodesToStart() error { slog.Info("Signaling nodes to start") activeNodes := utils.NewMapStream(bb.Network).Filter(func(_ int, node *NodeView) bool { diff --git a/internal/node/node.go b/internal/node/node.go index 643f875..62238a4 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -7,12 +7,13 @@ import ( "fmt" "github.com/HannahMarsh/pi_t-experiment/internal/api" "github.com/HannahMarsh/pi_t-experiment/pkg/utils" - "github.com/enriquebris/goconcurrentqueue" - "github.com/orcaman/concurrent-map/v2" "golang.org/x/exp/slog" "io" "net/http" "sync" + "time" + + "math/rand" ) // Node represents a node in the onion routing network @@ -22,9 +23,10 @@ type Node struct { Port int PublicKey []byte PrivateKey []byte - OtherNodes cmap.ConcurrentMap[string, *api.PublicNodeApi] - QueuedOnions goconcurrentqueue.Queue - QueuedRequests goconcurrentqueue.Queue + ActiveNodes []api.PublicNodeApi + MessageQueue []*api.Message + OnionQueue []*Onion + mu sync.Mutex NodeInfo api.PublicNodeApi BulletinBoardUrl string wg sync.WaitGroup @@ -37,14 +39,14 @@ func NewNode(id int, host string, port int, bulletinBoardUrl string) (*Node, err return nil, fmt.Errorf("node.NewNode(): failed to generate key pair: %w", err) } else { n := &Node{ - ID: id, - Host: host, - Port: port, - PublicKey: publicKey, - PrivateKey: privateKey, - OtherNodes: cmap.New[*api.PublicNodeApi](), - QueuedOnions: goconcurrentqueue.NewFIFO(), - QueuedRequests: goconcurrentqueue.NewFIFO(), + ID: id, + Host: host, + Port: port, + PublicKey: publicKey, + PrivateKey: privateKey, + ActiveNodes: make([]api.PublicNodeApi, 0), + MessageQueue: make([]*api.Message, 0), + OnionQueue: make([]*Onion, 0), NodeInfo: api.PublicNodeApi{ ID: id, Address: fmt.Sprintf("%s:%d", host, port), @@ -57,38 +59,101 @@ func NewNode(id int, host string, port int, bulletinBoardUrl string) (*Node, err return nil, fmt.Errorf("node.NewNode(): failed to register with bulletin board: %w", err2) } + go n.StartPeriodicUpdates(time.Second * 1) + return n, nil } } -func (n *Node) updateNode(node *api.PublicNodeApi) { - n.OtherNodes.Set(node.Address, node) +func (n *Node) StartPeriodicUpdates(interval time.Duration) { + ticker := time.NewTicker(interval) + go func() { + for range ticker.C { + if err := n.updateBulletinBoard(); err != nil { + fmt.Printf("Error updating bulletin board: %v\n", err) + return + } + n.ProcessMessageQueue() + } + }() +} + +func (n *Node) ProcessMessageQueue() { + n.mu.Lock() + defer n.mu.Unlock() + for _, msg := range n.MessageQueue { + // Create an onion from the message + if onion, err := n.NewOnion(msg, 1); err != nil { + fmt.Printf("Error creating onion: %v\n", err) + } else { + n.OnionQueue = append(n.OnionQueue, onion) + } + } +} + +func (n *Node) getNode(id int) *api.PublicNodeApi { + for _, node := range n.ActiveNodes { + if node.ID == id { + return &node + } + } + return nil +} + +func (n *Node) getRandomNode() *api.PublicNodeApi { + r := rand.Intn(len(n.ActiveNodes)) + return &n.ActiveNodes[r] +} + +func (n *Node) NewOnion(msg *api.Message, pathLength int) (*Onion, error) { + if msg_string, err := json.Marshal(msg); err != nil { + return nil, fmt.Errorf("NewOnion(): failed to marshal message: %w", err) + } else { + if to := n.getNode(msg.To); to != nil { + if o, err2 := NewOnion(fmt.Sprintf("%s/receive", to.Address), msg_string, to.PublicKey); err2 != nil { + return nil, fmt.Errorf("NewOnion(): failed to create onion: %w", err2) + } else { + for i := 0; i < pathLength; i++ { + var intermediary *api.PublicNodeApi + for intermediary.ID == n.ID { + intermediary = n.getRandomNode() + } + if err3 := o.AddLayer(intermediary.Address, intermediary.PublicKey); err3 != nil { + return nil, fmt.Errorf("NewOnion(): failed to add layer: %w", err3) + } + } + return o, nil + } + } else { + return nil, fmt.Errorf("NewOnion(): failed to get node with id %d", msg.To) + } + } } func (n *Node) startRun(activeNodes []api.PublicNodeApi) (didParticipate bool, e error) { - slog.Info("Starting run with", "num_onions", n.QueuedOnions.GetLen()) + n.mu.Lock() + if len(activeNodes) == 0 { + n.mu.Unlock() + return false, fmt.Errorf("startRun(): no active nodes") + } + n.ActiveNodes = activeNodes + onionsToSend := n.OnionQueue + n.OnionQueue = make([]*Onion, 0) + n.mu.Unlock() + slog.Info("Starting run with", "num_onions", len(onionsToSend)) n.wg.Wait() n.wg.Add(1) defer n.wg.Done() - n.OtherNodes.Clear() var participate bool = false for _, node := range activeNodes { - n.updateNode(&node) if node.ID == n.ID { participate = true } } if participate { - for { - if o, err := n.QueuedOnions.Dequeue(); o == nil || err != nil { - if err.Error() == "empty queue" { - break - } - return true, fmt.Errorf("startRun(): failed to dequeue onion: %w", err) - } else if on, ok := o.(*Onion); !ok { - return true, fmt.Errorf("startRun(): invalid onion type: %T", o) - } else if err2 := sendToNode(on); err2 != nil { + for _, onion := range onionsToSend { + if err2 := sendToNode(onion); err2 != nil { return true, fmt.Errorf("startRun(): failed to send onion to next node: %w", err2) } } @@ -101,9 +166,7 @@ func (n *Node) Receive(o *Onion) error { if err := o.RemoveLayer(n.PrivateKey); err != nil { return fmt.Errorf("node.Receive(): failed to remove layer: %w", err) } else if o.HasNextLayer() { - if _, present := n.OtherNodes.Get(o.Address); !present { - return fmt.Errorf("node.Receive(): next node not found: %s", o.Address) - } else if err2 := sendToNode(o); err2 != nil { + if err2 := sendToNode(o); err2 != nil { return fmt.Errorf("node.Receive(): failed to send to next node: %w", err2) } } else if o.HasMessage() { diff --git a/internal/node/node_handler.go b/internal/node/node_handler.go index 40767f3..6aeaa42 100644 --- a/internal/node/node_handler.go +++ b/internal/node/node_handler.go @@ -5,10 +5,10 @@ import ( "encoding/json" "fmt" "github.com/HannahMarsh/pi_t-experiment/internal/api" + "github.com/HannahMarsh/pi_t-experiment/pkg/utils" "golang.org/x/exp/slog" "io" "net/http" - "time" ) func (n *Node) HandleReceive(w http.ResponseWriter, r *http.Request) { @@ -54,11 +54,9 @@ func (n *Node) HandleClientRequest(w http.ResponseWriter, r *http.Request) { } slog.Info("Enqueuing messages", "num_messages", len(msgs)) for _, msg := range msgs { - if err := n.QueuedRequests.Enqueue(msg); err != nil { - slog.Error("Error enqueuing message", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + n.mu.Lock() + n.MessageQueue = append(n.MessageQueue, &msg) + n.mu.Unlock() } w.WriteHeader(http.StatusOK) } @@ -85,16 +83,48 @@ func (n *Node) RegisterWithBulletinBoard() error { } } +func (n *Node) GetActiveNodes() ([]api.PublicNodeApi, error) { + url := fmt.Sprintf("%s/nodes", n.BulletinBoardUrl) + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("error making GET request to %s: %v", url, err) + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + fmt.Printf("error closing response body: %v\n", err) + } + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var activeNodes []api.PublicNodeApi + if err = json.NewDecoder(resp.Body).Decode(&activeNodes); err != nil { + return nil, fmt.Errorf("error decoding response: %v", err) + } + + return activeNodes, nil +} + func (n *Node) updateBulletinBoard() error { - // getsnapshot of requested messages - pr := api.PrivateNodeApi{ - TimeOfRequest: time.Now(), - ID: n.ID, - Address: n.NodeInfo.Address, - PublicKey: n.PublicKey, - MessageQueue: + n.mu.Lock() + a, _ := n.GetActiveNodes() + if a != nil && len(a) > 0 { + n.ActiveNodes = a } - if data, err := json.Marshal(n.NodeInfo); err != nil { + m := utils.NewStream(n.MessageQueue).MapToInt(func(msg *api.Message) int { + return msg.To + }).Array + n.mu.Unlock() + nodeInfo := api.PrivateNodeApi{ + ID: n.ID, + Address: n.NodeInfo.Address, + PublicKey: n.PublicKey, + MessageQueue: m, + } + if data, err := json.Marshal(nodeInfo); err != nil { return fmt.Errorf("node.RegisterWithBulletinBoard(): failed to marshal node info: %w", err) } else { url := n.BulletinBoardUrl + "/update" @@ -107,7 +137,7 @@ func (n *Node) updateBulletinBoard() error { fmt.Printf("node.RegisterWithBulletinBoard(): error closing response body: %v\n", err2) } }(resp.Body) - if resp.StatusCode != http.StatusCreated { + if resp.StatusCode != http.StatusOK { return fmt.Errorf("node.RegisterWithBulletinBoard(): failed to register node, status code: %d, %s", resp.StatusCode, resp.Status) } return nil diff --git a/internal/node/onion.go b/internal/node/onion.go index 2784ed2..ccae067 100644 --- a/internal/node/onion.go +++ b/internal/node/onion.go @@ -53,6 +53,18 @@ func (o *Onion) RemoveLayer(privateKey []byte) error { } } +func NewOnion(addr string, msg []byte, publicKey []byte) (*Onion, error) { + if encryptedData, err := encrypt(msg, publicKey); err != nil { + return nil, fmt.Errorf("newOnion(): failed to encrypt message: %w", err) + } else { + return &Onion{ + Address: addr, + Data: nil, + Message: encryptedData, + }, nil + } +} + func (o *Onion) AddLayer(addr string, publicKey []byte) error { if b, err := toBytes(o); err != nil { return fmt.Errorf("onion.AddLayer(): failed to add layer: %w", err) @@ -61,6 +73,7 @@ func (o *Onion) AddLayer(addr string, publicKey []byte) error { } else { o.Address = addr o.Data = encryptedData + o.Message = nil return nil } } diff --git a/pkg/utils/stream.go b/pkg/utils/stream.go index 334346e..3cc2c64 100644 --- a/pkg/utils/stream.go +++ b/pkg/utils/stream.go @@ -8,8 +8,8 @@ type MapStream[K comparable, V any] struct { Values map[K]V } -func NewStream[T any](values []T) Stream[T] { - return Stream[T]{ +func NewStream[T any](values []T) *Stream[T] { + return &Stream[T]{ Array: values, } }