-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_replication.py
133 lines (110 loc) · 4.51 KB
/
process_replication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import logging.handlers
import os
from event_bus import Process
import glob
import json
PYTHON_LOGGER = logging.getLogger(__name__)
if not os.path.exists("log"):
os.mkdir("log")
HDLR = logging.handlers.TimedRotatingFileHandler("log/process_replication.log",
when="midnight", backupCount=60)
STREAM_HDLR = logging.StreamHandler()
FORMATTER = logging.Formatter("%(asctime)s %(filename)s [%(levelname)s] %(message)s")
HDLR.setFormatter(FORMATTER)
STREAM_HDLR.setFormatter(FORMATTER)
PYTHON_LOGGER.addHandler(HDLR)
PYTHON_LOGGER.addHandler(STREAM_HDLR)
PYTHON_LOGGER.setLevel(logging.DEBUG)
# Absolute path to the folder location of this python file
FOLDER_ABSOLUTE_PATH = os.path.normpath(os.path.dirname(os.path.abspath(__file__)))
class FileStorage:
FOLDER = "data"
def __init__(self, process_id):
self.process_id = process_id
self.local_data = {}
if not os.path.exists(self.FOLDER):
os.makedirs(self.FOLDER)
def set_value(self, data, value):
self.local_data[data] = value
with open(os.path.join(FOLDER_ABSOLUTE_PATH, self.FOLDER, str(self.process_id)), 'w') as f:
json.dump(self.local_data, f)
def get_value(self, data):
return self.local_data[data]
def get_replica(self, data):
ids = []
for path in glob.glob(os.path.join(FOLDER_ABSOLUTE_PATH, self.FOLDER, '*')):
with open(path) as f:
data_local = json.load(f)
if data in data_local and str(self.process_id) != os.path.basename(path):
ids.append(int(os.path.basename(path)))
return ids
class ProcessReplication(Process):
def __init__(self, process_id, bus_size):
super().__init__(process_id, bus_size)
self.file_storage = FileStorage(self.process_id)
self.in_critical = False
self.acc = -1
def process(self, message_box):
"""
Method call when we get new values
:param message_box: (list of Message) All get messages
"""
for msg in message_box:
data = msg.payload
try:
# If a process ask for critical section
if data == "section":
print("section")
self.in_critical = True
self.communicator.send_to("ok", msg.sender)
# When we get ok from other process
elif data == "ok":
self.acc -= 1
print("acc = {}".format(self.acc))
if self.acc == 0:
self.in_critical = True
# When we need to edit the local value
elif data[0] in self.file_storage.local_data:
print("{} get: {}, {}".format(self.process_id, data[0], data[1]))
self.file_storage.set_value(data[0], data[1])
self.in_critical = False
# send ok
self.communicator.send_to("ok", msg.sender)
except Exception as e:
pass
def edit(self, data, value):
# 1- Stop if a process his already in the critical section
print("{} wait critical".format(self.process_id))
while self.in_critical:
pass
print("{} done...".format(self.process_id))
ids = self.file_storage.get_replica(data)
# ask section critique
self.acc = len(ids)
for id_send in ids:
print("send to: {} the section".format(id_send))
self.communicator.send_to("section", id_send)
# Wait that all process say ok
print("{} wait for all ok".format(self.process_id))
while not self.in_critical:
pass
print("{} ok".format(self.process_id))
# Send new value
self.acc = len(ids)
self.in_critical = False
for id_send in self.file_storage.get_replica(data):
print("send to: {}".format(id_send))
self.communicator.send_to([data, value], id_send)
# Wait that all process receive the new value
print("{} wait for all ok".format(self.process_id))
while not self.in_critical:
pass
print("{} ok".format(self.process_id))
self.in_critical = False
self.file_storage.set_value(data, value)
def set(self, data, value):
print("Add " + data + " " + value)
self.file_storage.set_value(data, value)