-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrun_parallel.py
More file actions
117 lines (92 loc) · 3.71 KB
/
run_parallel.py
File metadata and controls
117 lines (92 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import sys
from logger import get_logger, init as logger_init, shutdown as logger_shutdown
from tsp import *
from pycsp import *
from timer import *
from broadcast_router import *
@multiprocess
def Worker(id, init_chan, task_chan, result_chan, shortest_route_subscribe_chan):
logger = get_logger('worker')
logger.log(id)
shortest_route_chan = Channel()
shortest_route_subscribe_chan(shortest_route_chan.writer())
shortest_route_chan = shortest_route_chan.reader()
distance_matrix = init_chan()
shortest_distance = sys.maxsize
tasks = []
while True:
guards = [InputGuard(shortest_route_chan)]
if not tasks:
guards += [InputGuard(task_chan)]
guards += [SkipGuard()]
(chan, msg) = PriSelect(guards)
if chan == shortest_route_chan:
logger.log('New shortest route: ', msg)
shortest_distance = msg
elif chan == task_chan:
tasks = msg
if not tasks:
task_chan.retire()
break
if tasks:
shortest_route = find_shortest_route(distance_matrix, tasks.pop(), shortest_distance)
if shortest_route and shortest_route.distance < shortest_distance:
result_chan(shortest_route)
logger.log('Terminating')
@process
def Master(init_chan, task_chan, result_chan, shortest_route_chan, num_cities, task_depth):
logger = get_logger('master')
distance_matrix = generate_distance_matrix(num_cities)
# Generate tasks
tasks = get_sub_routes(distance_matrix, task_depth)
logger.log('Number of tasks: ', len(tasks))
shortest_route = Route(distance=sys.maxsize)
while True:
try:
guards = [OutputGuard(init_chan, msg=distance_matrix),
InputGuard(result_chan)]
if tasks:
guards.append(OutputGuard(task_chan, msg=tasks[0:5]))
else:
guards.append(OutputGuard(task_chan, msg=None))
(chan, msg) = AltSelect(guards)
if chan == task_chan:
tasks = tasks[5:]
elif chan == result_chan:
if msg.distance < shortest_route.distance:
shortest_route = msg
logger.log('Broadcasting new shortest route')
shortest_route_chan(shortest_route.distance)
except ChannelRetireException:
break
poison(shortest_route_chan)
logger.log('Shortest path: ', shortest_route.path)
logger.log('Distance: ', shortest_route.distance)
def main(num_cities, task_depth, num_workers):
init_channel = Channel()
task_channel = Channel()
result_channel = Channel()
broadcast_publish_chan = Channel()
broadcast_subscribe_chan = Channel()
logger_init()
workers = []
for i in range(0, num_workers):
workers.append(Worker(i, init_channel.reader(), task_channel.reader(), result_channel.writer(), broadcast_subscribe_chan.writer()))
Parallel(Master(init_channel.writer(), task_channel.writer(), result_channel.reader(), broadcast_publish_chan.writer(), num_cities, task_depth),
workers,
BroadcastRouter(broadcast_publish_chan.reader(), broadcast_subscribe_chan.reader()))
logger_shutdown()
shutdown()
if __name__ == "__main__":
import multiprocessing
multiprocessing.set_start_method('fork')
if len(sys.argv) != 4:
print('usage <num cities> <task depth> <num workers>')
sys.exit(0)
num_cities = int(sys.argv[1])
task_depth = int(sys.argv[2])
num_workers = int(sys.argv[3])
timer = Timer()
with timer:
main(num_cities, task_depth, num_workers)
print('Execution time in seconds: ', timer.duration_in_seconds())