bitcoin/test/functional/wallet_migration.py
Andrew Chow ccc72fecd7
wallet: Be able to unlock the wallet for migration
Since migration reloads the wallet, the wallet will always be locked
unless the passphrase is given. migratewallet can now take the
passphrase in order to unlock the wallet for migration.

Github-Pull: #26595
Rebased-From: 7fd125b27d
2023-02-27 14:14:14 +00:00

423 lines
20 KiB
Python
Executable file

#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test Migrating a wallet from legacy to descriptor."""
import os
import random
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
find_vout_for_address,
)
from test_framework.wallet_util import (
get_generate_key,
)
class WalletMigrationTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [[]]
self.supports_cli = False
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
self.skip_if_no_sqlite()
self.skip_if_no_bdb()
def assert_is_sqlite(self, wallet_name):
wallet_file_path = os.path.join(self.nodes[0].datadir, "regtest/wallets", wallet_name, self.wallet_data_filename)
with open(wallet_file_path, 'rb') as f:
file_magic = f.read(16)
assert_equal(file_magic, b'SQLite format 3\x00')
assert_equal(self.nodes[0].get_wallet_rpc(wallet_name).getwalletinfo()["format"], "sqlite")
def create_legacy_wallet(self, wallet_name):
self.nodes[0].createwallet(wallet_name=wallet_name)
wallet = self.nodes[0].get_wallet_rpc(wallet_name)
assert_equal(wallet.getwalletinfo()["descriptors"], False)
assert_equal(wallet.getwalletinfo()["format"], "bdb")
return wallet
def assert_addr_info_equal(self, addr_info, addr_info_old):
assert_equal(addr_info["address"], addr_info_old["address"])
assert_equal(addr_info["scriptPubKey"], addr_info_old["scriptPubKey"])
assert_equal(addr_info["ismine"], addr_info_old["ismine"])
assert_equal(addr_info["hdkeypath"], addr_info_old["hdkeypath"])
assert_equal(addr_info["solvable"], addr_info_old["solvable"])
assert_equal(addr_info["ischange"], addr_info_old["ischange"])
assert_equal(addr_info["hdmasterfingerprint"], addr_info_old["hdmasterfingerprint"])
def assert_list_txs_equal(self, received_list_txs, expected_list_txs):
for d in received_list_txs:
if "parent_descs" in d:
del d["parent_descs"]
for d in expected_list_txs:
if "parent_descs" in d:
del d["parent_descs"]
assert_equal(received_list_txs, expected_list_txs)
def test_basic(self):
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.log.info("Test migration of a basic keys only wallet without balance")
basic0 = self.create_legacy_wallet("basic0")
addr = basic0.getnewaddress()
change = basic0.getrawchangeaddress()
old_addr_info = basic0.getaddressinfo(addr)
old_change_addr_info = basic0.getaddressinfo(change)
assert_equal(old_addr_info["ismine"], True)
assert_equal(old_addr_info["hdkeypath"], "m/0'/0'/0'")
assert_equal(old_change_addr_info["ismine"], True)
assert_equal(old_change_addr_info["hdkeypath"], "m/0'/1'/0'")
# Note: migration could take a while.
basic0.migratewallet()
# Verify created descriptors
assert_equal(basic0.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("basic0")
# The wallet should create the following descriptors:
# * BIP32 descriptors in the form of "0'/0'/*" and "0'/1'/*" (2 descriptors)
# * BIP44 descriptors in the form of "44'/1'/0'/0/*" and "44'/1'/0'/1/*" (2 descriptors)
# * BIP49 descriptors, P2SH(P2WPKH), in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors)
# * BIP84 descriptors, P2WPKH, in the form of "84'/1'/0'/1/*" and "84'/1'/0'/1/*" (2 descriptors)
# * BIP86 descriptors, P2TR, in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors)
# * A combo(PK) descriptor for the wallet master key.
# So, should have a total of 11 descriptors on it.
assert_equal(len(basic0.listdescriptors()["descriptors"]), 11)
# Compare addresses info
addr_info = basic0.getaddressinfo(addr)
change_addr_info = basic0.getaddressinfo(change)
self.assert_addr_info_equal(addr_info, old_addr_info)
self.assert_addr_info_equal(change_addr_info, old_change_addr_info)
addr_info = basic0.getaddressinfo(basic0.getnewaddress("", "bech32"))
assert_equal(addr_info["hdkeypath"], "m/84'/1'/0'/0/0")
self.log.info("Test migration of a basic keys only wallet with a balance")
basic1 = self.create_legacy_wallet("basic1")
for _ in range(0, 10):
default.sendtoaddress(basic1.getnewaddress(), 1)
self.generate(self.nodes[0], 1)
for _ in range(0, 5):
basic1.sendtoaddress(default.getnewaddress(), 0.5)
self.generate(self.nodes[0], 1)
bal = basic1.getbalance()
txs = basic1.listtransactions()
basic1.migratewallet()
assert_equal(basic1.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("basic1")
assert_equal(basic1.getbalance(), bal)
self.assert_list_txs_equal(basic1.listtransactions(), txs)
# restart node and verify that everything is still there
self.restart_node(0)
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].loadwallet("basic1")
basic1 = self.nodes[0].get_wallet_rpc("basic1")
assert_equal(basic1.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("basic1")
assert_equal(basic1.getbalance(), bal)
self.assert_list_txs_equal(basic1.listtransactions(), txs)
self.log.info("Test migration of a wallet with balance received on the seed")
basic2 = self.create_legacy_wallet("basic2")
basic2_seed = get_generate_key()
basic2.sethdseed(True, basic2_seed.privkey)
assert_equal(basic2.getbalance(), 0)
# Receive coins on different output types for the same seed
basic2_balance = 0
for addr in [basic2_seed.p2pkh_addr, basic2_seed.p2wpkh_addr, basic2_seed.p2sh_p2wpkh_addr]:
send_value = random.randint(1, 4)
default.sendtoaddress(addr, send_value)
basic2_balance += send_value
self.generate(self.nodes[0], 1)
assert_equal(basic2.getbalance(), basic2_balance)
basic2_txs = basic2.listtransactions()
# Now migrate and test that we still see have the same balance/transactions
basic2.migratewallet()
assert_equal(basic2.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("basic2")
assert_equal(basic2.getbalance(), basic2_balance)
self.assert_list_txs_equal(basic2.listtransactions(), basic2_txs)
def test_multisig(self):
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
# Contrived case where all the multisig keys are in a single wallet
self.log.info("Test migration of a wallet with all keys for a multisig")
multisig0 = self.create_legacy_wallet("multisig0")
addr1 = multisig0.getnewaddress()
addr2 = multisig0.getnewaddress()
addr3 = multisig0.getnewaddress()
ms_info = multisig0.addmultisigaddress(2, [addr1, addr2, addr3])
multisig0.migratewallet()
assert_equal(multisig0.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("multisig0")
ms_addr_info = multisig0.getaddressinfo(ms_info["address"])
assert_equal(ms_addr_info["ismine"], True)
assert_equal(ms_addr_info["desc"], ms_info["descriptor"])
assert_equal("multisig0_watchonly" in self.nodes[0].listwallets(), False)
assert_equal("multisig0_solvables" in self.nodes[0].listwallets(), False)
pub1 = multisig0.getaddressinfo(addr1)["pubkey"]
pub2 = multisig0.getaddressinfo(addr2)["pubkey"]
# Some keys in multisig do not belong to this wallet
self.log.info("Test migration of a wallet that has some keys in a multisig")
self.nodes[0].createwallet(wallet_name="multisig1")
multisig1 = self.nodes[0].get_wallet_rpc("multisig1")
ms_info = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
ms_info2 = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
assert_equal(multisig1.getwalletinfo()["descriptors"], False)
addr1 = ms_info["address"]
addr2 = ms_info2["address"]
txid = default.sendtoaddress(addr1, 10)
multisig1.importaddress(addr1)
assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], True)
assert_equal(multisig1.getaddressinfo(addr1)["solvable"], True)
self.generate(self.nodes[0], 1)
multisig1.gettransaction(txid)
assert_equal(multisig1.getbalances()["watchonly"]["trusted"], 10)
assert_equal(multisig1.getaddressinfo(addr2)["ismine"], False)
assert_equal(multisig1.getaddressinfo(addr2)["iswatchonly"], False)
assert_equal(multisig1.getaddressinfo(addr2)["solvable"], True)
# Migrating multisig1 should see the multisig is no longer part of multisig1
# A new wallet multisig1_watchonly is created which has the multisig address
# Transaction to multisig is in multisig1_watchonly and not multisig1
multisig1.migratewallet()
assert_equal(multisig1.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("multisig1")
assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], False)
assert_equal(multisig1.getaddressinfo(addr1)["solvable"], False)
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", multisig1.gettransaction, txid)
assert_equal(multisig1.getbalance(), 0)
assert_equal(multisig1.listtransactions(), [])
assert_equal("multisig1_watchonly" in self.nodes[0].listwallets(), True)
ms1_watchonly = self.nodes[0].get_wallet_rpc("multisig1_watchonly")
ms1_wallet_info = ms1_watchonly.getwalletinfo()
assert_equal(ms1_wallet_info['descriptors'], True)
assert_equal(ms1_wallet_info['private_keys_enabled'], False)
self.assert_is_sqlite("multisig1_watchonly")
assert_equal(ms1_watchonly.getaddressinfo(addr1)["ismine"], True)
assert_equal(ms1_watchonly.getaddressinfo(addr1)["solvable"], True)
# Because addr2 was not being watched, it isn't in multisig1_watchonly but rather multisig1_solvables
assert_equal(ms1_watchonly.getaddressinfo(addr2)["ismine"], False)
assert_equal(ms1_watchonly.getaddressinfo(addr2)["solvable"], False)
ms1_watchonly.gettransaction(txid)
assert_equal(ms1_watchonly.getbalance(), 10)
# Migrating multisig1 should see the second multisig is no longer part of multisig1
# A new wallet multisig1_solvables is created which has the second address
# This should have no transactions
assert_equal("multisig1_solvables" in self.nodes[0].listwallets(), True)
ms1_solvable = self.nodes[0].get_wallet_rpc("multisig1_solvables")
ms1_wallet_info = ms1_solvable.getwalletinfo()
assert_equal(ms1_wallet_info['descriptors'], True)
assert_equal(ms1_wallet_info['private_keys_enabled'], False)
self.assert_is_sqlite("multisig1_solvables")
assert_equal(ms1_solvable.getaddressinfo(addr1)["ismine"], False)
assert_equal(ms1_solvable.getaddressinfo(addr1)["solvable"], False)
assert_equal(ms1_solvable.getaddressinfo(addr2)["ismine"], True)
assert_equal(ms1_solvable.getaddressinfo(addr2)["solvable"], True)
assert_equal(ms1_solvable.getbalance(), 0)
assert_equal(ms1_solvable.listtransactions(), [])
def test_other_watchonly(self):
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
# Wallet with an imported address. Should be the same thing as the multisig test
self.log.info("Test migration of a wallet with watchonly imports")
self.nodes[0].createwallet(wallet_name="imports0")
imports0 = self.nodes[0].get_wallet_rpc("imports0")
assert_equal(imports0.getwalletinfo()["descriptors"], False)
# External address label
imports0.setlabel(default.getnewaddress(), "external")
# Normal non-watchonly tx
received_addr = imports0.getnewaddress()
imports0.setlabel(received_addr, "Receiving")
received_txid = default.sendtoaddress(received_addr, 10)
# Watchonly tx
import_addr = default.getnewaddress()
imports0.importaddress(import_addr)
imports0.setlabel(import_addr, "imported")
received_watchonly_txid = default.sendtoaddress(import_addr, 10)
# Received watchonly tx that is then spent
import_sent_addr = default.getnewaddress()
imports0.importaddress(import_sent_addr)
received_sent_watchonly_txid = default.sendtoaddress(import_sent_addr, 10)
received_sent_watchonly_vout = find_vout_for_address(self.nodes[0], received_sent_watchonly_txid, import_sent_addr)
send = default.sendall(recipients=[default.getnewaddress()], options={"inputs": [{"txid": received_sent_watchonly_txid, "vout": received_sent_watchonly_vout}]})
sent_watchonly_txid = send["txid"]
self.generate(self.nodes[0], 1)
balances = imports0.getbalances()
spendable_bal = balances["mine"]["trusted"]
watchonly_bal = balances["watchonly"]["trusted"]
assert_equal(len(imports0.listtransactions(include_watchonly=True)), 4)
# Migrate
imports0.migratewallet()
assert_equal(imports0.getwalletinfo()["descriptors"], True)
self.assert_is_sqlite("imports0")
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_watchonly_txid)
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_sent_watchonly_txid)
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, sent_watchonly_txid)
assert_equal(len(imports0.listtransactions(include_watchonly=True)), 1)
imports0.gettransaction(received_txid)
assert_equal(imports0.getbalance(), spendable_bal)
assert_equal("imports0_watchonly" in self.nodes[0].listwallets(), True)
watchonly = self.nodes[0].get_wallet_rpc("imports0_watchonly")
watchonly_info = watchonly.getwalletinfo()
assert_equal(watchonly_info["descriptors"], True)
self.assert_is_sqlite("imports0_watchonly")
assert_equal(watchonly_info["private_keys_enabled"], False)
watchonly.gettransaction(received_watchonly_txid)
watchonly.gettransaction(received_sent_watchonly_txid)
watchonly.gettransaction(sent_watchonly_txid)
assert_equal(watchonly.getbalance(), watchonly_bal)
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", watchonly.gettransaction, received_txid)
assert_equal(len(watchonly.listtransactions(include_watchonly=True)), 3)
# Check that labels were migrated and persisted to watchonly wallet
self.nodes[0].unloadwallet("imports0_watchonly")
self.nodes[0].loadwallet("imports0_watchonly")
labels = watchonly.listlabels()
assert "external" in labels
assert "imported" in labels
def test_no_privkeys(self):
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
# Migrating an actual watchonly wallet should not create a new watchonly wallet
self.log.info("Test migration of a pure watchonly wallet")
self.nodes[0].createwallet(wallet_name="watchonly0", disable_private_keys=True)
watchonly0 = self.nodes[0].get_wallet_rpc("watchonly0")
info = watchonly0.getwalletinfo()
assert_equal(info["descriptors"], False)
assert_equal(info["private_keys_enabled"], False)
addr = default.getnewaddress()
desc = default.getaddressinfo(addr)["desc"]
res = watchonly0.importmulti([
{
"desc": desc,
"watchonly": True,
"timestamp": "now",
}])
assert_equal(res[0]['success'], True)
default.sendtoaddress(addr, 10)
self.generate(self.nodes[0], 1)
watchonly0.migratewallet()
assert_equal("watchonly0_watchonly" in self.nodes[0].listwallets(), False)
info = watchonly0.getwalletinfo()
assert_equal(info["descriptors"], True)
assert_equal(info["private_keys_enabled"], False)
self.assert_is_sqlite("watchonly0")
# Migrating a wallet with pubkeys added to the keypool
self.log.info("Test migration of a pure watchonly wallet with pubkeys in keypool")
self.nodes[0].createwallet(wallet_name="watchonly1", disable_private_keys=True)
watchonly1 = self.nodes[0].get_wallet_rpc("watchonly1")
info = watchonly1.getwalletinfo()
assert_equal(info["descriptors"], False)
assert_equal(info["private_keys_enabled"], False)
addr1 = default.getnewaddress(address_type="bech32")
addr2 = default.getnewaddress(address_type="bech32")
desc1 = default.getaddressinfo(addr1)["desc"]
desc2 = default.getaddressinfo(addr2)["desc"]
res = watchonly1.importmulti([
{
"desc": desc1,
"keypool": True,
"timestamp": "now",
},
{
"desc": desc2,
"keypool": True,
"timestamp": "now",
}
])
assert_equal(res[0]["success"], True)
assert_equal(res[1]["success"], True)
# Before migrating, we can fetch addr1 from the keypool
assert_equal(watchonly1.getnewaddress(address_type="bech32"), addr1)
watchonly1.migratewallet()
info = watchonly1.getwalletinfo()
assert_equal(info["descriptors"], True)
assert_equal(info["private_keys_enabled"], False)
self.assert_is_sqlite("watchonly1")
# After migrating, the "keypool" is empty
assert_raises_rpc_error(-4, "Error: This wallet has no available keys", watchonly1.getnewaddress)
def test_pk_coinbases(self):
self.log.info("Test migration of a wallet using old pk() coinbases")
wallet = self.create_legacy_wallet("pkcb")
addr = wallet.getnewaddress()
addr_info = wallet.getaddressinfo(addr)
desc = descsum_create("pk(" + addr_info["pubkey"] + ")")
self.nodes[0].generatetodescriptor(1, desc, invalid_call=False)
bals = wallet.getbalances()
wallet.migratewallet()
assert_equal(bals, wallet.getbalances())
def test_encrypted(self):
self.log.info("Test migration of an encrypted wallet")
wallet = self.create_legacy_wallet("encrypted")
wallet.encryptwallet("pass")
# TODO: Fix migratewallet so that we can actually migrate encrypted wallets
def run_test(self):
self.generate(self.nodes[0], 101)
# TODO: Test the actual records in the wallet for these tests too. The behavior may be correct, but the data written may not be what we actually want
self.test_basic()
self.test_multisig()
self.test_other_watchonly()
self.test_no_privkeys()
self.test_pk_coinbases()
self.test_encrypted()
if __name__ == '__main__':
WalletMigrationTest().main()