mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-01-24 10:17:45 -03:00
164 lines
6.9 KiB
Python
Executable file
164 lines
6.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# Copyright (c) 2014-2024 The Bitcoin Core developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
"""Tests for orphan related RPCs."""
|
|
|
|
import time
|
|
|
|
from test_framework.mempool_util import (
|
|
ORPHAN_TX_EXPIRE_TIME,
|
|
tx_in_orphanage,
|
|
)
|
|
from test_framework.messages import (
|
|
CInv,
|
|
msg_inv,
|
|
msg_tx,
|
|
MSG_WTX,
|
|
)
|
|
from test_framework.p2p import P2PInterface
|
|
from test_framework.util import (
|
|
assert_equal,
|
|
assert_raises_rpc_error,
|
|
)
|
|
from test_framework.test_framework import BitcoinTestFramework
|
|
from test_framework.wallet import MiniWallet
|
|
|
|
|
|
class OrphanRPCsTest(BitcoinTestFramework):
|
|
def set_test_params(self):
|
|
self.num_nodes = 1
|
|
|
|
def run_test(self):
|
|
self.wallet = MiniWallet(self.nodes[0])
|
|
self.test_orphan_activity()
|
|
self.test_orphan_details()
|
|
self.test_misc()
|
|
|
|
def test_orphan_activity(self):
|
|
self.log.info("Check that orphaned transactions are returned with getorphantxs")
|
|
node = self.nodes[0]
|
|
|
|
self.log.info("Create two 1P1C packages, but only broadcast the children")
|
|
tx_parent_1 = self.wallet.create_self_transfer()
|
|
tx_child_1 = self.wallet.create_self_transfer(utxo_to_spend=tx_parent_1["new_utxo"])
|
|
tx_parent_2 = self.wallet.create_self_transfer()
|
|
tx_child_2 = self.wallet.create_self_transfer(utxo_to_spend=tx_parent_2["new_utxo"])
|
|
peer = node.add_p2p_connection(P2PInterface())
|
|
peer.send_and_ping(msg_tx(tx_child_1["tx"]))
|
|
peer.send_and_ping(msg_tx(tx_child_2["tx"]))
|
|
|
|
self.log.info("Check that neither parent is in the mempool")
|
|
assert_equal(node.getmempoolinfo()["size"], 0)
|
|
|
|
orphanage = node.getorphantxs(verbosity=0)
|
|
self.log.info("Check the size of the orphanage")
|
|
assert_equal(len(orphanage), 2)
|
|
self.log.info("Check that undefined verbosity is disallowed")
|
|
assert_raises_rpc_error(-8, "Invalid verbosity value -1", node.getorphantxs, verbosity=-1)
|
|
assert_raises_rpc_error(-8, "Invalid verbosity value 3", node.getorphantxs, verbosity=3)
|
|
self.log.info("Check that both children are in the orphanage")
|
|
assert tx_in_orphanage(node, tx_child_1["tx"])
|
|
assert tx_in_orphanage(node, tx_child_2["tx"])
|
|
|
|
self.log.info("Broadcast parent 1")
|
|
peer.send_and_ping(msg_tx(tx_parent_1["tx"]))
|
|
self.log.info("Check that parent 1 and child 1 are in the mempool")
|
|
raw_mempool = node.getrawmempool()
|
|
assert_equal(len(raw_mempool), 2)
|
|
assert tx_parent_1["txid"] in raw_mempool
|
|
assert tx_child_1["txid"] in raw_mempool
|
|
|
|
self.log.info("Check that orphanage only contains child 2")
|
|
orphanage = node.getorphantxs()
|
|
assert_equal(len(orphanage), 1)
|
|
assert tx_in_orphanage(node, tx_child_2["tx"])
|
|
|
|
peer.send_and_ping(msg_tx(tx_parent_2["tx"]))
|
|
self.log.info("Check that all parents and children are now in the mempool")
|
|
raw_mempool = node.getrawmempool()
|
|
assert_equal(len(raw_mempool), 4)
|
|
assert tx_parent_1["txid"] in raw_mempool
|
|
assert tx_child_1["txid"] in raw_mempool
|
|
assert tx_parent_2["txid"] in raw_mempool
|
|
assert tx_child_2["txid"] in raw_mempool
|
|
self.log.info("Check that the orphanage is empty")
|
|
assert_equal(len(node.getorphantxs()), 0)
|
|
|
|
self.log.info("Confirm the transactions (clears mempool)")
|
|
self.generate(node, 1)
|
|
assert_equal(node.getmempoolinfo()["size"], 0)
|
|
|
|
def test_orphan_details(self):
|
|
self.log.info("Check the transaction details returned from getorphantxs")
|
|
node = self.nodes[0]
|
|
|
|
self.log.info("Create two orphans, from different peers")
|
|
tx_parent_1 = self.wallet.create_self_transfer()
|
|
tx_child_1 = self.wallet.create_self_transfer(utxo_to_spend=tx_parent_1["new_utxo"])
|
|
tx_parent_2 = self.wallet.create_self_transfer()
|
|
tx_child_2 = self.wallet.create_self_transfer(utxo_to_spend=tx_parent_2["new_utxo"])
|
|
peer_1 = node.add_p2p_connection(P2PInterface())
|
|
peer_2 = node.add_p2p_connection(P2PInterface())
|
|
entry_time = int(time.time())
|
|
node.setmocktime(entry_time)
|
|
peer_1.send_and_ping(msg_tx(tx_child_1["tx"]))
|
|
peer_2.send_and_ping(msg_tx(tx_child_2["tx"]))
|
|
|
|
orphanage = node.getorphantxs(verbosity=2)
|
|
assert tx_in_orphanage(node, tx_child_1["tx"])
|
|
assert tx_in_orphanage(node, tx_child_2["tx"])
|
|
|
|
self.log.info("Check that orphan 1 and 2 were from different peers")
|
|
assert orphanage[0]["from"][0] != orphanage[1]["from"][0]
|
|
peer_ids = [orphanage[0]["from"][0], orphanage[1]["from"][0]]
|
|
|
|
self.log.info("Unorphan child 2")
|
|
peer_2.send_and_ping(msg_tx(tx_parent_2["tx"]))
|
|
assert not tx_in_orphanage(node, tx_child_2["tx"])
|
|
|
|
self.log.info("Check that additional announcers are reflected in RPC result")
|
|
peer_2.send_and_ping(msg_inv([CInv(t=MSG_WTX, h=int(tx_child_1["wtxid"], 16))]))
|
|
|
|
orphanage = node.getorphantxs(verbosity=2)
|
|
assert_equal(set(orphanage[0]["from"]), set(peer_ids))
|
|
|
|
self.log.info("Checking orphan details")
|
|
assert_equal(len(node.getorphantxs()), 1)
|
|
orphan_1 = orphanage[0]
|
|
self.orphan_details_match(orphan_1, tx_child_1, verbosity=1)
|
|
self.log.info("Checking orphan entry/expiration times")
|
|
assert_equal(orphan_1["entry"], entry_time)
|
|
assert_equal(orphan_1["expiration"], entry_time + ORPHAN_TX_EXPIRE_TIME)
|
|
|
|
self.log.info("Checking orphan details (verbosity 2)")
|
|
orphanage = node.getorphantxs(verbosity=2)
|
|
orphan_1 = orphanage[0]
|
|
self.orphan_details_match(orphan_1, tx_child_1, verbosity=2)
|
|
|
|
def orphan_details_match(self, orphan, tx, verbosity):
|
|
self.log.info("Check txid/wtxid of orphan")
|
|
assert_equal(orphan["txid"], tx["txid"])
|
|
assert_equal(orphan["wtxid"], tx["wtxid"])
|
|
|
|
self.log.info("Check the sizes of orphan")
|
|
assert_equal(orphan["bytes"], len(tx["tx"].serialize()))
|
|
assert_equal(orphan["vsize"], tx["tx"].get_vsize())
|
|
assert_equal(orphan["weight"], tx["tx"].get_weight())
|
|
|
|
if verbosity == 2:
|
|
self.log.info("Check the transaction hex of orphan")
|
|
assert_equal(orphan["hex"], tx["hex"])
|
|
|
|
def test_misc(self):
|
|
node = self.nodes[0]
|
|
assert_raises_rpc_error(-3, "Verbosity was boolean but only integer allowed", node.getorphantxs, verbosity=True)
|
|
assert_raises_rpc_error(-3, "Verbosity was boolean but only integer allowed", node.getorphantxs, verbosity=False)
|
|
help_output = node.help()
|
|
self.log.info("Check that getorphantxs is a hidden RPC")
|
|
assert "getorphantxs" not in help_output
|
|
assert "unknown command: getorphantxs" not in node.help("getorphantxs")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
OrphanRPCsTest(__file__).main()
|