BPS support

This commit is contained in:
aerinon
2022-02-17 14:00:00 -07:00
parent ccb7ced735
commit b024311aaa
14 changed files with 451 additions and 9 deletions

View File

@@ -2,8 +2,15 @@ import os
import time
import logging
try:
import bps.apply
import bps.io
except ImportError:
raise Exception('Could not load BPS module')
from Utils import output_path
from Rom import LocalRom, apply_rom_settings
from source.tools.BPS import bps_read_vlv
def adjust(args):
@@ -34,3 +41,37 @@ def adjust(args):
logger.debug('Total Time: %s', time.process_time() - start)
return args
def patch(args):
start = time.process_time()
logger = logging.getLogger('')
logger.info('Patching ROM.')
outfile_base = os.path.basename(args.patch)[:-4]
rom = LocalRom(args.baserom, False)
if os.path.isfile(args.baserom):
rom.verify_base_rom()
orig_buffer = rom.buffer.copy()
with open(args.patch, 'rb') as stream:
stream.seek(4) # skip BPS1
bps_read_vlv(stream) # skip source size
target_length = bps_read_vlv(stream)
rom.buffer.extend(bytearray([0x00] * (target_length - len(rom.buffer))))
stream.seek(0)
bps.apply.apply_to_bytearrays(bps.io.read_bps(stream), orig_buffer, rom.buffer)
if not hasattr(args, "sprite"):
args.sprite = None
apply_rom_settings(rom, args.heartbeep, args.heartcolor, args.quickswap, args.fastmenu, args.disablemusic,
args.sprite, args.ow_palettes, args.uw_palettes, args.reduce_flashing, args.shuffle_sfx)
output_path.cached_path = args.outputpath
rom.write_to_file(output_path('%s.sfc' % outfile_base))
logger.info('Done. Enjoy.')
logger.debug('Total Time: %s', time.process_time() - start)
return args

2
CLI.py
View File

@@ -208,9 +208,11 @@ def parse_settings():
"create_spoiler": True,
"calc_playthrough": True,
"create_rom": True,
"bps": False,
"usestartinventory": False,
"custom": False,
"rom": os.path.join(".", "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc"),
"patch": os.path.join(".", "Patch File.bps"),
"seed": "",
"count": 1,

2
Gui.py
View File

@@ -104,7 +104,7 @@ def guiMain(args=None):
self.pages["startinventory"] = ttk.Frame(self.notebook)
self.pages["custom"] = ttk.Frame(self.notebook)
self.notebook.add(self.pages["randomizer"], text='Randomize')
self.notebook.add(self.pages["adjust"], text='Adjust')
self.notebook.add(self.pages["adjust"], text='Adjust/Patch')
self.notebook.add(self.pages["startinventory"], text='Starting Inventory')
self.notebook.add(self.pages["custom"], text='Custom Item Pool')
self.notebook.pack()

11
Main.py
View File

