-
Notifications
You must be signed in to change notification settings - Fork 1
/
httpserver
executable file
·161 lines (134 loc) · 5.84 KB
/
httpserver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
import http
import json
import socket
import subprocess
import threading
from argparse import Namespace, ArgumentParser
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from sys import maxsize
import utils
from cache import RepliCache
from utils import get_local_ip
ORIGIN_SERVER = "cs5700cdnorigin.ccs.neu.edu"
GRADING_BEACON_PATH = "/grading/beacon"
DEBUG_CACHE = "/debug/cache"
DEBUG_LOGS = "/debug/logs"
cache_test_mode = False
if cache_test_mode:
print("Cache running in test mode")
repli_cache = RepliCache(test_mode=cache_test_mode)
class CdnHttpHandler(BaseHTTPRequestHandler):
"""
This class represents a CDN http handler.
"""
def do_GET(self) -> None:
"""
Fulfills a GET request.
"""
if self.path == GRADING_BEACON_PATH:
self.send_response(204)
self.send_header("Host", socket.gethostname())
self.end_headers()
elif self.path == DEBUG_CACHE: #
resp = json.dumps(repli_cache.articles, default=lambda obj: obj.__dict__)
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(resp)))
self.send_header("Host", socket.gethostname())
self.end_headers()
self.wfile.write(resp.encode())
elif self.path == DEBUG_LOGS:
with open("logs.txt") as fd:
logs = fd.read()
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(logs)))
self.send_header("Host", socket.gethostname())
self.end_headers()
self.wfile.write(logs.encode())
else:
found, data = repli_cache.get(self.path)
if not found: # if the article object doesn't exist
self.send_error(code=http.HTTPStatus.NOT_FOUND) # return a 404 http status code
return
else: # otherwise return the article object
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Encoding", "gzip")
self.send_header("Content-Length", str(len(data)))
self.send_header("Host", socket.gethostname())
self.end_headers()
self.wfile.write(data)
def do_POST(self) -> None:
"""
Fulfills a POST request.
"""
if self.path == "/measure": # send the active measurements to the DNS server
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length).decode() # unpack the client's IP addresses
ips_to_measure = json.loads(post_data) # extract the client's IP addresses
measurements = {}
measure_thread = threading.Thread(
target=measure, args=(ips_to_measure, measurements)
) # create jobs for the threads responsible for the active measurements
measure_thread.start()
avg_cpu_usage = utils.get_avg_cpu_percent() # get the current http replica server's CPU usage
measure_thread.join()
response = json.dumps({"rtts": measurements, "cpu": avg_cpu_usage}) # format the response to the DNS server
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(response)))
self.send_header("Host", socket.gethostname())
self.end_headers()
self.wfile.write(response.encode()) # pack and send the active measurements to the
else: # the endpoint doesn't exist
self.send_error(code=http.HTTPStatus.NOT_FOUND)
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
"""
This class represents a multi-threaded http handler.
"""
pass
def measure(client_ips: list, measurements: dict) -> None:
"""
Runs scamper to get RTTs active measurements by pinging the client servers.
:param client_ips: the list of client's IP addresses
:param measurements: the active measurements
"""
scamper_output = subprocess.check_output(
["scamper", "-p", "10", "-c", "ping", "-i", *client_ips, "-O", "json"]
).decode() # run scamper as a subprocess and ping clients with 10 packets per second
scamper_output = scamper_output.split("\n")[1:-2] # get rid of first and last line
for line in scamper_output: # parse the output to only keep the client's IP address and RTT
json_object = json.loads(line)
dst_ip = json_object["dst"]
if "avg" in json_object["statistics"]: # if the average RTT is present in the JSON object
rtt = json_object["statistics"]["avg"]
measurements[dst_ip] = rtt # map client's IP address to RTT
else: # otherwise set the RTT to infinity
measurements[dst_ip] = maxsize
def parse_args() -> Namespace:
"""
Parses the command line arguments.
:return: the command line arguments
"""
parser = ArgumentParser()
parser.add_argument("-p", type=int, help="the port number the server will bind to")
parser.add_argument("-o", type=str, help="the name of the origin server")
args = parser.parse_args()
return args
def main():
global ORIGIN_SERVER
args = parse_args()
port = args.p
ORIGIN_SERVER = args.o
web_server = ThreadingSimpleServer((get_local_ip(), port), CdnHttpHandler)
try:
print(f"Starting replica at http://{get_local_ip()}:{port}")
web_server.serve_forever()
except KeyboardInterrupt:
web_server.server_close()
print("Server stopped")
if __name__ == "__main__":
main()