This commit is contained in:
Ava Chow 2025-04-29 12:03:36 +02:00 committed by GitHub
commit a4fee72d03
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 81 additions and 101 deletions

View file

@ -28,10 +28,13 @@ static void WalletMigration(benchmark::Bench& bench)
// Setup legacy wallet
std::unique_ptr<CWallet> wallet = std::make_unique<CWallet>(test_setup->m_node.chain.get(), "", CreateMockableWalletDatabase());
wallet->chainStateFlushed(ChainstateRole::NORMAL, CBlockLocator{});
LegacyDataSPKM* legacy_spkm = wallet->GetOrCreateLegacyDataSPKM();
WalletBatch batch{wallet->GetDatabase()};
// Write a best block record as migration expects one to exist
CBlockLocator loc;
batch.WriteBestBlock(loc);
// Add watch-only addresses
std::vector<CScript> scripts_watch_only;
for (int w = 0; w < NUM_WATCH_ONLY_ADDR; ++w) {

View file

@ -572,7 +572,7 @@ public:
m_context.chain = &chain;
m_context.args = &args;
}
~WalletLoaderImpl() override { UnloadWallets(m_context); }
~WalletLoaderImpl() override { stop(); }
//! ChainClient methods
void registerRpcs() override
@ -594,7 +594,7 @@ public:
return StartWallets(m_context);
}
void flush() override { return FlushWallets(m_context); }
void stop() override { return StopWallets(m_context); }
void stop() override { return UnloadWallets(m_context); }
void setMockTime(int64_t time) override { return SetMockTime(time); }
void schedulerMockForward(std::chrono::seconds delta) override { Assert(m_context.scheduler)->MockForward(delta); }

View file

@ -173,13 +173,6 @@ void FlushWallets(WalletContext& context)
}
}
void StopWallets(WalletContext& context)
{
for (const std::shared_ptr<CWallet>& pwallet : GetWallets(context)) {
pwallet->Close();
}
}
void UnloadWallets(WalletContext& context)
{
auto wallets = GetWallets(context);

View file

@ -31,10 +31,7 @@ void StartWallets(WalletContext& context);
//! Flush all wallets in preparation for shutdown.
void FlushWallets(WalletContext& context);
//! Stop all wallets. Wallets will be flushed first.
void StopWallets(WalletContext& context);
//! Close all wallets.
//! Flush and unload all wallets.
void UnloadWallets(WalletContext& context);
} // namespace wallet

View file

@ -109,7 +109,7 @@ BOOST_FIXTURE_TEST_CASE(scan_for_wallet_transactions, TestChain100Setup)
LOCK(wallet.cs_wallet);
LOCK(Assert(m_node.chainman)->GetMutex());
wallet.SetWalletFlag(WALLET_FLAG_DESCRIPTORS);
wallet.SetLastBlockProcessed(m_node.chainman->ActiveChain().Height(), m_node.chainman->ActiveChain().Tip()->GetBlockHash());
wallet.SetLastBlockProcessed(newTip->nHeight, newTip->GetBlockHash());
}
AddKey(wallet, coinbaseKey);
WalletRescanReserver reserver(wallet);
@ -119,8 +119,8 @@ BOOST_FIXTURE_TEST_CASE(scan_for_wallet_transactions, TestChain100Setup)
{
CBlockLocator locator;
BOOST_CHECK(!WalletBatch{wallet.GetDatabase()}.ReadBestBlock(locator));
BOOST_CHECK(locator.IsNull());
BOOST_CHECK(WalletBatch{wallet.GetDatabase()}.ReadBestBlock(locator));
BOOST_CHECK(!locator.IsNull() && locator.vHave.front() == newTip->GetBlockHash());
}
CWallet::ScanResult result = wallet.ScanForWalletTransactions(/*start_block=*/oldTip->GetBlockHash(), /*start_height=*/oldTip->nHeight, /*max_height=*/{}, reserver, /*fUpdate=*/false, /*save_progress=*/true);
@ -133,7 +133,7 @@ BOOST_FIXTURE_TEST_CASE(scan_for_wallet_transactions, TestChain100Setup)
{
CBlockLocator locator;
BOOST_CHECK(WalletBatch{wallet.GetDatabase()}.ReadBestBlock(locator));
BOOST_CHECK(!locator.IsNull());
BOOST_CHECK(!locator.IsNull() && locator.vHave.front() == newTip->GetBlockHash());
}
}

View file

