contrib: dedup deserialization routines in utxo-to-sqlite script

These routines already exist in the functional test framework
(with the advantage of having proper unit tests executed in CI), so we
can just import them from there instead of leaving duplicated code
in the repository.
This commit is contained in:
Sebastian Falbesoner 2025-03-21 17:59:45 +01:00
parent 8046759305
commit f535b2fe63

View file

@ -16,6 +16,11 @@ import sqlite3
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../test/functional'))
from test_framework.compressor import decompress_amount # noqa: E402
from test_framework.messages import deser_compact_size, deser_varint # noqa: E402
UTXO_DUMP_MAGIC = b'utxo\xff'
UTXO_DUMP_VERSION = 2
@ -28,53 +33,9 @@ NET_MAGIC_BYTES = {
}
def read_varint(f):
"""Equivalent of `ReadVarInt()` (see serialization module)."""
n = 0
while True:
dat = f.read(1)[0]
n = (n << 7) | (dat & 0x7f)
if (dat & 0x80) > 0:
n += 1
else:
return n
def read_compactsize(f):
"""Equivalent of `ReadCompactSize()` (see serialization module)."""
n = f.read(1)[0]
if n == 253:
n = int.from_bytes(f.read(2), "little")
elif n == 254:
n = int.from_bytes(f.read(4), "little")
elif n == 255:
n = int.from_bytes(f.read(8), "little")
return n
def decompress_amount(x):
"""Equivalent of `DecompressAmount()` (see compressor module)."""
if x == 0:
return 0
x -= 1
e = x % 10
x //= 10
n = 0
if e < 9:
d = (x % 9) + 1
x //= 9
n = x * 10 + d
else:
n = x + 1
while e > 0:
n *= 10
e -= 1
return n
def decompress_script(f):
"""Equivalent of `DecompressScript()` (see compressor module)."""
size = read_varint(f) # sizes 0-5 encode compressed script types
size = deser_varint(f) # sizes 0-5 encode compressed script types
if size == 0: # P2PKH
return bytes([0x76, 0xa9, 20]) + f.read(20) + bytes([0x88, 0xac])
elif size == 1: # P2SH
@ -154,13 +115,13 @@ def main():
# read key (COutPoint)
if coins_per_hash_left == 0: # read next prevout hash
prevout_hash = f.read(32)[::-1].hex()
coins_per_hash_left = read_compactsize(f)
prevout_index = read_compactsize(f)
coins_per_hash_left = deser_compact_size(f)
prevout_index = deser_compact_size(f)
# read value (Coin)
code = read_varint(f)
code = deser_varint(f)
height = code >> 1
is_coinbase = code & 1
amount = decompress_amount(read_varint(f))
amount = decompress_amount(deser_varint(f))
scriptpubkey = decompress_script(f).hex()
write_batch.append((prevout_hash, prevout_index, amount, is_coinbase, height, scriptpubkey))
if height > max_height: