From f838321d6c9d1089e384ecd605c13fe59a1dc9bc Mon Sep 17 00:00:00 2001 From: KOSASIH Date: Tue, 4 Jun 2024 19:32:05 +0700 Subject: [PATCH] Create node.py --- node.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 node.py diff --git a/node.py b/node.py new file mode 100644 index 0000000..fca6a4c --- /dev/null +++ b/node.py @@ -0,0 +1,77 @@ +import socket +import threading +from queue import Queue +from time import sleep + +class Node: + def __init__(self, node_id, host, port): + self.node_id = node_id + self.host = host + self.port = port + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((self.host, self.port)) + self.socket.listen(5) + self.clients = [] + self.data_queue = Queue() + self.running = True + self.discovery_thread = threading.Thread(target=self.discover_nodes) + self.discovery_thread.start() + + def discover_nodes(self): + while self.running: + try: + broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + message = f'Node Discovery: {self.node_id}' + broadcast_socket.sendto(message.encode(), ('', 9000)) + sleep(5) + except Exception as e: + print(f'Error during node discovery: {e}') + + def connect_client(self, client_socket, client_address): + print(f'New connection from {client_address}') + self.clients.append((client_socket, client_address)) + client_thread = threading.Thread(target=self.handle_client, args=(client_socket, client_address)) + client_thread.start() + + def handle_client(self, client_socket, client_address): + while self.running: + try: + data = client_socket.recv(1024) + if data: + self.data_queue.put((client_address, data)) + else: + break + except Exception as e: + print(f'Error handling client: {e}') + break + + self.clients.remove((client_socket, client_address)) + client_socket.close() + + def send_data(self, data, destination_address): + for client in self.clients: + if client[1] == destination_address: + client[0].sendall(data) + + def receive_data(self): + while self.running: + try: + address, data = self.data_queue.get() + print(f'Received data from {address}: {data.decode()}') + except Exception as e: + print(f'Error receiving data: {e}') + + def stop(self): + self.running = False + self.socket.close() + self.discovery_thread.join() + +if __name__ == '__main__': + node = Node(1, '127.0.0.1', 9001) + try: + while True: + pass + except KeyboardInterrupt: + node.stop()