@ -164,6 +164,7 @@ bool RemoveWallet(WalletContext& context, const std::shared_ptr<CWallet>& wallet
interfaces::Chain& chain = wallet->chain();
std::string name = wallet->GetName();
wallet->WriteBestBlock();
// Unregister with the validation interface which also drops shared pointers.
wallet->m_chain_notifications_handler.reset();
@ -648,13 +649,23 @@ bool CWallet::ChangeWalletPassphrase(const SecureString& strOldWalletPassphrase,
return false;
}
void CWallet::chainStateFlushed(ChainstateRole role, const CBlockLocator& loc)
void CWallet::SetLastBlockProcessedInMem(int block_height, uint256 block_hash)
{
// Don't update the best block until the chain is attached so that in case of a shutdown,
// the rescan will be restarted at next startup.
if (m_attaching_chain || role == ChainstateRole::BACKGROUND) {
return;
}
AssertLockHeld(cs_wallet);
m_last_block_processed = block_hash;
m_last_block_processed_height = block_height;
}
void CWallet::SetLastBlockProcessed(int block_height, uint256 block_hash)
{
AssertLockHeld(cs_wallet);
SetLastBlockProcessedInMem(block_height, block_hash);
CBlockLocator loc;
chain().findBlock(m_last_block_processed, FoundBlock().locator(loc));
WalletBatch batch(GetDatabase());
batch.WriteBestBlock(loc);
}
@ -1425,15 +1436,16 @@ void CWallet::RecursiveUpdateTxState(WalletBatch* batch, const uint256& tx_hash,
}
}
void CWallet::SyncTransaction(const CTransactionRef& ptx, const SyncTxState& state, bool update_tx, bool rescanning_old_block)
bool CWallet::SyncTransaction(const CTransactionRef& ptx, const SyncTxState& state, bool update_tx, bool rescanning_old_block)
{
if (!AddToWalletIfInvolvingMe(ptx, state, update_tx, rescanning_old_block))
return; // Not one of ours
return false; // Not one of ours
// If a transaction changes 'conflicted' state, that changes the balance
// available of the outputs it spends. So force those to be
// recomputed, also:
MarkInputsDirty(ptx);
return true;
}
void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
@ -1520,18 +1532,25 @@ void CWallet::blockConnected(ChainstateRole role, const interfaces::BlockInfo& b
assert(block.data);
LOCK(cs_wallet);
m_last_block_processed_height = block.height;
m_last_block_processed = block.hash;
// Update the best block in memory first. This will set the best block's height, which is
// needed by MarkConflicted.
SetLastBlockProcessedInMem(block.height, block.hash);
// No need to scan block if it was created before the wallet birthday.
// Uses chain max time and twice the grace period to adjust time for block time variability.
if (block.chain_time_max < m_birth_time.load() - (TIMESTAMP_WINDOW * 2)) return;
// Scan block
bool wallet_updated = false;
for (size_t index = 0; index < block.data->vtx.size(); index++) {
SyncTransaction(block.data->vtx[index], TxStateConfirmed{block.hash, block.height, static_cast<int>(index)});
wallet_updated |= SyncTransaction(block.data->vtx[index], TxStateConfirmed{block.hash, block.height, static_cast<int>(index)});
transactionRemovedFromMempool(block.data->vtx[index], MemPoolRemovalReason::BLOCK);
}
// Update on disk if this block resulted in us updating a tx, or periodically every 144 blocks (~1 day)
if (wallet_updated || block.height % 144 == 0) {
SetLastBlockProcessed(block.height, block.hash);
}
}
void CWallet::blockDisconnected(const interfaces::BlockInfo& block)
@ -1543,9 +1562,6 @@ void CWallet::blockDisconnected(const interfaces::BlockInfo& block)
// be unconfirmed, whether or not the transaction is added back to the mempool.
// User may have to call abandontransaction again. It may be addressed in the
// future with a stickier abandoned state or even removing abandontransaction call.
m_last_block_processed_height = block.height - 1;
m_last_block_processed = *Assert(block.prev_hash);
int disconnect_height = block.height;
for (size_t index = 0; index < block.data->vtx.size(); index++) {
@ -1579,6 +1595,9 @@ void CWallet::blockDisconnected(const interfaces::BlockInfo& block)
}
}
}
// Update the best block
SetLastBlockProcessed(block.height - 1, *Assert(block.prev_hash));
}
void CWallet::updatedBlockTip()
@ -3092,7 +3111,8 @@ std::shared_ptr<CWallet> CWallet::Create(WalletContext& context, const std::stri
}
if (chain) {
walletInstance->chainStateFlushed(ChainstateRole::NORMAL, chain->getTipLocator());
WalletBatch batch(walletInstance->GetDatabase());
batch.WriteBestBlock(chain->getTipLocator());
}
} else if (wallet_creation_flags & WALLET_FLAG_DISABLE_PRIVATE_KEYS) {
// Make it impossible to disable private keys after creation
@ -3294,11 +3314,6 @@ bool CWallet::AttachChain(const std::shared_ptr<CWallet>& walletInstance, interf
// be pending on the validation-side until lock release. Blocks that are connected while the
// rescan is ongoing will not be processed in the rescan but with the block connected notifications,
// so the wallet will only be completeley synced after the notifications delivery.
// chainStateFlushed notifications are ignored until the rescan is finished
// so that in case of a shutdown event, the rescan will be repeated at the next start.
// This is temporary until rescan and notifications delivery are unified under same
// interface.
walletInstance->m_attaching_chain = true; //ignores chainStateFlushed notifications
walletInstance->m_chain_notifications_handler = walletInstance->chain().handleNotifications(walletInstance);
// If rescan_required = true, rescan_height remains equal to 0
@ -3375,16 +3390,22 @@ bool CWallet::AttachChain(const std::shared_ptr<CWallet>& walletInstance, interf
{
WalletRescanReserver reserver(*walletInstance);
if (!reserver.reserve() || (ScanResult::SUCCESS != walletInstance->ScanForWalletTransactions(chain.getBlockHash(rescan_height), rescan_height, /*max_height=*/{}, reserver, /*fUpdate=*/true, /*save_progress=*/true).status)) {
if (!reserver.reserve()) {
error = _("Failed to acquire rescan reserver during wallet initialization");
return false;
}
ScanResult scan_res = walletInstance->ScanForWalletTransactions(chain.getBlockHash(rescan_height), rescan_height, /*max_height=*/{}, reserver, /*fUpdate=*/true, /*save_progress=*/true);
if (ScanResult::SUCCESS != scan_res.status) {
error = _("Failed to rescan the wallet during initialization");
return false;
}
// Set and update the best block record
// Although ScanForWalletTransactions will have stopped at the best block that was set prior to the rescan,
// we still need to make sure that the best block on disk is set correctly as rescanning may overwrite it.
walletInstance->SetLastBlockProcessed(*scan_res.last_scanned_height, scan_res.last_scanned_block);
}
walletInstance->m_attaching_chain = false;
walletInstance->chainStateFlushed(ChainstateRole::NORMAL, chain.getTipLocator());
walletInstance->GetDatabase().IncrementUpdateCounter();
}
walletInstance->m_attaching_chain = false;
return true;
}
@ -4433,11 +4454,6 @@ util::Result<MigrationResult> MigrateLegacyToDescriptor(const std::string& walle
return util::Error{_("Error: This wallet is already a descriptor wallet")};
}
// Flush chain state before unloading wallet
CBlockLocator locator;
WITH_LOCK(wallet->cs_wallet, context.chain->findBlock(wallet->GetLastBlockHash(), FoundBlock().locator(locator)));
if (!locator.IsNull()) wallet->chainStateFlushed(ChainstateRole::NORMAL, locator);
if (!RemoveWallet(context, wallet, /*load_on_start=*/std::nullopt, warnings)) {
return util::Error{_("Unable to unload the wallet before migrating")};
}
@ -4686,4 +4702,17 @@ std::optional<CKey> CWallet::GetKey(const CKeyID& keyid) const
}
return std::nullopt;
}
void CWallet::WriteBestBlock() const
{
LOCK(cs_wallet);
if (!m_last_block_processed.IsNull()) {
CBlockLocator loc;
chain().findBlock(m_last_block_processed, FoundBlock().locator(loc));
WalletBatch batch(GetDatabase());
batch.WriteBestBlock(loc);
}
}
} // namespace wallet

