Combinatoric approach revised (KLA1)

Backported some fixes
This commit is contained in:
aerinon
2021-06-29 16:34:28 -06:00
parent 62fff1f6e1
commit b21564d5aa
8 changed files with 490 additions and 158 deletions

View File

@@ -458,17 +458,26 @@ class World(object):
class CollectionState(object):
def __init__(self, parent):
self.prog_items = Counter()
def __init__(self, parent, skip_init=False):
self.world = parent
self.reachable_regions = {player: dict() for player in range(1, parent.players + 1)}
self.blocked_connections = {player: dict() for player in range(1, parent.players + 1)}
self.events = []
self.path = {}
self.locations_checked = set()
self.stale = {player: True for player in range(1, parent.players + 1)}
for item in parent.precollected_items:
self.collect(item, True)
if not skip_init:
self.prog_items = Counter()
self.reachable_regions = {player: dict() for player in range(1, parent.players + 1)}
self.blocked_connections = {player: dict() for player in range(1, parent.players + 1)}
self.events = []
self.path = {}
self.locations_checked = set()
self.stale = {player: True for player in range(1, parent.players + 1)}
for item in parent.precollected_items:
self.collect(item, True)
# reached vs. opened in the counter
self.door_counter = {player: (Counter(), Counter()) for player in range(1, parent.players + 1)}
self.reached_doors = {player: set() for player in range(1, parent.players + 1)}
self.opened_doors = {player: set() for player in range(1, parent.players + 1)}
self.dungeons_to_check = {player: defaultdict(dict) for player in range(1, parent.players + 1)}
self.ghost_keys = Counter()
self.dungeon_limits = None
def update_reachable_regions(self, player):
self.stale[player] = False
@@ -479,66 +488,261 @@ class CollectionState(object):
start = self.world.get_region('Menu', player)
if not start in rrp:
rrp[start] = CrystalBarrier.Orange
for exit in start.exits:
bc[exit] = CrystalBarrier.Orange
for conn in start.exits:
bc[conn] = CrystalBarrier.Orange
queue = deque(self.blocked_connections[player].items())
self.traverse_world(queue, rrp, bc, player)
unresolved_events = [x for y in self.reachable_regions[player] for x in y.locations
if x.event and x.item and (x.item.smallkey or x.item.bigkey or x.item.advancement)
and x not in self.locations_checked and x.can_reach(self)]
if len(unresolved_events) == 0:
self.check_key_doors_in_dungeons(rrp, player)
def traverse_world(self, queue, rrp, bc, player):
# run BFS on all connections, and keep track of those blocked by missing items
while True:
try:
connection, crystal_state = queue.popleft()
new_region = connection.connected_region
if new_region is None or new_region in rrp and (new_region.type != RegionType.Dungeon or (rrp[new_region] & crystal_state) == crystal_state):
bc.pop(connection, None)
elif connection.can_reach(self):
if new_region.type == RegionType.Dungeon:
new_crystal_state = crystal_state
for exit in new_region.exits:
door = exit.door
if door is not None and door.crystal == CrystalBarrier.Either and door.entrance.can_reach(self):
new_crystal_state = CrystalBarrier.Either
break
if new_region in rrp:
new_crystal_state |= rrp[new_region]
while len(queue) > 0:
connection, crystal_state = queue.popleft()
new_region = connection.connected_region
if not self.should_visit(new_region, rrp, crystal_state, player):
bc.pop(connection, None)
elif connection.can_reach(self):
bc.pop(connection, None)
if new_region.type == RegionType.Dungeon:
new_crystal_state = crystal_state
if new_region in rrp:
new_crystal_state |= rrp[new_region]
rrp[new_region] = new_crystal_state
for exit in new_region.exits:
door = exit.door
if door is not None and not door.blocked:
rrp[new_region] = new_crystal_state
for conn in new_region.exits:
door = conn.door
if door is not None and not door.blocked:
if self.valid_crystal(door, new_crystal_state):
door_crystal_state = door.crystal if door.crystal else new_crystal_state
bc[exit] = door_crystal_state
queue.append((exit, door_crystal_state))
elif door is None:
queue.append((exit, new_crystal_state))
bc[conn] = door_crystal_state
queue.append((conn, door_crystal_state))
elif door is None:
# note: no door in dungeon indicates what exactly? (always traversable)?
queue.append((conn, new_crystal_state))
else:
new_crystal_state = CrystalBarrier.Orange
rrp[new_region] = new_crystal_state
for conn in new_region.exits:
bc[conn] = new_crystal_state
queue.append((conn, new_crystal_state))
self.path[new_region] = (new_region.name, self.path.get(connection, None))
# Retry connections if the new region can unblock them
if new_region.name in indirect_connections:
new_entrance = self.world.get_entrance(indirect_connections[new_region.name], player)
if new_entrance in bc and new_entrance.parent_region in rrp:
new_crystal_state = rrp[new_entrance.parent_region]
if (new_entrance, new_crystal_state) not in queue:
queue.append((new_entrance, new_crystal_state))
# else those connections that are not accessible yet
if self.is_small_door(connection) and not self.world.retro[player]: # todo: retro
door = connection.door
dungeon_name = connection.parent_region.dungeon.name # todo: universal
key_logic = self.world.key_logic[player][dungeon_name]
if door.name not in self.reached_doors[player]:
self.door_counter[player][0][dungeon_name] += 1
self.reached_doors[player].add(door.name)
if key_logic.sm_doors[door]:
self.reached_doors[player].add(key_logic.sm_doors[door].name)
if not connection.can_reach(self):
checklist = self.dungeons_to_check[player][dungeon_name]
checklist[connection.name] = (connection, crystal_state)
elif door.name not in self.opened_doors[player]:
opened_doors = self.opened_doors[player]
door = connection.door
if door.name not in opened_doors:
self.door_counter[player][1][dungeon_name] += 1
opened_doors.add(door.name)
key_logic = self.world.key_logic[player][dungeon_name]
if key_logic.sm_doors[door]:
opened_doors.add(key_logic.sm_doors[door].name)
def should_visit(self, new_region, rrp, crystal_state, player):
if not new_region:
return False
if self.dungeon_limits and not self.possibly_connected_to_dungeon(new_region, player):
return False
if new_region not in rrp:
return True
if new_region.type != RegionType.Dungeon:
return False
return (rrp[new_region] & crystal_state) != crystal_state
def possibly_connected_to_dungeon(self, new_region, player):
if new_region.dungeon:
return new_region.dungeon.name in self.dungeon_limits
else:
return new_region.name in self.world.inaccessible_regions[player]
@staticmethod
def valid_crystal(door, new_crystal_state):
return (not door.crystal or door.crystal == CrystalBarrier.Either or new_crystal_state == CrystalBarrier.Either
or new_crystal_state == door.crystal)
def check_key_doors_in_dungeons(self, rrp, player):
for dungeon_name, checklist in self.dungeons_to_check[player].items():
init_door_candidates = self.should_explore_child_state(self, dungeon_name, player)
key_total = self.prog_items[(dungeon_keys[dungeon_name], player)] # todo: universal
remaining_keys = key_total - self.door_counter[player][1][dungeon_name]
if not init_door_candidates or remaining_keys == 0:
continue
dungeon_doors = {x.name for x in self.world.key_logic[player][dungeon_name].sm_doors.keys()}
def valid_d_door(x):
return x in dungeon_doors
child_states = deque()
child_states.append(self)
visited_opened_doors = set()
visited_opened_doors.add(frozenset(self.opened_doors[player]))
terminal_states, done, common_regions, common_bc, common_doors = [], False, {}, {}, set()
while not done:
terminal_states.clear()
while len(child_states) > 0:
next_child = child_states.popleft()
door_candidates = CollectionState.should_explore_child_state(next_child, dungeon_name, player)
if door_candidates:
for chosen_door in door_candidates:
child_state = next_child.copy()
child_queue = deque()
child_state.door_counter[player][1][dungeon_name] += 1
if isinstance(chosen_door, tuple):
child_state.opened_doors[player].add(chosen_door[0])
child_state.opened_doors[player].add(chosen_door[1])
if chosen_door[0] in checklist:
child_queue.append(checklist[chosen_door[0]])
if chosen_door[1] in checklist:
child_queue.append(checklist[chosen_door[1]])
else:
child_state.opened_doors[player].add(chosen_door)
if chosen_door in checklist:
child_queue.append(checklist[chosen_door])
if child_state.opened_doors[player] not in visited_opened_doors:
done = False
while not done:
rrp_ = child_state.reachable_regions[player]
bc_ = child_state.blocked_connections[player]
self.dungeon_limits = [dungeon_name]
child_state.traverse_world(child_queue, rrp_, bc_, player)
new_events = child_state.sweep_for_events_once()
child_state.stale[player] = False
if new_events:
for conn in bc_:
if conn.parent_region.dungeon and conn.parent_region.dungeon.name == dungeon_name:
child_queue.append((conn, bc_[conn]))
done = not new_events
visited_opened_doors.add(frozenset(child_state.opened_doors[player]))
child_states.append(child_state)
else:
new_crystal_state = CrystalBarrier.Orange
rrp[new_region] = new_crystal_state
bc.pop(connection, None)
for exit in new_region.exits:
bc[exit] = new_crystal_state
queue.append((exit, new_crystal_state))
terminal_states.append(next_child)
common_regions, common_doors, first = {}, set(), True
for term_state in terminal_states:
t_rrp = term_state.reachable_regions[player]
if first:
first = False
common_regions = {x: y for x, y in t_rrp.items() if x not in rrp or y != rrp[x]}
common_doors = {x for x in term_state.opened_doors[player] - self.opened_doors[player]
if valid_d_door(x)}
else:
cm_rrp = {x: y for x, y in t_rrp.items() if x not in rrp or y != rrp[x]}
common_regions = {k: self.comb_crys(v, cm_rrp[k]) for k, v in common_regions.items()
if k in cm_rrp and self.crys_agree(v, cm_rrp[k])}
common_doors &= {x for x in term_state.opened_doors[player] - self.opened_doors[player]
if valid_d_door(x)}
done = len(child_states) == 0
self.path[new_region] = (new_region.name, self.path.get(connection, None))
terminal_queue = deque()
for door in common_doors:
self.opened_doors[player].add(door)
if door in checklist:
terminal_queue.append(checklist[door])
if self.find_door_pair(player, dungeon_name, door) not in self.opened_doors[player]:
self.door_counter[player][1][dungeon_name] += 1
# Retry connections if the new region can unblock them
if new_region.name in indirect_connections:
new_entrance = self.world.get_entrance(indirect_connections[new_region.name], player)
if new_entrance in bc and new_entrance not in queue and new_entrance.parent_region in rrp:
queue.append((new_entrance, rrp[new_entrance.parent_region]))
except IndexError:
break
self.dungeon_limits = [dungeon_name]
rrp_ = self.reachable_regions[player]
bc_ = self.blocked_connections[player]
self.traverse_world(terminal_queue, rrp_, bc_, player)
self.dungeon_limits = None
rrp = self.reachable_regions[player]
missing_regions = {x: y for x, y in common_regions.items() if x not in rrp}
for k in missing_regions:
rrp[k] = missing_regions[k]
checklist.clear()
@staticmethod
def comb_crys(a, b):
return a if a == b or a != CrystalBarrier.Either else b
@staticmethod
def crys_agree(a, b):
return a == b or a == CrystalBarrier.Either or b == CrystalBarrier.Either
def find_door_pair(self, player, dungeon_name, name):
for door in self.world.key_logic[player][dungeon_name].sm_doors.keys():
if door.name == name:
paired_door = self.world.key_logic[player][dungeon_name].sm_doors[door]
return paired_door.name if paired_door else None
return None
@staticmethod
def should_explore_child_state(state, dungeon_name, player):
small_key_name = dungeon_keys[dungeon_name] # todo: universal
key_total = state.prog_items[(small_key_name, player)] + state.ghost_keys[(small_key_name, player)]
remaining_keys = key_total - state.door_counter[player][1][dungeon_name]
unopened_doors = state.door_counter[player][0][dungeon_name] - state.door_counter[player][1][dungeon_name]
if remaining_keys > 0 and unopened_doors > 0:
key_logic = state.world.key_logic[player][dungeon_name] # todo: universal
door_candidates, skip = [], set()
for door, paired in key_logic.sm_doors.items():
if door.name in state.reached_doors[player] and door.name not in state.opened_doors[player]:
if door.name not in skip:
if paired:
door_candidates.append((door.name, paired.name))
skip.add(paired.name)
else:
door_candidates.append(door.name)
return door_candidates
return None
@staticmethod
def print_rrp(rrp):
logger = logging.getLogger('')
logger.debug('RRP Checking')
for region, packet in rrp.items():
new_crystal_state, logic, path = packet
logger.debug(f'\nRegion: {region.name} (CS: {str(new_crystal_state)})')
for i in range(0, len(logic)):
logger.debug(f'{logic[i]}')
logger.debug(f'{",".join(str(x) for x in path[i])}')
def copy(self):
ret = CollectionState(self.world)
ret = CollectionState(self.world, skip_init=True)
ret.prog_items = self.prog_items.copy()
ret.reachable_regions = {player: copy.copy(self.reachable_regions[player]) for player in range(1, self.world.players + 1)}
ret.blocked_connections = {player: copy.copy(self.blocked_connections[player]) for player in range(1, self.world.players + 1)}
ret.events = copy.copy(self.events)
ret.path = copy.copy(self.path)
ret.locations_checked = copy.copy(self.locations_checked)
ret.stale = {player: self.stale[player] for player in range(1, self.world.players + 1)}
ret.door_counter = {player: (copy.copy(self.door_counter[player][0]), copy.copy(self.door_counter[player][1]))
for player in range(1, self.world.players + 1)}
ret.reached_doors = {player: copy.copy(self.reached_doors[player]) for player in range(1, self.world.players + 1)}
ret.opened_doors = {player: copy.copy(self.opened_doors[player]) for player in range(1, self.world.players + 1)}
# todo: verify if this isn't copied deep enough
ret.dungeons_to_check = {
player: defaultdict(dict, {name: copy.copy(checklist)
for name, checklist in self.dungeons_to_check[player].items()})
for player in range(1, self.world.players + 1)}
ret.ghost_keys = self.ghost_keys.copy()
return ret
def can_reach(self, spot, resolution_hint=None, player=None):
@@ -556,6 +760,19 @@ class CollectionState(object):
return spot.can_reach(self)
def sweep_for_events_once(self, key_only=False, locations=None):
if locations is None:
locations = self.world.get_filled_locations()
checked_locations = set([l for l in locations if l in self.locations_checked])
reachable_events = [location for location in locations if location.event and
(not key_only or (not self.world.keyshuffle[location.item.player] and location.item.smallkey) or (not self.world.bigkeyshuffle[location.item.player] and location.item.bigkey))
and location.can_reach(self)]
reachable_events = self._do_not_flood_the_keys(reachable_events)
for event in reachable_events:
if event not in checked_locations:
self.events.append((event.name, event.player))
self.collect(event.item, True, event)
return len(reachable_events) > len(checked_locations)
def sweep_for_events(self, key_only=False, locations=None):
# this may need improvement
@@ -603,6 +820,13 @@ class CollectionState(object):
or not self.location_can_be_flooded(flood_location))
return True
@staticmethod
def is_small_door(connection):
return connection and connection.door and connection.door.smallKey
def is_door_open(self, door_name, player):
return door_name in self.opened_doors[player]
@staticmethod
def location_can_be_flooded(location):
return location.parent_region.name in ['Swamp Trench 1 Alcove', 'Swamp Trench 2 Alcove']
@@ -1806,6 +2030,15 @@ class Item(object):
def compass(self):
return self.type == 'Compass'
@property
def dungeon(self):
if not self.smallkey and not self.bigkey and not self.map and not self.compass:
return None
item_dungeon = self.name.split('(')[1][:-1]
if item_dungeon == 'Escape':
item_dungeon = 'Hyrule Castle'
return item_dungeon
def __str__(self):
return str(self.__unicode__())
@@ -2196,6 +2429,21 @@ dungeon_names = [
'Swamp Palace', 'Skull Woods', 'Thieves Town', 'Ice Palace', 'Misery Mire', 'Turtle Rock', 'Ganons Tower'
]
dungeon_keys = {
'Hyrule Castle': 'Small Key (Escape)',
'Eastern Palace': 'Small Key (Eastern Palace)',
'Desert Palace': 'Small Key (Desert Palace)',
'Tower of Hera': 'Small Key (Tower of Hera)',
'Agahnims Tower': 'Small Key (Agahnims Tower)',
'Palace of Darkness': 'Small Key (Palace of Darkness)',
'Swamp Palace': 'Small Key (Swamp Palace)',
'Skull Woods': 'Small Key (Skull Woods)',
'Thieves Town': 'Small Key (Thieves Town)',
'Ice Palace': 'Small Key (Ice Palace)',
'Misery Mire': 'Small Key (Misery Mire)',
'Turtle Rock': 'Small Key (Turtle Rock)',
'Ganons Tower': 'Small Key (Ganons Tower)'
}
class PotItem(FastEnum):
Nothing = 0x0
@@ -2346,3 +2594,10 @@ class Settings(object):
args.enemy_health[p] = r(e_health)[(settings[7] & 0xE0) >> 5]
args.enemy_damage[p] = r(e_dmg)[(settings[7] & 0x18) >> 3]
args.shufflepots[p] = True if settings[7] & 0x4 else False
@unique
class KeyRuleType(Enum):
WorstCase = 0
AllowSmall = 1
Lock = 2