Files
alttpr-python/source/tools/BPS.py
2022-02-17 14:00:00 -07:00

325 lines
10 KiB
Python

# Code derived from https://github.com/marcrobledo/RomPatcher.js (MIT License)
import sys
from time import perf_counter
from collections import defaultdict
from binascii import crc32
try:
from fast_enum import FastEnum
except ImportError:
from enum import IntFlag as FastEnum
def bps_get_vlv_len(data):
length = 0
while True:
x = data & 0x7f
data >>= 7
if data == 0:
length += 1
break
length += 1
data -= 1
return length
def bps_read_vlv(stream):
data, shift = 0, 1
while True:
x = stream.read(1)[0]
data += (x & 0x7f) * shift
if x & 0x80:
return data
shift <<= 7
data += shift
class Bps:
def __init__(self):
self.source_size = 0
self.target_size = 0
self.metadata = ''
self.actions = []
self.source_checksum = 0
self.target_checksum = 0
self.patch_checksum = 0
self.binary_ba = bytearray()
self.offset = 0
def write_to_binary(self):
patch_size = 4
patch_size += bps_get_vlv_len(self.source_size)
patch_size += bps_get_vlv_len(self.target_size)
patch_size += bps_get_vlv_len(len(self.metadata))
patch_size += len(self.metadata)
for action in self.actions:
mode, length, data = action
patch_size += bps_get_vlv_len(((length - 1) << 2) + mode)
if mode == BpsMode.BPS_ACTION_TARGET_READ:
patch_size += length
elif mode == BpsMode.BPS_ACTION_SOURCE_COPY or mode == BpsMode.BPS_ACTION_TARGET_COPY:
patch_size += bps_get_vlv_len((abs(data) << 1) + (1 if data < 0 else 0))
patch_size += 12
self.binary_ba = bytearray(patch_size)
self.write_string('BPS1')
self.bps_write_vlv(self.source_size)
self.bps_write_vlv(self.target_size)
self.bps_write_vlv(len(self.metadata))
self.write_string(self.metadata)
for action in self.actions:
mode, length, data = action
self.bps_write_vlv(((length - 1) << 2) + mode)
if mode == BpsMode.BPS_ACTION_TARGET_READ:
self.write_bytes(data)
elif mode == BpsMode.BPS_ACTION_SOURCE_COPY or mode == BpsMode.BPS_ACTION_TARGET_COPY:
self.bps_write_vlv((abs(data) << 1) + (1 if data < 0 else 0))
self.write_u32(self.source_checksum)
self.write_u32(self.target_checksum)
self.write_u32(self.patch_checksum)
def write_string(self, string):
for ch in string:
self.binary_ba[self.offset] = ord(ch)
self.offset += 1
def write_byte(self, byte):
self.binary_ba[self.offset] = byte
self.offset += 1
def write_bytes(self, m_bytes):
for byte in m_bytes:
self.binary_ba[self.offset] = byte
self.offset += 1
def write_u32(self, data):
self.binary_ba[self.offset] = data & 0x000000ff
self.binary_ba[self.offset+1] = (data & 0x0000ff00) >> 8
self.binary_ba[self.offset+2] = (data & 0x00ff0000) >> 16
self.binary_ba[self.offset+3] = (data & 0xff000000) >> 24
self.offset += 4
def bps_write_vlv(self, data):
while True:
x = data & 0x7f
data >>= 7
if data == 0:
self.write_byte(0x80 | x)
break
self.write_byte(x)
data -= 1
class BpsMode(FastEnum):
BPS_ACTION_SOURCE_READ = 0
BPS_ACTION_TARGET_READ = 1
BPS_ACTION_SOURCE_COPY = 2
BPS_ACTION_TARGET_COPY = 3
def create_bps_from_data(original, modified):
patch = Bps()
patch.source_size = len(original)
patch.target_size = len(modified)
patch.actions = create_bps_linear(original, modified)
patch.source_checksum = crc32(original)
patch.target_checksum = crc32(modified)
patch.write_to_binary()
patch.patch_checksum = crc32(patch.binary_ba[:-4])
patch.offset = len(patch.binary_ba) - 4
patch.write_u32(patch.patch_checksum)
return patch
def create_bps_delta(original, modified):
patch_actions = []
source_data = original
target_data = modified
source_size = len(original)
target_size = len(modified)
source_relative_offset = 0
target_relative_offset = 0
output_offset = 0
source_tree = defaultdict(list)
source_tree_2 = defaultdict(list)
target_tree = defaultdict(list)
t1_start = perf_counter()
for offset in range(0, source_size):
symbol = source_data[offset]
if offset < source_size - 1:
symbol |= source_data[offset + 1] << 8
source_tree[symbol].append(offset)
print(f'Elasped Time 1: {perf_counter()-t1_start}')
source_array = list(source_data)
t2_start = perf_counter()
for offset in range(0, source_size):
symbol = source_array[offset]
if offset < source_size - 1:
symbol |= source_array[offset + 1] << 8
source_tree_2[symbol].append(offset)
print(f'Elasped Time 2: {perf_counter()-t2_start}')
trl = {'target_read_length': 0}
def target_read_flush(buffer):
if buffer['target_read_length']:
action = (BpsMode.BPS_ACTION_TARGET_READ, buffer['target_read_length'], [])
patch_actions.append(action)
offset = output_offset - buffer['target_read_length']
while buffer['target_read_length']:
action[2].append(target_data[offset])
offset += 1
buffer['target_read_length'] -= 1
while output_offset < target_size:
max_length, max_offset, mode = 0, 0, BpsMode.BPS_ACTION_TARGET_READ
symbol = target_data[output_offset]
if output_offset < target_size - 1:
symbol |= target_data[output_offset + 1] << 8
# source read
length, offset = 0, output_offset
while offset < source_size and offset < target_size and source_data[offset] == target_data[offset]:
length += 1
offset += 1
if length > max_length:
max_length, mode = length, BpsMode.BPS_ACTION_SOURCE_READ
# source copy
for node in source_tree[symbol]:
length, x, y = 0, node, output_offset
while x < source_size and y < target_size and source_data[x] == target_data[y]:
length += 1
x += 1
y += 1
if length > max_length:
max_length, max_offset, mode = length, node, BpsMode.BPS_ACTION_SOURCE_COPY
# target copy
for node in target_tree[symbol]:
length, x, y = 0, node, output_offset
while y < target_size and target_data[x] == target_data[y]:
length += 1
x += 1
y += 1
if length > max_length:
max_length, max_offset, mode = length, node, BpsMode.BPS_ACTION_TARGET_COPY
target_tree[symbol].append(output_offset)
# target read
if max_length < 4:
max_length = min(1, target_size - output_offset)
mode = BpsMode.BPS_ACTION_TARGET_READ
if mode != BpsMode.BPS_ACTION_TARGET_READ:
target_read_flush(trl)
if mode == BpsMode.BPS_ACTION_SOURCE_READ:
patch_actions.append((mode, max_length, None))
elif mode == BpsMode.BPS_ACTION_TARGET_READ:
trl['target_read_length'] += max_length
else:
if mode == BpsMode.BPS_ACTION_SOURCE_COPY:
relative_offset = max_offset - source_relative_offset
source_relative_offset = max_offset + max_length
else:
relative_offset = max_offset - target_relative_offset
target_relative_offset = max_offset + max_length
patch_actions.append((mode, max_length, relative_offset))
output_offset += max_length
target_read_flush(trl)
return patch_actions
def create_bps_linear(original, modified):
patch_actions = []
source_data = original
target_data = modified
source_size = len(original)
target_size = len(modified)
target_relative_offset = 0
output_offset = 0
trl = {'target_read_length': 0}
def target_read_flush(buffer):
if buffer['target_read_length']:
action = (BpsMode.BPS_ACTION_TARGET_READ, buffer['target_read_length'], [])
patch_actions.append(action)
offset = output_offset - buffer['target_read_length']
while buffer['target_read_length']:
action[2].append(target_data[offset])
offset += 1
buffer['target_read_length'] -= 1
eof = min(source_size, target_size)
while output_offset < target_size:
src_length, n = 0, 0
while output_offset + n < eof:
if source_data[output_offset + n] != target_data[output_offset + n]:
break
src_length += 1
n += 1
rle_length, n = 0, 1
while output_offset + n < target_size:
if target_data[output_offset] != target_data[output_offset + n]:
break
rle_length += 1
n += 1
if rle_length >= 4:
trl['target_read_length'] += 1
output_offset += 1
target_read_flush(trl)
relative_offset = (output_offset - 1) - target_relative_offset
patch_actions.append((BpsMode.BPS_ACTION_TARGET_COPY, rle_length, relative_offset))
output_offset += rle_length
target_relative_offset = output_offset - 1
elif src_length >= 4:
target_read_flush(trl)
patch_actions.append((BpsMode.BPS_ACTION_SOURCE_READ, src_length, None))
output_offset += src_length
else:
trl['target_read_length'] += 1
output_offset += 1
target_read_flush(trl)
return patch_actions
if __name__ == '__main__':
with open(sys.argv[1], 'rb') as source:
sourcedata = source.read()
with open(sys.argv[2], 'rb') as target:
targetdata = target.read()
patch = create_bps_from_data(sourcedata, targetdata)
with open(sys.argv[3], 'wb') as patchfile:
patchfile.write(patch.binary_ba)