forked from BurnySc2/python-sc2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot_ai.py
More file actions
2040 lines (1768 loc) · 92.3 KB
/
bot_ai.py
File metadata and controls
2040 lines (1768 loc) · 92.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import itertools
import math
import random
import time
import warnings
from collections import Counter
from typing import Any, Dict, List, Optional, Set, Tuple, Union, TYPE_CHECKING
from contextlib import suppress
from s2clientprotocol import sc2api_pb2 as sc_pb
from .cache import property_cache_forever, property_cache_once_per_frame, property_cache_once_per_frame_no_copy
from .constants import (
FakeEffectID,
abilityid_to_unittypeid,
geyser_ids,
mineral_ids,
TERRAN_TECH_REQUIREMENT,
PROTOSS_TECH_REQUIREMENT,
ZERG_TECH_REQUIREMENT,
ALL_GAS,
EQUIVALENTS_FOR_TECH_PROGRESS,
TERRAN_STRUCTURES_REQUIRE_SCV,
IS_PLACEHOLDER,
)
from .data import ActionResult, Alert, Race, Result, Target, race_gas, race_townhalls, race_worker
from .distances import DistanceCalculation
from .game_data import AbilityData, GameData
from .dicts.unit_trained_from import UNIT_TRAINED_FROM
from .dicts.unit_train_build_abilities import TRAIN_INFO
from .dicts.upgrade_researched_from import UPGRADE_RESEARCHED_FROM
from .dicts.unit_research_abilities import RESEARCH_INFO
# Imports for mypy and pycharm autocomplete as well as sphinx autodocumentation
from .game_state import Blip, EffectData, GameState
from .ids.ability_id import AbilityId
from .ids.unit_typeid import UnitTypeId
from .ids.upgrade_id import UpgradeId
from .pixel_map import PixelMap
from .position import Point2
from .unit import Unit
from .units import Units
from .game_data import Cost
from .unit_command import UnitCommand
from loguru import logger
if TYPE_CHECKING:
from .game_info import GameInfo, Ramp
from .client import Client
class BotAI(DistanceCalculation):
"""Base class for bots."""
EXPANSION_GAP_THRESHOLD = 15
def _initialize_variables(self):
""" Called from main.py internally """
DistanceCalculation.__init__(self)
# Specific opponent bot ID used in sc2ai ladder games http://sc2ai.net/ and on ai arena https://aiarena.net
# The bot ID will stay the same each game so your bot can "adapt" to the opponent
if not hasattr(self, "opponent_id"):
# Prevent overwriting the opponent_id which is set here https://github.com/Hannessa/python-sc2-ladderbot/blob/master/__init__.py#L40
# otherwise set it to None
self.opponent_id: str = None
# Select distance calculation method, see distances.py: _distances_override_functions function
if not hasattr(self, "distance_calculation_method"):
self.distance_calculation_method: int = 2
# Select if the Unit.command should return UnitCommand objects. Set this to True if your bot uses 'self.do(unit(ability, target))'
if not hasattr(self, "unit_command_uses_self_do"):
self.unit_command_uses_self_do: bool = False
# This value will be set to True by main.py in self._prepare_start if game is played in realtime (if true, the bot will have limited time per step)
self.realtime: bool = False
self.base_build: int = -1
self.all_units: Units = Units([], self)
self.units: Units = Units([], self)
self.workers: Units = Units([], self)
self.larva: Units = Units([], self)
self.structures: Units = Units([], self)
self.townhalls: Units = Units([], self)
self.gas_buildings: Units = Units([], self)
self.all_own_units: Units = Units([], self)
self.enemy_units: Units = Units([], self)
self.enemy_structures: Units = Units([], self)
self.all_enemy_units: Units = Units([], self)
self.resources: Units = Units([], self)
self.destructables: Units = Units([], self)
self.watchtowers: Units = Units([], self)
self.mineral_field: Units = Units([], self)
self.vespene_geyser: Units = Units([], self)
self.placeholders: Units = Units([], self)
self.techlab_tags: Set[int] = set()
self.reactor_tags: Set[int] = set()
self.minerals: int = 50
self.vespene: int = 0
self.supply_army: float = 0
self.supply_workers: float = 12 # Doesn't include workers in production
self.supply_cap: float = 15
self.supply_used: float = 12
self.supply_left: float = 3
self.idle_worker_count: int = 0
self.army_count: int = 0
self.warp_gate_count: int = 0
self.actions: List[UnitCommand] = []
self.blips: Set[Blip] = set()
self.race: Race = None
self.enemy_race: Race = None
self._units_created: Counter = Counter()
self._unit_tags_seen_this_game: Set[int] = set()
self._units_previous_map: Dict[int, Unit] = dict()
self._structures_previous_map: Dict[int, Unit] = dict()
self._enemy_units_previous_map: Dict[int, Unit] = dict()
self._enemy_structures_previous_map: Dict[int, Unit] = dict()
self._all_units_previous_map: Dict[int, Unit] = dict()
self._previous_upgrades: Set[UpgradeId] = set()
self._expansion_positions_list: List[Point2] = []
self._resource_location_to_expansion_position_dict: Dict[Point2, Point2] = {}
self._time_before_step: float = None
self._time_after_step: float = None
self._min_step_time: float = math.inf
self._max_step_time: float = 0
self._last_step_step_time: float = 0
self._total_time_in_on_step: float = 0
self._total_steps_iterations: int = 0
# Internally used to keep track which units received an action in this frame, so that self.train() function does not give the same larva two orders - cleared every frame
self.unit_tags_received_action: Set[int] = set()
@property
def time(self) -> float:
""" Returns time in seconds, assumes the game is played on 'faster' """
return self.state.game_loop / 22.4 # / (1/1.4) * (1/16)
@property
def time_formatted(self) -> str:
""" Returns time as string in min:sec format """
t = self.time
return f"{int(t // 60):02}:{int(t % 60):02}"
@property
def step_time(self) -> Tuple[float, float, float, float]:
"""Returns a tuple of step duration in milliseconds.
First value is the minimum step duration - the shortest the bot ever took
Second value is the average step duration
Third value is the maximum step duration - the longest the bot ever took (including on_start())
Fourth value is the step duration the bot took last iteration
If called in the first iteration, it returns (inf, 0, 0, 0)"""
avg_step_duration = (
(self._total_time_in_on_step / self._total_steps_iterations) if self._total_steps_iterations else 0
)
return (
self._min_step_time * 1000,
avg_step_duration * 1000,
self._max_step_time * 1000,
self._last_step_step_time * 1000,
)
@property
def game_info(self) -> GameInfo:
""" See game_info.py """
return self._game_info
@property
def game_data(self) -> GameData:
""" See game_data.py """
return self._game_data
@property
def client(self) -> Client:
""" See client.py """
return self._client
@property
def larva_count(self):
""" Replacement for self.state.common.larva_count https://github.com/Blizzard/s2client-proto/blob/d3d18392f9d7c646067d447df0c936a8ca57d587/s2clientprotocol/sc2api.proto#L614 """
warnings.warn(
"self.larva_count will be removed soon, please use len(self.larva) or self.larva.amount instead",
DeprecationWarning,
stacklevel=2,
)
return len(self.larva)
def alert(self, alert_code: Alert) -> bool:
"""
Check if alert is triggered in the current step.
Possible alerts are listed here https://github.com/Blizzard/s2client-proto/blob/e38efed74c03bec90f74b330ea1adda9215e655f/s2clientprotocol/sc2api.proto#L679-L702
Example use::
from sc2.data import Alert
if self.alert(Alert.AddOnComplete):
print("Addon Complete")
Alert codes::
AlertError
AddOnComplete
BuildingComplete
BuildingUnderAttack
LarvaHatched
MergeComplete
MineralsExhausted
MorphComplete
MothershipComplete
MULEExpired
NuclearLaunchDetected
NukeComplete
NydusWormDetected
ResearchComplete
TrainError
TrainUnitComplete
TrainWorkerComplete
TransformationComplete
UnitUnderAttack
UpgradeComplete
VespeneExhausted
WarpInComplete
:param alert_code:
"""
assert isinstance(alert_code, Alert), f"alert_code {alert_code} is no Alert"
return alert_code.value in self.state.alerts
@property
def start_location(self) -> Point2:
"""
Returns the spawn location of the bot, using the position of the first created townhall.
This will be None if the bot is run on an arcade or custom map that does not feature townhalls at game start.
"""
return self._game_info.player_start_location
@property
def enemy_start_locations(self) -> List[Point2]:
"""Possible start locations for enemies."""
return self._game_info.start_locations
@property
def main_base_ramp(self) -> Ramp:
"""Returns the Ramp instance of the closest main-ramp to start location.
Look in game_info.py for more information about the Ramp class
Example: See terran ramp wall bot
"""
if hasattr(self, "cached_main_base_ramp"):
return self.cached_main_base_ramp
# The reason for len(ramp.upper) in {2, 5} is:
# ParaSite map has 5 upper points, and most other maps have 2 upper points at the main ramp.
# The map Acolyte has 4 upper points at the wrong ramp (which is closest to the start position).
try:
self.cached_main_base_ramp = min(
(ramp for ramp in self.game_info.map_ramps if len(ramp.upper) in {2, 5}),
key=lambda r: self.start_location.distance_to(r.top_center),
)
except ValueError:
# Hardcoded hotfix for Honorgrounds LE map, as that map has a large main base ramp with inbase natural
self.cached_main_base_ramp = min(
(ramp for ramp in self.game_info.map_ramps if len(ramp.upper) in {4, 9}),
key=lambda r: self.start_location.distance_to(r.top_center),
)
return self.cached_main_base_ramp
@property_cache_once_per_frame
def expansion_locations_list(self) -> List[Point2]:
""" Returns a list of expansion positions, not sorted in any way. """
assert (
self._expansion_positions_list
), f"self._find_expansion_locations() has not been run yet, so accessing the list of expansion locations is pointless."
return self._expansion_positions_list
@property_cache_once_per_frame
def expansion_locations_dict(self) -> Dict[Point2, Units]:
"""
Returns dict with the correct expansion position Point2 object as key,
resources as Units (mineral fields and vespene geysers) as value.
Caution: This function is slow. If you only need the expansion locations, use the property above.
"""
assert (
self._expansion_positions_list
), f"self._find_expansion_locations() has not been run yet, so accessing the list of expansion locations is pointless."
expansion_locations: Dict[Point2, Units] = {pos: Units([], self) for pos in self._expansion_positions_list}
for resource in self.resources:
# It may be that some resources are not mapped to an expansion location
exp_position: Point2 = self._resource_location_to_expansion_position_dict.get(resource.position, None)
if exp_position:
assert exp_position in expansion_locations
expansion_locations[exp_position].append(resource)
return expansion_locations
# Deprecated
@property_cache_once_per_frame
def expansion_locations(self) -> Dict[Point2, Units]:
""" Same as the function above. """
assert (
self._expansion_positions_list
), f"self._find_expansion_locations() has not been run yet, so accessing the list of expansion locations is pointless."
warnings.warn(
f"You are using 'self.expansion_locations', please use 'self.expansion_locations_list' (fast) or 'self.expansion_locations_dict' (slow) instead.",
DeprecationWarning,
stacklevel=2,
)
return self.expansion_locations_dict
def _find_expansion_locations(self):
""" Ran once at the start of the game to calculate expansion locations. """
# Idea: create a group for every resource, then merge these groups if
# any resource in a group is closer than a threshold to any resource of another group
# Distance we group resources by
resource_spread_threshold: float = 8.5
geysers: Units = self.vespene_geyser
# Create a group for every resource
resource_groups: List[List[Unit]] = [
[resource]
for resource in self.resources
if resource.name != "MineralField450" # dont use low mineral count patches
]
# Loop the merging process as long as we change something
merged_group = True
while merged_group:
merged_group = False
# Check every combination of two groups
for group_a, group_b in itertools.combinations(resource_groups, 2):
# Check if any pair of resource of these groups is closer than threshold together
if any(
resource_a.distance_to(resource_b) <= resource_spread_threshold
for resource_a, resource_b in itertools.product(group_a, group_b)
):
# Remove the single groups and add the merged group
resource_groups.remove(group_a)
resource_groups.remove(group_b)
resource_groups.append(group_a + group_b)
merged_group = True
break
# Distance offsets we apply to center of each resource group to find expansion position
offset_range = 7
offsets = [
(x, y)
for x, y in itertools.product(range(-offset_range, offset_range + 1), repeat=2)
if 4 < math.hypot(x, y) <= 8
]
# Dict we want to return
centers = {}
# For every resource group:
for resources in resource_groups:
# Possible expansion points
amount = len(resources)
# Calculate center, round and add 0.5 because expansion location will have (x.5, y.5)
# coordinates because bases have size 5.
center_x = int(sum(resource.position.x for resource in resources) / amount) + 0.5
center_y = int(sum(resource.position.y for resource in resources) / amount) + 0.5
possible_points = (Point2((offset[0] + center_x, offset[1] + center_y)) for offset in offsets)
# Filter out points that are too near
possible_points = (
point
for point in possible_points
# Check if point can be built on
if self._game_info.placement_grid[point.rounded] == 1
# Check if all resources have enough space to point
and all(
point.distance_to(resource) >= (7 if resource._proto.unit_type in geyser_ids else 6)
for resource in resources
)
)
# Choose best fitting point
result: Point2 = min(
possible_points, key=lambda point: sum(point.distance_to(resource) for resource in resources)
)
centers[result] = resources
# Put all expansion locations in a list
self._expansion_positions_list.append(result)
# Maps all resource positions to the expansion position
for resource in resources:
self._resource_location_to_expansion_position_dict[resource.position] = result
@property
def units_created(self) -> Counter:
"""Returns a Counter for all your units and buildings you have created so far.
This may be used for statistics (at the end of the game) or for strategic decision making.
CAUTION: This does not properly work at the moment for morphing units and structures. Please use the 'on_unit_type_changed' event to add these morphing unit types manually to 'self._units_created'.
Issues would arrise in e.g. siege tank morphing to sieged tank, and then morphing back (suddenly the counter counts 2 tanks have been created).
Examples::
# Give attack command to enemy base every time 10 marines have been trained
async def on_unit_created(self, unit: Unit):
if unit.type_id == UnitTypeId.MARINE:
if self.units_created[MARINE] % 10 == 0:
for marine in self.units(UnitTypeId.MARINE):
marine.attack(self.enemy_start_locations[0])
"""
return self._units_created
def _correct_zerg_supply(self):
"""The client incorrectly rounds zerg supply down instead of up (see
https://github.com/Blizzard/s2client-proto/issues/123), so self.supply_used
and friends return the wrong value when there are an odd number of zerglings
and banelings. This function corrects the bad values."""
# TODO: remove when Blizzard/sc2client-proto#123 gets fixed.
half_supply_units = {
UnitTypeId.ZERGLING,
UnitTypeId.ZERGLINGBURROWED,
UnitTypeId.BANELING,
UnitTypeId.BANELINGBURROWED,
UnitTypeId.BANELINGCOCOON,
}
correction = self.units(half_supply_units).amount % 2
self.supply_used += correction
self.supply_army += correction
self.supply_left -= correction
async def get_available_abilities(
self, units: Union[List[Unit], Units], ignore_resource_requirements: bool = False
) -> List[List[AbilityId]]:
"""Returns available abilities of one or more units. Right now only checks cooldown, energy cost, and whether the ability has been researched.
Examples::
units_abilities = await self.get_available_abilities(self.units)
or::
units_abilities = await self.get_available_abilities([self.units.random])
:param units:
:param ignore_resource_requirements:"""
return await self._client.query_available_abilities(units, ignore_resource_requirements)
async def expand_now(
self, building: UnitTypeId = None, max_distance: float = 10, location: Optional[Point2] = None
):
"""Finds the next possible expansion via 'self.get_next_expansion()'. If the target expansion is blocked (e.g. an enemy unit), it will misplace the expansion.
:param building:
:param max_distance:
:param location:"""
if not building:
# self.race is never Race.Random
start_townhall_type = {
Race.Protoss: UnitTypeId.NEXUS,
Race.Terran: UnitTypeId.COMMANDCENTER,
Race.Zerg: UnitTypeId.HATCHERY,
}
building = start_townhall_type[self.race]
assert isinstance(building, UnitTypeId), f"{building} is no UnitTypeId"
if not location:
location = await self.get_next_expansion()
if not location:
# All expansions are used up or mined out
logger.warning("Trying to expand_now() but bot is out of locations to expand to")
return
await self.build(building, near=location, max_distance=max_distance, random_alternative=False, placement_step=1)
async def get_next_expansion(self) -> Optional[Point2]:
"""Find next expansion location."""
closest = None
distance = math.inf
for el in self.expansion_locations_list:
def is_near_to_expansion(t):
return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD
if any(map(is_near_to_expansion, self.townhalls)):
# already taken
continue
startp = self._game_info.player_start_location
d = await self._client.query_pathing(startp, el)
if d is None:
continue
if d < distance:
distance = d
closest = el
return closest
async def distribute_workers(self, resource_ratio: float = 2):
"""
Distributes workers across all the bases taken.
Keyword `resource_ratio` takes a float. If the current minerals to gas
ratio is bigger than `resource_ratio`, this function prefer filling gas_buildings
first, if it is lower, it will prefer sending workers to minerals first.
NOTE: This function is far from optimal, if you really want to have
refined worker control, you should write your own distribution function.
For example long distance mining control and moving workers if a base was killed
are not being handled.
WARNING: This is quite slow when there are lots of workers or multiple bases.
:param resource_ratio:"""
if not self.mineral_field or not self.workers or not self.townhalls.ready:
return
worker_pool = [worker for worker in self.workers.idle]
bases = self.townhalls.ready
gas_buildings = self.gas_buildings.ready
# list of places that need more workers
deficit_mining_places = []
for mining_place in bases | gas_buildings:
difference = mining_place.surplus_harvesters
# perfect amount of workers, skip mining place
if not difference:
continue
if mining_place.has_vespene:
# get all workers that target the gas extraction site
# or are on their way back from it
local_workers = self.workers.filter(
lambda unit: unit.order_target == mining_place.tag
or (unit.is_carrying_vespene and unit.order_target == bases.closest_to(mining_place).tag)
)
else:
# get tags of minerals around expansion
local_minerals_tags = {
mineral.tag for mineral in self.mineral_field if mineral.distance_to(mining_place) <= 8
}
# get all target tags a worker can have
# tags of the minerals he could mine at that base
# get workers that work at that gather site
local_workers = self.workers.filter(
lambda unit: unit.order_target in local_minerals_tags
or (unit.is_carrying_minerals and unit.order_target == mining_place.tag)
)
# too many workers
if difference > 0:
for worker in local_workers[:difference]:
worker_pool.append(worker)
# too few workers
# add mining place to deficit bases for every missing worker
else:
deficit_mining_places += [mining_place for _ in range(-difference)]
# prepare all minerals near a base if we have too many workers
# and need to send them to the closest patch
if len(worker_pool) > len(deficit_mining_places):
all_minerals_near_base = [
mineral
for mineral in self.mineral_field
if any(mineral.distance_to(base) <= 8 for base in self.townhalls.ready)
]
# distribute every worker in the pool
for worker in worker_pool:
# as long as have workers and mining places
if deficit_mining_places:
# choose only mineral fields first if current mineral to gas ratio is less than target ratio
if self.vespene and self.minerals / self.vespene < resource_ratio:
possible_mining_places = [place for place in deficit_mining_places if not place.vespene_contents]
# else prefer gas
else:
possible_mining_places = [place for place in deficit_mining_places if place.vespene_contents]
# if preferred type is not available any more, get all other places
if not possible_mining_places:
possible_mining_places = deficit_mining_places
# find closest mining place
current_place = min(deficit_mining_places, key=lambda place: place.distance_to(worker))
# remove it from the list
deficit_mining_places.remove(current_place)
# if current place is a gas extraction site, go there
if current_place.vespene_contents:
worker.gather(current_place)
# if current place is a gas extraction site,
# go to the mineral field that is near and has the most minerals left
else:
local_minerals = (
mineral for mineral in self.mineral_field if mineral.distance_to(current_place) <= 8
)
# local_minerals can be empty if townhall is misplaced
target_mineral = max(local_minerals, key=lambda mineral: mineral.mineral_contents, default=None)
if target_mineral:
worker.gather(target_mineral)
# more workers to distribute than free mining spots
# send to closest if worker is doing nothing
elif worker.is_idle and all_minerals_near_base:
target_mineral = min(all_minerals_near_base, key=lambda mineral: mineral.distance_to(worker))
worker.gather(target_mineral)
else:
# there are no deficit mining places and worker is not idle
# so dont move him
pass
@property
def owned_expansions(self) -> Dict[Point2, Unit]:
"""List of expansions owned by the player."""
owned = {}
for el in self.expansion_locations_list:
def is_near_to_expansion(t):
return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD
th = next((x for x in self.townhalls if is_near_to_expansion(x)), None)
if th:
owned[el] = th
return owned
def calculate_supply_cost(self, unit_type: UnitTypeId) -> float:
"""
This function calculates the required supply to train or morph a unit.
The total supply of a baneling is 0.5, but a zergling already uses up 0.5 supply, so the morph supply cost is 0.
The total supply of a ravager is 3, but a roach already uses up 2 supply, so the morph supply cost is 1.
The required supply to build zerglings is 1 because they pop in pairs, so this function returns 1 because the larva morph command requires 1 free supply.
Example::
roach_supply_cost = self.calculate_supply_cost(UnitTypeId.ROACH) # Is 2
ravager_supply_cost = self.calculate_supply_cost(UnitTypeId.RAVAGER) # Is 1
baneling_supply_cost = self.calculate_supply_cost(UnitTypeId.BANELING) # Is 0
:param unit_type:"""
if unit_type in {UnitTypeId.ZERGLING}:
return 1
unit_supply_cost = self._game_data.units[unit_type.value]._proto.food_required
if unit_supply_cost > 0 and unit_type in UNIT_TRAINED_FROM and len(UNIT_TRAINED_FROM[unit_type]) == 1:
for producer in UNIT_TRAINED_FROM[unit_type]: # type: UnitTypeId
producer_unit_data = self.game_data.units[producer.value]
if producer_unit_data._proto.food_required <= unit_supply_cost:
producer_supply_cost = producer_unit_data._proto.food_required
unit_supply_cost -= producer_supply_cost
return unit_supply_cost
def can_feed(self, unit_type: UnitTypeId) -> bool:
"""Checks if you have enough free supply to build the unit
Example::
cc = self.townhalls.idle.random_or(None)
# self.townhalls can be empty or there are no idle townhalls
if cc and self.can_feed(UnitTypeId.SCV):
cc.train(UnitTypeId.SCV)
:param unit_type:"""
required = self.calculate_supply_cost(unit_type)
# "required <= 0" in case self.supply_left is negative
return required <= 0 or self.supply_left >= required
def calculate_unit_value(self, unit_type: UnitTypeId) -> Cost:
"""
Unlike the function below, this function returns the value of a unit given by the API (e.g. the resources lost value on kill).
Examples::
self.calculate_value(UnitTypeId.ORBITALCOMMAND) == Cost(550, 0)
self.calculate_value(UnitTypeId.RAVAGER) == Cost(100, 100)
self.calculate_value(UnitTypeId.ARCHON) == Cost(175, 275)
:param unit_type:
"""
unit_data = self.game_data.units[unit_type.value]
return Cost(unit_data._proto.mineral_cost, unit_data._proto.vespene_cost)
def calculate_cost(self, item_id: Union[UnitTypeId, UpgradeId, AbilityId]) -> Cost:
"""
Calculate the required build, train or morph cost of a unit. It is recommended to use the UnitTypeId instead of the ability to create the unit.
The total cost to create a ravager is 100/100, but the actual morph cost from roach to ravager is only 25/75, so this function returns 25/75.
It is adviced to use the UnitTypeId instead of the AbilityId. Instead of::
self.calculate_cost(AbilityId.UPGRADETOORBITAL_ORBITALCOMMAND)
use::
self.calculate_cost(UnitTypeId.ORBITALCOMMAND)
More examples::
from sc2.game_data import Cost
self.calculate_cost(UnitTypeId.BROODLORD) == Cost(150, 150)
self.calculate_cost(UnitTypeId.RAVAGER) == Cost(25, 75)
self.calculate_cost(UnitTypeId.BANELING) == Cost(25, 25)
self.calculate_cost(UnitTypeId.ORBITALCOMMAND) == Cost(150, 0)
self.calculate_cost(UnitTypeId.REACTOR) == Cost(50, 50)
self.calculate_cost(UnitTypeId.TECHLAB) == Cost(50, 25)
self.calculate_cost(UnitTypeId.QUEEN) == Cost(150, 0)
self.calculate_cost(UnitTypeId.HATCHERY) == Cost(300, 0)
self.calculate_cost(UnitTypeId.LAIR) == Cost(150, 100)
self.calculate_cost(UnitTypeId.HIVE) == Cost(200, 150)
:param item_id:
"""
if isinstance(item_id, UnitTypeId):
# Fix cost for reactor and techlab where the API returns 0 for both
if item_id in {UnitTypeId.REACTOR, UnitTypeId.TECHLAB, UnitTypeId.ARCHON}:
if item_id == UnitTypeId.REACTOR:
return Cost(50, 50)
elif item_id == UnitTypeId.TECHLAB:
return Cost(50, 25)
elif item_id == UnitTypeId.ARCHON:
return self.calculate_unit_value(UnitTypeId.ARCHON)
unit_data = self._game_data.units[item_id.value]
# Cost of morphs is automatically correctly calculated by 'calculate_ability_cost'
return self._game_data.calculate_ability_cost(unit_data.creation_ability)
elif isinstance(item_id, UpgradeId):
cost = self._game_data.upgrades[item_id.value].cost
else:
# Is already AbilityId
cost = self._game_data.calculate_ability_cost(item_id)
return cost
def can_afford(self, item_id: Union[UnitTypeId, UpgradeId, AbilityId], check_supply_cost: bool = True) -> bool:
"""Tests if the player has enough resources to build a unit or structure.
Example::
cc = self.townhalls.idle.random_or(None)
# self.townhalls can be empty or there are no idle townhalls
if cc and self.can_afford(UnitTypeId.SCV):
cc.train(UnitTypeId.SCV)
Example::
# Current state: we have 150 minerals and one command center and a barracks
can_afford_morph = self.can_afford(UnitTypeId.ORBITALCOMMAND, check_supply_cost=False)
# Will be 'True' although the API reports that an orbital is worth 550 minerals, but the morph cost is only 150 minerals
:param item_id:
:param check_supply_cost:"""
cost = self.calculate_cost(item_id)
if cost.minerals > self.minerals or cost.vespene > self.vespene:
return False
if check_supply_cost and isinstance(item_id, UnitTypeId):
supply_cost = self.calculate_supply_cost(item_id)
if supply_cost and supply_cost > self.supply_left:
return False
return True
async def can_cast(
self,
unit: Unit,
ability_id: AbilityId,
target: Optional[Union[Unit, Point2]] = None,
only_check_energy_and_cooldown: bool = False,
cached_abilities_of_unit: List[AbilityId] = None,
) -> bool:
"""Tests if a unit has an ability available and enough energy to cast it.
Example::
stalkers = self.units(UnitTypeId.STALKER)
stalkers_that_can_blink = stalkers.filter(lambda unit: unit.type_id == UnitTypeId.STALKER and (await self.can_cast(unit, AbilityId.EFFECT_BLINK_STALKER, only_check_energy_and_cooldown=True)))
See data_pb2.py (line 161) for the numbers 1-5 to make sense
:param unit:
:param ability_id:
:param target:
:param only_check_energy_and_cooldown:
:param cached_abilities_of_unit:"""
assert isinstance(unit, Unit), f"{unit} is no Unit object"
assert isinstance(ability_id, AbilityId), f"{ability_id} is no AbilityId"
assert isinstance(target, (type(None), Unit, Point2))
# check if unit has enough energy to cast or if ability is on cooldown
if cached_abilities_of_unit:
abilities = cached_abilities_of_unit
else:
abilities = (await self.get_available_abilities([unit], ignore_resource_requirements=False))[0]
if ability_id in abilities:
if only_check_energy_and_cooldown:
return True
cast_range = self._game_data.abilities[ability_id.value]._proto.cast_range
ability_target = self._game_data.abilities[ability_id.value]._proto.target
# Check if target is in range (or is a self cast like stimpack)
if (
ability_target == 1
or ability_target == Target.PointOrNone.value
and isinstance(target, Point2)
and unit.distance_to(target) <= unit.radius + target.radius + cast_range
): # cant replace 1 with "Target.None.value" because ".None" doesnt seem to be a valid enum name
return True
# Check if able to use ability on a unit
elif (
ability_target in {Target.Unit.value, Target.PointOrUnit.value}
and isinstance(target, Unit)
and unit.distance_to(target) <= unit.radius + target.radius + cast_range
):
return True
# Check if able to use ability on a position
elif (
ability_target in {Target.Point.value, Target.PointOrUnit.value}
and isinstance(target, Point2)
and unit.distance_to(target) <= unit.radius + cast_range
):
return True
return False
def select_build_worker(self, pos: Union[Unit, Point2], force: bool = False) -> Optional[Unit]:
"""Select a worker to build a building with.
Example::
barracks_placement_position = self.main_base_ramp.barracks_correct_placement
worker = self.select_build_worker(barracks_placement_position)
# Can return None
if worker:
worker.build(UnitTypeId.BARRACKS, barracks_placement_position)
:param pos:
:param force:"""
workers = (
self.workers.filter(lambda w: (w.is_gathering or w.is_idle) and w.distance_to(pos) < 20) or self.workers
)
if workers:
for worker in workers.sorted_by_distance_to(pos).prefer_idle:
if (
worker not in self.unit_tags_received_action
and not worker.orders
or len(worker.orders) == 1
and worker.orders[0].ability.id in {AbilityId.MOVE, AbilityId.HARVEST_GATHER}
):
return worker
return workers.random if force else None
async def can_place_single(self, building: Union[AbilityId, UnitTypeId], position: Point2) -> bool:
""" Checks the placement for only one position. """
if isinstance(building, UnitTypeId):
creation_ability = self._game_data.units[building.value].creation_ability.id
return (await self._client._query_building_placement_fast(creation_ability, [position]))[0]
return (await self._client._query_building_placement_fast(building, [position]))[0]
async def can_place(
self, building: Union[AbilityData, AbilityId, UnitTypeId], positions: List[Point2]
) -> List[bool]:
"""Tests if a building can be placed in the given locations.
Example::
barracks_placement_position = self.main_base_ramp.barracks_correct_placement
worker = self.select_build_worker(barracks_placement_position)
# Can return None
if worker and (await self.can_place(UnitTypeId.BARRACKS, [barracks_placement_position])[0]:
worker.build(UnitTypeId.BARRACKS, barracks_placement_position)
:param building:
:param position:"""
building_type = type(building)
assert type(building) in {AbilityData, AbilityId, UnitTypeId}, f"{building}, {building_type}"
if building_type == UnitTypeId:
building = self._game_data.units[building.value].creation_ability.id
elif building_type == AbilityData:
warnings.warn(
"Using AbilityData is deprecated and may be removed soon. Please use AbilityId or UnitTypeId instead.",
DeprecationWarning,
stacklevel=2,
)
building = building_type.id
if isinstance(positions, (Point2, tuple)):
warnings.warn(
"The support for querying single entries will be removed soon. Please use either 'await self.can_place_single(building, position)' or 'await (self.can_place(building, [position]))[0]",
DeprecationWarning,
stacklevel=2,
)
return await self.can_place_single(building, positions)
else:
assert isinstance(positions, list), f"Expected an iterable (list, tuple), but was: {positions}"
assert isinstance(
positions[0], Point2
), f"List is expected to have Point2, but instead had: {positions[0]} {type(positions[0])}"
return await self._client._query_building_placement_fast(building, positions)
async def find_placement(
self,
building: Union[UnitTypeId, AbilityId],
near: Point2,
max_distance: int = 20,
random_alternative: bool = True,
placement_step: int = 2,
addon_place: bool = False,
) -> Optional[Point2]:
"""Finds a placement location for building.
Example::
if self.townhalls:
cc = self.townhalls[0]
depot_position = await self.find_placement(UnitTypeId.SUPPLYDEPOT, near=cc)
:param building:
:param near:
:param max_distance:
:param random_alternative:
:param placement_step:
:param addon_place:"""
assert isinstance(building, (AbilityId, UnitTypeId))
assert isinstance(near, Point2), f"{near} is no Point2 object"
if isinstance(building, UnitTypeId):
building = self._game_data.units[building.value].creation_ability.id
if await self.can_place_single(building, near) and (
not addon_place or await self.can_place_single(UnitTypeId.SUPPLYDEPOT, near.offset((2.5, -0.5)))
):
return near
if max_distance == 0:
return None
for distance in range(placement_step, max_distance, placement_step):
possible_positions = [
Point2(p).offset(near).to2
for p in (
[(dx, -distance) for dx in range(-distance, distance + 1, placement_step)]
+ [(dx, distance) for dx in range(-distance, distance + 1, placement_step)]
+ [(-distance, dy) for dy in range(-distance, distance + 1, placement_step)]
+ [(distance, dy) for dy in range(-distance, distance + 1, placement_step)]
)
]
res = await self._client._query_building_placement_fast(building, possible_positions)
# Filter all positions if building can be placed
possible = [p for r, p in zip(res, possible_positions) if r]
if addon_place:
# Filter remaining positions if addon can be placed
res = await self._client._query_building_placement_fast(
AbilityId.TERRANBUILDDROP_SUPPLYDEPOTDROP,
[p.offset((2.5, -0.5)) for p in possible],
)
possible = [p for r, p in zip(res, possible) if r]
if not possible:
continue
if random_alternative:
return random.choice(possible)
else:
return min(possible, key=lambda p: p.distance_to_point2(near))
return None
# TODO: improve using cache per frame
def already_pending_upgrade(self, upgrade_type: UpgradeId) -> float:
"""Check if an upgrade is being researched
Returns values are::
0 # not started
0 < x < 1 # researching
1 # completed
Example::
stim_completion_percentage = self.already_pending_upgrade(UpgradeId.STIMPACK)
:param upgrade_type:
"""
assert isinstance(upgrade_type, UpgradeId), f"{upgrade_type} is no UpgradeId"
if upgrade_type in self.state.upgrades:
return 1
creationAbilityID = self._game_data.upgrades[upgrade_type.value].research_ability.exact_id
for structure in self.structures.filter(lambda unit: unit.is_ready):
for order in structure.orders:
if order.ability.exact_id == creationAbilityID:
return order.progress
return 0
@property_cache_once_per_frame_no_copy
def _abilities_all_units(self) -> Tuple[Counter, Dict[UnitTypeId, float]]:
"""Cache for the already_pending function, includes protoss units warping in,
all units in production and all structures, and all morphs"""
abilities_amount = Counter()
max_build_progress: Dict[UnitTypeId, float] = {}
for unit in self.units + self.structures: # type: Unit
for order in unit.orders:
abilities_amount[order.ability] += 1
if not unit.is_ready:
if self.race != Race.Terran or not unit.is_structure:
# If an SCV is constructing a building, already_pending would count this structure twice
# (once from the SCV order, and once from "not structure.is_ready")
creation_ability: AbilityData = self._game_data.units[unit.type_id.value].creation_ability
abilities_amount[creation_ability] += 1
max_build_progress[creation_ability] = max(
max_build_progress.get(creation_ability, 0), unit.build_progress
)
return abilities_amount, max_build_progress
def structure_type_build_progress(self, structure_type: Union[UnitTypeId, int]) -> float:
"""
Returns the build progress of a structure type.
Return range: 0 <= x <= 1 where
0: no such structure exists
0 < x < 1: at least one structure is under construction, returns the progress of the one with the highest progress
1: we have at least one such structure complete
Example::
# Assuming you have one barracks building at 0.5 build progress: