-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmaster_node.py
More file actions
288 lines (241 loc) · 11.7 KB
/
master_node.py
File metadata and controls
288 lines (241 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""
Master Node implementation in Python3 for Space Opera
"""
from absl import app, flags, logging
from concurrent import futures
import grpc
import master_comm_pb2
import master_comm_pb2_grpc
import node_comm_pb2
import node_comm_pb2_grpc
import random
import redis
# Redis configuration
# REDIS_HOST = "localhost"
# REDIS_PORT = 6379
# REDIS_PASSWORD = ""
REDIS_HOST = "a5c2cf3b49b5646829aa1b82ee0d2611-1520476928.us-west-1.elb.amazonaws.com"
REDIS_PORT = 6379
REDIS_PASSWORD = "sdfkj!$n7jh1&%"
# Redis keys
NETWORK_NODES = "network:nodes"
NETWORK_DATA = "network:data"
NETWORK_NODE_DATA = "network:data:node:%s"
NETWORK_DATA_FILE = "network:data:file:%s"
# Constants
REPLICATION_FACTOR = 3
# Absl flags
FLAGS = flags.FLAGS
flags.DEFINE_string('redis_host', REDIS_HOST, 'Redis host')
flags.DEFINE_integer('redis_port', REDIS_PORT, 'Redis port')
flags.DEFINE_string('redis_password', REDIS_PASSWORD, 'Redis password')
flags.DEFINE_integer('replication_factor', REPLICATION_FACTOR, 'Replication factor for Space Opera network')
# Redis client that will store all network node and file information
redis_client = None
class MasterComm(master_comm_pb2_grpc.ReplicationServicer):
# Functions for Gateway
def NewNodeUpdate(self, request, context):
"""
NewNodeUpdate is invoked when a new node joins the network
Invoked by Gateway
"""
logging.info(f"NewNodeUpdate invoked with request: {request}")
if request.newnodeip:
# Save new node to Redis DB
redis_client.sadd(NETWORK_NODES, request.newnodeip)
logging.info(f"New node {request.newnodeip} added to network")
return master_comm_pb2.NewNodeUpdateResponse(status = "SUCCESS")
else:
logging.info("[ERROR]: NewNodeUpdate invoked with empty request")
return master_comm_pb2.NewNodeUpdateResponse(status = "FAILURE")
def GetNodeForDownload(self, request, context):
"""
GetNodeForDownload is invoked when a client wants to download a file
Invoked by Gateway
"""
logging.info(f"GetNodeForDownload invoked with request: {request}")
if request.filename:
nodes = redis_client.smembers(NETWORK_DATA_FILE % request.filename)
if nodes:
# Return random node that contains the file
node = random.choice(list(nodes))
logging.info(f"Returning node {node} for file {request.filename}")
return master_comm_pb2.GetNodeForDownloadResponse(nodeip=node)
else:
logging.info("Returning empty nodeip")
return master_comm_pb2.GetNodeForDownloadResponse()
else:
logging.info("[ERROR]: GetNodeForDownload invoked with empty request")
return master_comm_pb2.GetNodeForDownloadResponse()
def GetNodeForUpload(self, request, context):
"""
GetNodeForUpload is invoked when a client wants to upload a file
Invoked by Gateway
"""
logging.info(f"GetNodeForUpload invoked with request: {request}")
if request.filename:
# Build mapping of nodes to number of files stored on them
nodes = {}
for node in redis_client.smembers(NETWORK_NODES):
nodes[node] = len(redis_client.smembers(NETWORK_NODE_DATA % node))
if len(nodes) == 0:
logging.info(f"No nodes available on network to upload file {request.filename}")
return master_comm_pb2.GetNodeForUploadResponse(nodeip = "")
# Sort nodes by number of files stored on them
sorted_nodes = sorted(nodes.items(), key=lambda x: x[1])
# Node with least number of files
node_ip = sorted_nodes[0][0]
redis_client.sadd(NETWORK_DATA, request.filename)
redis_client.sadd(NETWORK_NODE_DATA % node_ip, request.filename)
redis_client.sadd(NETWORK_DATA_FILE % request.filename, node_ip)
logging.info(f"Returning node {node_ip} for file upload of {request.filename}")
return master_comm_pb2.GetNodeForUploadResponse(nodeip = node_ip)
else:
logging.info("[ERROR]: GetNodeForUpload invoked with empty request")
return master_comm_pb2.GetNodeForUploadResponse()
# Functions for Sentinel
def NodeDownUpdate(self, request, context):
"""
NodeDownUpdate is invoked when a node is unresponsive as detected by the Sentinel
Invoked by Sentinel
"""
logging.info(f"NodeDownUpdate invoked with request: {request}")
if request.nodeip:
# Replicate all the files stored on the node
files = redis_client.smembers(NETWORK_NODE_DATA % request.nodeip)
for file in files:
replica_nodes = redis_client.sdiff(NETWORK_NODES, NETWORK_DATA_FILE % file)
replica_node = None
if replica_nodes:
replica_node = random.choice(list(replica_nodes))
else:
logging.info(f"[ERROR]: No nodes available for replicating file {file}")
# Remove node from network
redis_client.srem(NETWORK_NODES, request.nodeip)
logging.info(f"Node {request.nodeip} removed from network")
return master_comm_pb2.NodeDownUpdateResponse(status = "SUCCESS")
# Send message to node to replicate file to replica_node
logging.info(f"Replicating file {file} to {replica_node}")
nodes = redis_client.smembers(NETWORK_DATA_FILE % file)
# Get a node that has the required file and isn't the node that is down
# logging.info("NODES THAT WE GOT ARE")
# logging.info(nodes)
# logging.info(request.nodeip)
if len(nodes) > 1:
for node in nodes:
if node != request.nodeip:
logging.info(f"Sending request to {node} to replicate file {file}")
replicate_response = None
with grpc.insecure_channel(f"{node}:6080") as node_channel:
stub = node_comm_pb2_grpc.NodeReplicationStub(node_channel)
replicate_response = stub.ReplicateFile(node_comm_pb2.ReplicateFileRequest(filename=file, nodeips=[replica_node]))
logging.info(f"Response from ReplicateFile request: {replicate_response}")
if replicate_response != None and replicate_response.status == "SUCCESS":
break
else:
logging.info("[ERROR]: We can't replicate to node...")
# Remove node from network
redis_client.srem(NETWORK_NODES, request.nodeip)
logging.info(f"Node {request.nodeip} removed from network")
return master_comm_pb2.NodeDownUpdateResponse(status = "SUCCESS")
else:
logging.info("[ERROR]: NodeDownUpdate invoked with empty request")
return master_comm_pb2.NodeDownUpdateResponse(status = "FAILURE")
def GetListOfNodes(self, request, context):
"""
GetListOfNodes is invoked when the Sentinel wants to know the list of nodes in the network
Invoked by Sentinel
"""
logging.info(f"GetListOfNodes invoked with request: {request}")
nodes = redis_client.smembers(NETWORK_NODES)
logging.info(f"Returning list of nodes: {nodes}")
return master_comm_pb2.GetListOfNodesResponse(nodeips=list(nodes))
# Functions for Node
def GetNodeIpsForReplication(self, request, context):
"""
GetNodeIpsForReplication is invoked when a file has been uploaded to the Node and needs to be replicated
Invoked by Node
"""
logging.info(f"GetNodeIpsForReplication invoked with request: {request}")
if request.filename:
# Get list of nodes that don't have the file
new_nodes = redis_client.sdiff(NETWORK_NODES, NETWORK_DATA_FILE % request.filename)
# Build map of number of files per node
nodes = {}
for node in new_nodes:
nodes[node] = len(redis_client.smembers(NETWORK_NODE_DATA % node))
# Sort nodes by number of files stored on them
sorted_nodes = sorted(nodes.items(), key=lambda x: x[1])
replication_nodes = []
count = 0
for node in sorted_nodes:
replication_nodes.append(node[0])
count += 1
if count == FLAGS.replication_factor - 1:
break
# Return list of nodes to replicate
logging.info(f"Returning list of nodes to replicate: {replication_nodes}")
return master_comm_pb2.NodeIpsReply(nodeips = replication_nodes)
else:
logging.info("[ERROR]: GetNodeIpsForReplication invoked with empty request")
return master_comm_pb2.NodeIpsReply()
def UpdateReplicationStatus(self, request, context):
"""
UpdateReplicationStatus is invoked when a node has finished replicating a file to all the nodes
Invoked by Node
"""
logging.info(f"UpdateReplicationStatus invoked with request: {request}")
if request.filename and request.nodeips:
for node in request.nodeips:
# Add file to node
redis_client.sadd(NETWORK_NODE_DATA % node, request.filename)
# Add node to file
redis_client.sadd(NETWORK_DATA_FILE % request.filename, node)
return master_comm_pb2.StatusResponse(status = master_comm_pb2.Status.Value("SUCCESS"))
else:
logging.info("[ERROR]: UpdateReplicationStatus invoked with empty request")
return master_comm_pb2.StatusResponse(status = master_comm_pb2.Status.Value("FAILURE"))
# Functions for CLI
def GetListOfFiles(self, request, context):
"""
GetListOfFiles is invoked when the CLI wants to know the list of files in the network
Invoked by CLI
"""
logging.info(f"GetListOfFiles invoked with request: {request}")
if request.nodeips:
files = set()
for node in request.nodeips:
# Get list of files on node
node_files = redis_client.smembers(NETWORK_NODE_DATA % node)
files.update(node_files)
logging.info(f"Returning list of files: {files} for nodes: {request.nodeips}")
return master_comm_pb2.GetListOfFilesResponse(files=list(files))
else:
# Return list of all files on the network
files = redis_client.smembers(NETWORK_DATA)
logging.info(f"Returning list of all files on network: {files}")
return master_comm_pb2.GetListOfFilesResponse(filenames = list(files))
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
master_comm_pb2_grpc.add_ReplicationServicer_to_server(MasterComm(), server)
# server.add_insecure_port('[::]:50051')
server.add_insecure_port('[::]:6090')
server.start()
server.wait_for_termination()
def setup_redis():
"""
Setup Redis client
"""
global redis_client
try:
redis_client = redis.Redis(host=FLAGS.redis_host, port=FLAGS.redis_port, password=FLAGS.redis_password, decode_responses=True)
except Exception as e:
logging.info("[ERROR]: Error while connecting to redis: %s", e)
exit(1)
def main(argv):
logging.info("Setting up redis client...")
setup_redis()
logging.info("Starting gRPC server...")
serve()
if __name__ == '__main__':
app.run(main)