-
Notifications
You must be signed in to change notification settings - Fork 0
/
pi_network.py
144 lines (121 loc) · 5.07 KB
/
pi_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import hashlib
import json
import logging
import os
import random
import time
from typing import Dict, List, Tuple
import cryptography.hazmat.primitives.asymmetric.ed25519 as ed25519
from cryptography.hazmat.primitives import serialization
# Custom protocol for node communication
class PiNetworkProtocol(asyncio.Protocol):
def __init__(self, node: 'PiNode'):
self.node = node
self.transport = None
self.buffer = bytearray()
def connection_made(self, transport: asyncio.Transport) -> None:
self.transport = transport
self.node.log.info(f"Connected to {transport.get_extra_info('peername')}")
def data_received(self, data: bytes) -> None:
self.buffer.extend(data)
while self.buffer:
msg_len = int.from_bytes(self.buffer[:4], 'big')
if len(self.buffer) < msg_len + 4:
break
msg = self.buffer[4:msg_len + 4]
self.buffer = self.buffer[msg_len + 4:]
self.handle_message(msg)
def handle_message(self, msg: bytes) -> None:
msg_type = msg[0]
if msg_type == 0x01: # Node announcement
self.handle_node_announcement(msg[1:])
elif msg_type == 0x02: # Transaction
self.handle_transaction(msg[1:])
elif msg_type == 0x03: # Block
self.handle_block(msg[1:])
else:
self.node.log.warning(f"Unknown message type {msg_type}")
def handle_node_announcement(self, msg: bytes) -> None:
node_id = msg[:32]
node_addr = msg[32:64]
self.node.add_node(node_id, node_addr)
def handle_transaction(self, msg: bytes) -> None:
tx = self.node.deserialize_transaction(msg)
self.node.process_transaction(tx)
def handle_block(self, msg: bytes) -> None:
block = self.node.deserialize_block(msg)
self.node.process_block(block)
def send_message(self, msg: bytes) -> None:
self.transport.write(len(msg).to_bytes(4, 'big') + msg)
class PiNode:
def __init__(self, node_id: bytes, node_addr: bytes, private_key: ed25519.Ed25519PrivateKey):
self.node_id = node_id
self.node_addr = node_addr
self.private_key = private_key
self.public_key = private_key.public_key()
self.nodes: Dict[bytes, bytes] = {}
self.transactions: List[Tuple[bytes, bytes]] = []
self.blocks: List[bytes] = []
self.log = logging.getLogger(f"PiNode-{node_id.hex()}")
def add_node(self, node_id: bytes, node_addr: bytes) -> None:
self.nodes[node_id] = node_addr
self.log.info(f"Added node {node_id.hex()} at {node_addr.hex()}")
def deserialize_transaction(self, msg: bytes) -> Tuple[bytes, bytes]:
tx_id = msg[:32]
tx_data = msg[32:]
return tx_id, tx_data
def deserialize_block(self, msg: bytes) -> bytes:
block_id = msg[:32]
block_data = msg[32:]
return block_id, block_data
def process_transaction(self, tx: Tuple[bytes, bytes]) -> None:
# Verify transaction signature
tx_id, tx_data = tx
signature = tx_data[-64:]
tx_data = tx_data[:-64]
if not self.verify_signature(tx_id, tx_data, signature):
self.log.warning(f"Invalid transaction signature for {tx_id.hex()}")
return
# Process transaction
self.log.info(f"Processing transaction {tx_id.hex()}")
# ...
def process_block(self, block: bytes) -> None:
# Verify block signature
block_id, block_data = block
signature = block_data[-64:]
block_data = block_data[:-64]
if not self.verify_signature(block_id, block_data, signature):
self.log.warning(f"Invalid block signature for {block_id.hex()}")
return
# Process block
self.log.info(f"Processing block {block_id.hex()}")
# ...
def verify_signature(self, msg_id: bytes, msg_data: bytes, signature: bytes) -> bool:
public_key = self.public_key
try:
public_key.verify(signature, msg_id + msg_data, padding.cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding)
return True
except cryptography.exceptions.InvalidSignature:
return False
asyncdef start(self) -> None:
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: PiNetworkProtocol(self), self.node_addr, self.node_addr)
await coro
def generate_node_id() -> bytes:
return os.urandom(32)
def generate_private_key() -> ed25519.Ed25519PrivateKey:
return ed25519.Ed25519PrivateKey.generate()
def generate_node(node_id: bytes, node_addr: bytes) -> PiNode:
private_key = generate_private_key()
public_key = private_key.public_key()
return PiNode(node_id, node_addr, private_key)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
node_id = generate_node_id()
node_addr = socket.gethostbyname(socket.gethostname()) + ":5000"
node = generate_node(node_id, node_addr.encode())
try:
asyncio.run(node.start())
except KeyboardInterrupt:
pass