Re-worked settings for pottery lottery

Fixed multiworld support for lottery
This commit is contained in:
aerinon
2022-01-21 16:10:33 -07:00
parent d6e74c638f
commit 5211b305e6
22 changed files with 472 additions and 220 deletions

View File

@@ -8,8 +8,10 @@ import shlex
import urllib.parse
import websockets
from BaseClasses import PotItem, PotFlags
import Items
import Regions
import PotShuffle
class ReceivedItem:
@@ -57,8 +59,13 @@ class Context:
self.key_drop_mode = False
self.shop_mode = False
self.retro_mode = False
self.pottery_mode = False
self.mystery_mode = False
self.ignore_count = 0
self.lookup_name_to_id = {}
self.lookup_id_to_name = {}
def color_code(*args):
codes = {'reset': 0, 'bold': 1, 'underline': 4, 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34,
'magenta': 35, 'cyan': 36, 'white': 37 , 'black_bg': 40, 'red_bg': 41, 'green_bg': 42, 'yellow_bg': 43,
@@ -83,6 +90,10 @@ INGAME_MODES = {0x07, 0x09, 0x0b}
SAVEDATA_START = WRAM_START + 0xF000
SAVEDATA_SIZE = 0x500
POT_ITEMS_SRAM_START = WRAM_START + 0x016600
SPRITE_ITEMS_SRAM_START = WRAM_START + 0x016850
ITEM_SRAM_SIZE = 0x250
RECV_PROGRESS_ADDR = SAVEDATA_START + 0x4D0 # 2 bytes
RECV_ITEM_ADDR = SAVEDATA_START + 0x4D2 # 1 byte
RECV_ITEM_PLAYER_ADDR = SAVEDATA_START + 0x4D3 # 1 byte
@@ -349,6 +360,8 @@ location_table_misc = {'Bottle Merchant': (0x3c9, 0x2),
'Purple Chest': (0x3c9, 0x10),
"Link's Uncle": (0x3c6, 0x1),
'Hobo': (0x3c9, 0x1)}
location_table_pot_items = {}
location_table_sprite_items = {}
SNES_DISCONNECTED = 0
SNES_CONNECTING = 1
@@ -676,7 +689,7 @@ async def process_server_cmd(ctx : Context, cmd, args):
ctx.player_names = {p: n for p, n in args[1]}
msgs = []
if ctx.locations_checked:
msgs.append(['LocationChecks', [Regions.lookup_name_to_id[loc] for loc in ctx.locations_checked]])
msgs.append(['LocationChecks', [ctx.lookup_name_to_id[loc] for loc in ctx.locations_checked]])
if ctx.locations_scouted:
msgs.append(['LocationScouts', list(ctx.locations_scouted)])
if msgs:
@@ -689,7 +702,7 @@ async def process_server_cmd(ctx : Context, cmd, args):
elif start_index != len(ctx.items_received):
sync_msg = [['Sync']]
if ctx.locations_checked:
sync_msg.append(['LocationChecks', [Regions.lookup_name_to_id[loc] for loc in ctx.locations_checked]])
sync_msg.append(['LocationChecks', [ctx.lookup_name_to_id[loc] for loc in ctx.locations_checked]])
await send_msgs(ctx.socket, sync_msg)
if start_index == len(ctx.items_received):
for item in items:
@@ -701,7 +714,7 @@ async def process_server_cmd(ctx : Context, cmd, args):
if location not in ctx.locations_info:
replacements = {0xA2: 'Small Key', 0x9D: 'Big Key', 0x8D: 'Compass', 0x7D: 'Map'}
item_name = replacements.get(item, get_item_name_from_id(item))
logging.info(f"Saw {color(item_name, 'red', 'bold')} at {list(Regions.lookup_id_to_name.keys())[location - 1]}")
logging.info(f"Saw {color(item_name, 'red', 'bold')} at {list(ctx.lookup_id_to_name.keys())[location - 1]}")
ctx.locations_info[location] = (item, player)
ctx.watcher_event.set()
@@ -710,7 +723,8 @@ async def process_server_cmd(ctx : Context, cmd, args):
item = color(get_item_name_from_id(item), 'cyan' if player_sent != ctx.slot else 'green')
player_sent = color(ctx.player_names[player_sent], 'yellow' if player_sent != ctx.slot else 'magenta')
player_recvd = color(ctx.player_names[player_recvd], 'yellow' if player_recvd != ctx.slot else 'magenta')
logging.info('%s sent %s to %s (%s)' % (player_sent, item, player_recvd, get_location_name_from_address(location)))
location_name = get_location_name_from_address(ctx, location)
logging.info('%s sent %s to %s (%s)' % (player_sent, item, player_recvd, location_name))
if cmd == 'Print':
logging.info(args)
@@ -779,10 +793,10 @@ async def console_loop(ctx : Context):
for index, item in enumerate(ctx.items_received, 1):
logging.info('%s from %s (%s) (%d/%d in list)' % (
color(get_item_name_from_id(item.item), 'red', 'bold'), color(ctx.player_names[item.player], 'yellow'),
get_location_name_from_address(item.location), index, len(ctx.items_received)))
get_location_name_from_address(ctx, item.location), index, len(ctx.items_received)))
if command[0] == '/missing':
for location in [k for k, v in Regions.lookup_name_to_id.items()
for location in [k for k, v in ctx.lookup_name_to_id.items()
if type(v) is int and not filter_location(ctx, k)]:
if location not in ctx.locations_checked:
logging.info('Missing: ' + location)
@@ -804,15 +818,19 @@ def get_item_name_from_id(code):
return items[0] if items else f'Unknown item (ID:{code})'
def get_location_name_from_address(address):
def get_location_name_from_address(ctx, address):
if type(address) is str:
return address
return Regions.lookup_id_to_name.get(address, f'Unknown location (ID:{address})')
return ctx.lookup_id_to_name.get(address, f'Unknown location (ID:{address})')
def filter_location(ctx, location):
if not ctx.key_drop_mode and ('Key Drop' in location or 'Pot Key' in location):
if (not ctx.key_drop_mode and location in PotShuffle.key_drop_data
and PotShuffle.key_drop_data[location][0] == 'Drop'):
return True
if (not ctx.pottery_mode and location in PotShuffle.key_drop_data
and PotShuffle.key_drop_data[location][0] == 'Pot'):
return True
if not ctx.shop_mode and location in Regions.flat_normal_shops:
return True
@@ -821,6 +839,31 @@ def filter_location(ctx, location):
return False
def init_lookups(ctx):
ctx.lookup_id_to_name = {x: y for x, y in Regions.lookup_id_to_name.items()}
ctx.lookup_name_to_id = {x: y for x, y in Regions.lookup_name_to_id.items()}
for location, datum in PotShuffle.key_drop_data.items():
type = datum[0]
if type == 'Drop':
location_id, super_tile, sprite_index = datum[1]
location_table_sprite_items[location] = (2 * super_tile, 0x8000 >> sprite_index)
ctx.lookup_name_to_id[location] = location_id
ctx.lookup_id_to_name[location_id] = location
for super_tile, pot_list in PotShuffle.vanilla_pots.items():
for pot_index, pot in enumerate(pot_list):
if pot.item != PotItem.Hole:
if pot.item == PotItem.Key:
loc_name = next(loc for loc, datum in PotShuffle.key_drop_data.items()
if datum[1] == super_tile)
else:
descriptor = 'Large Block' if pot.flags & PotFlags.Block else f'Pot #{pot_index+1}'
loc_name = f'{pot.room} {descriptor}'
location_table_pot_items[loc_name] = (2 * super_tile, 0x8000 >> pot_index)
location_id = Regions.pot_address(pot_index, super_tile)
ctx.lookup_name_to_id[loc_name] = location_id
ctx.lookup_id_to_name[location_id] = loc_name
async def track_locations(ctx : Context, roomid, roomdata):
new_locations = []
@@ -832,9 +875,12 @@ async def track_locations(ctx : Context, roomid, roomdata):
if ctx.mode_flags is None:
flags = await snes_read(ctx, MODE_FLAGS, 1)
ctx.mode_flags = flags
ctx.key_drop_mode = flags[0] & 0x1
ctx.shop_mode = flags[0] & 0x2
ctx.retro_mode = flags[0] & 0x4
ctx.pottery_mode = flags[0] & 0x8
ctx.mystery_mode = flags[0] & 0x10
def new_check(location):
ctx.locations_checked.add(location)
@@ -842,8 +888,9 @@ async def track_locations(ctx : Context, roomid, roomdata):
if ignored:
ctx.ignore_count += 1
else:
logging.info(f"New check: {location} ({len(ctx.locations_checked)-ctx.ignore_count}/{ctx.total_locations})")
new_locations.append(Regions.lookup_name_to_id[location])
total = '???' if ctx.mystery_mode else ctx.total_locations
logging.info(f"New check: {location} ({len(ctx.locations_checked)-ctx.ignore_count}/{total})")
new_locations.append(ctx.lookup_name_to_id[location])
try:
if ctx.shop_mode or ctx.retro_mode:
@@ -908,6 +955,22 @@ async def track_locations(ctx : Context, roomid, roomdata):
if misc_data[offset - 0x3c6] & mask != 0 and location not in ctx.locations_checked:
new_check(location)
if not all([location in ctx.locations_checked for location in location_table_pot_items.keys()]):
pot_items_data = await snes_read(ctx, POT_ITEMS_SRAM_START, ITEM_SRAM_SIZE)
if pot_items_data is not None:
for location, (offset, mask) in location_table_pot_items.items():
pot_value = pot_items_data[offset] | (pot_items_data[offset + 1] << 8)
if pot_value & mask != 0 and location not in ctx.locations_checked:
new_check(location)
if not all([location in ctx.locations_checked for location in location_table_sprite_items.keys()]):
sprite_items_data = await snes_read(ctx, SPRITE_ITEMS_SRAM_START, ITEM_SRAM_SIZE)
if sprite_items_data is not None:
for location, (offset, mask) in location_table_sprite_items.items():
sprite_value = sprite_items_data[offset] | (sprite_items_data[offset + 1] << 8)
if sprite_value & mask != 0 and location not in ctx.locations_checked:
new_check(location)
await send_msgs(ctx.socket, [['LocationChecks', new_locations]])
async def game_watcher(ctx : Context):
@@ -955,7 +1018,7 @@ async def game_watcher(ctx : Context):
item = ctx.items_received[recv_index]
logging.info('Received %s from %s (%s) (%d/%d in list)' % (
color(get_item_name_from_id(item.item), 'red', 'bold'), color(ctx.player_names[item.player], 'yellow'),
get_location_name_from_address(item.location), recv_index + 1, len(ctx.items_received)))
get_location_name_from_address(ctx, item.location), recv_index + 1, len(ctx.items_received)))
recv_index += 1
snes_buffered_write(ctx, RECV_PROGRESS_ADDR, bytes([recv_index & 0xFF, (recv_index >> 8) & 0xFF]))
snes_buffered_write(ctx, RECV_ITEM_ADDR, bytes([item.item]))
@@ -969,7 +1032,7 @@ async def game_watcher(ctx : Context):
if scout_location > 0 and scout_location not in ctx.locations_scouted:
ctx.locations_scouted.add(scout_location)
logging.info(f'Scouting item at {list(Regions.lookup_id_to_name.keys())[scout_location - 1]}')
logging.info(f'Scouting item at {list(ctx.lookup_id_to_name.keys())[scout_location - 1]}')
await send_msgs(ctx.socket, [['LocationScouts', [scout_location]]])
await track_locations(ctx, roomid, roomdata)
@@ -984,6 +1047,7 @@ async def main():
logging.basicConfig(format='%(message)s', level=getattr(logging, args.loglevel.upper(), logging.INFO))
ctx = Context(args.snes, args.connect, args.password)
init_lookups(ctx)
input_task = asyncio.create_task(console_loop(ctx))