View file

@ -305,7 +305,6 @@ private:
std::atomic<bool> fAbortRescan{false};
std::atomic<bool> fScanningWallet{false}; // controlled by WalletRescanReserver
std::atomic<bool> m_attaching_chain{false};
std::atomic<bool> m_scanning_with_passphrase{false};
std::atomic<SteadyClock::time_point> m_scanning_start{SteadyClock::time_point{}};
std::atomic<double> m_scanning_progress{0};
@ -370,7 +369,7 @@ private:
void SyncMetaData(std::pair<TxSpends::iterator, TxSpends::iterator>) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void SyncTransaction(const CTransactionRef& tx, const SyncTxState& state, bool update_tx = true, bool rescanning_old_block = false) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
bool SyncTransaction(const CTransactionRef& tx, const SyncTxState& state, bool update_tx = true, bool rescanning_old_block = false) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
/** WalletFlags set on this wallet. */
std::atomic<uint64_t> m_wallet_flags{0};
@ -437,6 +436,9 @@ private:
static NodeClock::time_point GetDefaultNextResend();
// Update last block processed in memory only
void SetLastBlockProcessedInMem(int block_height, uint256 block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
public:
/**
* Main wallet lock.
@ -788,7 +790,6 @@ public:
/** should probably be renamed to IsRelevantToMe */
bool IsFromMe(const CTransaction& tx) const;
CAmount GetDebit(const CTransaction& tx, const isminefilter& filter) const;
void chainStateFlushed(ChainstateRole role, const CBlockLocator& loc) override;
DBErrors LoadWallet();
@ -989,13 +990,10 @@ public:
assert(m_last_block_processed_height >= 0);
return m_last_block_processed;
}
/** Set last block processed height, currently only use in unit test */
void SetLastBlockProcessed(int block_height, uint256 block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet)
{
AssertLockHeld(cs_wallet);
m_last_block_processed_height = block_height;
m_last_block_processed = block_hash;
};
/** Set last block processed height, and write to database */
void SetLastBlockProcessed(int block_height, uint256 block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
/** Write the current best block to database */
void WriteBestBlock() const;
//! Connect the signals from ScriptPubKeyMans to the signals in CWallet
void ConnectScriptPubKeyManNotifiers();

View file

@ -9,10 +9,7 @@ try:
except ImportError:
pass
import concurrent.futures
from test_framework.blocktools import COINBASE_MATURITY
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_not_equal,
@ -33,41 +30,6 @@ class WalletDescriptorTest(BitcoinTestFramework):
self.skip_if_no_wallet()
self.skip_if_no_py_sqlite3()
def test_concurrent_writes(self):
self.log.info("Test sqlite concurrent writes are in the correct order")
self.restart_node(0, extra_args=["-unsafesqlitesync=0"])
self.nodes[0].createwallet(wallet_name="concurrency", blank=True)
wallet = self.nodes[0].get_wallet_rpc("concurrency")
# First import a descriptor that uses hardened dervation so that topping up
# Will require writing a ton to db
wallet.importdescriptors([{"desc":descsum_create("wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h)"), "timestamp": "now", "active": True}])
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread:
topup = thread.submit(wallet.keypoolrefill, newsize=1000)
# Then while the topup is running, we need to do something that will call
# ChainStateFlushed which will trigger a write to the db, hopefully at the
# same time that the topup still has an open db transaction.
self.nodes[0].cli.gettxoutsetinfo()
assert_equal(topup.result(), None)
wallet.unloadwallet()
# Check that everything was written
wallet_db = self.nodes[0].wallets_path / "concurrency" / self.wallet_data_filename
conn = sqlite3.connect(wallet_db)
with conn:
# Retrieve the bestblock_nomerkle record
bestblock_rec = conn.execute("SELECT value FROM main WHERE hex(key) = '1262657374626C6F636B5F6E6F6D65726B6C65'").fetchone()[0]
# Retrieve the number of descriptor cache records
# Since we store binary data, sqlite's comparison operators don't work everywhere
# so just retrieve all records and process them ourselves.
db_keys = conn.execute("SELECT key FROM main").fetchall()
cache_records = len([k[0] for k in db_keys if b"walletdescriptorcache" in k[0]])
conn.close()
assert_equal(bestblock_rec[5:37][::-1].hex(), self.nodes[0].getbestblockhash())
assert_equal(cache_records, 1000)
def run_test(self):
# Make a descriptor wallet
self.log.info("Making a descriptor wallet")
@ -266,8 +228,6 @@ class WalletDescriptorTest(BitcoinTestFramework):
conn.close()
assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme")
self.test_concurrent_writes()
if __name__ == '__main__':
WalletDescriptorTest(__file__).main()

View file

@ -119,11 +119,11 @@ class ReorgsRestoreTest(BitcoinTestFramework):
self.start_node(0)
assert_equal(node.getbestblockhash(), tip)
# Due to an existing bug, the wallet incorrectly keeps the transaction in an abandoned state, even though that's
# no longer the case (after the unclean shutdown, the node's chain returned to the pre-invalidation tip).
# This issue blocks any future spending and results in an incorrect balance display.
# After disconnecting the block, the wallet should recorded the new best block.
# Upon reload after the crash, since the chainstate was not flushed, the tip contains the previously abandoned
# coinbase. This should be rescanned and now un-abandoned.
wallet = node.get_wallet_rpc("reorg_crash")
assert_equal(wallet.getwalletinfo()['immature_balance'], 0) # FIXME: #31824.
assert_equal(wallet.gettransaction(coinbase_tx_id)['details'][0]['abandoned'], False)
# Previously, a bug caused the node to crash if two block disconnection events occurred consecutively.
# Ensure this is no longer the case by simulating a new reorg.