bitcoin/test/functional/interface_zmq.py
Ava Chow 2666d83da5
Some checks are pending
CI / test each commit (push) Waiting to run
CI / macOS 14 native, arm64, no depends, sqlite only, gui (push) Waiting to run
CI / Win64 native, VS 2022 (push) Waiting to run
CI / ASan + LSan + UBSan + integer, no depends, USDT (push) Waiting to run
Merge bitcoin/bitcoin#30893: test: Introduce ensure_for helper
111465d72d test: Remove unused attempts parameter from wait_until (Fabian Jahr)
5468a23eb9 test: Add check_interval parameter to wait_until (Fabian Jahr)
16c87d91fd test: Introduce ensure_for helper (Fabian Jahr)

Pull request description:

  A repeating pattern in the functional tests is that the test sleeps for a while to ensure that a certain condition is still true after some amount of time has elapsed. Most recently a new case of this was added in #30807. This PR here introduces an `ensure` helper to streamline this functionality.

  Some approach considerations:
  - It is possible to construct this by reusing `wait_until` and wrapping it in `try` internally. However, the logger output of the failing wait would still be printed which seems irritating. So I opted for simplified but similar internals to `wait_until`.
  - This implementation starts for a failure in the condition right away which has the nice side-effect that it might give feedback on a failure earlier than is currently the case. However, in some cases, it may be expected that the condition may still be false at the beginning and then turns true until time has run out, something that would work when the test sleeps without checking in a loop. I decided against this design (and even against adding it as an option) because such a test design seems like it would be racy either way.
  - I have also been going back and forth on naming. To me `ensure` works well but I am also not a native speaker, happy consider a different name if others don't think it's clear enough.

ACKs for top commit:
  maflcko:
    re-ACK 111465d72d 🍋
  achow101:
    ACK 111465d72d
  tdb3:
    code review re ACK 111465d72d
  furszy:
    utACK 111465d72d

Tree-SHA512: ce01a4f3531995375a6fbf01b27d51daa9d4c3d7cd10381be6e86ec5925d2965861000f7cb4796b8d40aabe3b64c4c27e2811270e4e3c9916689575b8ba4a2aa
2024-11-19 13:56:20 -05:00

594 lines
26 KiB
Python
Executable file