@@ -29,7 +29,7 @@ from ItemList import generate_itempool, difficulties, fill_prizes, customize_sho
from Utils import output_path, parse_player_names
from source.item.FillUtil import create_item_pool_config, massage_item_pool, district_item_pool_config
from source.tools.BPS import create_bps_from_data
__version__ = '1.0.1.5-v'
@@ -308,7 +308,14 @@ def main(args, seed=None, fish=None):
if world.players > 1 or world.teams > 1:
outfilepname += f"_{world.player_names[player][team].replace(' ', '_')}" if world.player_names[player][team] != 'Player %d' % player else ''
outfilesuffix = f'_{Settings.make_code(world, player)}' if not args.outputname else ''
rom.write_to_file(output_path(f'{outfilebase}{outfilepname}{outfilesuffix}.sfc'))
if args.bps:
patchfile = output_path(f'{outfilebase}{outfilepname}{outfilesuffix}.bps')
patch = create_bps_from_data(LocalRom(args.rom, patch=False).buffer, rom.buffer)
with open(patchfile, 'wb') as stream:
stream.write(patch.binary_ba)
else:
sfc_file = output_path(f'{outfilebase}{outfilepname}{outfilesuffix}.sfc')
rom.write_to_file(sfc_file)
if world.players > 1:
multidata = zlib.compress(json.dumps({"names": parsed_names,

View File

@@ -149,6 +149,7 @@ Same as above but both small keys and bigs keys of the dungeon are not allowed o
* 1.0.1.5
* A couple new options for lighter pottery modes
* New option for Boss Shuffle: Unique (Prize bosses will be one of each, but GT bosses can be anything)
* Support for BPS patch creation and applying patches during adjustment
* Fix for Hera Basement Cage item inheriting last pot checked
* Update indicators on keysanity menu for overworld map option
* Fix for Standard ER where locations in rain state could be in logic

8
Rom.py
View File

@@ -119,6 +119,13 @@ class LocalRom(object):
ret.write_bytes(int(address), values)
return ret
def verify_base_rom(self):
# verify correct checksum of baserom
basemd5 = hashlib.md5()
basemd5.update(self.buffer)
if JAP10HASH != basemd5.hexdigest():
raise RuntimeError('Supplied Base Rom does not match known MD5 for JAP(1.0) release.')
def patch_base_rom(self):
# verify correct checksum of baserom
basemd5 = hashlib.md5()
@@ -165,7 +172,6 @@ class LocalRom(object):
with open(local_path('data/base2current.json'), 'w') as fp:
json.dump(patches, fp, separators=(',', ':'))
def write_crc(self):
crc = (sum(self.buffer[:0x7FDC] + self.buffer[0x7FE0:]) + 0x01FE) & 0xFFFF
inv = crc ^ 0xFFFF

View File

@@ -366,6 +366,9 @@
"jsonout": {
"action": "store_true"
},
"bps": {
"action": "store_true"
},
"enemizercli": {
"setting": "enemizercli"
},

View File

@@ -59,6 +59,7 @@
"help": {
"lang": [ "App Language, if available, defaults to English" ],
"create_spoiler": [ "Output a Spoiler File" ],
"bps": [ "Output BPS patches instead of ROMs"],
"logic": [
"Select Enforcement of Item Requirements. (default: %(default)s)",
"No Glitches: No Glitch knowledge required.",

View File

@@ -53,8 +53,8 @@
"randomizer.dungeon.compassshuffle": "Compasses",
"randomizer.dungeon.smallkeyshuffle": "Small Keys",
"randomizer.dungeon.bigkeyshuffle": "Big Keys",
"randomizer.dungeon.keydropshuffle": "Drop and Pot Keys",
"randomizer.dungeon.dropshuffle": "Shuffle Key Drops",
"randomizer.dungeon.keydropshuffle": "Key Drop Shuffle (Legacy)",
"randomizer.dungeon.dropshuffle": "Shuffle Enemy Key Drops",
"randomizer.dungeon.potshuffle": "Pot Shuffle (Legacy)",
"randomizer.dungeon.pottery": "Pottery",
"randomizer.dungeon.pottery.none": "None",
@@ -185,6 +185,7 @@
"randomizer.gameoptions.sprite.unchanged": "(unchanged)",
"randomizer.generation.bps": "Create BPS Patches",
"randomizer.generation.createspoiler": "Create Spoiler Log",
"randomizer.generation.createrom": "Create Patched ROM",
"randomizer.generation.calcplaythrough": "Calculate Playthrough",

View File

@@ -15,6 +15,7 @@
"none",
"simple",
"full",
"unique",
"random"
]
}

View File

@@ -1,5 +1,6 @@
{
"checkboxes": {
"bps": { "type": "checkbox" },
"createspoiler": { "type": "checkbox" },
"createrom": { "type": "checkbox" },
"calcplaythrough": { "type": "checkbox" },

View File

@@ -116,6 +116,7 @@ SETTINGSTOPROCESS = {
"shuffle_sfx": "shuffle_sfx",
},
"generation": {
"bps": "bps",
"createspoiler": "create_spoiler",
"createrom": "create_rom",
"calcplaythrough": "calc_playthrough",

View File

@@ -1,5 +1,5 @@
from tkinter import ttk, filedialog, messagebox, StringVar, Button, Entry, Frame, Label, E, W, LEFT, RIGHT, X, BOTTOM
from AdjusterMain import adjust
from AdjusterMain import adjust, patch
from argparse import Namespace
from source.classes.SpriteSelector import SpriteSelector
import source.gui.widgets as widgets
@@ -79,7 +79,9 @@ def adjust_page(top, parent, settings):
romEntry2 = Entry(adjustRomFrame, textvariable=self.romVar2)
def RomSelect2():
rom = filedialog.askopenfilename(filetypes=[("Rom Files", (".sfc", ".smc")), ("All Files", "*")])
initdir = os.path.join(os.getcwd(), settings['outputpath']) if 'outputpath' in settings else os.getcwd()
rom = filedialog.askopenfilename(filetypes=[("Rom Files", (".sfc", ".smc")), ("All Files", "*")],
initialdir=initdir)
if rom:
settings["rom"] = rom
self.romVar2.set(rom)
@@ -122,6 +124,57 @@ def adjust_page(top, parent, settings):
messagebox.showinfo(title="Success", message="Rom patched successfully")
adjustButton = Button(self.frames["bottomAdjustFrame"], text='Adjust Rom', command=adjustRom)
adjustButton.pack(side=BOTTOM, padx=(5, 0))
adjustButton.pack(padx=(5, 0))
patchFileFrame = Frame(self.frames["bottomAdjustFrame"])
patchFileLabel = Label(patchFileFrame, text='BPS Patch: ')
self.patchVar = StringVar(value=settings["patch"])
patchEntry = Entry(patchFileFrame, textvariable=self.patchVar)
def PatchSelect():
initdir = os.path.join(os.getcwd(), settings['outputpath']) if 'outputpath' in settings else os.getcwd()
file = filedialog.askopenfilename(filetypes=[("BPS Files", ".bps"), ("All Files", "*")], initialdir=initdir)
if file:
settings["patch"] = file
self.patchVar.set(file)
patchSelectButton = Button(patchFileFrame, text='Select Patch', command=PatchSelect)
patchFileLabel.pack(side=LEFT)
patchEntry.pack(side=LEFT, fill=X, expand=True)
patchSelectButton.pack(side=LEFT)
patchFileFrame.pack(fill=X)
def patchRom():
if output_path.cached_path is None:
output_path.cached_path = top.settings["outputpath"]
options = {
"heartbeep": "heartbeep",
"heartcolor": "heartcolor",
"menuspeed": "fastmenu",
"owpalettes": "ow_palettes",
"uwpalettes": "uw_palettes",
"quickswap": "quickswap",
"nobgm": "disablemusic",
"reduce_flashing": "reduce_flashing",
"shuffle_sfx": "shuffle_sfx",
}
guiargs = Namespace()
for option in options:
arg = options[option]
setattr(guiargs, arg, self.widgets[option].storageVar.get())
guiargs.patch = self.patchVar.get()
guiargs.baserom = top.pages["randomizer"].pages["generation"].widgets["rom"].storageVar.get()
guiargs.sprite = self.sprite
guiargs.outputpath = os.path.dirname(guiargs.patch)
try:
patch(args=guiargs)
except Exception as e:
logging.exception(e)
messagebox.showerror(title="Error while creating seed", message=str(e))
else:
messagebox.showinfo(title="Success", message="Rom patched successfully")
patchButton = Button(self.frames["bottomAdjustFrame"], text='Patch Rom', command=patchRom)
patchButton.pack(side=BOTTOM, padx=(5, 0))
return self,settings

324
source/tools/BPS.py Normal file
View File

@@ -0,0 +1,324 @@
# Code derived from https://github.com/marcrobledo/RomPatcher.js (MIT License)
import sys
from time import perf_counter
from collections import defaultdict
from binascii import crc32
try:
from fast_enum import FastEnum
except ImportError:
from enum import IntFlag as FastEnum
def bps_get_vlv_len(data):
length = 0
while True:
x = data & 0x7f
data >>= 7
if data == 0:
length += 1
break
length += 1
data -= 1
return length
def bps_read_vlv(stream):
data, shift = 0, 1
while True:
x = stream.read(1)[0]
data += (x & 0x7f) * shift
if x & 0x80:
return data
shift <<= 7
data += shift
class Bps:
def __init__(self):
self.source_size = 0
self.target_size = 0
self.metadata = ''
self.actions = []
self.source_checksum = 0
self.target_checksum = 0
self.patch_checksum = 0
self.binary_ba = bytearray()
self.offset = 0
def write_to_binary(self):
patch_size = 4
patch_size += bps_get_vlv_len(self.source_size)
patch_size += bps_get_vlv_len(self.target_size)
patch_size += bps_get_vlv_len(len(self.metadata))
patch_size += len(self.metadata)
for action in self.actions:
mode, length, data = action
patch_size += bps_get_vlv_len(((length - 1) << 2) + mode)
if mode == BpsMode.BPS_ACTION_TARGET_READ:
patch_size += length
elif mode == BpsMode.BPS_ACTION_SOURCE_COPY or mode == BpsMode.BPS_ACTION_TARGET_COPY:
patch_size += bps_get_vlv_len((abs(data) << 1) + (1 if data < 0 else 0))
patch_size += 12
self.binary_ba = bytearray(patch_size)
self.write_string('BPS1')
self.bps_write_vlv(self.source_size)
self.bps_write_vlv(self.target_size)
self.bps_write_vlv(len(self.metadata))
self.write_string(self.metadata)
for action in self.actions:
mode, length, data = action
self.bps_write_vlv(((length - 1) << 2) + mode)
if mode == BpsMode.BPS_ACTION_TARGET_READ:
self.write_bytes(data)
elif mode == BpsMode.BPS_ACTION_SOURCE_COPY or mode == BpsMode.BPS_ACTION_TARGET_COPY:
self.bps_write_vlv((abs(data) << 1) + (1 if data < 0 else 0))
self.write_u32(self.source_checksum)
self.write_u32(self.target_checksum)
self.write_u32(self.patch_checksum)
def write_string(self, string):
for ch in string:
self.binary_ba[self.offset] = ord(ch)
self.offset += 1
def write_byte(self, byte):
self.binary_ba[self.offset] = byte
self.offset += 1
def write_bytes(self, m_bytes):
for byte in m_bytes:
self.binary_ba[self.offset] = byte
self.offset += 1
def write_u32(self, data):
self.binary_ba[self.offset] = data & 0x000000ff
self.binary_ba[self.offset+1] = (data & 0x0000ff00) >> 8
self.binary_ba[self.offset+2] = (data & 0x00ff0000) >> 16
self.binary_ba[self.offset+3] = (data & 0xff000000) >> 24
self.offset += 4
def bps_write_vlv(self, data):
while True:
x = data & 0x7f
data >>= 7
if data == 0:
self.write_byte(0x80 | x)
break
self.write_byte(x)
data -= 1
class BpsMode(FastEnum):
BPS_ACTION_SOURCE_READ = 0
BPS_ACTION_TARGET_READ = 1
BPS_ACTION_SOURCE_COPY = 2
BPS_ACTION_TARGET_COPY = 3
def create_bps_from_data(original, modified):
patch = Bps()
patch.source_size = len(original)
patch.target_size = len(modified)
patch.actions = create_bps_linear(original, modified)
patch.source_checksum = crc32(original)
patch.target_checksum = crc32(modified)
patch.write_to_binary()
patch.patch_checksum = crc32(patch.binary_ba[:-4])
patch.offset = len(patch.binary_ba) - 4
patch.write_u32(patch.patch_checksum)
return patch
def create_bps_delta(original, modified):
patch_actions = []
source_data = original
target_data = modified
source_size = len(original)
target_size = len(modified)
source_relative_offset = 0
target_relative_offset = 0
output_offset = 0
source_tree = defaultdict(list)
source_tree_2 = defaultdict(list)
target_tree = defaultdict(list)
t1_start = perf_counter()
for offset in range(0, source_size):
symbol = source_data[offset]
if offset < source_size - 1:
symbol |= source_data[offset + 1] << 8
source_tree[symbol].append(offset)
print(f'Elasped Time 1: {perf_counter()-t1_start}')
source_array = list(source_data)
t2_start = perf_counter()
for offset in range(0, source_size):
symbol = source_array[offset]
if offset < source_size - 1:
symbol |= source_array[offset + 1] << 8
source_tree_2[symbol].append(offset)
print(f'Elasped Time 2: {perf_counter()-t2_start}')
trl = {'target_read_length': 0}
def target_read_flush(buffer):
if buffer['target_read_length']:
action = (BpsMode.BPS_ACTION_TARGET_READ, buffer['target_read_length'], [])
patch_actions.append(action)
offset = output_offset - buffer['target_read_length']
while buffer['target_read_length']:
action[2].append(target_data[offset])
offset += 1
buffer['target_read_length'] -= 1
while output_offset < target_size:
max_length, max_offset, mode = 0, 0, BpsMode.BPS_ACTION_TARGET_READ
symbol = target_data[output_offset]
if output_offset < target_size - 1:
symbol |= target_data[output_offset + 1] << 8
# source read
length, offset = 0, output_offset
while offset < source_size and offset < target_size and source_data[offset] == target_data[offset]:
length += 1
offset += 1
if length > max_length:
max_length, mode = length, BpsMode.BPS_ACTION_SOURCE_READ
# source copy
for node in source_tree[symbol]:
length, x, y = 0, node, output_offset
while x < source_size and y < target_size and source_data[x] == target_data[y]:
length += 1
x += 1
y += 1
if length > max_length:
max_length, max_offset, mode = length, node, BpsMode.BPS_ACTION_SOURCE_COPY
# target copy
for node in target_tree[symbol]:
length, x, y = 0, node, output_offset
while y < target_size and target_data[x] == target_data[y]:
length += 1
x += 1
y += 1
if length > max_length:
max_length, max_offset, mode = length, node, BpsMode.BPS_ACTION_TARGET_COPY
target_tree[symbol].append(output_offset)
# target read
if max_length < 4:
max_length = min(1, target_size - output_offset)
mode = BpsMode.BPS_ACTION_TARGET_READ
if mode != BpsMode.BPS_ACTION_TARGET_READ:
target_read_flush(trl)
if mode == BpsMode.BPS_ACTION_SOURCE_READ:
patch_actions.append((mode, max_length, None))
elif mode == BpsMode.BPS_ACTION_TARGET_READ:
trl['target_read_length'] += max_length
else:
if mode == BpsMode.BPS_ACTION_SOURCE_COPY:
relative_offset = max_offset - source_relative_offset
source_relative_offset = max_offset + max_length
else:
relative_offset = max_offset - target_relative_offset
target_relative_offset = max_offset + max_length
patch_actions.append((mode, max_length, relative_offset))
output_offset += max_length
target_read_flush(trl)
return patch_actions
def create_bps_linear(original, modified):
patch_actions = []
source_data = original
target_data = modified
source_size = len(original)
target_size = len(modified)
target_relative_offset = 0
output_offset = 0
trl = {'target_read_length': 0}
def target_read_flush(buffer):
if buffer['target_read_length']:
action = (BpsMode.BPS_ACTION_TARGET_READ, buffer['target_read_length'], [])
patch_actions.append(action)
offset = output_offset - buffer['target_read_length']
while buffer['target_read_length']:
action[2].append(target_data[offset])
offset += 1
buffer['target_read_length'] -= 1
eof = min(source_size, target_size)
while output_offset < target_size:
src_length, n = 0, 0
while output_offset + n < eof:
if source_data[output_offset + n] != target_data[output_offset + n]:
break
src_length += 1
n += 1
rle_length, n = 0, 1
while output_offset + n < target_size:
if target_data[output_offset] != target_data[output_offset + n]:
break
rle_length += 1
n += 1
if rle_length >= 4:
trl['target_read_length'] += 1
output_offset += 1
target_read_flush(trl)
relative_offset = (output_offset - 1) - target_relative_offset
patch_actions.append((BpsMode.BPS_ACTION_TARGET_COPY, rle_length, relative_offset))
output_offset += rle_length
target_relative_offset = output_offset - 1
elif src_length >= 4:
target_read_flush(trl)
patch_actions.append((BpsMode.BPS_ACTION_SOURCE_READ, src_length, None))
output_offset += src_length
else:
trl['target_read_length'] += 1
output_offset += 1
target_read_flush(trl)
return patch_actions
if __name__ == '__main__':
with open(sys.argv[1], 'rb') as source:
sourcedata = source.read()
with open(sys.argv[2], 'rb') as target:
targetdata = target.read()
patch = create_bps_from_data(sourcedata, targetdata)
with open(sys.argv[3], 'wb') as patchfile:
patchfile.write(patch.binary_ba)