mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-02 14:37:42 -03:00
d464b2af30
Due to an oversight, we cannot currently migrate encrypted wallets,
regardless of whether they are unlocked. Migrating such wallets will
trigger an error, and result in the cleanup being run. This conveniently
allows us to check some parts of the cleanup code.
Github-Pull: #26594
Rebased-From: 88afc73ae0
418 lines
19 KiB
Python
Executable file
418 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# Copyright (c) 2020 The Bitcoin Core developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
"""Test Migrating a wallet from legacy to descriptor."""
|
|
|
|
import os
|
|
import random
|
|
from test_framework.descriptors import descsum_create
|
|
from test_framework.test_framework import BitcoinTestFramework
|
|
from test_framework.util import (
|
|
assert_equal,
|
|
assert_raises_rpc_error,
|
|
find_vout_for_address,
|
|
)
|
|
from test_framework.wallet_util import (
|
|
get_generate_key,
|
|
)
|
|
|
|
|
|
class WalletMigrationTest(BitcoinTestFramework):
|
|
def set_test_params(self):
|
|
self.setup_clean_chain = True
|
|
self.num_nodes = 1
|
|
self.extra_args = [[]]
|
|
self.supports_cli = False
|
|
|
|
def skip_test_if_missing_module(self):
|
|
self.skip_if_no_wallet()
|
|
self.skip_if_no_sqlite()
|
|
self.skip_if_no_bdb()
|
|
|
|
def assert_is_sqlite(self, wallet_name):
|
|
wallet_file_path = os.path.join(self.nodes[0].datadir, "regtest/wallets", wallet_name, self.wallet_data_filename)
|
|
with open(wallet_file_path, 'rb') as f:
|
|
file_magic = f.read(16)
|
|
assert_equal(file_magic, b'SQLite format 3\x00')
|
|
assert_equal(self.nodes[0].get_wallet_rpc(wallet_name).getwalletinfo()["format"], "sqlite")
|
|
|
|
def create_legacy_wallet(self, wallet_name):
|
|
self.nodes[0].createwallet(wallet_name=wallet_name)
|
|
wallet = self.nodes[0].get_wallet_rpc(wallet_name)
|
|
assert_equal(wallet.getwalletinfo()["descriptors"], False)
|
|
assert_equal(wallet.getwalletinfo()["format"], "bdb")
|
|
return wallet
|
|
|
|
def assert_addr_info_equal(self, addr_info, addr_info_old):
|
|
assert_equal(addr_info["address"], addr_info_old["address"])
|
|
assert_equal(addr_info["scriptPubKey"], addr_info_old["scriptPubKey"])
|
|
assert_equal(addr_info["ismine"], addr_info_old["ismine"])
|
|
assert_equal(addr_info["hdkeypath"], addr_info_old["hdkeypath"])
|
|
assert_equal(addr_info["solvable"], addr_info_old["solvable"])
|
|
assert_equal(addr_info["ischange"], addr_info_old["ischange"])
|
|
assert_equal(addr_info["hdmasterfingerprint"], addr_info_old["hdmasterfingerprint"])
|
|
|
|
def assert_list_txs_equal(self, received_list_txs, expected_list_txs):
|
|
for d in received_list_txs:
|
|
if "parent_descs" in d:
|
|
del d["parent_descs"]
|
|
for d in expected_list_txs:
|
|
if "parent_descs" in d:
|
|
del d["parent_descs"]
|
|
assert_equal(received_list_txs, expected_list_txs)
|
|
|
|
def test_basic(self):
|
|
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
|
|
|
|
self.log.info("Test migration of a basic keys only wallet without balance")
|
|
basic0 = self.create_legacy_wallet("basic0")
|
|
|
|
addr = basic0.getnewaddress()
|
|
change = basic0.getrawchangeaddress()
|
|
|
|
old_addr_info = basic0.getaddressinfo(addr)
|
|
old_change_addr_info = basic0.getaddressinfo(change)
|
|
assert_equal(old_addr_info["ismine"], True)
|
|
assert_equal(old_addr_info["hdkeypath"], "m/0'/0'/0'")
|
|
assert_equal(old_change_addr_info["ismine"], True)
|
|
assert_equal(old_change_addr_info["hdkeypath"], "m/0'/1'/0'")
|
|
|
|
# Note: migration could take a while.
|
|
basic0.migratewallet()
|
|
|
|
# Verify created descriptors
|
|
assert_equal(basic0.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("basic0")
|
|
|
|
# The wallet should create the following descriptors:
|
|
# * BIP32 descriptors in the form of "0'/0'/*" and "0'/1'/*" (2 descriptors)
|
|
# * BIP44 descriptors in the form of "44'/1'/0'/0/*" and "44'/1'/0'/1/*" (2 descriptors)
|
|
# * BIP49 descriptors, P2SH(P2WPKH), in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors)
|
|
# * BIP84 descriptors, P2WPKH, in the form of "84'/1'/0'/1/*" and "84'/1'/0'/1/*" (2 descriptors)
|
|
# * BIP86 descriptors, P2TR, in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors)
|
|
# * A combo(PK) descriptor for the wallet master key.
|
|
# So, should have a total of 11 descriptors on it.
|
|
assert_equal(len(basic0.listdescriptors()["descriptors"]), 11)
|
|
|
|
# Compare addresses info
|
|
addr_info = basic0.getaddressinfo(addr)
|
|
change_addr_info = basic0.getaddressinfo(change)
|
|
self.assert_addr_info_equal(addr_info, old_addr_info)
|
|
self.assert_addr_info_equal(change_addr_info, old_change_addr_info)
|
|
|
|
addr_info = basic0.getaddressinfo(basic0.getnewaddress("", "bech32"))
|
|
assert_equal(addr_info["hdkeypath"], "m/84'/1'/0'/0/0")
|
|
|
|
self.log.info("Test migration of a basic keys only wallet with a balance")
|
|
basic1 = self.create_legacy_wallet("basic1")
|
|
|
|
for _ in range(0, 10):
|
|
default.sendtoaddress(basic1.getnewaddress(), 1)
|
|
|
|
self.generate(self.nodes[0], 1)
|
|
|
|
for _ in range(0, 5):
|
|
basic1.sendtoaddress(default.getnewaddress(), 0.5)
|
|
|
|
self.generate(self.nodes[0], 1)
|
|
bal = basic1.getbalance()
|
|
txs = basic1.listtransactions()
|
|
|
|
basic1.migratewallet()
|
|
assert_equal(basic1.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("basic1")
|
|
assert_equal(basic1.getbalance(), bal)
|
|
self.assert_list_txs_equal(basic1.listtransactions(), txs)
|
|
|
|
# restart node and verify that everything is still there
|
|
self.restart_node(0)
|
|
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
|
|
self.nodes[0].loadwallet("basic1")
|
|
basic1 = self.nodes[0].get_wallet_rpc("basic1")
|
|
assert_equal(basic1.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("basic1")
|
|
assert_equal(basic1.getbalance(), bal)
|
|
self.assert_list_txs_equal(basic1.listtransactions(), txs)
|
|
|
|
self.log.info("Test migration of a wallet with balance received on the seed")
|
|
basic2 = self.create_legacy_wallet("basic2")
|
|
basic2_seed = get_generate_key()
|
|
basic2.sethdseed(True, basic2_seed.privkey)
|
|
assert_equal(basic2.getbalance(), 0)
|
|
|
|
# Receive coins on different output types for the same seed
|
|
basic2_balance = 0
|
|
for addr in [basic2_seed.p2pkh_addr, basic2_seed.p2wpkh_addr, basic2_seed.p2sh_p2wpkh_addr]:
|
|
send_value = random.randint(1, 4)
|
|
default.sendtoaddress(addr, send_value)
|
|
basic2_balance += send_value
|
|
self.generate(self.nodes[0], 1)
|
|
assert_equal(basic2.getbalance(), basic2_balance)
|
|
basic2_txs = basic2.listtransactions()
|
|
|
|
# Now migrate and test that we still see have the same balance/transactions
|
|
basic2.migratewallet()
|
|
assert_equal(basic2.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("basic2")
|
|
assert_equal(basic2.getbalance(), basic2_balance)
|
|
self.assert_list_txs_equal(basic2.listtransactions(), basic2_txs)
|
|
|
|
def test_multisig(self):
|
|
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
|
|
|
|
# Contrived case where all the multisig keys are in a single wallet
|
|
self.log.info("Test migration of a wallet with all keys for a multisig")
|
|
multisig0 = self.create_legacy_wallet("multisig0")
|
|
addr1 = multisig0.getnewaddress()
|
|
addr2 = multisig0.getnewaddress()
|
|
addr3 = multisig0.getnewaddress()
|
|
|
|
ms_info = multisig0.addmultisigaddress(2, [addr1, addr2, addr3])
|
|
|
|
multisig0.migratewallet()
|
|
assert_equal(multisig0.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("multisig0")
|
|
ms_addr_info = multisig0.getaddressinfo(ms_info["address"])
|
|
assert_equal(ms_addr_info["ismine"], True)
|
|
assert_equal(ms_addr_info["desc"], ms_info["descriptor"])
|
|
assert_equal("multisig0_watchonly" in self.nodes[0].listwallets(), False)
|
|
assert_equal("multisig0_solvables" in self.nodes[0].listwallets(), False)
|
|
|
|
pub1 = multisig0.getaddressinfo(addr1)["pubkey"]
|
|
pub2 = multisig0.getaddressinfo(addr2)["pubkey"]
|
|
|
|
# Some keys in multisig do not belong to this wallet
|
|
self.log.info("Test migration of a wallet that has some keys in a multisig")
|
|
self.nodes[0].createwallet(wallet_name="multisig1")
|
|
multisig1 = self.nodes[0].get_wallet_rpc("multisig1")
|
|
ms_info = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
|
|
ms_info2 = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2])
|
|
assert_equal(multisig1.getwalletinfo()["descriptors"], False)
|
|
|
|
addr1 = ms_info["address"]
|
|
addr2 = ms_info2["address"]
|
|
txid = default.sendtoaddress(addr1, 10)
|
|
multisig1.importaddress(addr1)
|
|
assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
|
|
assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], True)
|
|
assert_equal(multisig1.getaddressinfo(addr1)["solvable"], True)
|
|
self.generate(self.nodes[0], 1)
|
|
multisig1.gettransaction(txid)
|
|
assert_equal(multisig1.getbalances()["watchonly"]["trusted"], 10)
|
|
assert_equal(multisig1.getaddressinfo(addr2)["ismine"], False)
|
|
assert_equal(multisig1.getaddressinfo(addr2)["iswatchonly"], False)
|
|
assert_equal(multisig1.getaddressinfo(addr2)["solvable"], True)
|
|
|
|
# Migrating multisig1 should see the multisig is no longer part of multisig1
|
|
# A new wallet multisig1_watchonly is created which has the multisig address
|
|
# Transaction to multisig is in multisig1_watchonly and not multisig1
|
|
multisig1.migratewallet()
|
|
assert_equal(multisig1.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("multisig1")
|
|
assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False)
|
|
assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], False)
|
|
assert_equal(multisig1.getaddressinfo(addr1)["solvable"], False)
|
|
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", multisig1.gettransaction, txid)
|
|
assert_equal(multisig1.getbalance(), 0)
|
|
assert_equal(multisig1.listtransactions(), [])
|
|
|
|
assert_equal("multisig1_watchonly" in self.nodes[0].listwallets(), True)
|
|
ms1_watchonly = self.nodes[0].get_wallet_rpc("multisig1_watchonly")
|
|
ms1_wallet_info = ms1_watchonly.getwalletinfo()
|
|
assert_equal(ms1_wallet_info['descriptors'], True)
|
|
assert_equal(ms1_wallet_info['private_keys_enabled'], False)
|
|
self.assert_is_sqlite("multisig1_watchonly")
|
|
assert_equal(ms1_watchonly.getaddressinfo(addr1)["ismine"], True)
|
|
assert_equal(ms1_watchonly.getaddressinfo(addr1)["solvable"], True)
|
|
# Because addr2 was not being watched, it isn't in multisig1_watchonly but rather multisig1_solvables
|
|
assert_equal(ms1_watchonly.getaddressinfo(addr2)["ismine"], False)
|
|
assert_equal(ms1_watchonly.getaddressinfo(addr2)["solvable"], False)
|
|
ms1_watchonly.gettransaction(txid)
|
|
assert_equal(ms1_watchonly.getbalance(), 10)
|
|
|
|
# Migrating multisig1 should see the second multisig is no longer part of multisig1
|
|
# A new wallet multisig1_solvables is created which has the second address
|
|
# This should have no transactions
|
|
assert_equal("multisig1_solvables" in self.nodes[0].listwallets(), True)
|
|
ms1_solvable = self.nodes[0].get_wallet_rpc("multisig1_solvables")
|
|
ms1_wallet_info = ms1_solvable.getwalletinfo()
|
|
assert_equal(ms1_wallet_info['descriptors'], True)
|
|
assert_equal(ms1_wallet_info['private_keys_enabled'], False)
|
|
self.assert_is_sqlite("multisig1_solvables")
|
|
assert_equal(ms1_solvable.getaddressinfo(addr1)["ismine"], False)
|
|
assert_equal(ms1_solvable.getaddressinfo(addr1)["solvable"], False)
|
|
assert_equal(ms1_solvable.getaddressinfo(addr2)["ismine"], True)
|
|
assert_equal(ms1_solvable.getaddressinfo(addr2)["solvable"], True)
|
|
assert_equal(ms1_solvable.getbalance(), 0)
|
|
assert_equal(ms1_solvable.listtransactions(), [])
|
|
|
|
|
|
def test_other_watchonly(self):
|
|
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
|
|
|
|
# Wallet with an imported address. Should be the same thing as the multisig test
|
|
self.log.info("Test migration of a wallet with watchonly imports")
|
|
self.nodes[0].createwallet(wallet_name="imports0")
|
|
imports0 = self.nodes[0].get_wallet_rpc("imports0")
|
|
assert_equal(imports0.getwalletinfo()["descriptors"], False)
|
|
|
|
# Exteranl address label
|
|
imports0.setlabel(default.getnewaddress(), "external")
|
|
|
|
# Normal non-watchonly tx
|
|
received_addr = imports0.getnewaddress()
|
|
imports0.setlabel(received_addr, "Receiving")
|
|
received_txid = default.sendtoaddress(received_addr, 10)
|
|
|
|
# Watchonly tx
|
|
import_addr = default.getnewaddress()
|
|
imports0.importaddress(import_addr)
|
|
imports0.setlabel(import_addr, "imported")
|
|
received_watchonly_txid = default.sendtoaddress(import_addr, 10)
|
|
|
|
# Received watchonly tx that is then spent
|
|
import_sent_addr = default.getnewaddress()
|
|
imports0.importaddress(import_sent_addr)
|
|
received_sent_watchonly_txid = default.sendtoaddress(import_sent_addr, 10)
|
|
received_sent_watchonly_vout = find_vout_for_address(self.nodes[0], received_sent_watchonly_txid, import_sent_addr)
|
|
send = default.sendall(recipients=[default.getnewaddress()], options={"inputs": [{"txid": received_sent_watchonly_txid, "vout": received_sent_watchonly_vout}]})
|
|
sent_watchonly_txid = send["txid"]
|
|
|
|
self.generate(self.nodes[0], 1)
|
|
|
|
balances = imports0.getbalances()
|
|
spendable_bal = balances["mine"]["trusted"]
|
|
watchonly_bal = balances["watchonly"]["trusted"]
|
|
assert_equal(len(imports0.listtransactions(include_watchonly=True)), 4)
|
|
|
|
# Migrate
|
|
imports0.migratewallet()
|
|
assert_equal(imports0.getwalletinfo()["descriptors"], True)
|
|
self.assert_is_sqlite("imports0")
|
|
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_watchonly_txid)
|
|
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_sent_watchonly_txid)
|
|
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, sent_watchonly_txid)
|
|
assert_equal(len(imports0.listtransactions(include_watchonly=True)), 1)
|
|
imports0.gettransaction(received_txid)
|
|
assert_equal(imports0.getbalance(), spendable_bal)
|
|
|
|
assert_equal("imports0_watchonly" in self.nodes[0].listwallets(), True)
|
|
watchonly = self.nodes[0].get_wallet_rpc("imports0_watchonly")
|
|
watchonly_info = watchonly.getwalletinfo()
|
|
assert_equal(watchonly_info["descriptors"], True)
|
|
self.assert_is_sqlite("imports0_watchonly")
|
|
assert_equal(watchonly_info["private_keys_enabled"], False)
|
|
watchonly.gettransaction(received_watchonly_txid)
|
|
watchonly.gettransaction(received_sent_watchonly_txid)
|
|
watchonly.gettransaction(sent_watchonly_txid)
|
|
assert_equal(watchonly.getbalance(), watchonly_bal)
|
|
assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", watchonly.gettransaction, received_txid)
|
|
assert_equal(len(watchonly.listtransactions(include_watchonly=True)), 3)
|
|
|
|
def test_no_privkeys(self):
|
|
default = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
|
|
|
|
# Migrating an actual watchonly wallet should not create a new watchonly wallet
|
|
self.log.info("Test migration of a pure watchonly wallet")
|
|
self.nodes[0].createwallet(wallet_name="watchonly0", disable_private_keys=True)
|
|
watchonly0 = self.nodes[0].get_wallet_rpc("watchonly0")
|
|
info = watchonly0.getwalletinfo()
|
|
assert_equal(info["descriptors"], False)
|
|
assert_equal(info["private_keys_enabled"], False)
|
|
|
|
addr = default.getnewaddress()
|
|
desc = default.getaddressinfo(addr)["desc"]
|
|
res = watchonly0.importmulti([
|
|
{
|
|
"desc": desc,
|
|
"watchonly": True,
|
|
"timestamp": "now",
|
|
}])
|
|
assert_equal(res[0]['success'], True)
|
|
default.sendtoaddress(addr, 10)
|
|
self.generate(self.nodes[0], 1)
|
|
|
|
watchonly0.migratewallet()
|
|
assert_equal("watchonly0_watchonly" in self.nodes[0].listwallets(), False)
|
|
info = watchonly0.getwalletinfo()
|
|
assert_equal(info["descriptors"], True)
|
|
assert_equal(info["private_keys_enabled"], False)
|
|
self.assert_is_sqlite("watchonly0")
|
|
|
|
# Migrating a wallet with pubkeys added to the keypool
|
|
self.log.info("Test migration of a pure watchonly wallet with pubkeys in keypool")
|
|
self.nodes[0].createwallet(wallet_name="watchonly1", disable_private_keys=True)
|
|
watchonly1 = self.nodes[0].get_wallet_rpc("watchonly1")
|
|
info = watchonly1.getwalletinfo()
|
|
assert_equal(info["descriptors"], False)
|
|
assert_equal(info["private_keys_enabled"], False)
|
|
|
|
addr1 = default.getnewaddress(address_type="bech32")
|
|
addr2 = default.getnewaddress(address_type="bech32")
|
|
desc1 = default.getaddressinfo(addr1)["desc"]
|
|
desc2 = default.getaddressinfo(addr2)["desc"]
|
|
res = watchonly1.importmulti([
|
|
{
|
|
"desc": desc1,
|
|
"keypool": True,
|
|
"timestamp": "now",
|
|
},
|
|
{
|
|
"desc": desc2,
|
|
"keypool": True,
|
|
"timestamp": "now",
|
|
}
|
|
])
|
|
assert_equal(res[0]["success"], True)
|
|
assert_equal(res[1]["success"], True)
|
|
# Before migrating, we can fetch addr1 from the keypool
|
|
assert_equal(watchonly1.getnewaddress(address_type="bech32"), addr1)
|
|
|
|
watchonly1.migratewallet()
|
|
info = watchonly1.getwalletinfo()
|
|
assert_equal(info["descriptors"], True)
|
|
assert_equal(info["private_keys_enabled"], False)
|
|
self.assert_is_sqlite("watchonly1")
|
|
# After migrating, the "keypool" is empty
|
|
assert_raises_rpc_error(-4, "Error: This wallet has no available keys", watchonly1.getnewaddress)
|
|
|
|
def test_pk_coinbases(self):
|
|
self.log.info("Test migration of a wallet using old pk() coinbases")
|
|
wallet = self.create_legacy_wallet("pkcb")
|
|
|
|
addr = wallet.getnewaddress()
|
|
addr_info = wallet.getaddressinfo(addr)
|
|
desc = descsum_create("pk(" + addr_info["pubkey"] + ")")
|
|
|
|
self.nodes[0].generatetodescriptor(1, desc, invalid_call=False)
|
|
|
|
bals = wallet.getbalances()
|
|
|
|
wallet.migratewallet()
|
|
|
|
assert_equal(bals, wallet.getbalances())
|
|
|
|
def test_encrypted(self):
|
|
self.log.info("Test migration of an encrypted wallet")
|
|
wallet = self.create_legacy_wallet("encrypted")
|
|
|
|
wallet.encryptwallet("pass")
|
|
|
|
wallet.walletpassphrase("pass", 10)
|
|
assert_raises_rpc_error(-4, "Error: Unable to produce descriptors for this legacy wallet. Make sure the wallet is unlocked first", wallet.migratewallet)
|
|
# TODO: Fix migratewallet so that we can actually migrate encrypted wallets
|
|
|
|
def run_test(self):
|
|
self.generate(self.nodes[0], 101)
|
|
|
|
# TODO: Test the actual records in the wallet for these tests too. The behavior may be correct, but the data written may not be what we actually want
|
|
self.test_basic()
|
|
self.test_multisig()
|
|
self.test_other_watchonly()
|
|
self.test_no_privkeys()
|
|
self.test_pk_coinbases()
|
|
self.test_encrypted()
|
|
|
|
if __name__ == '__main__':
|
|
WalletMigrationTest().main()
|