#!/usr/bin/env python3
# Copyright (c) 2015-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface."""
import os
import struct
import tempfile
from io import BytesIO
from test_framework.address import (
ADDRESS_BCRT1_P2WSH_OP_TRUE,
ADDRESS_BCRT1_UNSPENDABLE,
)
from test_framework.blocktools import (
add_witness_commitment,
create_block,
create_coinbase,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import (
CBlock,
hash256,
tx_from_hex,
)
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
ensure_for,
p2p_port,
)
from test_framework.wallet import (
MiniWallet,
)
from test_framework.netutil import test_ipv6_local, test_unix_socket
# Test may be skipped and not have zmq installed
try:
import zmq
except ImportError:
pass
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
class ZMQSubscriber:
def __init__(self, socket, topic):
self.sequence = None # no sequence number received yet
self.socket = socket
self.topic = topic
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
# Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
received_seq = struct.unpack('<I', seq)[-1]
if self.sequence is None:
self.sequence = received_seq
else:
assert_equal(received_seq, self.sequence)
self.sequence += 1
return body
def receive(self):
return self._receive_from_publisher_and_check()
def receive_sequence(self):
body = self._receive_from_publisher_and_check()
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
if mempool_sequence is not None:
assert label == "A" or label == "R"
else:
assert label == "D" or label == "C"
return (hash, label, mempool_sequence)
class ZMQTestSetupBlock:
"""Helper class for setting up a ZMQ test via the "sync up" procedure.
Generates a block on the specified node on instantiation and provides a
method to check whether a ZMQ notification matches, i.e. the event was
caused by this generated block. Assumes that a notification either contains
the generated block's hash, it's (coinbase) transaction id, the raw block or
raw transaction data.
"""
def __init__(self, test_framework, node):
self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
coinbase = node.getblock(self.block_hash, 2)['tx'][0]
self.tx_hash = coinbase['txid']
self.raw_tx = coinbase['hex']
self.raw_block = node.getblock(self.block_hash, 0)
def caused_notification(self, notification):
return (
self.block_hash in notification
or self.tx_hash in notification
or self.raw_block in notification
or self.raw_tx in notification
)
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
# whitelist peers to speed up tx relay / mempool sync
self.noban_tx_relay = True
self.zmq_port_base = p2p_port(self.num_nodes + 1)
def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
self.skip_if_no_bitcoind_zmq()
def run_test(self):
self.wallet = MiniWallet(self.nodes[0])
self.ctx = zmq.Context()
try:
self.test_basic()
if test_unix_socket():
self.test_basic(unix=True)
else:
self.log.info("Skipping ipc test, because UNIX sockets are not supported.")
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
self.test_multiple_interfaces()
self.test_ipv6()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
if ipv6:
socket.setsockopt(zmq.IPV6, 1)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, [f"-zmqpub{topic}={address.replace('ipc://', 'unix:')}" for topic, address in services])
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive the corresponding notification on all subscribers
# 3. If all subscribers get the message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
test_block = ZMQTestSetupBlock(self, self.nodes[0])
recv_failed = False
for sub in subscribers:
try:
while not test_block.caused_notification(sub.receive().hex()):
self.log.debug("Ignoring sync-up notification for previously generated block.")
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break
# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()
return subscribers
def test_basic(self, unix = False):
self.log.info(f"Running basic test with {'ipc' if unix else 'tcp'} protocol")
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = f"tcp://127.0.0.1:{self.zmq_port_base}"
if unix:
# Use the shortest temp path possible since paths may have as little as 92-char limit
socket_path = tempfile.NamedTemporaryFile().name
address = f"ipc://{socket_path}"
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
hashblock = subs[0]
hashtx = subs[1]
rawblock = subs[2]
rawtx = subs[3]
num_blocks = 5
self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
for x in range(num_blocks):
# Should receive the coinbase txid.
txid = hashtx.receive()
# Should receive the coinbase raw transaction.
tx = tx_from_hex(rawtx.receive().hex())
tx.calc_sha256()
assert_equal(tx.hash, txid.hex())
# Should receive the generated raw block.
hex = rawblock.receive()
block = CBlock()
block.deserialize(BytesIO(hex))
assert block.is_valid()
assert_equal(block.vtx[0].hash, tx.hash)
assert_equal(len(block.vtx), 1)
assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
# Should receive the generated block hash.
hash = hashblock.receive().hex()
assert_equal(genhashes[x], hash)
# The block should only have the coinbase txid.
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
self.log.info("Wait for tx from second node")
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
payment_txid = payment_tx['txid']
self.sync_all()
# Should receive the broadcasted txid.
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())
# Should receive the broadcasted raw transaction.
hex = rawtx.receive()
assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex())
# Mining the block with this tx should result in second notification
# after coinbase tx notification
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
hashtx.receive()
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())
self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": address, "hwm": 1000},
{"type": "pubhashtx", "address": address, "hwm": 1000},
{"type": "pubrawblock", "address": address, "hwm": 1000},
{"type": "pubrawtx", "address": address, "hwm": 1000},
])
assert_equal(self.nodes[1].getzmqnotifications(), [])
if unix:
os.unlink(socket_path)
def test_reorg(self):
address = f"tcp://127.0.0.1:{self.zmq_port_base}"
# Should only notify the tip if a reorg occurs
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
# Generate 2 blocks in nodes[1] to a different address to ensure split
connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
self.sync_blocks() # tx in mempool valid but not advertised
# Should receive nodes[1] tip
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
# During reorg:
# Get old payment transaction notification from disconnect and disconnected cb
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
# And the payment transaction again due to mempool entry
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), payment_txid)
# And the new connected coinbases
for i in [0, 1]:
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
# If we do a simple invalidate we announce the disconnected coinbase
self.nodes[0].invalidateblock(connect_blocks[1])
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order
of processing, regardless of IBD, re-orgs, etc.
Format of messages:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
# Generate 1 block in nodes[0] and receive all notifications
dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
# Note: We are not notified of any block transactions, coinbase or mined
assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
# Generate 2 blocks in nodes[1] to a different address to ensure a chain split
self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
# Then we receive all block (dis)connect notifications for the 2 block reorg
assert_equal((dc_block, "D", None), seq.receive_sequence())
block_count = self.nodes[1].getblockcount()
assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
self.log.info("Wait for tx from second node")
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
payment_txid = payment_tx['txid']
self.sync_all()
self.log.info("Testing sequence notifications with mempool sequence values")
# Should receive the broadcasted txid.
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
self.log.info("Testing RBF notification")
# Replace it to test eviction/addition notification
payment_tx['tx'].vout[0].nValue -= 1000
rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex())
self.sync_all()
assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
seq_num += 1
assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Doesn't get published when mined, make a block and tx to "flush" the possibility
# though the mempool sequence number does go up by the number of transactions
# removed from the mempool by the block mining it.
mempool_size = len(self.nodes[0].getrawmempool())
c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0]
# Make sure the number of mined transactions matches the number of txs out of mempool
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
seq_num += mempool_size_delta
payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid']
self.sync_all()
assert_equal((c_block, "C", None), seq.receive_sequence())
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Spot check getrawmempool results that they only show up when asked for
assert type(self.nodes[0].getrawmempool()) is list
assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
self.log.info("Testing reorg notifications")
# Manually invalidate the last block to test mempool re-entry
# N.B. This part could be made more lenient in exact ordering
# since it greatly depends on inner-workings of blocks/mempool
# during "deep" re-orgs. Probably should "re-construct"
# blockchain/mempool state from notifications instead.
block_count = self.nodes[0].getblockcount()
best_hash = self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(best_hash)
# Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
# of the time they were gathered.
ensure_for(duration=2, f=lambda: self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num)
assert_equal((best_hash, "D", None), seq.receive_sequence())
assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Other things may happen but aren't wallet-deterministic so we don't test for them currently
self.nodes[0].reconsiderblock(best_hash)
self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE)
self.log.info("Evict mempool transaction by block conflict")
orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0])
orig_txid = orig_tx['txid']
# More to be simply mined
more_tx = []
for _ in range(5):
more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
orig_tx['tx'].vout[0].nValue -= 1000
bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex())
# Mine the pre-bump tx
txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx]
block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add)
add_witness_commitment(block)
block.solve()
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
tip = self.nodes[0].getbestblockhash()
assert_equal(int(tip, 16), block.sha256)
orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
# Flush old notifications until evicted tx original entry
(hash_str, label, mempool_seq) = seq.receive_sequence()
while hash_str != orig_txid:
(hash_str, label, mempool_seq) = seq.receive_sequence()
mempool_seq += 1
# Added original tx
assert_equal(label, "A")
# More transactions to be simply mined
for i in range(len(more_tx)):
assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
# Bumped by rbf
assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
mempool_seq += 1
assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
# Conflict announced first, then block
assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence())
mempool_seq += 1
assert_equal((tip, "C", None), seq.receive_sequence())
mempool_seq += len(more_tx)
# Last tx
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all() # want to make sure we didn't break "consensus" for other tests
def test_mempool_sync(self):
"""
Use sequence notification plus getrawmempool sequence results to "sync mempool"
"""
self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
assert_equal(next_mempool_seq, 1)
# Some transactions have been happening but we aren't consuming zmq notifications yet
# or we lost a ZMQ message somehow and want to start over
txs = []
num_txs = 5
for _ in range(num_txs):
txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1]))
self.sync_all()
# 1) Consume backlog until we get a mempool sequence number
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
while zmq_mem_seq is None:
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
assert label == "A" or label == "R"
assert hash_str is not None
# 2) We need to "seed" our view of the mempool
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
assert_equal(get_raw_seq, num_txs + 1)
assert zmq_mem_seq < get_raw_seq
# Things continue to happen in the "interim" while waiting for snapshot results
# We have node 0 do all these to avoid p2p races with RBF announcements
for _ in range(num_txs):
txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
txs[-1]['tx'].vout[0].nValue -= 1000
self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex())
self.sync_all()
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
while True:
if zmq_mem_seq == get_raw_seq - 1:
break
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")
# 4) Moving forward, we apply the delta to our local view
# remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
expected_sequence = get_raw_seq
r_gap = 0
for _ in range(num_txs + 2 + 1 + 1):
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
if mempool_sequence != expected_sequence:
# Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
# position in the incoming block message "C"
if label == "R":
assert mempool_sequence > expected_sequence
r_gap += mempool_sequence - expected_sequence
else:
raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "R":
assert hash_str in mempool_view
mempool_view.remove(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "C":
# (Attempt to) remove all txids from known block connects
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
for txid in block_txids:
if txid in mempool_view:
expected_sequence += 1
mempool_view.remove(txid)
expected_sequence -= r_gap
r_gap = 0
elif label == "D":
# Not useful for mempool tracking per se
continue
else:
raise Exception("Unexpected ZMQ sequence label!")
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"),
("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"),
], sync_blocks=False)
# Generate 1 block in nodes[0] and receive all notifications
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)
# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
def test_ipv6(self):
if not test_ipv6_local():
self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
return
self.log.info("Testing IPv6")
# Set up subscriber using IPv6 loopback address
subscribers = self.setup_zmq_test([
("hashblock", f"tcp://[::1]:{self.zmq_port_base}")
], ipv6=True)
# Generate 1 block in nodes[0]
self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
if __name__ == '__main__':
ZMQTest(__file__).main()