forked from harrischristiansen/generals-bot
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathArmyAnalyzer.py
486 lines (393 loc) · 20.8 KB
/
ArmyAnalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
"""
@ Travis Drake (EklipZ) eklipz.io - tdrake0x45 at gmail)
July 2019
Generals.io Automated Client - https://github.com/harrischristiansen/generals-bot
EklipZ bot - Tries to play generals lol
"""
from __future__ import annotations
import itertools
import time
import typing
import logbook
from collections import deque
import SearchUtils
from Army import Army
from Interfaces import MapMatrixInterface
from Path import Path
from base.client.map import Tile, MapBase, TILE_OBSTACLE
from MapMatrix import MapMatrix, MapMatrixSet
class PathWay:
def __init__(self, distance):
self.distance: int = distance
self.tiles: typing.Set[Tile] = set()
self.seed_tile: Tile = None
def add_tile(self, tile):
self.tiles.add(tile)
#
# def __getstate__(self) -> typing.Dict[str, typing.Any]:
# s = self.s
SENTINAL = "~"
INF_PATH_WAY = PathWay(distance=1000)
class ArmyAnalyzer:
# __slots__ = (
# 'map',
# 'tileA',
# 'tileB',
# 'pathWayLookupMatrix',
# 'pathWays',
# 'shortestPathWay',
# 'chokeWidths',
# 'interceptChokes',
# 'interceptTurns',
# 'interceptDistances',
# 'tileDistancesLookup',
# 'aMap',
# 'bMap',
# 'tile',
# )
TimeSpentBuildingPathwaysChokeWidthsAndMinPath: float = 0.0
TimeSpentBuildingInterceptChokes: float = 0.0
TimeSpentInInit: float = 0.0
NumAnalysisBuilt: int = 0
def __init__(self, map: MapBase, armyA: Tile | Army, armyB: Tile | Army, bypassRetraverseThreshold: int = -1, bypassRetraverseThresholdPathTiles: typing.Iterable[Tile] | None = None, maxDist: int = 100):
"""
@param map:
@param armyA:
@param armyB:
@param bypassRetraverseThreshold: if set to a positive integer, will bypass including tiles where army at tile B would retraverse this many of its own friendly tiles with no adjacent A team tiles.
@param bypassRetraverseThresholdPathTiles: if included, these tiles will be excluded from the retraverse limitation
"""
startTime = time.perf_counter()
self.map: MapBase = map
if isinstance(armyA, Army):
self.tileA: Tile = armyA.tile
else:
self.tileA: Tile = armyA
if isinstance(armyB, Army):
self.tileB: Tile = armyB.tile
else:
self.tileB: Tile = armyB
# path chokes are relative to the paths between A and B
self.pathWayLookupMatrix: MapMatrixInterface[PathWay | None] = MapMatrix(map, initVal=None)
self.pathWays: typing.List[PathWay] = []
self.shortestPathWay: PathWay = INF_PATH_WAY
self.chokeWidths: MapMatrixInterface[int] = MapMatrix(map)
"""
If the army were to path through this tile, this is the number of alternate nearby tiles the army could also be at. So if the army has to make an extra wide path to get here, wasting moves, this will be chokeWidth1 even if the actual choke is 2 wide.
"""
self.interceptChokes: MapMatrixInterface[int] = MapMatrix(map)
"""The value in here for a tile represents the number of additional moves necessary for worst case intercept, for an army that reaches this tile on the earliest turn the bTile army could reach this. It is effectively the difference between the best case and worst case intercept turns for an intercept reaching this tile."""
self.interceptTurns: MapMatrixInterface[int] = MapMatrix(map)
"""This represents the raw turns into the intercept of an army that another army army must reach this tile to successfully achieve an intercept, ASSUMING the enemy army goes this way."""
self.interceptDistances: MapMatrixInterface[int] = MapMatrix(map)
"""The number of moves you will waste to achieve the intercept, worst case, regardless of which way the enemy army goes."""
self.tileDistancesLookup: typing.Dict[int, typing.List[Tile]] = {}
"""A lookup from the number of turns into the intercept, to which tiles the enemy army could have reached by that point (shorted path only)."""
logbook.info(f"ArmyAnalyzer analyzing {self.tileA} and {self.tileB}")
self.aMap: MapMatrixInterface[int]
self.bMap: MapMatrixInterface[int]
if bypassRetraverseThreshold <= 0:
self.aMap = map.distance_mapper.get_tile_dist_matrix(self.tileA)
self.bMap = map.distance_mapper.get_tile_dist_matrix(self.tileB)
else:
skip = {t for t in itertools.chain.from_iterable(map.players[p].tiles for p in map.get_teammates(self.tileB.player))}
if bypassRetraverseThresholdPathTiles:
skip.difference_update(bypassRetraverseThresholdPathTiles)
def foreachFunc(tile: Tile):
skip.discard(tile)
SearchUtils.breadth_first_foreach_fast_no_neut_cities(map, [t for t in map.pathable_tiles if map.is_tile_on_team_with(t, self.tileA.player) or t == self.tileB], maxDepth=bypassRetraverseThreshold, foreachFunc=foreachFunc)
logbook.info(f'building distance maps except skipping {len(skip)} tiles') # : {" | ".join([str(t) for t in skip])}
self.aMap = SearchUtils.build_distance_map_matrix_with_skip(map, [self.tileA], skip)
self.bMap = SearchUtils.build_distance_map_matrix_with_skip(map, [self.tileB], skip)
ArmyAnalyzer.TimeSpentInInit += time.perf_counter() - startTime
self.scan(maxDist)
ArmyAnalyzer.NumAnalysisBuilt += 1
def __getstate__(self):
state = self.__dict__.copy()
if "map" in state:
del state["map"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.map = None
@classmethod
def dump_times(cls):
logbook.info(f'OVERALL TIME SPENT IN ARMY ANALYZER:\r\n'
f' TimeSpentBuildingPathwaysChokeWidthsAndMinPath: {cls.TimeSpentBuildingPathwaysChokeWidthsAndMinPath:.5f}s\r\n'
f' TimeSpentBuildingInterceptChokes: {cls.TimeSpentBuildingInterceptChokes:.5f}s\r\n'
f' TimeSpentInInit: {cls.TimeSpentInInit:.5f}s\r\n'
f' NumAnalysisBuilt: {cls.NumAnalysisBuilt}\r\n')
@classmethod
def reset_times(cls):
cls.TimeSpentBuildingPathwaysChokeWidthsAndMinPath = 0.0
cls.TimeSpentBuildingInterceptChokes = 0.0
cls.TimeSpentInInit = 0.0
cls.NumAnalysisBuilt = 0
def scan(self, maxDist: int = 100):
start = time.perf_counter()
self.build_chokes_and_pathways(maxDist)
ArmyAnalyzer.TimeSpentBuildingPathwaysChokeWidthsAndMinPath += time.perf_counter() - start
start = time.perf_counter()
self.build_intercept_chokes()
ArmyAnalyzer.TimeSpentBuildingInterceptChokes += time.perf_counter() - start
# This is heavily optimized at this point.
def build_chokes_and_pathways(self, maxDist: int = 100):
maxDistOffset = self.bMap.raw[self.tileA.tile_index] + maxDist
chokeCounterMap = {}
for tile in self.map.pathable_tiles:
# build the pathway
path = self.pathWayLookupMatrix.raw[tile.tile_index]
if path:
continue
path = self.build_pathway(tile, maxDistOffset)
if path is not INF_PATH_WAY:
self.pathWays.append(path)
chokeCounterMap.clear()
for pathTile in path.tiles:
if pathTile.isObstacle or (pathTile.isTempFogPrediction and pathTile.isCity and not pathTile.discovered):
continue
chokeKey = self._get_choke_key(pathTile)
chokeCounterMap[chokeKey] = chokeCounterMap.get(chokeKey, 0) + 1
# this HAS to be separate loop because that choke key is not unique to the tile, so the above loop needs to add all the chokes up first, THEN we need to assign the choke widths.
for pathTile in path.tiles:
if pathTile.isObstacle or (pathTile.isTempFogPrediction and pathTile.isCity and not pathTile.discovered):
continue
cw = chokeCounterMap[self._get_choke_key(pathTile)]
self.chokeWidths.raw[pathTile.tile_index] = cw
self.shortestPathWay = self.pathWayLookupMatrix.raw[self.tileA.tile_index]
# recurse
def build_pathway(self, tile, maxDist) -> PathWay:
distance = self.aMap.raw[tile.tile_index] + self.bMap.raw[tile.tile_index]
if distance > maxDist:
# TODO is this necessary?
self.pathWayLookupMatrix.raw[tile.tile_index] = INF_PATH_WAY
return INF_PATH_WAY
#logbook.info(" building pathway from tile {} distance {}".format(tile.toString(), distance))
path = PathWay(distance=distance)
path.seed_tile = tile
self._build_pathway_recurse(path, tile)
return path
# I hate all of these .grid hacks, but they are like 30% faster sadly..
def _build_pathway_recurse(self, path: PathWay, currentTile: Tile):
if self.pathWayLookupMatrix.raw[currentTile.tile_index]:
return
if self.aMap.raw[currentTile.tile_index] + self.bMap.raw[currentTile.tile_index] != path.distance:
return
#logbook.info(" adding tile {}".format(currentTile.toString()))
path.tiles.add(currentTile)
self.pathWayLookupMatrix.raw[currentTile.tile_index] = path
for adjacentTile in currentTile.movable:
if self.pathWayLookupMatrix.raw[adjacentTile.tile_index]:
continue
if adjacentTile.isMountain or (adjacentTile.tile == TILE_OBSTACLE and not adjacentTile.discovered and not adjacentTile.isCity) or (adjacentTile.isCity and adjacentTile.player == -1):
continue
self._build_pathway_recurse(path, adjacentTile)
def _get_choke_key(self, tile: Tile) -> typing.Tuple:
# including path in the key forces the 'chokes' to be part of the same pathway.
return self.aMap.raw[tile.tile_index], self.bMap.raw[tile.tile_index]
# return self.aMap[tile], self.bMap[tile]
# return path.distance, self.aMap[tile], self.bMap[tile]
def build_intercept_chokes(self):
q = deque()
# TODO this can just be array of arrays, just need to know min and max values...?
distancesLookup = {}
q.append(self.tileB)
# visited = MapMatrixSet(self.map)
visited = {self.tileB.tile_index}
pw = self.pathWayLookupMatrix.raw[self.tileA.tile_index]
if pw is None:
return
shortestDist = pw.distance
while q:
nextTile = q.popleft()
# we only care about the shortest pathway tiles for this?
if self.pathWayLookupMatrix.raw[nextTile.tile_index] != pw:
continue
nextBDist = self.bMap.raw[nextTile.tile_index]
pw = self.pathWayLookupMatrix.raw[nextTile.tile_index]
offsetDist = nextBDist + pw.distance - shortestDist
try:
curSet = distancesLookup[offsetDist]
except KeyError:
curSet = []
distancesLookup[offsetDist] = curSet
curSet.append(nextTile)
for t in nextTile.movable:
if t.isObstacle:
continue # TODO ??
if t.tile_index not in visited: # and t in self.shortestPathWay.tiles
visited.add(t.tile_index)
nPw = self.pathWayLookupMatrix.raw[t.tile_index]
if nPw is None:
continue
if nPw.distance >= pw.distance:
q.append(t)
# distMinMaxTable[curBDist] = (minX, minY, maxX, maxY)
# distMinMaxTable[curBDist] = (minRefDist, maxRefDist)
zeroChokes = []
oneChokes = set()
for r in range(self.shortestPathWay.distance + 1):
try:
curSet = distancesLookup[r]
except KeyError:
curSet = None
if not curSet:
continue
if len(curSet) == 1:
# then this is a primary choke
zeroChokes.append(curSet[0])
# for t in curSet[0].movable:
# if not t.isObstacle:
# oneChokes.add(t)
continue
if len(curSet) == 2:
# first = curSet[0]
# firstDist = self.bMap.raw[first.tile_index]
# then this is likely a 1-away choke, verify and find common one-choke tiles
anyOne = False
for t in curSet[0].movable:
# this forces us to be directional with the save, you can only claim this chase-intercept when arriving from the front, not the back, of a choke.
# TODO this breaks test_should_see_split_path_blocker_as_mid_choke, we need to intercept with a one-tile from behind, there.
# if self.bMap[t] < firstDist:
# continue
if t.isObstacle:
continue
if t in curSet[1].movable:
oneChokes.add(t)
anyOne = True
if anyOne:
continue
# TODO there is a split if we cannot build a chain of adjacents..?
# logbook.info(f'at dist {r}, found {len(curSet)} tiles')
#
# for tile in curSet:
# maxDist = 0
# for altTile in curSet:
# if tile == altTile:
# continue
#
# dist = self.map.distance_mapper.get_distance_between_or_none(tile, altTile)
# if dist is None:
# continue
# if dist > maxDist:
# maxDist = dist
# # logbook.info(f'ic {str(tile)} = {maxDist}')
# self.interceptChokes[tile] = maxDist
self.tileDistancesLookup = distancesLookup
shortestSet = self.shortestPathWay.tiles
def foreachFunc(tile: Tile, stateObj: typing.Tuple[int, int, Tile | None]) -> typing.Tuple[int, int, Tile | None]:
dist, interceptMoves, fromTile = stateObj
self.interceptChokes.raw[tile.tile_index] = dist
self.interceptDistances.raw[tile.tile_index] = interceptMoves
# This is what lets us include the tiles 1 away from shortest path, which is good for finding common one-tile-away shared split intercept points
if tile not in shortestSet:
return None
return dist + 1, interceptMoves + 1, tile
threatDist = self.shortestPathWay.distance
startTiles: typing.Dict[Tile, typing.Tuple[int, typing.Tuple[int, int, Tile | None]]] = {}
"""Tile -> (startDist, (prioStartDist, interceptMoves, fromTile))"""
# oneChokes come first because we must overwrite them with zeroChokes, if the zeroChoke is also a oneChoke
for oneChoke in oneChokes:
# oneChoke can guaranteed intercept
startTiles[oneChoke] = 1, (1, 0, None)
# for zeroChoke in zeroChokes:
# #... this does nothing except lie about the intercept moves
# for tile in zeroChoke.movable:
# if tile.isObstacle:
# continue
# startTiles[tile] = 0, (0, 0, None)
furthestZeroChoke = 0
for zeroChoke in zeroChokes:
if zeroChoke != self.tileA:
dist = self.bMap.raw[zeroChoke.tile_index]
if dist > furthestZeroChoke:
furthestZeroChoke = dist
# normally we can reach a zero choke one turn behind our opponent and be safe.
turn = -1
startTiles[zeroChoke] = turn, (turn, 0, None)
SearchUtils.breadth_first_foreach_with_state_and_start_dist(self.map, startTiles, maxDepth=20, foreachFunc=foreachFunc, noLog=True)
for zeroChoke in zeroChokes:
if zeroChoke.isGeneral:
# must get to general 1 ahead of opp to be safe
self.interceptChokes.raw[zeroChoke.tile_index] = 1
# we must get to a tile next to our general at the same time as opp, no chasing, or we lose on priority. TODO take into account priority?
for tile in zeroChoke.movable:
existingVal = self.interceptChokes.raw[tile.tile_index]
if existingVal is not None:
self.interceptChokes.raw[tile.tile_index] = existingVal + 1
for icTile in self.map.pathable_tiles:
chokeDist = self.interceptChokes.raw[icTile.tile_index]
if chokeDist is not None:
aDist = self.aMap.raw[icTile.tile_index]
bDist = self.bMap.raw[icTile.tile_index]
# anything above our closest choke, we can move in by 1
# if bDist < furthestZeroChoke:
# bDist += 1
interceptWorstCaseDist = self.interceptDistances.raw[icTile.tile_index]
interceptTurns = threatDist - aDist + 1 # - chokeDist - interceptWorstCaseDist
if bDist >= threatDist:
# we're behind our general
interceptTurns -= bDist - threatDist + 1
self.interceptTurns.raw[icTile.tile_index] = interceptTurns
self.fix_choke_widths(self.shortestPathWay.tiles)
def fix_choke_widths(self, shortestTiles: typing.Set[Tile]):
# TODO this isn't right either
def foreachFunc(tile: Tile, stateObj: typing.Tuple[int, int]) -> typing.Tuple[int, int] | None:
if tile in shortestTiles:
return None
prevCw, _ = stateObj
newCw = prevCw + 1
self.chokeWidths.raw[tile.tile_index] = newCw
# This is what lets us include the tiles 1 away from shortest path, which is good for finding common one-tile-away shared split intercept points
return newCw, 0
startTiles: typing.Dict[Tile, typing.Tuple[int, typing.Tuple[int, int]]] = {}
for tile in shortestTiles:
ourDist = self.chokeWidths.raw[tile.tile_index]
if ourDist is None:
continue
for adj in tile.movable:
try:
existing = startTiles[adj]
except KeyError:
existing = None
if not existing or existing[0] > ourDist:
startTiles[adj] = (ourDist, (ourDist, 0))
SearchUtils.breadth_first_foreach_with_state_and_start_dist(self.map, startTiles, maxDepth=20, foreachFunc=foreachFunc, noLog=True)
def is_choke(self, tile: Tile) -> bool:
chokeMoves = self.interceptDistances[tile]
return chokeMoves is not None and chokeMoves == 0
def is_one_behind_safe_choke(self, tile: Tile) -> bool:
chokeVal = self.interceptChokes[tile]
return chokeVal == -1 # TODO should this be only -1? not sure what its used for.
def is_two_move_capture_choke(self, tile: Tile) -> bool:
chokeMoves = self.interceptDistances[tile]
return chokeMoves is not None and chokeMoves <= 2
@classmethod
def build_from_path(cls, map: MapBase, path: Path, bypassRetraverse: bool = False, maxDist: int = 10000) -> ArmyAnalyzer:
"""
@param map:
@param path:
@param bypassRetraverse: if True, will make sure that the average captures of the path are matched by the army analysis board; tiles will be cut if it involves backtracking over friendly territory instead of capturing tiles. DO NOT use for kill threats.
@param maxDist: the maximum distance from the tiles to include in the analysis, beyond which chokes and stuff will be excluded.
@return:
"""
tileA = path.tail.tile
tileB = path.start.tile
if path.length != map.distance_mapper.get_distance_between(tileB, tileA):
# then we have a mid-point non-shortest-path scenario
i = 0
node = path.start
prev = None
while node is not None and map.distance_mapper.get_distance_between(tileB, node.tile) == i:
i += 1
prev = node
node = node.next
logbook.info(f'ArmyAnalyzer.build_from_path was non-shortest, shortest segment ending at {prev.tile} for path {tileB}-->{tileA} ({path})')
tileA = prev.tile
bypassRetraverseThreshold = -1
bypassRetraversePath = None
if bypassRetraverse:
bypassRetraverseThreshold = 2
bypassRetraversePath = path.tileList
analyzer = ArmyAnalyzer(map, tileA, tileB, bypassRetraverseThreshold=bypassRetraverseThreshold, bypassRetraverseThresholdPathTiles=bypassRetraversePath, maxDist=maxDist)
return analyzer