mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-01-25 02:33:24 -03:00
198 lines
7.7 KiB
Python
198 lines
7.7 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# Copyright (c) 2024-present The Bitcoin Core developers
|
||
|
# Distributed under the MIT software license, see the accompanying
|
||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||
|
|
||
|
from decimal import Decimal
|
||
|
|
||
|
from test_framework.test_framework import BitcoinTestFramework
|
||
|
from test_framework.util import assert_equal, assert_raises_rpc_error
|
||
|
from test_framework.messages import COIN
|
||
|
from test_framework.wallet import MiniWallet, getnewdestination
|
||
|
|
||
|
|
||
|
class GetBlocksActivityTest(BitcoinTestFramework):
|
||
|
def set_test_params(self):
|
||
|
self.num_nodes = 1
|
||
|
self.setup_clean_chain = True
|
||
|
|
||
|
def run_test(self):
|
||
|
node = self.nodes[0]
|
||
|
wallet = MiniWallet(node)
|
||
|
node.setmocktime(node.getblockheader(node.getbestblockhash())['time'])
|
||
|
wallet.generate(200, invalid_call=False)
|
||
|
|
||
|
self.test_no_activity(node)
|
||
|
self.test_activity_in_block(node, wallet)
|
||
|
self.test_no_mempool_inclusion(node, wallet)
|
||
|
self.test_multiple_addresses(node, wallet)
|
||
|
self.test_invalid_blockhash(node, wallet)
|
||
|
self.test_invalid_descriptor(node, wallet)
|
||
|
self.test_confirmed_and_unconfirmed(node, wallet)
|
||
|
self.test_receive_then_spend(node, wallet)
|
||
|
|
||
|
def test_no_activity(self, node):
|
||
|
_, _, addr_1 = getnewdestination()
|
||
|
result = node.getdescriptoractivity([], [f"addr({addr_1})"], True)
|
||
|
assert_equal(len(result['activity']), 0)
|
||
|
|
||
|
def test_activity_in_block(self, node, wallet):
|
||
|
_, spk_1, addr_1 = getnewdestination(address_type='bech32m')
|
||
|
txid = wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)['txid']
|
||
|
blockhash = self.generate(node, 1)[0]
|
||
|
|
||
|
# Test getdescriptoractivity with the specific blockhash
|
||
|
result = node.getdescriptoractivity([blockhash], [f"addr({addr_1})"], True)
|
||
|
assert_equal(list(result.keys()), ['activity'])
|
||
|
[activity] = result['activity']
|
||
|
|
||
|
for k, v in {
|
||
|
'amount': Decimal('1.00000000'),
|
||
|
'blockhash': blockhash,
|
||
|
'height': 201,
|
||
|
'txid': txid,
|
||
|
'type': 'receive',
|
||
|
'vout': 1,
|
||
|
}.items():
|
||
|
assert_equal(activity[k], v)
|
||
|
|
||
|
outspk = activity['output_spk']
|
||
|
|
||
|
assert_equal(outspk['asm'][:2], '1 ')
|
||
|
assert_equal(outspk['desc'].split('(')[0], 'rawtr')
|
||
|
assert_equal(outspk['hex'], spk_1.hex())
|
||
|
assert_equal(outspk['address'], addr_1)
|
||
|
assert_equal(outspk['type'], 'witness_v1_taproot')
|
||
|
|
||
|
|
||
|
def test_no_mempool_inclusion(self, node, wallet):
|
||
|
_, spk_1, addr_1 = getnewdestination()
|
||
|
wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)
|
||
|
|
||
|
_, spk_2, addr_2 = getnewdestination()
|
||
|
wallet.send_to(
|
||
|
from_node=node, scriptPubKey=spk_2, amount=1 * COIN)
|
||
|
|
||
|
# Do not generate a block to keep the transaction in the mempool
|
||
|
|
||
|
result = node.getdescriptoractivity([], [f"addr({addr_1})", f"addr({addr_2})"], False)
|
||
|
|
||
|
assert_equal(len(result['activity']), 0)
|
||
|
|
||
|
def test_multiple_addresses(self, node, wallet):
|
||
|
_, spk_1, addr_1 = getnewdestination()
|
||
|
_, spk_2, addr_2 = getnewdestination()
|
||
|
wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)
|
||
|
wallet.send_to(from_node=node, scriptPubKey=spk_2, amount=2 * COIN)
|
||
|
|
||
|
blockhash = self.generate(node, 1)[0]
|
||
|
|
||
|
result = node.getdescriptoractivity([blockhash], [f"addr({addr_1})", f"addr({addr_2})"], True)
|
||
|
|
||
|
assert_equal(len(result['activity']), 2)
|
||
|
|
||
|
# Duplicate address specification is fine.
|
||
|
assert_equal(
|
||
|
result,
|
||
|
node.getdescriptoractivity([blockhash], [
|
||
|
f"addr({addr_1})", f"addr({addr_1})", f"addr({addr_2})"], True))
|
||
|
|
||
|
# Flipping descriptor order doesn't affect results.
|
||
|
result_flipped = node.getdescriptoractivity(
|
||
|
[blockhash], [f"addr({addr_2})", f"addr({addr_1})"], True)
|
||
|
assert_equal(result, result_flipped)
|
||
|
|
||
|
[a1] = [a for a in result['activity'] if a['output_spk']['address'] == addr_1]
|
||
|
[a2] = [a for a in result['activity'] if a['output_spk']['address'] == addr_2]
|
||
|
|
||
|
assert a1['blockhash'] == blockhash
|
||
|
assert a1['amount'] == 1.0
|
||
|
|
||
|
assert a2['blockhash'] == blockhash
|
||
|
assert a2['amount'] == 2.0
|
||
|
|
||
|
def test_invalid_blockhash(self, node, wallet):
|
||
|
self.generate(node, 20) # Generate to get more fees
|
||
|
|
||
|
_, spk_1, addr_1 = getnewdestination()
|
||
|
wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)
|
||
|
|
||
|
invalid_blockhash = "0000000000000000000000000000000000000000000000000000000000000000"
|
||
|
|
||
|
assert_raises_rpc_error(
|
||
|
-5, "Block not found",
|
||
|
node.getdescriptoractivity, [invalid_blockhash], [f"addr({addr_1})"], True)
|
||
|
|
||
|
def test_invalid_descriptor(self, node, wallet):
|
||
|
blockhash = self.generate(node, 1)[0]
|
||
|
_, _, addr_1 = getnewdestination()
|
||
|
|
||
|
assert_raises_rpc_error(
|
||
|
-5, "is not a valid descriptor",
|
||
|
node.getdescriptoractivity, [blockhash], [f"addrx({addr_1})"], True)
|
||
|
|
||
|
def test_confirmed_and_unconfirmed(self, node, wallet):
|
||
|
self.generate(node, 20) # Generate to get more fees
|
||
|
|
||
|
_, spk_1, addr_1 = getnewdestination()
|
||
|
txid_1 = wallet.send_to(
|
||
|
from_node=node, scriptPubKey=spk_1, amount=1 * COIN)['txid']
|
||
|
blockhash = self.generate(node, 1)[0]
|
||
|
|
||
|
_, spk_2, to_addr = getnewdestination()
|
||
|
txid_2 = wallet.send_to(
|
||
|
from_node=node, scriptPubKey=spk_2, amount=1 * COIN)['txid']
|
||
|
|
||
|
result = node.getdescriptoractivity(
|
||
|
[blockhash], [f"addr({addr_1})", f"addr({to_addr})"], True)
|
||
|
|
||
|
activity = result['activity']
|
||
|
assert_equal(len(activity), 2)
|
||
|
|
||
|
[confirmed] = [a for a in activity if a.get('blockhash') == blockhash]
|
||
|
assert confirmed['txid'] == txid_1
|
||
|
assert confirmed['height'] == node.getblockchaininfo()['blocks']
|
||
|
|
||
|
[unconfirmed] = [a for a in activity if not a.get('blockhash')]
|
||
|
assert 'blockhash' not in unconfirmed
|
||
|
assert 'height' not in unconfirmed
|
||
|
|
||
|
assert any(a['txid'] == txid_2 for a in activity if not a.get('blockhash'))
|
||
|
|
||
|
def test_receive_then_spend(self, node, wallet):
|
||
|
"""Also important because this tests multiple blockhashes."""
|
||
|
self.generate(node, 20) # Generate to get more fees
|
||
|
|
||
|
sent1 = wallet.send_self_transfer(from_node=node)
|
||
|
utxo = sent1['new_utxo']
|
||
|
blockhash_1 = self.generate(node, 1)[0]
|
||
|
|
||
|
sent2 = wallet.send_self_transfer(from_node=node, utxo_to_spend=utxo)
|
||
|
blockhash_2 = self.generate(node, 1)[0]
|
||
|
|
||
|
result = node.getdescriptoractivity(
|
||
|
[blockhash_1, blockhash_2], [wallet.get_descriptor()], True)
|
||
|
|
||
|
assert_equal(len(result['activity']), 4)
|
||
|
|
||
|
assert result['activity'][1]['type'] == 'receive'
|
||
|
assert result['activity'][1]['txid'] == sent1['txid']
|
||
|
assert result['activity'][1]['blockhash'] == blockhash_1
|
||
|
|
||
|
assert result['activity'][2]['type'] == 'spend'
|
||
|
assert result['activity'][2]['spend_txid'] == sent2['txid']
|
||
|
assert result['activity'][2]['prevout_txid'] == sent1['txid']
|
||
|
assert result['activity'][2]['blockhash'] == blockhash_2
|
||
|
|
||
|
# Test that reversing the blockorder yields the same result.
|
||
|
assert_equal(result, node.getdescriptoractivity(
|
||
|
[blockhash_1, blockhash_2], [wallet.get_descriptor()], True))
|
||
|
|
||
|
# Test that duplicating a blockhash yields the same result.
|
||
|
assert_equal(result, node.getdescriptoractivity(
|
||
|
[blockhash_1, blockhash_2, blockhash_2], [wallet.get_descriptor()], True))
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
GetBlocksActivityTest(__file__).main()
|