forked from harrischristiansen/generals-bot
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathDistanceMapperImpl.py
More file actions
146 lines (119 loc) · 5.49 KB
/
DistanceMapperImpl.py
File metadata and controls
146 lines (119 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import time
import typing
from collections import deque
import logbook
import SearchUtils
from Interfaces import MapMatrixInterface
from MapMatrix import MapMatrix
from base.client.map import DistanceMapper, MapBase, Tile
UNREACHABLE = 1000
"""The placeholder distance value for unreachable land"""
class DistanceMapperImpl(DistanceMapper):
# THIS IS NOT THREAD SAFE
# THIS IS NOT THREAD SAFE
# THIS IS NOT THREAD SAFE
# THIS IS NOT THREAD SAFE
def __init__(self, map: MapBase):
self.map: MapBase = map
self._dists: MapMatrixInterface[MapMatrixInterface[int] | None] = MapMatrix(map)
self.time_total: float = 0.0
self.time_building_distmaps: float = 0.0
self.resets_total: int = 0
def get_distance_between_or_none(self, tileA: Tile, tileB: Tile) -> int | None:
"""Performs worse than the dual cache version."""
dist = self.get_distance_between(tileA, tileB)
# above is already timed
start = time.perf_counter()
if dist > 999:
# TODO this is a debug assert setup...
if tileB in self.map.reachable_tiles and tileA in self.map.reachable_tiles:
logbook.error(f'tileA {str(tileA)} and tileB {str(tileB)} both in reachable, but had bad distance. Force recalculating all distances...')
self.recalculate()
self.time_total += time.perf_counter() - start
return self.get_distance_between(tileA, tileB)
self.time_total += time.perf_counter() - start
return None
else:
self.time_total += time.perf_counter() - start
return dist
def get_distance_between_or_none_dual_cache(self, tileA: Tile, tileB: Tile) -> int | None:
dist = self.get_distance_between_dual_cache(tileA, tileB)
# above is already timed
start = time.perf_counter()
if dist > 999:
# TODO this is a debug assert setup...
if tileB in self.map.reachable_tiles and tileA in self.map.reachable_tiles:
logbook.error(f'tileA {str(tileA)} and tileB {str(tileB)} both in reachable, but had bad distance. Force recalculating all distances...')
self.recalculate()
self.time_total += time.perf_counter() - start
return self.get_distance_between(tileA, tileB)
self.time_total += time.perf_counter() - start
return None
else:
self.time_total += time.perf_counter() - start
return dist
def get_distance_between(self, tileA: Tile, tileB: Tile) -> int:
start = time.perf_counter()
"""Performs worse than the dual cache version."""
tileDists = self._dists.raw[tileA.tile_index]
if tileDists is None:
tileDists = self._build_distance_map_matrix_fast(tileA)
self._dists.raw[tileA.tile_index] = tileDists
self.time_total += time.perf_counter() - start
return tileDists.raw[tileB.tile_index]
def get_distance_between_dual_cache(self, tileA: Tile, tileB: Tile) -> int:
start = time.perf_counter()
tileDists = self._dists.raw[tileA.tile_index]
if tileDists is None:
bDists = self._dists.raw[tileB.tile_index]
if bDists is None:
tileDists = self._build_distance_map_matrix_fast(tileA)
self._dists.raw[tileA.tile_index] = tileDists
else:
return bDists.raw[tileA.tile_index]
self.time_total += time.perf_counter() - start
return tileDists.raw[tileB.tile_index]
def get_tile_dist_matrix(self, tile: Tile) -> MapMatrixInterface[int]:
start = time.perf_counter()
tileDists = self._dists.raw[tile.tile_index]
if tileDists is None:
tileDists = self._build_distance_map_matrix_fast(tile)
self._dists.raw[tile.tile_index] = tileDists
self.time_total += time.perf_counter() - start
return tileDists
def _build_distance_map_matrix_fast(self, startTile: Tile) -> MapMatrixInterface[int]:
start = time.perf_counter()
distanceMap = MapMatrix(self.map, UNREACHABLE)
frontier = deque()
raw = distanceMap.raw
raw[startTile.tile_index] = 0
frontier.append((startTile, 0))
dist: int
current: Tile
while frontier:
(current, dist) = frontier.popleft()
newDist = dist + 1
for n in current.movable: # new spots to try
if raw[n.tile_index] != UNREACHABLE:
continue
raw[n.tile_index] = newDist
if n.isObstacle:
continue
frontier.append((n, newDist))
self.time_building_distmaps += time.perf_counter() - start
return distanceMap
def recalculate(self):
logbook.info(f'RESETTING CACHED DISTANCE MAPS IN DistanceMapperImpl')
self.resets_total += 1
# for tile in self.map.get_all_tiles():
# self._dists.raw[tile.tile_index] = None
self._dists = MapMatrix(self.map, None)
def dump_times(self):
logbook.info(f'OVERALL TIME SPENT IN DISTANCE MAPPER:\r\n'
f' Distmaps: {self.time_building_distmaps:.5f}s\r\n'
f' Time total: {self.time_total:.5f}s\r\n'
f' Num resets: {self.resets_total}\r\n')
def reset_times(self):
self.time_total = 0.0
self.time_building_distmaps = 0.0
self.resets_total = 0