2016-03-19 20:58:06 +01:00
#!/usr/bin/env python3
# Copyright (c) 2010 ArtForz -- public domain half-a-node
# Copyright (c) 2012 Jeff Garzik
2022-12-24 23:49:50 +00:00
# Copyright (c) 2010-2022 The Bitcoin Core developers
2016-03-19 20:58:06 +01:00
# Distributed under the MIT software license, see the accompanying
2015-04-28 12:36:15 -04:00
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
2020-08-17 10:10:44 +01:00
""" Test objects for interacting with a bitcoind node over the p2p protocol.
2017-01-17 18:34:40 -05:00
2020-08-17 10:10:44 +01:00
The P2PInterface objects interact with the bitcoind nodes under test using the
node ' s p2p interface. They can be used to send messages to the node, and
callbacks can be registered that execute when messages are received from the
node . Messages are sent to / received from the node on an asyncio event loop .
State held inside the objects must be guarded by the p2p_lock to avoid data
races between the main testing thread and the event loop .
2017-01-17 18:34:40 -05:00
2017-10-17 16:16:39 -04:00
P2PConnection : A low - level connection object to a node ' s P2P interface
2017-11-22 11:45:14 -05:00
P2PInterface : A high - level interface object for communicating to a node over P2P
P2PDataStore : A p2p interface class that keeps a store of transactions and blocks
2020-01-30 18:52:25 -08:00
and can respond correctly to getdata and getheaders messages
P2PTxInvStore : A p2p interface class that inherits from P2PDataStore , and keeps
a count of how many times each txid has been announced . """
2018-06-18 17:28:37 -04:00
import asyncio
2017-03-29 14:07:39 -04:00
from collections import defaultdict
from io import BytesIO
2015-04-28 12:36:15 -04:00
import logging
2023-12-08 17:30:19 +01:00
import platform
2017-03-29 14:07:39 -04:00
import struct
import sys
2017-12-08 10:50:24 -05:00
import threading
2017-03-29 14:07:39 -04:00
2018-09-15 20:01:20 -04:00
from test_framework . messages import (
CBlockHeader ,
2020-06-14 12:54:37 +02:00
MAX_HEADERS_RESULTS ,
2018-09-15 20:01:20 -04:00
msg_addr ,
2020-05-20 12:05:18 +02:00
msg_addrv2 ,
2018-09-15 20:01:20 -04:00
msg_block ,
MSG_BLOCK ,
msg_blocktxn ,
2020-05-04 14:10:18 -04:00
msg_cfcheckpt ,
2020-05-04 14:27:29 -04:00
msg_cfheaders ,
msg_cfilter ,
2018-09-15 20:01:20 -04:00
msg_cmpctblock ,
msg_feefilter ,
2020-04-03 15:54:14 +02:00
msg_filteradd ,
2020-03-31 00:10:32 +02:00
msg_filterclear ,
2020-03-12 13:36:46 -04:00
msg_filterload ,
2018-09-15 20:01:20 -04:00
msg_getaddr ,
msg_getblocks ,
msg_getblocktxn ,
2022-05-13 13:35:46 +02:00
msg_getcfcheckpt ,
msg_getcfheaders ,
msg_getcfilters ,
2018-09-15 20:01:20 -04:00
msg_getdata ,
msg_getheaders ,
msg_headers ,
msg_inv ,
msg_mempool ,
2020-03-12 13:36:46 -04:00
msg_merkleblock ,
2018-09-15 20:01:20 -04:00
msg_notfound ,
msg_ping ,
msg_pong ,
2020-05-20 12:05:18 +02:00
msg_sendaddrv2 ,
2018-09-15 20:01:20 -04:00
msg_sendcmpct ,
msg_sendheaders ,
2022-03-30 17:21:56 +03:00
msg_sendtxrcncl ,
2018-09-15 20:01:20 -04:00
msg_tx ,
MSG_TX ,
MSG_TYPE_MASK ,
msg_verack ,
msg_version ,
2020-03-27 02:12:47 +01:00
MSG_WTX ,
msg_wtxidrelay ,
2018-09-15 20:01:20 -04:00
NODE_NETWORK ,
NODE_WITNESS ,
2023-11-15 10:38:40 +05:30
MAGIC_BYTES ,
2018-09-15 20:01:20 -04:00
sha256 ,
)
2020-06-10 13:29:07 -07:00
from test_framework . util import (
MAX_NODES ,
p2p_port ,
2023-10-03 18:34:20 +02:00
wait_until_helper_internal ,
2020-06-10 13:29:07 -07:00
)
2022-02-05 22:11:02 +05:30
from test_framework . v2_p2p import (
EncryptedP2PState ,
2022-02-05 23:28:02 +05:30
MSGTYPE_TO_SHORTID ,
2022-12-11 00:22:20 +05:30
SHORTID ,
2022-02-05 22:11:02 +05:30
)
2016-04-08 21:02:24 -04:00
2020-07-19 14:47:05 +07:00
logger = logging . getLogger ( " TestFramework.p2p " )
2017-02-15 12:21:22 -05:00
2020-11-28 11:41:25 +00:00
# The minimum P2P version that this test framework supports
MIN_P2P_VERSION_SUPPORTED = 60001
2020-11-28 11:41:25 +00:00
# The P2P version that this test framework implements and sends in its `version` message
# Version 70016 supports wtxid relay
P2P_VERSION = 70016
2020-11-28 11:41:25 +00:00
# The services that this test framework offers in its `version` message
P2P_SERVICES = NODE_NETWORK | NODE_WITNESS
2020-11-28 11:41:25 +00:00
# The P2P user agent string that this test framework sends in its `version` message
P2P_SUBVERSION = " /python-p2p-tester:0.0.3/ "
2020-11-28 11:41:25 +00:00
# Value for relay that this test framework sends in its `version` message
P2P_VERSION_RELAY = 1
2021-02-26 12:53:40 -08:00
# Delay after receiving a tx inv before requesting transactions from non-preferred peers, in seconds
NONPREF_PEER_TX_DELAY = 2
2023-04-19 15:40:18 +01:00
# Delay for requesting transactions via txids if we have wtxid-relaying peers, in seconds
TXID_RELAY_DELAY = 2
# Delay for requesting transactions if the peer has MAX_PEER_TX_REQUEST_IN_FLIGHT or more requests
OVERLOADED_PEER_TX_DELAY = 2
# How long to wait before downloading a transaction from an additional peer
GETDATA_TX_INTERVAL = 60
2020-11-28 11:41:25 +00:00
2017-10-17 07:51:50 -04:00
MESSAGEMAP = {
b " addr " : msg_addr ,
2020-05-20 12:05:18 +02:00
b " addrv2 " : msg_addrv2 ,
2017-10-17 07:51:50 -04:00
b " block " : msg_block ,
b " blocktxn " : msg_blocktxn ,
2020-05-04 14:10:18 -04:00
b " cfcheckpt " : msg_cfcheckpt ,
2020-05-04 14:27:29 -04:00
b " cfheaders " : msg_cfheaders ,
b " cfilter " : msg_cfilter ,
2017-10-17 07:51:50 -04:00
b " cmpctblock " : msg_cmpctblock ,
b " feefilter " : msg_feefilter ,
2020-04-03 15:54:14 +02:00
b " filteradd " : msg_filteradd ,
2020-03-31 00:10:32 +02:00
b " filterclear " : msg_filterclear ,
2020-03-12 13:36:46 -04:00
b " filterload " : msg_filterload ,
2017-10-17 07:51:50 -04:00
b " getaddr " : msg_getaddr ,
b " getblocks " : msg_getblocks ,
b " getblocktxn " : msg_getblocktxn ,
2022-05-13 13:35:46 +02:00
b " getcfcheckpt " : msg_getcfcheckpt ,
b " getcfheaders " : msg_getcfheaders ,
b " getcfilters " : msg_getcfilters ,
2017-10-17 07:51:50 -04:00
b " getdata " : msg_getdata ,
b " getheaders " : msg_getheaders ,
b " headers " : msg_headers ,
b " inv " : msg_inv ,
b " mempool " : msg_mempool ,
2020-03-12 13:36:46 -04:00
b " merkleblock " : msg_merkleblock ,
2018-09-15 20:01:20 -04:00
b " notfound " : msg_notfound ,
2017-10-17 07:51:50 -04:00
b " ping " : msg_ping ,
b " pong " : msg_pong ,
2020-05-20 12:05:18 +02:00
b " sendaddrv2 " : msg_sendaddrv2 ,
2017-10-17 07:51:50 -04:00
b " sendcmpct " : msg_sendcmpct ,
b " sendheaders " : msg_sendheaders ,
2022-03-30 17:21:56 +03:00
b " sendtxrcncl " : msg_sendtxrcncl ,
2017-10-17 07:51:50 -04:00
b " tx " : msg_tx ,
b " verack " : msg_verack ,
b " version " : msg_version ,
2020-03-27 02:12:47 +01:00
b " wtxidrelay " : msg_wtxidrelay ,
2017-10-17 07:51:50 -04:00
}
2018-06-18 17:28:37 -04:00
class P2PConnection ( asyncio . Protocol ) :
2017-11-17 15:01:24 -05:00
""" A low-level connection object to a node ' s P2P interface.
2017-10-16 22:31:18 -04:00
2017-11-17 15:01:24 -05:00
This class is responsible for :
2015-04-28 12:36:15 -04:00
2017-11-17 15:01:24 -05:00
- opening and closing the TCP connection to the node
- reading bytes from and writing bytes to the socket
- deserializing and serializing the P2P message header
- logging messages as they are sent and received
This class contains no logic for handing the P2P message payloads . It must be
2017-10-17 16:16:39 -04:00
sub - classed and the on_message ( ) callback overridden . """
2017-11-17 15:01:24 -05:00
def __init__ ( self ) :
2018-06-18 17:28:37 -04:00
# The underlying transport of the connection.
# Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
self . _transport = None
2023-11-24 23:55:44 +05:30
# This lock is acquired before sending messages over the socket. There's an implied lock order and
# p2p_lock must not be acquired after _send_lock as it could result in deadlocks.
self . _send_lock = threading . Lock ( )
2022-02-05 22:11:02 +05:30
self . v2_state = None # EncryptedP2PState object needed for v2 p2p connections
2022-12-10 12:12:37 +05:30
self . reconnect = False # set if reconnection needs to happen
2018-06-20 21:24:29 -04:00
@property
def is_connected ( self ) :
2018-06-18 17:28:37 -04:00
return self . _transport is not None
2018-06-20 21:24:29 -04:00
2022-02-05 22:11:02 +05:30
@property
def supports_v2_p2p ( self ) :
return self . v2_state is not None
2020-06-10 13:29:07 -07:00
def peer_connect_helper ( self , dstaddr , dstport , net , timeout_factor ) :
2018-06-18 17:28:37 -04:00
assert not self . is_connected
2020-05-18 09:45:55 +05:30
self . timeout_factor = timeout_factor
2015-04-28 12:36:15 -04:00
self . dstaddr = dstaddr
self . dstport = dstport
2018-06-18 17:28:37 -04:00
# The initial message to send after the connection was made:
self . on_connection_send_msg = None
2016-04-10 16:54:28 +02:00
self . recvbuf = b " "
2019-01-24 17:13:06 -05:00
self . magic_bytes = MAGIC_BYTES [ net ]
2020-06-10 13:29:07 -07:00
2022-02-05 22:11:02 +05:30
def peer_connect ( self , dstaddr , dstport , * , net , timeout_factor , supports_v2_p2p ) :
2020-06-10 13:29:07 -07:00
self . peer_connect_helper ( dstaddr , dstport , net , timeout_factor )
2022-02-05 22:11:02 +05:30
if supports_v2_p2p :
self . v2_state = EncryptedP2PState ( initiating = True , net = net )
2015-04-28 12:36:15 -04:00
2018-06-18 17:28:37 -04:00
loop = NetworkThread . network_event_loop
2020-06-10 13:29:07 -07:00
logger . debug ( ' Connecting to Bitcoin Node: %s : %d ' % ( self . dstaddr , self . dstport ) )
coroutine = loop . create_connection ( lambda : self , host = self . dstaddr , port = self . dstport )
return lambda : loop . call_soon_threadsafe ( loop . create_task , coroutine )
2022-12-10 12:12:37 +05:30
def peer_accept_connection ( self , connect_id , connect_cb = lambda : None , * , net , timeout_factor , supports_v2_p2p , reconnect ) :
2020-06-10 13:29:07 -07:00
self . peer_connect_helper ( ' 0 ' , 0 , net , timeout_factor )
2022-12-10 12:12:37 +05:30
self . reconnect = reconnect
2022-02-05 22:11:02 +05:30
if supports_v2_p2p :
self . v2_state = EncryptedP2PState ( initiating = False , net = net )
2020-06-10 13:29:07 -07:00
logger . debug ( ' Listening for Bitcoin Node with id: {} ' . format ( connect_id ) )
return lambda : NetworkThread . listen ( self , connect_cb , idx = connect_id )
2015-04-28 12:36:15 -04:00
2017-11-17 15:01:24 -05:00
def peer_disconnect ( self ) :
# Connection could have already been closed by other end.
2018-06-18 17:28:37 -04:00
NetworkThread . network_event_loop . call_soon_threadsafe ( lambda : self . _transport and self . _transport . abort ( ) )
2017-11-17 15:01:24 -05:00
2017-10-17 07:51:50 -04:00
# Connection and disconnection methods
2018-06-18 17:28:37 -04:00
def connection_made ( self , transport ) :
""" asyncio callback when a connection is opened. """
assert not self . _transport
logger . debug ( " Connected & Listening: %s : %d " % ( self . dstaddr , self . dstport ) )
self . _transport = transport
2022-02-05 22:11:02 +05:30
# in an inbound connection to the TestNode with P2PConnection as the initiator, [TestNode <---- P2PConnection]
# send the initial handshake immediately
if self . supports_v2_p2p and self . v2_state . initiating and not self . v2_state . tried_v2_handshake :
send_handshake_bytes = self . v2_state . initiate_v2_handshake ( )
2024-05-17 09:40:38 +05:30
logger . debug ( f " sending { len ( self . v2_state . sent_garbage ) } bytes of garbage data " )
2022-02-05 22:11:02 +05:30
self . send_raw_message ( send_handshake_bytes )
2024-01-30 04:34:43 +01:00
# for v1 outbound connections, send version message immediately after opening
# (for v2 outbound connections, send it after the initial v2 handshake)
if self . p2p_connected_to_node and not self . supports_v2_p2p :
2024-01-30 02:59:01 +01:00
self . send_version ( )
2018-06-18 17:28:37 -04:00
self . on_open ( )
def connection_lost ( self , exc ) :
""" asyncio callback when a connection is closed. """
2022-12-10 12:12:37 +05:30
# don't display warning if reconnection needs to be attempted using v1 P2P
if exc and not self . reconnect :
2018-06-18 17:28:37 -04:00
logger . warning ( " Connection lost to {} : {} due to {} " . format ( self . dstaddr , self . dstport , exc ) )
else :
logger . debug ( " Closed connection to: %s : %d " % ( self . dstaddr , self . dstport ) )
self . _transport = None
2016-04-10 16:54:28 +02:00
self . recvbuf = b " "
2017-11-17 15:01:24 -05:00
self . on_close ( )
2015-04-28 12:36:15 -04:00
2022-02-05 23:08:49 +05:30
# v2 handshake method
2024-01-31 14:34:34 +05:30
def _on_data_v2_handshake ( self ) :
2022-02-05 23:08:49 +05:30
""" v2 handshake performed before P2P messages are exchanged (see BIP324). P2PConnection is the initiator
( in inbound connections to TestNode ) and the responder ( in outbound connections from TestNode ) .
Performed by :
* initiator using ` initiate_v2_handshake ( ) ` , ` complete_handshake ( ) ` and ` authenticate_handshake ( ) `
* responder using ` respond_v2_handshake ( ) ` , ` complete_handshake ( ) ` and ` authenticate_handshake ( ) `
` initiate_v2_handshake ( ) ` is immediately done by the initiator when the connection is established in
` connection_made ( ) ` . The rest of the initial v2 handshake functions are handled here .
"""
if not self . v2_state . peer :
if not self . v2_state . initiating and not self . v2_state . sent_garbage :
# if the responder hasn't sent garbage yet, the responder is still reading ellswift bytes
# reads ellswift bytes till the first mismatch from 12 bytes V1_PREFIX
length , send_handshake_bytes = self . v2_state . respond_v2_handshake ( BytesIO ( self . recvbuf ) )
self . recvbuf = self . recvbuf [ length : ]
if send_handshake_bytes == - 1 :
self . v2_state = None
return
elif send_handshake_bytes :
2024-05-17 09:40:38 +05:30
logger . debug ( f " sending { len ( self . v2_state . sent_garbage ) } bytes of garbage data " )
2022-02-05 23:08:49 +05:30
self . send_raw_message ( send_handshake_bytes )
elif send_handshake_bytes == b " " :
return # only after send_handshake_bytes are sent can `complete_handshake()` be done
# `complete_handshake()` reads the remaining ellswift bytes from recvbuf
# and sends response after deriving shared ECDH secret using received ellswift bytes
length , response = self . v2_state . complete_handshake ( BytesIO ( self . recvbuf ) )
self . recvbuf = self . recvbuf [ length : ]
if response :
self . send_raw_message ( response )
else :
return # only after response is sent can `authenticate_handshake()` be done
# `self.v2_state.peer` is instantiated only after shared ECDH secret/BIP324 derived keys and ciphers
# is derived in `complete_handshake()`.
# so `authenticate_handshake()` which uses the BIP324 derived ciphers gets called after `complete_handshake()`.
assert self . v2_state . peer
length , is_mac_auth = self . v2_state . authenticate_handshake ( self . recvbuf )
if not is_mac_auth :
raise ValueError ( " invalid v2 mac tag in handshake authentication " )
self . recvbuf = self . recvbuf [ length : ]
2024-01-30 02:59:01 +01:00
if self . v2_state . tried_v2_handshake :
2024-01-30 04:34:43 +01:00
# for v2 outbound connections, send version message immediately after v2 handshake
if self . p2p_connected_to_node :
self . send_version ( )
2024-02-01 13:23:50 +01:00
# process post-v2-handshake data immediately, if available
if len ( self . recvbuf ) > 0 :
self . _on_data ( )
2022-02-05 23:08:49 +05:30
2017-10-17 07:51:50 -04:00
# Socket read methods
2018-06-18 17:28:37 -04:00
def data_received ( self , t ) :
""" asyncio callback when data is read from the socket. """
2017-09-13 09:17:15 -04:00
if len ( t ) > 0 :
self . recvbuf + = t
2022-02-05 23:08:49 +05:30
if self . supports_v2_p2p and not self . v2_state . tried_v2_handshake :
2024-01-31 14:34:34 +05:30
self . _on_data_v2_handshake ( )
2022-02-05 23:08:49 +05:30
else :
self . _on_data ( )
2017-11-23 09:47:11 -05:00
def _on_data ( self ) :
""" Try to read P2P messages from the recv buffer.
2015-04-28 12:36:15 -04:00
2017-11-23 09:47:11 -05:00
This method reads data from the buffer in a loop . It deserializes ,
parses and verifies the P2P header , then passes the P2P payload to
the on_message callback for processing . """
2016-03-31 18:33:15 +02:00
try :
while True :
2022-12-11 00:22:20 +05:30
if self . supports_v2_p2p :
# v2 P2P messages are read
msglen , msg = self . v2_state . v2_receive_packet ( self . recvbuf )
if msglen == - 1 :
raise ValueError ( " invalid v2 mac tag " + repr ( self . recvbuf ) )
elif msglen == 0 : # need to receive more bytes in recvbuf
return
self . recvbuf = self . recvbuf [ msglen : ]
2022-12-10 20:12:51 +05:30
if msg is None : # ignore decoy messages
return
2022-12-11 00:22:20 +05:30
assert msg # application layer messages (which aren't decoy messages) are non-empty
shortid = msg [ 0 ] # 1-byte short message type ID
if shortid == 0 :
# next 12 bytes are interpreted as ASCII message type if shortid is b'\x00'
if len ( msg ) < 13 :
raise IndexError ( " msg needs minimum required length of 13 bytes " )
msgtype = msg [ 1 : 13 ] . rstrip ( b ' \x00 ' )
msg = msg [ 13 : ] # msg is set to be payload
else :
# a 1-byte short message type ID
msgtype = SHORTID . get ( shortid , f " unknown- { shortid } " )
msg = msg [ 1 : ]
else :
# v1 P2P messages are read
if len ( self . recvbuf ) < 4 :
return
if self . recvbuf [ : 4 ] != self . magic_bytes :
raise ValueError ( " magic bytes mismatch: {} != {} " . format ( repr ( self . magic_bytes ) , repr ( self . recvbuf ) ) )
if len ( self . recvbuf ) < 4 + 12 + 4 + 4 :
return
msgtype = self . recvbuf [ 4 : 4 + 12 ] . split ( b " \x00 " , 1 ) [ 0 ]
msglen = struct . unpack ( " <i " , self . recvbuf [ 4 + 12 : 4 + 12 + 4 ] ) [ 0 ]
checksum = self . recvbuf [ 4 + 12 + 4 : 4 + 12 + 4 + 4 ]
if len ( self . recvbuf ) < 4 + 12 + 4 + 4 + msglen :
return
msg = self . recvbuf [ 4 + 12 + 4 + 4 : 4 + 12 + 4 + 4 + msglen ]
th = sha256 ( msg )
h = sha256 ( th )
if checksum != h [ : 4 ] :
raise ValueError ( " got bad checksum " + repr ( self . recvbuf ) )
self . recvbuf = self . recvbuf [ 4 + 12 + 4 + 4 + msglen : ]
2020-04-10 22:56:07 +02:00
if msgtype not in MESSAGEMAP :
raise ValueError ( " Received unknown msgtype from %s : %d : ' %s ' %s " % ( self . dstaddr , self . dstport , msgtype , repr ( msg ) ) )
2017-10-17 10:59:20 -04:00
f = BytesIO ( msg )
2020-04-10 22:56:07 +02:00
t = MESSAGEMAP [ msgtype ] ( )
2017-10-17 10:59:20 -04:00
t . deserialize ( f )
2017-11-23 09:47:11 -05:00
self . _log_message ( " receive " , t )
self . on_message ( t )
2016-03-31 18:33:15 +02:00
except Exception as e :
2022-12-10 12:12:37 +05:30
if not self . reconnect :
logger . exception ( ' Error reading message: ' , repr ( e ) )
2017-09-13 09:17:15 -04:00
raise
2015-04-28 12:36:15 -04:00
2017-11-23 09:47:11 -05:00
def on_message ( self , message ) :
2017-11-17 15:01:24 -05:00
""" Callback for processing a P2P payload. Must be overridden by derived class. """
raise NotImplementedError
2017-10-17 07:51:50 -04:00
# Socket write methods
2022-12-10 20:12:51 +05:30
def send_message ( self , message , is_decoy = False ) :
2017-11-23 09:47:11 -05:00
""" Send a P2P message over the socket.
This method takes a P2P payload , builds the P2P header and adds
the message to the send buffer to be sent over the socket . """
2023-11-24 23:55:44 +05:30
with self . _send_lock :
2022-12-10 20:12:51 +05:30
tmsg = self . build_message ( message , is_decoy )
2023-11-24 23:55:44 +05:30
self . _log_message ( " send " , message )
return self . send_raw_message ( tmsg )
2018-10-19 13:34:28 -04:00
def send_raw_message ( self , raw_message_bytes ) :
2018-06-20 21:24:29 -04:00
if not self . is_connected :
raise IOError ( ' Not connected ' )
2018-07-23 14:43:45 +02:00
def maybe_write ( ) :
if not self . _transport :
return
2019-02-20 18:13:43 -05:00
if self . _transport . is_closing ( ) :
2018-07-23 14:43:45 +02:00
return
2018-10-19 13:34:28 -04:00
self . _transport . write ( raw_message_bytes )
2018-07-23 14:43:45 +02:00
NetworkThread . network_event_loop . call_soon_threadsafe ( maybe_write )
2015-04-28 12:36:15 -04:00
2017-10-17 07:51:50 -04:00
# Class utility methods
2015-04-28 12:36:15 -04:00
2022-12-10 20:12:51 +05:30
def build_message ( self , message , is_decoy = False ) :
2018-06-20 21:24:29 -04:00
""" Build a serialized P2P message """
2020-04-10 22:56:07 +02:00
msgtype = message . msgtype
2018-06-20 21:24:29 -04:00
data = message . serialize ( )
2022-02-05 23:28:02 +05:30
if self . supports_v2_p2p :
if msgtype in SHORTID . values ( ) :
tmsg = MSGTYPE_TO_SHORTID . get ( msgtype ) . to_bytes ( 1 , ' big ' )
else :
tmsg = b " \x00 "
tmsg + = msgtype
tmsg + = b " \x00 " * ( 12 - len ( msgtype ) )
tmsg + = data
2022-12-10 20:12:51 +05:30
return self . v2_state . v2_enc_packet ( tmsg , ignore = is_decoy )
2022-02-05 23:28:02 +05:30
else :
tmsg = self . magic_bytes
tmsg + = msgtype
tmsg + = b " \x00 " * ( 12 - len ( msgtype ) )
tmsg + = struct . pack ( " <I " , len ( data ) )
th = sha256 ( data )
h = sha256 ( th )
tmsg + = h [ : 4 ]
tmsg + = data
return tmsg
2018-06-20 21:24:29 -04:00
2017-03-31 16:44:41 -04:00
def _log_message ( self , direction , msg ) :
2017-11-23 09:47:11 -05:00
""" Logs a message being sent or received over the connection. """
2017-03-31 16:44:41 -04:00
if direction == " send " :
log_message = " Send message to "
elif direction == " receive " :
log_message = " Received message from "
log_message + = " %s : %d : %s " % ( self . dstaddr , self . dstport , repr ( msg ) [ : 500 ] )
if len ( log_message ) > 500 :
log_message + = " ... (msg truncated) "
logger . debug ( log_message )
2015-04-28 12:36:15 -04:00
2017-10-17 16:16:39 -04:00
class P2PInterface ( P2PConnection ) :
2017-11-17 15:01:24 -05:00
""" A high-level P2P interface class for communicating with a Bitcoin node.
This class provides high - level callbacks for processing P2P message
payloads , as well as convenience methods for interacting with the
node over P2P .
2017-11-23 10:17:50 -05:00
Individual testcases should subclass this and override the on_ * methods
2017-10-17 16:16:39 -04:00
if they want to alter message handling behaviour . """
2020-10-17 11:20:43 -04:00
def __init__ ( self , support_addrv2 = False , wtxidrelay = True ) :
2017-11-17 15:01:24 -05:00
super ( ) . __init__ ( )
2017-11-23 10:17:50 -05:00
2020-07-27 07:55:49 -04:00
# Track number of messages of each type received.
# Should be read-only in a test.
2017-11-23 10:17:50 -05:00
self . message_count = defaultdict ( int )
2020-07-27 07:55:49 -04:00
# Track the most recent message of each type.
# To wait for a message to be received, pop that message from
2020-08-27 08:55:20 +02:00
# this and use self.wait_until.
2017-11-23 10:17:50 -05:00
self . last_message = { }
# A count of the number of ping messages we've sent to the node
self . ping_counter = 1
2017-11-17 15:01:24 -05:00
# The network services received from the peer
self . nServices = 0
2020-05-20 12:05:18 +02:00
self . support_addrv2 = support_addrv2
2020-10-17 11:20:43 -04:00
# If the peer supports wtxid-relay
self . wtxidrelay = wtxidrelay
2020-06-10 13:29:07 -07:00
def peer_connect_send_version ( self , services ) :
# Send a version msg
vt = msg_version ( )
2020-11-28 11:41:25 +00:00
vt . nVersion = P2P_VERSION
2020-11-28 11:41:25 +00:00
vt . strSubVer = P2P_SUBVERSION
2020-11-28 11:41:25 +00:00
vt . relay = P2P_VERSION_RELAY
2020-06-10 13:29:07 -07:00
vt . nServices = services
vt . addrTo . ip = self . dstaddr
vt . addrTo . port = self . dstport
vt . addrFrom . ip = " 0.0.0.0 "
vt . addrFrom . port = 0
self . on_connection_send_msg = vt # Will be sent in connection_made callback
2023-11-03 13:30:54 +01:00
def peer_connect ( self , * , services = P2P_SERVICES , send_version , * * kwargs ) :
create_conn = super ( ) . peer_connect ( * * kwargs )
2017-10-17 15:56:12 -04:00
if send_version :
2020-06-10 13:29:07 -07:00
self . peer_connect_send_version ( services )
return create_conn
2021-09-19 14:11:09 +02:00
def peer_accept_connection ( self , * args , services = P2P_SERVICES , * * kwargs ) :
2020-06-10 13:29:07 -07:00
create_conn = super ( ) . peer_accept_connection ( * args , * * kwargs )
self . peer_connect_send_version ( services )
2018-06-18 17:28:37 -04:00
return create_conn
2017-10-17 15:56:12 -04:00
2017-11-23 10:17:50 -05:00
# Message receiving methods
2017-11-17 15:01:24 -05:00
def on_message ( self , message ) :
2017-11-23 10:17:50 -05:00
""" Receive message and dispatch message to appropriate callback.
We keep a count of how many of each message type has been received
and the most recent message of each type . """
2020-07-19 14:47:05 +07:00
with p2p_lock :
2017-11-23 10:17:50 -05:00
try :
2020-04-10 22:56:07 +02:00
msgtype = message . msgtype . decode ( ' ascii ' )
self . message_count [ msgtype ] + = 1
self . last_message [ msgtype ] = message
getattr ( self , ' on_ ' + msgtype ) ( message )
2022-08-18 20:23:15 +02:00
except Exception :
2017-11-17 15:01:24 -05:00
print ( " ERROR delivering %s ( %s ) " % ( repr ( message ) , sys . exc_info ( ) [ 0 ] ) )
2017-11-23 10:17:50 -05:00
raise
# Callback methods. Can be overridden by subclasses in individual test
# cases to provide custom message handling behaviour.
2017-11-17 15:01:24 -05:00
def on_open ( self ) :
pass
def on_close ( self ) :
pass
def on_addr ( self , message ) : pass
2020-05-20 12:05:18 +02:00
def on_addrv2 ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_block ( self , message ) : pass
def on_blocktxn ( self , message ) : pass
2020-05-04 14:10:18 -04:00
def on_cfcheckpt ( self , message ) : pass
2020-05-04 14:27:29 -04:00
def on_cfheaders ( self , message ) : pass
def on_cfilter ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_cmpctblock ( self , message ) : pass
def on_feefilter ( self , message ) : pass
2020-04-03 15:54:14 +02:00
def on_filteradd ( self , message ) : pass
2020-03-31 00:10:32 +02:00
def on_filterclear ( self , message ) : pass
2020-03-12 13:36:46 -04:00
def on_filterload ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_getaddr ( self , message ) : pass
def on_getblocks ( self , message ) : pass
def on_getblocktxn ( self , message ) : pass
def on_getdata ( self , message ) : pass
def on_getheaders ( self , message ) : pass
def on_headers ( self , message ) : pass
def on_mempool ( self , message ) : pass
2020-03-12 13:36:46 -04:00
def on_merkleblock ( self , message ) : pass
2018-09-15 20:01:20 -04:00
def on_notfound ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_pong ( self , message ) : pass
2020-05-20 12:05:18 +02:00
def on_sendaddrv2 ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_sendcmpct ( self , message ) : pass
def on_sendheaders ( self , message ) : pass
2022-03-30 17:21:56 +03:00
def on_sendtxrcncl ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_tx ( self , message ) : pass
2020-03-27 02:12:47 +01:00
def on_wtxidrelay ( self , message ) : pass
2017-11-17 15:01:24 -05:00
def on_inv ( self , message ) :
2017-11-23 10:17:50 -05:00
want = msg_getdata ( )
for i in message . inv :
if i . type != 0 :
want . inv . append ( i )
if len ( want . inv ) :
2017-11-17 15:01:24 -05:00
self . send_message ( want )
2017-11-23 10:17:50 -05:00
2017-11-17 15:01:24 -05:00
def on_ping ( self , message ) :
self . send_message ( msg_pong ( message . nonce ) )
2017-11-23 10:17:50 -05:00
2017-11-17 15:01:24 -05:00
def on_verack ( self , message ) :
2018-09-24 22:45:58 +02:00
pass
2017-11-23 10:17:50 -05:00
2017-11-17 15:01:24 -05:00
def on_version ( self , message ) :
2020-11-28 11:41:25 +00:00
assert message . nVersion > = MIN_P2P_VERSION_SUPPORTED , " Version {} received. Test framework only supports versions greater than {} " . format ( message . nVersion , MIN_P2P_VERSION_SUPPORTED )
2024-01-30 04:34:43 +01:00
# for inbound connections, reply to version with own version message
# (could be due to v1 reconnect after a failed v2 handshake)
if not self . p2p_connected_to_node :
2024-01-30 02:59:01 +01:00
self . send_version ( )
2022-12-10 12:12:37 +05:30
self . reconnect = False
2020-10-17 11:20:43 -04:00
if message . nVersion > = 70016 and self . wtxidrelay :
2020-04-21 17:02:46 +02:00
self . send_message ( msg_wtxidrelay ( ) )
2020-05-20 12:05:18 +02:00
if self . support_addrv2 :
self . send_message ( msg_sendaddrv2 ( ) )
2020-12-07 09:12:37 -08:00
self . send_message ( msg_verack ( ) )
2017-11-17 15:01:24 -05:00
self . nServices = message . nServices
2022-10-04 16:20:22 +02:00
self . relay = message . relay
2023-10-10 12:11:00 -04:00
if self . p2p_connected_to_node :
self . send_message ( msg_getaddr ( ) )
2017-11-23 10:17:50 -05:00
# Connection helper methods
2020-07-11 10:41:23 +02:00
def wait_until ( self , test_function_in , * , timeout = 60 , check_connected = True ) :
def test_function ( ) :
if check_connected :
assert self . is_connected
return test_function_in ( )
2023-10-03 18:34:20 +02:00
wait_until_helper_internal ( test_function , timeout = timeout , lock = p2p_lock , timeout_factor = self . timeout_factor )
2020-05-03 01:42:40 +02:00
2020-06-10 13:29:07 -07:00
def wait_for_connect ( self , timeout = 60 ) :
test_function = lambda : self . is_connected
2023-03-08 11:38:03 +01:00
self . wait_until ( test_function , timeout = timeout , check_connected = False )
2020-06-10 13:29:07 -07:00
2017-11-23 10:17:50 -05:00
def wait_for_disconnect ( self , timeout = 60 ) :
2018-06-20 21:24:29 -04:00
test_function = lambda : not self . is_connected
2020-07-11 10:41:23 +02:00
self . wait_until ( test_function , timeout = timeout , check_connected = False )
2017-11-23 10:17:50 -05:00
2023-09-08 19:14:22 +05:30
def wait_for_reconnect ( self , timeout = 60 ) :
def test_function ( ) :
2024-01-31 14:34:34 +05:30
return self . is_connected and self . last_message . get ( ' version ' ) and not self . supports_v2_p2p
2023-09-08 19:14:22 +05:30
self . wait_until ( test_function , timeout = timeout , check_connected = False )
2017-11-23 10:17:50 -05:00
# Message receiving helper methods
2019-05-09 10:42:56 -04:00
def wait_for_tx ( self , txid , timeout = 60 ) :
def test_function ( ) :
if not self . last_message . get ( ' tx ' ) :
return False
return self . last_message [ ' tx ' ] . tx . rehash ( ) == txid
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2019-05-09 10:42:56 -04:00
2017-11-23 10:17:50 -05:00
def wait_for_block ( self , blockhash , timeout = 60 ) :
2019-07-26 16:14:36 -04:00
def test_function ( ) :
return self . last_message . get ( " block " ) and self . last_message [ " block " ] . block . rehash ( ) == blockhash
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
2018-08-08 11:24:59 -04:00
def wait_for_header ( self , blockhash , timeout = 60 ) :
def test_function ( ) :
last_headers = self . last_message . get ( ' headers ' )
if not last_headers :
return False
2020-04-11 15:53:01 +02:00
return last_headers . headers [ 0 ] . rehash ( ) == int ( blockhash , 16 )
2018-08-08 11:24:59 -04:00
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2018-08-08 11:24:59 -04:00
2020-04-10 16:01:33 +02:00
def wait_for_merkleblock ( self , blockhash , timeout = 60 ) :
2020-03-12 13:36:46 -04:00
def test_function ( ) :
last_filtered_block = self . last_message . get ( ' merkleblock ' )
if not last_filtered_block :
return False
2020-04-11 15:53:01 +02:00
return last_filtered_block . merkleblock . header . rehash ( ) == int ( blockhash , 16 )
2020-03-12 13:36:46 -04:00
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2020-03-12 13:36:46 -04:00
2020-04-17 02:23:02 -07:00
def wait_for_getdata ( self , hash_list , timeout = 60 ) :
2017-11-22 11:45:14 -05:00
""" Waits for a getdata message.
2020-04-17 02:23:02 -07:00
The object hashes in the inventory vector must match the provided hash_list . """
2019-07-26 16:14:36 -04:00
def test_function ( ) :
2020-04-17 02:23:02 -07:00
last_data = self . last_message . get ( " getdata " )
if not last_data :
return False
return [ x . hash for x in last_data . inv ] == hash_list
2019-07-26 16:14:36 -04:00
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
def wait_for_getheaders ( self , timeout = 60 ) :
2017-11-22 11:45:14 -05:00
""" Waits for a getheaders message.
Receiving any getheaders message will satisfy the predicate . the last_message [ " getheaders " ]
value must be explicitly cleared before calling this method , or this will return
immediately with success . TODO : change this method to take a hash value and only
return true if the correct block header has been requested . """
2019-07-26 16:14:36 -04:00
def test_function ( ) :
return self . last_message . get ( " getheaders " )
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
def wait_for_inv ( self , expected_inv , timeout = 60 ) :
""" Waits for an INV message and checks that the first inv object in the message was as expected. """
if len ( expected_inv ) > 1 :
raise NotImplementedError ( " wait_for_inv() will only verify the first inv object " )
2019-07-26 16:14:36 -04:00
def test_function ( ) :
return self . last_message . get ( " inv " ) and \
2017-11-23 10:17:50 -05:00
self . last_message [ " inv " ] . inv [ 0 ] . type == expected_inv [ 0 ] . type and \
self . last_message [ " inv " ] . inv [ 0 ] . hash == expected_inv [ 0 ] . hash
2019-07-26 16:14:36 -04:00
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
def wait_for_verack ( self , timeout = 60 ) :
2019-07-26 16:14:36 -04:00
def test_function ( ) :
2020-07-27 07:55:49 -04:00
return " verack " in self . last_message
2019-07-26 16:14:36 -04:00
2020-08-04 12:55:35 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
# Message sending helper functions
2024-01-30 02:59:01 +01:00
def send_version ( self ) :
if self . on_connection_send_msg :
self . send_message ( self . on_connection_send_msg )
self . on_connection_send_msg = None # Never used again
2018-10-19 13:34:28 -04:00
def send_and_ping ( self , message , timeout = 60 ) :
2017-11-23 10:17:50 -05:00
self . send_message ( message )
2018-10-19 13:34:28 -04:00
self . sync_with_ping ( timeout = timeout )
2017-11-23 10:17:50 -05:00
2023-09-05 12:11:54 +02:00
def sync_with_ping ( self , timeout = 60 ) :
""" Ensure ProcessMessages and SendMessages is called on this connection """
# Sending two pings back-to-back, requires that the node calls
2021-04-27 13:32:23 +02:00
# `ProcessMessage` twice, and thus ensures `SendMessages` must have
# been called at least once
2023-09-05 12:11:54 +02:00
self . send_message ( msg_ping ( nonce = 0 ) )
2017-11-23 10:17:50 -05:00
self . send_message ( msg_ping ( nonce = self . ping_counter ) )
2019-07-26 16:14:36 -04:00
def test_function ( ) :
return self . last_message . get ( " pong " ) and self . last_message [ " pong " ] . nonce == self . ping_counter
2020-05-03 01:42:40 +02:00
self . wait_until ( test_function , timeout = timeout )
2017-11-23 10:17:50 -05:00
self . ping_counter + = 1
2018-06-18 17:28:37 -04:00
# One lock for synchronizing all data access between the network event loop (see
2017-10-17 07:51:50 -04:00
# NetworkThread below) and the thread running the test logic. For simplicity,
2018-06-18 17:28:37 -04:00
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface.
# This lock should be acquired in the thread running the test logic to synchronize
2017-10-17 16:16:39 -04:00
# access to any data shared with the P2PInterface or P2PConnection.
2020-07-19 14:47:05 +07:00
p2p_lock = threading . Lock ( )
2017-12-08 10:50:24 -05:00
2018-06-18 17:28:37 -04:00
2017-12-08 10:50:24 -05:00
class NetworkThread ( threading . Thread ) :
2018-06-18 17:28:37 -04:00
network_event_loop = None
2017-12-08 10:50:24 -05:00
def __init__ ( self ) :
super ( ) . __init__ ( name = " NetworkThread " )
2018-06-18 17:28:37 -04:00
# There is only one event loop and no more than one thread must be created
assert not self . network_event_loop
2020-06-10 13:29:07 -07:00
NetworkThread . listeners = { }
NetworkThread . protos = { }
2023-12-08 17:30:19 +01:00
if platform . system ( ) == ' Windows ' :
2021-09-15 18:33:43 +03:00
asyncio . set_event_loop_policy ( asyncio . WindowsSelectorEventLoopPolicy ( ) )
2018-06-18 17:28:37 -04:00
NetworkThread . network_event_loop = asyncio . new_event_loop ( )
2015-04-28 12:36:15 -04:00
def run ( self ) :
2018-06-18 17:28:37 -04:00
""" Start the network thread. """
self . network_event_loop . run_forever ( )
def close ( self , timeout = 10 ) :
""" Close the connections and network event loop. """
self . network_event_loop . call_soon_threadsafe ( self . network_event_loop . stop )
2023-10-03 18:34:20 +02:00
wait_until_helper_internal ( lambda : not self . network_event_loop . is_running ( ) , timeout = timeout )
2018-06-18 17:28:37 -04:00
self . network_event_loop . close ( )
self . join ( timeout )
2019-10-25 14:28:08 +02:00
# Safe to remove event loop.
NetworkThread . network_event_loop = None
2017-11-22 11:45:14 -05:00
2020-06-10 13:29:07 -07:00
@classmethod
def listen ( cls , p2p , callback , port = None , addr = None , idx = 1 ) :
""" Ensure a listening server is running on the given port, and run the
protocol specified by ` p2p ` on the next connection to it . Once ready
for connections , call ` callback ` . """
if port is None :
assert 0 < idx < = MAX_NODES
port = p2p_port ( MAX_NODES - idx )
if addr is None :
addr = ' 127.0.0.1 '
2022-12-10 12:12:37 +05:30
def exception_handler ( loop , context ) :
if not p2p . reconnect :
loop . default_exception_handler ( context )
cls . network_event_loop . set_exception_handler ( exception_handler )
2020-06-10 13:29:07 -07:00
coroutine = cls . create_listen_server ( addr , port , callback , p2p )
cls . network_event_loop . call_soon_threadsafe ( cls . network_event_loop . create_task , coroutine )
@classmethod
async def create_listen_server ( cls , addr , port , callback , proto ) :
def peer_protocol ( ) :
""" Returns a function that does the protocol handling for a new
connection . To allow different connections to have different
behaviors , the protocol function is first put in the cls . protos
dict . When the connection is made , the function removes the
protocol function from that dict , and returns it so the event loop
can start executing it . """
response = cls . protos . get ( ( addr , port ) )
2022-12-10 12:12:37 +05:30
# remove protocol function from dict only when reconnection doesn't need to happen/already happened
if not proto . reconnect :
cls . protos [ ( addr , port ) ] = None
2020-06-10 13:29:07 -07:00
return response
if ( addr , port ) not in cls . listeners :
# When creating a listener on a given (addr, port) we only need to
# do it once. If we want different behaviors for different
# connections, we can accomplish this by providing different
# `proto` functions
listener = await cls . network_event_loop . create_server ( peer_protocol , addr , port )
logger . debug ( " Listening server on %s : %d should be started " % ( addr , port ) )
cls . listeners [ ( addr , port ) ] = listener
cls . protos [ ( addr , port ) ] = proto
callback ( addr , port )
2017-11-22 11:45:14 -05:00
class P2PDataStore ( P2PInterface ) :
""" A P2P data store class.
Keeps a block and transaction store and responds correctly to getdata and getheaders requests . """
def __init__ ( self ) :
super ( ) . __init__ ( )
# store of blocks. key is block hash, value is a CBlock object
self . block_store = { }
self . last_block_hash = ' '
# store of txs. key is txid, value is a CTransaction object
self . tx_store = { }
self . getdata_requests = [ ]
def on_getdata ( self , message ) :
""" Check for the tx/block in our stores and if found, reply with an inv message. """
for inv in message . inv :
self . getdata_requests . append ( inv . hash )
if ( inv . type & MSG_TYPE_MASK ) == MSG_TX and inv . hash in self . tx_store . keys ( ) :
self . send_message ( msg_tx ( self . tx_store [ inv . hash ] ) )
elif ( inv . type & MSG_TYPE_MASK ) == MSG_BLOCK and inv . hash in self . block_store . keys ( ) :
self . send_message ( msg_block ( self . block_store [ inv . hash ] ) )
else :
logger . debug ( ' getdata message type {} received. ' . format ( hex ( inv . type ) ) )
def on_getheaders ( self , message ) :
""" Search back through our block store for the locator, and reply with a headers message if found. """
locator , hash_stop = message . locator , message . hashstop
# Assume that the most recent block added is the tip
if not self . block_store :
return
headers_list = [ self . block_store [ self . last_block_hash ] ]
while headers_list [ - 1 ] . sha256 not in locator . vHave :
# Walk back through the block store, adding headers to headers_list
# as we go.
prev_block_hash = headers_list [ - 1 ] . hashPrevBlock
if prev_block_hash in self . block_store :
2018-04-02 15:40:38 -04:00
prev_block_header = CBlockHeader ( self . block_store [ prev_block_hash ] )
2017-11-22 11:45:14 -05:00
headers_list . append ( prev_block_header )
if prev_block_header . sha256 == hash_stop :
# if this is the hashstop header, stop here
break
else :
logger . debug ( ' block hash {} not found in block store ' . format ( hex ( prev_block_hash ) ) )
break
# Truncate the list if there are too many headers
2020-06-14 12:54:37 +02:00
headers_list = headers_list [ : - MAX_HEADERS_RESULTS - 1 : - 1 ]
2017-11-22 11:45:14 -05:00
response = msg_headers ( headers_list )
if response is not None :
self . send_message ( response )
2022-12-10 20:12:51 +05:30
def send_blocks_and_test ( self , blocks , node , * , success = True , force_send = False , reject_reason = None , expect_disconnect = False , timeout = 60 , is_decoy = False ) :
2017-11-22 11:45:14 -05:00
""" Send blocks to test node and test whether the tip advances.
- add all blocks to our block_store
- send a headers message for the final block
- the on_getheaders handler will ensure that any getheaders are responded to
2018-11-13 11:14:47 -05:00
- if force_send is False : wait for getdata for each of the blocks . The on_getdata handler will
ensure that any getdata messages are responded to . Otherwise send the full block unsolicited .
2017-11-22 11:45:14 -05:00
- if success is True : assert that the node ' s tip advances to the most recent block
- if success is False : assert that the node ' s tip doesn ' t advance
2018-08-24 15:26:42 -04:00
- if reject_reason is set : assert that the correct reject message is logged """
2017-11-22 11:45:14 -05:00
2020-07-19 14:47:05 +07:00
with p2p_lock :
2017-11-22 11:45:14 -05:00
for block in blocks :
self . block_store [ block . sha256 ] = block
self . last_block_hash = block . sha256
2018-08-24 15:26:42 -04:00
reject_reason = [ reject_reason ] if reject_reason else [ ]
with node . assert_debug_log ( expected_msgs = reject_reason ) :
2022-12-10 20:12:51 +05:30
if is_decoy : # since decoy messages are ignored by the recipient - no need to wait for response
force_send = True
2018-11-13 11:14:47 -05:00
if force_send :
for b in blocks :
2022-12-10 20:12:51 +05:30
self . send_message ( msg_block ( block = b ) , is_decoy )
2018-11-13 11:14:47 -05:00
else :
2019-04-23 19:30:52 +00:00
self . send_message ( msg_headers ( [ CBlockHeader ( block ) for block in blocks ] ) )
2020-07-11 10:41:23 +02:00
self . wait_until (
lambda : blocks [ - 1 ] . sha256 in self . getdata_requests ,
timeout = timeout ,
check_connected = success ,
)
2017-11-22 11:45:14 -05:00
2018-08-24 15:26:42 -04:00
if expect_disconnect :
2018-10-10 02:51:19 -04:00
self . wait_for_disconnect ( timeout = timeout )
2018-08-24 15:26:42 -04:00
else :
2018-10-10 02:51:19 -04:00
self . sync_with_ping ( timeout = timeout )
2017-11-22 11:45:14 -05:00
2018-08-24 15:26:42 -04:00
if success :
2020-05-03 01:42:40 +02:00
self . wait_until ( lambda : node . getbestblockhash ( ) == blocks [ - 1 ] . hash , timeout = timeout )
2018-08-24 15:26:42 -04:00
else :
assert node . getbestblockhash ( ) != blocks [ - 1 ] . hash
2017-11-22 11:45:14 -05:00
2018-08-24 15:26:42 -04:00
def send_txs_and_test ( self , txs , node , * , success = True , expect_disconnect = False , reject_reason = None ) :
2017-11-22 11:45:14 -05:00
""" Send txs to test node and test whether they ' re accepted to the mempool.
- add all txs to our tx_store
- send tx messages for all txs
2018-04-24 13:49:56 -04:00
- if success is True / False : assert that the txs are / are not accepted to the mempool
2018-04-10 18:08:01 -04:00
- if expect_disconnect is True : Skip the sync with ping
2018-08-24 15:26:42 -04:00
- if reject_reason is set : assert that the correct reject message is logged . """
2017-11-22 11:45:14 -05:00
2020-07-19 14:47:05 +07:00
with p2p_lock :
2017-11-22 11:45:14 -05:00
for tx in txs :
self . tx_store [ tx . sha256 ] = tx
2018-08-24 15:26:42 -04:00
reject_reason = [ reject_reason ] if reject_reason else [ ]
with node . assert_debug_log ( expected_msgs = reject_reason ) :
2017-11-22 11:45:14 -05:00
for tx in txs :
2018-08-24 15:26:42 -04:00
self . send_message ( msg_tx ( tx ) )
2017-11-22 11:45:14 -05:00
2018-08-24 15:26:42 -04:00
if expect_disconnect :
self . wait_for_disconnect ( )
else :
self . sync_with_ping ( )
raw_mempool = node . getrawmempool ( )
if success :
# Check that all txs are now in the mempool
for tx in txs :
assert tx . hash in raw_mempool , " {} not found in mempool " . format ( tx . hash )
else :
# Check that none of the txs are now in the mempool
for tx in txs :
assert tx . hash not in raw_mempool , " {} tx found in mempool " . format ( tx . hash )
2020-01-30 18:52:25 -08:00
class P2PTxInvStore ( P2PInterface ) :
""" A P2PInterface which stores a count of how many times each txid has been announced. """
def __init__ ( self ) :
super ( ) . __init__ ( )
self . tx_invs_received = defaultdict ( int )
def on_inv ( self , message ) :
2020-05-09 13:42:15 -07:00
super ( ) . on_inv ( message ) # Send getdata in response.
2020-01-30 18:52:25 -08:00
# Store how many times invs have been received for each tx.
for i in message . inv :
2020-03-27 02:12:47 +01:00
if ( i . type == MSG_TX ) or ( i . type == MSG_WTX ) :
2020-01-30 18:52:25 -08:00
# save txid
self . tx_invs_received [ i . hash ] + = 1
def get_invs ( self ) :
2020-07-19 14:47:05 +07:00
with p2p_lock :
2020-01-30 18:52:25 -08:00
return list ( self . tx_invs_received . keys ( ) )
2020-05-09 13:42:15 -07:00
def wait_for_broadcast ( self , txns , timeout = 60 ) :
""" Waits for the txns (list of txids) to complete initial broadcast.
The mempool should mark unbroadcast = False for these transactions .
"""
# Wait until invs have been received (and getdatas sent) for each txid.
2020-07-11 10:41:23 +02:00
self . wait_until ( lambda : set ( self . tx_invs_received . keys ( ) ) == set ( [ int ( tx , 16 ) for tx in txns ] ) , timeout = timeout )
2020-05-09 13:42:15 -07:00
# Flush messages and wait for the getdatas to be processed
self . sync_with_ping ( )