This commit is contained in:
Vasil Dimov 2025-04-29 14:00:46 -04:00 committed by GitHub
commit 969b796abf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 1625 additions and 170 deletions

View file

@ -287,6 +287,7 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL
policy/rbf.cpp
policy/settings.cpp
policy/truc_policy.cpp
private_broadcast.cpp
rest.cpp
rpc/blockchain.cpp
rpc/external_signer.cpp

View file

@ -452,6 +452,7 @@ private:
if (conn_type == "block-relay-only") return "block";
if (conn_type == "manual" || conn_type == "feeler") return conn_type;
if (conn_type == "addr-fetch") return "addr";
if (conn_type == "private-broadcast") return "priv";
return "";
}
std::string FormatServices(const UniValue& services)
@ -687,6 +688,7 @@ public:
" \"manual\" - peer we manually added using RPC addnode or the -addnode/-connect config options\n"
" \"feeler\" - short-lived connection for testing addresses\n"
" \"addr\" - address fetch; short-lived connection for requesting addresses\n"
" \"priv\" - private broadcast; short-lived connection for broadcasting our transactions\n"
" net Network the peer connected through (\"ipv4\", \"ipv6\", \"onion\", \"i2p\", \"cjdns\", or \"npr\" (not publicly routable))\n"
" serv Services offered by the peer\n"
" \"n\" - NETWORK: peer can serve the full block chain\n"

View file

@ -535,7 +535,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc)
argsman.AddArg("-forcednsseed", strprintf("Always query for peer addresses via DNS lookup (default: %u)", DEFAULT_FORCEDNSSEED), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-listen", strprintf("Accept connections from outside (default: %u if no -proxy, -connect or -maxconnections=0)", DEFAULT_LISTEN), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxconnections=<n>", strprintf("Maintain at most <n> automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxconnections=<n>", strprintf("Maintain at most <n> automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u. It does not apply to short-lived private broadcast connections either, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS, MAX_PRIVATE_BROADCAST_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxreceivebuffer=<n>", strprintf("Maximum per-connection receive buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection memory usage for the send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-maxuploadtarget=<n>", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
@ -645,6 +645,15 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc)
OptionsCategory::NODE_RELAY);
argsman.AddArg("-minrelaytxfee=<amt>", strprintf("Fees (in %s/kvB) smaller than this are considered zero fee for relaying, mining and transaction creation (default: %s)",
CURRENCY_UNIT, FormatMoney(DEFAULT_MIN_RELAY_TX_FEE)), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY);
argsman.AddArg("-privatebroadcast",
strprintf(
"Broadcast transactions submitted via sendrawtransaction RPC using short lived "
"connections through the Tor or I2P networks without putting them in the mempool first. "
"Transactions submitted through the wallet are not affected by this option "
"(default: %u)",
DEFAULT_PRIVATE_BROADCAST),
ArgsManager::ALLOW_ANY,
OptionsCategory::NODE_RELAY);
argsman.AddArg("-whitelistforcerelay", strprintf("Add 'forcerelay' permission to whitelisted peers with default permissions. This will relay transactions even if the transactions were already in the mempool. (default: %d)", DEFAULT_WHITELISTFORCERELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY);
argsman.AddArg("-whitelistrelay", strprintf("Add 'relay' permission to whitelisted peers with default permissions. This will accept relayed transactions even when not relaying transactions (default: %d)", DEFAULT_WHITELISTRELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY);
@ -1004,11 +1013,14 @@ bool AppInitParameterInteraction(const ArgsManager& args)
if (user_max_connection < 0) {
return InitError(Untranslated("-maxconnections must be greater or equal than zero"));
}
const size_t max_private{args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)
? MAX_PRIVATE_BROADCAST_CONNECTIONS
: 0};
// Reserve enough FDs to account for the bare minimum, plus any manual connections, plus the bound interfaces
int min_required_fds = MIN_CORE_FDS + MAX_ADDNODE_CONNECTIONS + nBind;
// Try raising the FD limit to what we need (available_fds may be smaller than the requested amount if this fails)
available_fds = RaiseFileDescriptorLimit(user_max_connection + min_required_fds);
available_fds = RaiseFileDescriptorLimit(user_max_connection + max_private + min_required_fds);
// If we are using select instead of poll, our actual limit may be even smaller
#ifndef USE_POLL
available_fds = std::min(FD_SETSIZE, available_fds);
@ -1626,13 +1638,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}
const bool listenonion{args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)};
if (onion_proxy.IsValid()) {
SetProxy(NET_ONION, onion_proxy);
} else {
// If -listenonion is set, then we will (try to) connect to the Tor control port
// later from the torcontrol thread and may retrieve the onion proxy from there.
const bool listenonion_disabled{!args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)};
if (onlynet_used_with_onion && listenonion_disabled) {
if (onlynet_used_with_onion && !listenonion) {
return InitError(
_("Outbound connections restricted to Tor (-onlynet=onion) but the proxy for "
"reaching the Tor network is not provided: none of -proxy, -onion or "
@ -1989,7 +2001,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
connOptions.onion_binds.push_back(onion_service_target);
}
if (args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) {
if (listenonion) {
if (connOptions.onion_binds.size() > 1) {
InitWarning(strprintf(_("More than one onion bind address is provided. Using %s "
"for the automatically created Tor onion service."),
@ -2055,6 +2067,32 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);
if (args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) {
// If -listenonion is set, then NET_ONION may not be reachable now
// but may become reachable later, thus only error here if it is not
// reachable and will not become reachable for sure.
const bool onion_may_become_reachable{listenonion && (!args.IsArgSet("-onlynet") || onlynet_used_with_onion)};
if (!g_reachable_nets.Contains(NET_I2P) &&
!g_reachable_nets.Contains(NET_ONION) &&
!onion_may_become_reachable) {
return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), "
"but none of Tor or I2P networks is reachable"));
}
if (!connOptions.m_use_addrman_outgoing) {
return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), "
"but -connect is also configured. They are incompatible because the "
"private broadcast needs to open new connections to randomly "
"chosen Tor or I2P peers. Consider using -maxconnections=0 -addnode=... "
"instead"));
}
if (!proxyRandomize && (g_reachable_nets.Contains(NET_ONION) || onion_may_become_reachable)) {
InitWarning(_("Private broadcast of own transactions requested (-privatebroadcast) and "
"-proxyrandomize is disabled. Tor circuits for private broadcast connections "
"may be correlated to other connections over Tor. For maximum privacy set "
"-proxyrandomize=1."));
}
}
if (!node.connman->Start(scheduler, connOptions)) {
return false;
}

View file

@ -7,6 +7,7 @@
#include <blockfilter.h>
#include <common/settings.h>
#include <node/types.h>
#include <primitives/transaction.h> // For CTransactionRef
#include <util/result.h>
@ -212,13 +213,19 @@ public:
//! Check if transaction has descendants in mempool.
virtual bool hasDescendantsInMempool(const uint256& txid) = 0;
//! Transaction is added to memory pool, if the transaction fee is below the
//! amount specified by max_tx_fee, and broadcast to all peers if relay is set to true.
//! Return false if the transaction could not be added due to the fee or for another reason.
//! Consume a local transaction, optionally adding it to the mempool and
//! optionally broadcasting it to the network.
//! @param[in] tx Transaction to process.
//! @param[in] max_tx_fee Don't add the transaction to the mempool or
//! broadcast it if its fee is higher than this.
//! @param[in] broadcast_method Whether to add the transaction to the
//! mempool and how/whether to broadcast it.
//! @param[out] err_string Set if an error occurs.
//! @return False if the transaction could not be added due to the fee or for another reason.
virtual bool broadcastTransaction(const CTransactionRef& tx,
const CAmount& max_tx_fee,
bool relay,
std::string& err_string) = 0;
const CAmount& max_tx_fee,
node::TxBroadcastMethod broadcast_method,
std::string& err_string) = 0;
//! Calculate mempool ancestor and descendant counts for the given transaction.
virtual void getTransactionAncestry(const uint256& txid, size_t& ancestors, size_t& descendants, size_t* ancestorsize = nullptr, CAmount* ancestorfees = nullptr) = 0;

View file

@ -199,6 +199,7 @@ static const std::map<std::string, BCLog::LogFlags, std::less<>> LOG_CATEGORIES_
{"txreconciliation", BCLog::TXRECONCILIATION},
{"scan", BCLog::SCAN},
{"txpackages", BCLog::TXPACKAGES},
{"privatebroadcast", BCLog::PRIVATE_BROADCAST},
};
static const std::unordered_map<BCLog::LogFlags, std::string> LOG_CATEGORIES_BY_FLAG{

View file

@ -71,6 +71,7 @@ namespace BCLog {
TXRECONCILIATION = (CategoryMask{1} << 26),
SCAN = (CategoryMask{1} << 27),
TXPACKAGES = (CategoryMask{1} << 28),
PRIVATE_BROADCAST = (CategoryMask{1} << 29),
ALL = ~NONE,
};
enum class Level {

View file

@ -374,7 +374,16 @@ bool CConnman::CheckIncomingNonce(uint64_t nonce)
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce)
// Omit private broadcast connections from this check to prevent this privacy attack:
// - We connect to a peer in an attempt to privately broadcast a transaction. From our
// VERSION message the peer deducts that this is a short-lived connection for
// broadcasting a transaction, takes our nonce and delays their VERACK.
// - The peer starts connecting to (clearnet) nodes and sends them a VERSION message
// which contains our nonce. If the peer manages to connect to us we would disconnect.
// - Upon a disconnect, the peer knows our clearnet address. They go back to the short
// lived privacy broadcast connection and continue with VERACK.
if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && !pnode->IsPrivateBroadcastConn() &&
pnode->GetLocalNonce() == nonce)
return false;
}
return true;
@ -394,7 +403,12 @@ static CService GetBindAddress(const Sock& sock)
return addr_bind;
}
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
CNode* CConnman::ConnectNode(CAddress addrConnect,
const char* pszDest,
bool fCountFailure,
ConnectionType conn_type,
bool use_v2transport,
std::optional<Proxy> proxy_arg)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@ -464,14 +478,23 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
for (auto& target_addr: connect_to) {
if (target_addr.IsValid()) {
const bool use_proxy{GetProxy(target_addr.GetNetwork(), proxy)};
bool use_proxy;
if (proxy_arg.has_value()) {
use_proxy = true;
proxy = proxy_arg.value();
} else {
use_proxy = GetProxy(addrConnect.GetNetwork(), proxy);
}
bool proxyConnectionFailed = false;
if (target_addr.IsI2P() && use_proxy) {
i2p::Connection conn;
bool connected{false};
if (m_i2p_sam_session) {
// If an I2P SAM session already exists, normally we would re-use it. But in the case of
// private broadcast we force a new transient session. A Connect() using m_i2p_sam_session
// would use our permanent I2P address as a source address.
if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) {
connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed);
} else {
{
@ -1870,6 +1893,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
switch (conn_type) {
case ConnectionType::INBOUND:
case ConnectionType::MANUAL:
case ConnectionType::PRIVATE_BROADCAST:
return false;
case ConnectionType::OUTBOUND_FULL_RELAY:
max_connections = m_max_outbound_full_relay;
@ -2655,6 +2679,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, std
// peers from addrman.
case ConnectionType::ADDR_FETCH:
case ConnectionType::FEELER:
case ConnectionType::PRIVATE_BROADCAST:
break;
case ConnectionType::MANUAL:
case ConnectionType::OUTBOUND_FULL_RELAY:
@ -2985,7 +3010,13 @@ void CConnman::ThreadOpenAddedConnections()
}
// if successful, this moves the passed grant to the constructed node
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport)
bool CConnman::OpenNetworkConnection(const CAddress& addrConnect,
bool fCountFailure,
CSemaphoreGrant&& grant_outbound,
const char* pszDest,
ConnectionType conn_type,
bool use_v2transport,
std::optional<Proxy> proxy)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@ -2994,23 +3025,23 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
// Initiate outbound network connection
//
if (interruptNet) {
return;
return false;
}
if (!fNetworkActive) {
return;
return false;
}
if (!pszDest) {
bool banned_or_discouraged = m_banman && (m_banman->IsDiscouraged(addrConnect) || m_banman->IsBanned(addrConnect));
if (IsLocal(addrConnect) || banned_or_discouraged || AlreadyConnectedToAddress(addrConnect)) {
return;
return false;
}
} else if (FindNode(std::string(pszDest)))
return;
return false;
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport, proxy);
if (!pnode)
return;
return false;
pnode->grantOutbound = std::move(grant_outbound);
m_msgproc->InitializeNode(*pnode, m_local_services);
@ -3028,6 +3059,78 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
pnode->ConnectionTypeAsString().c_str(),
pnode->ConnectedThroughNetwork(),
GetNodeCount(ConnectionDirection::Out));
return true;
}
std::optional<Network> CConnman::PrivateBroadcast::PickNetwork(std::optional<Proxy>& proxy) const
{
prevector<4, Network> nets;
std::optional<Proxy> clearnet_proxy;
if (g_reachable_nets.Contains(NET_ONION)) {
nets.push_back(NET_ONION);
clearnet_proxy = ProxyForIPv4or6();
if (clearnet_proxy.has_value()) {
if (g_reachable_nets.Contains(NET_IPV4)) {
nets.push_back(NET_IPV4);
}
if (g_reachable_nets.Contains(NET_IPV6)) {
nets.push_back(NET_IPV6);
}
}
}
if (g_reachable_nets.Contains(NET_I2P)) {
nets.push_back(NET_I2P);
}
if (nets.empty()) {
return std::nullopt;
}
const Network net{nets[FastRandomContext{}.randrange(nets.size())]};
if (net == NET_IPV4 || net == NET_IPV6) {
proxy = clearnet_proxy;
}
return net;
}
size_t CConnman::PrivateBroadcast::NumToOpen() const
{
LOCK(m_num_to_open_mutex);
return m_num_to_open;
}
void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n)
{
WITH_LOCK(m_num_to_open_mutex, m_num_to_open += n);
m_num_to_open_cond.notify_all();
}
size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n)
{
LOCK(m_num_to_open_mutex);
if (m_num_to_open > n) {
m_num_to_open -= n;
} else {
m_num_to_open = 0;
}
return m_num_to_open;
}
void CConnman::PrivateBroadcast::NumToOpenWait() const
{
WAIT_LOCK(m_num_to_open_mutex, lock);
m_num_to_open_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_num_to_open_mutex) { return m_num_to_open > 0; });
}
std::optional<Proxy> CConnman::PrivateBroadcast::ProxyForIPv4or6() const
{
Proxy tor_proxy;
if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) {
return tor_proxy;
}
return std::nullopt;
}
Mutex NetEventsInterface::g_msgproc_mutex;
@ -3114,6 +3217,83 @@ void CConnman::ThreadI2PAcceptIncoming()
}
}
void CConnman::ThreadPrivateBroadcast()
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
size_t addrman_num_bad_addresses{0};
while (!interruptNet) {
if (!fNetworkActive) {
interruptNet.sleep_for(5s);
continue;
}
CSemaphoreGrant conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened.
m_private_broadcast.NumToOpenWait();
if (interruptNet) {
break;
}
std::optional<Proxy> proxy;
const std::optional<Network> net{m_private_broadcast.PickNetwork(proxy)};
if (!net.has_value()) {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Warning,
"Connections needed but none of the Tor or I2P networks is reachable");
interruptNet.sleep_for(5s);
continue;
}
const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()});
if (!addr.IsValid() || IsLocal(addr)) {
++addrman_num_bad_addresses;
if (addrman_num_bad_addresses > 100) {
interruptNet.sleep_for(500ms);
}
continue;
}
addrman_num_bad_addresses = 0;
auto target_str{addr.ToStringAddrPort()};
if (proxy.has_value()) {
target_str += " through the proxy at " + proxy->ToString();
}
const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2);
if (OpenNetworkConnection(addr,
/*fCountFailure=*/true,
std::move(conn_max_grant),
/*strDest=*/nullptr,
ConnectionType::PRIVATE_BROADCAST,
use_v2transport,
proxy)) {
const size_t remaining{m_private_broadcast.NumToOpenSub(1)};
LogDebug(BCLog::PRIVATE_BROADCAST,
"Socket connected to %s; remaining connections to open: %d",
target_str,
remaining);
} else {
const size_t remaining{m_private_broadcast.NumToOpen()};
if (remaining == 0) {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Failed to connect to %s, will not retry, no more connections needed",
target_str);
} else {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Failed to connect to %s, will retry to a different address; remaining connections "
"to open: %d",
target_str,
remaining);
}
}
}
}
bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions)
{
int nOne = 1;
@ -3388,6 +3568,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); });
}
if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) {
threadPrivateBroadcast =
std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); });
}
// Dump network addresses
scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL);
@ -3437,10 +3622,16 @@ void CConnman::Interrupt()
semAddnode->post();
}
}
m_private_broadcast.m_sem_conn_max.post();
m_private_broadcast.NumToOpenAdd(1);
}
void CConnman::StopThreads()
{
if (threadPrivateBroadcast.joinable()) {
threadPrivateBroadcast.join();
}
if (threadI2PAcceptIncoming.joinable()) {
threadI2PAcceptIncoming.join();
}
@ -3873,6 +4064,30 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
if (pnode->IsPrivateBroadcastConn() &&
msg.m_type != NetMsgType::VERSION &&
msg.m_type != NetMsgType::VERACK &&
msg.m_type != NetMsgType::INV &&
msg.m_type != NetMsgType::TX &&
msg.m_type != NetMsgType::PING) {
// Ensure private broadcast connections only send the above message types.
// Others are not needed and may degrade privacy.
LogDebug(BCLog::PRIVATE_BROADCAST,
"Omitting send of message '%s', peer=%d%s",
msg.m_type,
pnode->GetId(),
pnode->LogIP(fLogIPs));
return;
}
if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() &&
pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) {
// If we are sending the peer VERACK that means we successfully sent
// and received another message to/from that peer (VERSION).
m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true);
}
size_t nMessageSize = msg.data.size();
LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId());
if (gArgs.GetBoolArg("-capturemessages", false)) {

124
src/net.h
View file

@ -71,6 +71,8 @@ static const int MAX_ADDNODE_CONNECTIONS = 8;
static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2;
/** Maximum number of feeler connections */
static const int MAX_FEELER_CONNECTIONS = 1;
/** Maximum number of private broadcast connections */
static constexpr size_t MAX_PRIVATE_BROADCAST_CONNECTIONS{64};
/** -listen default */
static const bool DEFAULT_LISTEN = true;
/** The maximum number of peer connections to maintain. */
@ -81,6 +83,8 @@ static const std::string DEFAULT_MAX_UPLOAD_TARGET{"0M"};
static const bool DEFAULT_BLOCKSONLY = false;
/** -peertimeout default */
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
/** Default for -privatebroadcast. */
static constexpr bool DEFAULT_PRIVATE_BROADCAST{false};
/** Number of file descriptors required for message capture **/
static const int NUM_FDS_MESSAGE_CAPTURE = 1;
/** Interval for ASMap Health Check **/
@ -766,6 +770,7 @@ public:
case ConnectionType::MANUAL:
case ConnectionType::ADDR_FETCH:
case ConnectionType::FEELER:
case ConnectionType::PRIVATE_BROADCAST:
return false;
} // no default case, so the compiler can warn about missing cases
@ -787,6 +792,7 @@ public:
case ConnectionType::FEELER:
case ConnectionType::BLOCK_RELAY:
case ConnectionType::ADDR_FETCH:
case ConnectionType::PRIVATE_BROADCAST:
return false;
case ConnectionType::OUTBOUND_FULL_RELAY:
case ConnectionType::MANUAL:
@ -808,6 +814,10 @@ public:
return m_conn_type == ConnectionType::ADDR_FETCH;
}
bool IsPrivateBroadcastConn() const {
return m_conn_type == ConnectionType::PRIVATE_BROADCAST;
}
bool IsInboundConn() const {
return m_conn_type == ConnectionType::INBOUND;
}
@ -821,6 +831,7 @@ public:
case ConnectionType::OUTBOUND_FULL_RELAY:
case ConnectionType::BLOCK_RELAY:
case ConnectionType::ADDR_FETCH:
case ConnectionType::PRIVATE_BROADCAST:
return true;
} // no default case, so the compiler can warn about missing cases
@ -1136,7 +1147,96 @@ public:
bool GetNetworkActive() const { return fNetworkActive; };
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
void SetNetworkActive(bool active);
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
/**
* Open a new P2P connection and initialize it with the PeerManager at `m_msgproc`.
* @param[in] addrConnect Address to connect to, if `strDest` is `nullptr`.
* @param[in] fCountFailure Increment the number of connection attempts to this address in Addrman.
* @param[in] grant_outbound Take ownership of this grant, to be released later when the connection is closed.
* @param[in] strDest Address to resolve and connect to.
* @param[in] conn_type Type of the connection to open, must not be `ConnectionType::INBOUND`.
* @param[in] use_v2transport Use P2P encryption, (aka V2 transport, BIP324).
* @param[in] proxy Optional proxy to use and override normal proxy selection.
* @retval true The connection was opened successfully.
* @retval false The connection attempt failed.
*/
bool OpenNetworkConnection(const CAddress& addrConnect,
bool fCountFailure,
CSemaphoreGrant&& grant_outbound,
const char* strDest,
ConnectionType conn_type,
bool use_v2transport,
std::optional<Proxy> proxy = std::nullopt)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
/// Group of private broadcast related members.
class PrivateBroadcast
{
public:
/**
* Remember if we ever established at least one outbound connection to a
* Tor peer, including sending and receiving P2P messages. If this is
* true then the Tor proxy indeed works and is a proxy to the Tor network,
* not a misconfigured ordinary SOCKS5 proxy as -proxy or -onion. If that
* is the case, then we assume that connecting to an IPv4 or IPv6 address
* via that proxy will be done through the Tor network and a Tor exit node.
*/
std::atomic_bool m_outbound_tor_ok_at_least_once{false};
/**
* Semaphore used to guard against opening too many connections.
* Opening private broadcast connections will be paused if this is equal to 0.
*/
CSemaphore m_sem_conn_max{MAX_PRIVATE_BROADCAST_CONNECTIONS};
/**
* Choose a network to open a connection to.
* @param[out] proxy Optional proxy to override the normal proxy selection.
* Will be set if !std::nullopt is returned. Could be set to `std::nullopt`
* if there is no need to override the proxy that would be used for connecting
* to the returned network.
* @retval std::nullopt No network could be selected.
* @retval !std::nullopt The network was selected and `proxy` is set (maybe to `std::nullopt`).
*/
std::optional<Network> PickNetwork(std::optional<Proxy>& proxy) const;
/// Get the pending number of connections to open.
size_t NumToOpen() const EXCLUSIVE_LOCKS_REQUIRED(!m_num_to_open_mutex);
/**
* Increment the number of new connections of type `ConnectionType::PRIVATE_BROADCAST`
* to be opened by `CConnman::ThreadPrivateBroadcast()`.
* @param[in] n Increment by this number.
*/
void NumToOpenAdd(size_t n) EXCLUSIVE_LOCKS_REQUIRED(!m_num_to_open_mutex);
/**
* Decrement the number of new connections of type `ConnectionType::PRIVATE_BROADCAST`
* to be opened by `CConnman::ThreadPrivateBroadcast()`.
* @param[in] n Decrement by this number.
* @return The number of connections that remain to be opened after the operation.
*/
size_t NumToOpenSub(size_t n) EXCLUSIVE_LOCKS_REQUIRED(!m_num_to_open_mutex);
/// Wait for the number of needed connections to become greater than 0.
void NumToOpenWait() const EXCLUSIVE_LOCKS_REQUIRED(!m_num_to_open_mutex);
private:
/**
* Check if private broadcast can be done to IPv4 or IPv6 peers and if so via which proxy.
* If private broadcast connections should not be opened to IPv4 or IPv6, then this will
* return an empty optional.
*/
std::optional<Proxy> ProxyForIPv4or6() const;
/// Condition variable to wait for `m_num_to_open` to change.
mutable std::condition_variable m_num_to_open_cond;
/// Mutex protecting `m_num_to_open`.
mutable Mutex m_num_to_open_mutex;
/// Number of `ConnectionType::PRIVATE_BROADCAST` connections to open.
size_t m_num_to_open GUARDED_BY(m_num_to_open_mutex){0};
} m_private_broadcast;
bool CheckIncomingNonce(uint64_t nonce);
void ASMapHealthCheck();
@ -1300,6 +1400,7 @@ private:
void ThreadOpenConnections(std::vector<std::string> connect, std::span<const std::string> seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming();
void ThreadPrivateBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AcceptConnection(const ListenSocket& hListenSocket);
/**
@ -1363,7 +1464,25 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
/**
* Open a new P2P connection.
* @param[in] addrConnect Address to connect to, if `strDest` is `nullptr`.
* @param[in] pszDest Address to resolve and connect to.
* @param[in] fCountFailure Increment the number of connection attempts to this address in Addrman.
* @param[in] conn_type Type of the connection to open, must not be `ConnectionType::INBOUND`.
* @param[in] use_v2transport Use P2P encryption, (aka V2 transport, BIP324).
* @param[in] proxy Optional proxy to use and override normal proxy selection.
* @return Newly created CNode object or nullptr if the connection failed.
*/
CNode* ConnectNode(CAddress addrConnect,
const char* pszDest,
bool fCountFailure,
ConnectionType conn_type,
bool use_v2transport,
std::optional<Proxy> proxy)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector<NetWhitelistPermissions>& ranges) const;
void DeleteNode(CNode* pnode);
@ -1561,6 +1680,7 @@ private:
std::thread threadOpenConnections;
std::thread threadMessageHandler;
std::thread threadI2PAcceptIncoming;
std::thread threadPrivateBroadcast;
/** flag for deciding to connect to an extra outbound peer,
* in excess of m_max_outbound_full_relay

View file

@ -43,6 +43,7 @@
#include <policy/policy.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <private_broadcast.h>
#include <protocol.h>
#include <random.h>
#include <scheduler.h>
@ -195,6 +196,10 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
/** The compactblocks version we support. See BIP 152. */
static constexpr uint64_t CMPCTBLOCKS_VERSION{2};
/** For private broadcast, send a transaction to this many peers. */
static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3};
/** Private broadcast connections must complete within this time. Disconnect the peer if it takes longer. */
static constexpr auto PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME{3min};
// Internal stuff
namespace {
@ -536,7 +541,8 @@ public:
std::vector<TxOrphanage::OrphanTxBase> GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayTransaction(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void ScheduleTxForBroadcastToAll(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void ScheduleTxForPrivateBroadcast(const CTransactionRef& tx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void SetBestBlock(int height, std::chrono::seconds time) override
{
m_best_height = height;
@ -559,6 +565,9 @@ private:
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
/** Rebroadcast stale private transactions (already broadcast but not received back from the network). */
void ReattemptPrivateBroadcast(CScheduler& scheduler);
/** Get a shared pointer to the Peer object.
* May return an empty shared_ptr if the Peer object can't be found. */
PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
@ -724,8 +733,8 @@ private:
/** Send a version message to a peer */
void PushNodeVersion(CNode& pnode, const Peer& peer);
/** Send a ping message every PING_INTERVAL or if requested via RPC. May
* mark the peer to be disconnected if a ping has timed out.
/** Send a ping message every PING_INTERVAL or if requested via RPC (peer.m_ping_queued is true).
* May mark the peer to be disconnected if a ping has timed out.
* We use mockable time for ping timeouts, so setmocktime may cause pings
* to time out. */
void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now);
@ -961,6 +970,14 @@ private:
void ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const BlockTransactions& block_transactions)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex);
/**
* Schedule an INV for a transaction be sent to the given peer (via `PushMessage()`).
* The transaction is picked from the list of transactions for private broadcast.
* It is assumed that the connection to the peer is `ConnectionType::PRIVATE_BROADCAST`.
* Calling this for other peers will degrade privacy. Don't do that.
*/
void PushPrivateBroadcastTx(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex);
/**
* When a peer sends us a valid block, instruct it to announce blocks to us
* using CMPCTBLOCK if possible by adding its nodeid to the end of
@ -1071,6 +1088,9 @@ private:
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/// A list of transactions to be broadcast privately.
PrivateBroadcast m_tx_for_private_broadcast;
};
const CNodeState* PeerManagerImpl::State(NodeId pnode) const
@ -1523,26 +1543,64 @@ void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks, c
void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer)
{
uint64_t my_services{peer.m_our_services};
const int64_t nTime{count_seconds(GetTime<std::chrono::seconds>())};
uint64_t nonce = pnode.GetLocalNonce();
const int nNodeStartingHeight{m_best_height};
NodeId nodeid = pnode.GetId();
CAddress addr = pnode.addr;
CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService();
uint64_t your_services{addr.nServices};
const bool tx_relay{!RejectIncomingTxs(pnode)};
MakeAndPushMessage(pnode, NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime,
your_services, CNetAddr::V1(addr_you), // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime)
my_services, CNetAddr::V1(CService{}), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime)
nonce, strSubVersion, nNodeStartingHeight, tx_relay);
if (fLogIPs) {
LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addr_you.ToStringAddrPort(), tx_relay, nodeid);
uint64_t my_services;
int64_t my_time;
uint64_t your_services;
CService your_addr;
std::string my_user_agent;
int my_height;
bool my_tx_relay;
if (pnode.IsPrivateBroadcastConn()) {
my_services = NODE_NONE;
my_time = 0;
your_services = NODE_NONE;
your_addr = CService{};
my_user_agent = "/pynode:0.0.1/"; // Use a constant other than the default (or user-configured). See https://github.com/bitcoin/bitcoin/pull/27509#discussion_r1214671917
my_height = 0;
my_tx_relay = false;
} else {
LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, tx_relay, nodeid);
CAddress addr{pnode.addr};
my_services = peer.m_our_services;
my_time = count_seconds(GetTime<std::chrono::seconds>());
your_services = addr.nServices;
your_addr = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService{};
my_user_agent = strSubVersion;
my_height = m_best_height;
my_tx_relay = !RejectIncomingTxs(pnode);
}
MakeAndPushMessage(
pnode,
NetMsgType::VERSION,
PROTOCOL_VERSION,
my_services,
my_time,
// your_services + CNetAddr::V1(your_addr) is the pre-version-31402 serialization of your_addr (without nTime)
your_services, CNetAddr::V1(your_addr),
// same, for a dummy address
my_services, CNetAddr::V1(CService{}),
pnode.GetLocalNonce(),
my_user_agent,
my_height,
my_tx_relay);
const NodeId nodeid{pnode.GetId()};
if (fLogIPs) {
LogDebug(BCLog::NET,
"send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d",
PROTOCOL_VERSION,
my_height,
your_addr.ToStringAddrPort(),
my_tx_relay,
nodeid);
} else {
LogDebug(BCLog::NET,
"send version message: version %d, blocks=%d, txrelay=%d, peer=%d",
PROTOCOL_VERSION,
my_height,
my_tx_relay,
nodeid);
}
}
@ -1573,6 +1631,13 @@ void PeerManagerImpl::InitializeNode(const CNode& node, ServiceFlags our_service
}
}
/** Calculate the delta time after which to run the next transactions broadcast. */
static std::chrono::milliseconds NextTxBroadcast()
{
// We add randomness on every cycle to avoid the possibility of P2P fingerprinting.
return 10min + FastRandomContext().randrange<std::chrono::milliseconds>(5min);
}
void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
{
std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs();
@ -1581,16 +1646,61 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
CTransactionRef tx = m_mempool.get(txid);
if (tx != nullptr) {
RelayTransaction(txid, tx->GetWitnessHash());
ScheduleTxForBroadcastToAll(txid, tx->GetWitnessHash());
} else {
m_mempool.RemoveUnbroadcastTx(txid, true);
}
}
// Schedule next run for 10-15 minutes in the future.
// We add randomness on every cycle to avoid the possibility of P2P fingerprinting.
const auto delta = 10min + FastRandomContext().randrange<std::chrono::milliseconds>(5min);
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast());
}
void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler)
{
// The following heuristic is subject to races, but that is ok: if it overshoots,
// we will open some private connections in vain, if it undershoots, the stale
// transactions will be picked on the next run.
size_t active_connections{0};
m_connman.ForEachNode([&active_connections](const CNode* node) {
if (node->IsPrivateBroadcastConn()) {
++active_connections;
}
});
const size_t to_open_connections{m_connman.m_private_broadcast.NumToOpen()};
// Remove stale transactions that are no longer relevant (e.g. already in
// the mempool or mined) and count the remaining ones.
size_t num_for_rebroadcast{0};
const auto stale_txs = m_tx_for_private_broadcast.GetStale();
{
LOCK(cs_main);
for (const auto& stale_tx : stale_txs) {
auto mempool_acceptable = m_chainman.ProcessTransaction(stale_tx, /*test_accept=*/true);
if (mempool_acceptable.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Reattempting broadcast of stale txid=%s wtxid=%s",
stale_tx->GetHash().ToString(),
stale_tx->GetWitnessHash().ToString());
++num_for_rebroadcast;
} else {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Giving up broadcast attempts for txid=%s wtxid=%s: %s",
stale_tx->GetHash().ToString(),
stale_tx->GetWitnessHash().ToString(),
mempool_acceptable.m_state.ToString());
m_tx_for_private_broadcast.Remove(stale_tx);
}
}
}
if (num_for_rebroadcast > active_connections + to_open_connections) {
m_connman.m_private_broadcast.NumToOpenAdd(num_for_rebroadcast - active_connections - to_open_connections);
}
scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast());
}
void PeerManagerImpl::FinalizeNode(const CNode& node)
@ -1650,16 +1760,25 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
}
} // cs_main
if (node.fSuccessfullyConnected &&
!node.IsBlockOnlyConn() && !node.IsInboundConn()) {
!node.IsBlockOnlyConn() && !node.IsPrivateBroadcastConn() && !node.IsInboundConn()) {
// Only change visible addrman state for full outbound peers. We don't
// call Connected() for feeler connections since they don't have
// fSuccessfullyConnected set.
// fSuccessfullyConnected set. Also don't call Connected() for private broadcast
// connections since they could leak information in addrman.
m_addrman.Connected(node.addr);
}
{
LOCK(m_headers_presync_mutex);
m_headers_presync_stats.erase(nodeid);
}
if (node.IsPrivateBroadcastConn()) {
// FinishBroadcast() is called when we get a PONG from the peer which means that the send
// has concluded successfully. Call FinishBroadcast() here as well in case we did not call
// it before (unsuccessful, never concluded with the reception of a PONG).
if (m_tx_for_private_broadcast.FinishBroadcast(nodeid, /*confirmed_by_node=*/false)) {
m_connman.m_private_broadcast.NumToOpenAdd(1);
}
}
LogDebug(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}
@ -1943,9 +2062,9 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler)
static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
scheduler.scheduleEvery([this] { this->CheckForStaleTipAndEvictPeers(); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL});
// schedule next run for 10-15 minutes in the future
const auto delta = 10min + FastRandomContext().randrange<std::chrono::milliseconds>(5min);
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast());
scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast());
}
void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd)
@ -2148,7 +2267,7 @@ void PeerManagerImpl::SendPings()
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
}
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
void PeerManagerImpl::ScheduleTxForBroadcastToAll(const uint256& txid, const uint256& wtxid)
{
LOCK(m_peer_mutex);
for(auto& it : m_peer_map) {
@ -2171,6 +2290,25 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid
};
}
void PeerManagerImpl::ScheduleTxForPrivateBroadcast(const CTransactionRef& tx)
{
if (m_tx_for_private_broadcast.Add(tx)) {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Requesting %d new connections due to txid=%s, wtxid=%s",
NUM_PRIVATE_BROADCAST_PER_TX,
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
m_connman.m_private_broadcast.NumToOpenAdd(NUM_PRIVATE_BROADCAST_PER_TX);
} else {
LogDebug(
BCLog::PRIVATE_BROADCAST,
"Ignoring unnecessary request to schedule an already scheduled transaction: txid=%s, wtxid=%s",
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
}
}
void PeerManagerImpl::RelayAddress(NodeId originator,
const CAddress& addr,
bool fReachable)
@ -3038,7 +3176,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
tx->GetWitnessHash().ToString(),
m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000);
RelayTransaction(tx->GetHash(), tx->GetWitnessHash());
ScheduleTxForBroadcastToAll(tx->GetHash(), tx->GetWitnessHash());
for (const CTransactionRef& removedTx : replaced_transactions) {
AddToCompactExtraTransactions(removedTx);
@ -3405,6 +3543,34 @@ void PeerManagerImpl::ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const Bl
return;
}
void PeerManagerImpl::PushPrivateBroadcastTx(CNode& node)
{
Assume(node.IsPrivateBroadcastConn());
auto opt_tx = m_tx_for_private_broadcast.GetTxForBroadcast();
if (!opt_tx) {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Disconnecting: no more transactions for private broadcast (connected in vain), peer=%d%s",
node.GetId(),
node.LogIP(fLogIPs));
node.fDisconnect = true;
return;
}
const CTransactionRef& tx{*opt_tx};
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"P2P handshake completed, sending INV for txid=%s%s, peer=%d%s",
tx->GetHash().ToString(),
tx->HasWitness() ? strprintf(", wtxid=%s", tx->GetWitnessHash().ToString()) : "",
node.GetId(),
node.LogIP(fLogIPs));
MakeAndPushMessage(node, NetMsgType::INV, std::vector<CInv>{{CInv{MSG_TX, tx->GetHash().ToUint256()}}});
m_tx_for_private_broadcast.PushedToNode(node.GetId(), tx->GetHash());
}
void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv,
const std::chrono::microseconds time_received,
const std::atomic<bool>& interruptMsgProc)
@ -3503,19 +3669,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
pfrom.SetCommonVersion(greatest_common_version);
pfrom.nVersion = nVersion;
if (greatest_common_version >= WTXID_RELAY_VERSION) {
MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY);
}
// Signal ADDRv2 support (BIP155).
if (greatest_common_version >= 70016) {
// BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some
// implementations reject messages they don't know. As a courtesy, don't send
// it to nodes with a version before 70016, as no software is known to support
// BIP155 that doesn't announce at least that protocol version number.
MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2);
}
pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices);
peer->m_their_services = nServices;
pfrom.SetAddrLocal(addrMe);
@ -3542,6 +3695,61 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
if (fRelay) pfrom.m_relays_txs = true;
}
if (!pfrom.IsInboundConn() && !pfrom.IsPrivateBroadcastConn()) {
// For non-inbound connections, we update the addrman to record
// connection success so that addrman will have an up-to-date
// notion of which peers are online and available.
//
// While we strive to not leak information about block-relay-only
// connections via the addrman, not moving an address to the tried
// table is also potentially detrimental because new-table entries
// are subject to eviction in the event of addrman collisions. We
// mitigate the information-leak by never calling
// AddrMan::Connected() on block-relay-only peers; see
// FinalizeNode().
//
// We don't want to modify the addrman based on private broadcast
// connections in case they leak information about our node
// connecting to peers via those connections.
//
// This moves an address from New to Tried table in Addrman,
// resolves tried-table collisions, etc.
m_addrman.Good(pfrom.addr);
}
const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)};
LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n",
cleanSubVer, pfrom.nVersion,
peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(),
pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : ""));
if (pfrom.IsPrivateBroadcastConn()) {
if (fRelay) {
MakeAndPushMessage(pfrom, NetMsgType::VERACK);
} else {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Disconnecting: does not support transactions relay (connected in vain), peer=%d%s",
pfrom.GetId(),
pfrom.LogIP(fLogIPs));
pfrom.fDisconnect = true;
}
return;
}
if (greatest_common_version >= WTXID_RELAY_VERSION) {
MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY);
}
// Signal ADDRv2 support (BIP155).
if (greatest_common_version >= 70016) {
// BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some
// implementations reject messages they don't know. As a courtesy, don't send
// it to nodes with a version before 70016, as no software is known to support
// BIP155 that doesn't announce at least that protocol version number.
MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2);
}
if (greatest_common_version >= WTXID_RELAY_VERSION && m_txreconciliation) {
// Per BIP-330, we announce txreconciliation support if:
// - protocol version per the peer's VERSION message supports WTXID_RELAY;
@ -3589,30 +3797,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
peer->m_addr_token_bucket += MAX_ADDR_TO_SEND;
}
if (!pfrom.IsInboundConn()) {
// For non-inbound connections, we update the addrman to record
// connection success so that addrman will have an up-to-date
// notion of which peers are online and available.
//
// While we strive to not leak information about block-relay-only
// connections via the addrman, not moving an address to the tried
// table is also potentially detrimental because new-table entries
// are subject to eviction in the event of addrman collisions. We
// mitigate the information-leak by never calling
// AddrMan::Connected() on block-relay-only peers; see
// FinalizeNode().
//
// This moves an address from New to Tried table in Addrman,
// resolves tried-table collisions, etc.
m_addrman.Good(pfrom.addr);
}
const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)};
LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n",
cleanSubVer, pfrom.nVersion,
peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(),
pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : ""));
peer->m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now<NodeSeconds>();
if (!pfrom.IsInboundConn()) {
// Don't use timedata samples from inbound peers to make it
@ -3659,7 +3843,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
(mapped_as ? strprintf(", mapped_as=%d", mapped_as) : ""));
}
if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION) {
if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION && !pfrom.IsPrivateBroadcastConn()) {
// Tell our peer we are willing to provide version 2 cmpctblocks.
// However, we do not request new block announcements using
// cmpctblock messages.
@ -3702,6 +3886,16 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
pfrom.fSuccessfullyConnected = true;
if (pfrom.IsPrivateBroadcastConn()) {
// The peer may intend to later send us NetMsgType::FEEFILTER limiting
// cheap transactions, but we don't wait for that and thus we may send
// them a transaction below their threshold. This is ok because this
// relay logic is designed to work even in cases when the peer drops
// the transaction (due to it being too cheap, or for other reasons).
PushPrivateBroadcastTx(pfrom);
}
return;
}
@ -3824,6 +4018,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
return;
}
if (pfrom.IsPrivateBroadcastConn()) {
if (msg_type != NetMsgType::PONG && msg_type != NetMsgType::GETDATA) {
LogDebug(BCLog::PRIVATE_BROADCAST,
"Ignoring incoming message '%s', peer=%d%s",
msg_type,
pfrom.GetId(),
pfrom.LogIP(fLogIPs));
return;
}
}
if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) {
const auto ser_params{
msg_type == NetMsgType::ADDRV2 ?
@ -4027,6 +4232,39 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
LogDebug(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
}
if (pfrom.IsPrivateBroadcastConn()) {
const auto pushed_tx_opt = m_tx_for_private_broadcast.GetTxPushedToNode(pfrom.GetId());
if (!pushed_tx_opt) {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Disconnecting: got GETDATA without sending an INV, peer=%d%s\n",
pfrom.GetId(),
fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : "");
pfrom.fDisconnect = true;
return;
}
const CTransactionRef& pushed_tx{*pushed_tx_opt};
// The GETDATA request must contain exactly one inv and it must be for the transaction
// that we INVed to the peer earlier.
if (vInv.size() == 1 && vInv[0].IsMsgTx() && vInv[0].hash == pushed_tx->GetHash()) {
MakeAndPushMessage(pfrom, NetMsgType::TX, TX_WITH_WITNESS(*pushed_tx));
peer->m_ping_queued = true; // Ensure a ping will be sent: mimick a request via RPC.
MaybeSendPing(pfrom, *peer, GetTime<std::chrono::microseconds>());
} else {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Disconnecting: got an unexpected GETDATA message, peer=%d%s\n",
pfrom.GetId(),
fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : "");
pfrom.fDisconnect = true;
}
return;
}
{
LOCK(peer->m_getdata_requests_mutex);
peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
@ -4260,6 +4498,21 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
AddKnownTx(*peer, hash);
if (auto num_broadcasted = m_tx_for_private_broadcast.Remove(ptx)) {
LogPrintLevel(BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Received our privately broadcast transaction (txid=%s) from the "
"network from peer=%d%s; stopping private broadcast attempts",
txid.ToString(),
pfrom.GetId(),
pfrom.LogIP(fLogIPs));
if (NUM_PRIVATE_BROADCAST_PER_TX > num_broadcasted.value()) {
// Not all of the initial NUM_PRIVATE_BROADCAST_PER_TX connections were needed.
// Tell CConnman it does not need to start the remaining ones.
m_connman.m_private_broadcast.NumToOpenSub(NUM_PRIVATE_BROADCAST_PER_TX - num_broadcasted.value());
}
}
LOCK2(cs_main, m_tx_download_mutex);
const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx);
@ -4274,7 +4527,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
} else {
LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n",
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), pfrom.GetId());
RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
ScheduleTxForBroadcastToAll(tx.GetHash(), tx.GetWitnessHash());
}
}
@ -4763,6 +5016,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
if (ping_time.count() >= 0) {
// Let connman know about this successful ping-pong
pfrom.PongReceived(ping_time);
if (pfrom.IsPrivateBroadcastConn()) {
m_tx_for_private_broadcast.FinishBroadcast(pfrom.GetId(), /*confirmed_by_node=*/true);
LogPrintLevel(
BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Got a PONG (the transaction will probably reach the network), "
"marking for disconnect, peer=%d%s",
pfrom.GetId(),
pfrom.LogIP(fLogIPs));
pfrom.fDisconnect = true;
}
} else {
// This should never happen
sProblem = "Timing mishap";
@ -5475,6 +5739,23 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
const auto current_time{GetTime<std::chrono::microseconds>()};
// The logic below does not apply to private broadcast peers, so skip it.
// Also in CConnman::PushMessage() we make sure that unwanted messages are
// not sent. This here is just an optimization.
if (pto->IsPrivateBroadcastConn()) {
if (pto->m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) {
LogPrintLevel(
BCLog::PRIVATE_BROADCAST,
BCLog::Level::Info,
"Disconnecting: did not complete the transaction send within %d seconds, peer=%d%s",
std::chrono::duration_cast<std::chrono::seconds>(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME).count(),
pto->GetId(),
pto->LogIP(fLogIPs));
pto->fDisconnect = true;
}
return true;
}
if (pto->IsAddrFetchConn() && current_time - pto->m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) {
LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", pto->DisconnectMsg(fLogIPs));
pto->fDisconnect = true;

View file

@ -118,8 +118,14 @@ public:
/** Get peer manager info. */
virtual PeerManagerInfo GetInfo() const = 0;
/** Relay transaction to all peers. */
virtual void RelayTransaction(const uint256& txid, const uint256& wtxid) = 0;
/** Schedule a transaction to be broadcast to all peers at a later time. */
virtual void ScheduleTxForBroadcastToAll(const uint256& txid, const uint256& wtxid) = 0;
/**
* Schedule a transaction to be privately broadcasted. This is done
* asynchronously via short-lived connections to peers on privacy networks.
*/
virtual void ScheduleTxForPrivateBroadcast(const CTransactionRef& tx) = 0;
/** Send ping message to all peers */
virtual void SendPings() = 0;

View file

@ -20,6 +20,8 @@ std::string ConnectionTypeAsString(ConnectionType conn_type)
return "block-relay-only";
case ConnectionType::ADDR_FETCH:
return "addr-fetch";
case ConnectionType::PRIVATE_BROADCAST:
return "private-broadcast";
} // no default case, so the compiler can warn about missing cases
assert(false);

View file

@ -75,6 +75,13 @@ enum class ConnectionType {
* AddrMan is empty.
*/
ADDR_FETCH,
/**
* Private broadcast connections are short-lived and only opened to
* privacy networks (Tor, I2P) for relaying privacy-sensitive data (like
* our own transactions) and closed afterwards.
*/
PRIVATE_BROADCAST,
};
/** Convert ConnectionType enum to a string value */

View file

@ -364,7 +364,12 @@ public:
}
TransactionError broadcastTransaction(CTransactionRef tx, CAmount max_tx_fee, std::string& err_string) override
{
return BroadcastTransaction(*m_context, std::move(tx), err_string, max_tx_fee, /*relay=*/ true, /*wait_callback=*/ false);
return BroadcastTransaction(*m_context,
std::move(tx),
err_string,
max_tx_fee,
ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL,
/*wait_callback=*/false);
}
WalletLoader& walletLoader() override
{
@ -685,10 +690,10 @@ public:
}
bool broadcastTransaction(const CTransactionRef& tx,
const CAmount& max_tx_fee,
bool relay,
TxBroadcastMethod broadcast_method,
std::string& err_string) override
{
const TransactionError err = BroadcastTransaction(m_node, tx, err_string, max_tx_fee, relay, /*wait_callback=*/false);
const TransactionError err = BroadcastTransaction(m_node, tx, err_string, max_tx_fee, broadcast_method, /*wait_callback=*/false);
// Chain clients only care about failures to accept the tx to the mempool. Disregard non-mempool related failures.
// Note: this will need to be updated if BroadcastTransactions() is updated to return other non-mempool failures
// that Chain clients do not need to know about.

View file

@ -31,7 +31,12 @@ static TransactionError HandleATMPError(const TxValidationState& state, std::str
}
}
TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef tx, std::string& err_string, const CAmount& max_tx_fee, bool relay, bool wait_callback)
TransactionError BroadcastTransaction(NodeContext& node,
const CTransactionRef tx,
std::string& err_string,
const CAmount& max_tx_fee,
TxBroadcastMethod broadcast_method,
bool wait_callback)
{
// BroadcastTransaction can be called by RPC or by the wallet.
// chainman, mempool and peerman are initialized before the RPC server and wallet are started
@ -62,14 +67,14 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
// There's already a transaction in the mempool with this txid. Don't
// try to submit this transaction to the mempool (since it'll be
// rejected as a TX_CONFLICT), but do attempt to reannounce the mempool
// transaction if relay=true.
// transaction if broadcast_method is not ADD_TO_MEMPOOL_NO_BROADCAST.
//
// The mempool transaction may have the same or different witness (and
// wtxid) as this transaction. Use the mempool's wtxid for reannouncement.
wtxid = mempool_tx->GetWitnessHash();
} else {
// Transaction is not already in the mempool.
if (max_tx_fee > 0) {
if (max_tx_fee > 0 || broadcast_method == NO_MEMPOOL_PRIVATE_BROADCAST) {
// First, call ATMP with test_accept and check the fee. If ATMP
// fails here, return error immediately.
const MempoolAcceptResult result = node.chainman->ProcessTransaction(tx, /*test_accept=*/ true);
@ -79,18 +84,27 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
return TransactionError::MAX_FEE_EXCEEDED;
}
}
// Try to submit the transaction to the mempool.
const MempoolAcceptResult result = node.chainman->ProcessTransaction(tx, /*test_accept=*/ false);
if (result.m_result_type != MempoolAcceptResult::ResultType::VALID) {
return HandleATMPError(result.m_state, err_string);
}
// Transaction was accepted to the mempool.
switch (broadcast_method) {
case ADD_TO_MEMPOOL_NO_BROADCAST:
case ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL:
// Try to submit the transaction to the mempool.
{
const MempoolAcceptResult result =
node.chainman->ProcessTransaction(tx, /*test_accept=*/false);
if (result.m_result_type != MempoolAcceptResult::ResultType::VALID) {
return HandleATMPError(result.m_state, err_string);
}
}
// Transaction was accepted to the mempool.
if (relay) {
// the mempool tracks locally submitted transactions to make a
// best-effort of initial broadcast
node.mempool->AddUnbroadcastTx(txid);
if (broadcast_method == ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL) {
// the mempool tracks locally submitted transactions to make a
// best-effort of initial broadcast
node.mempool->AddUnbroadcastTx(txid);
}
break;
case NO_MEMPOOL_PRIVATE_BROADCAST: break;
}
if (wait_callback && node.validation_signals) {
@ -116,8 +130,15 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
promise.get_future().wait();
}
if (relay) {
node.peerman->RelayTransaction(txid, wtxid);
switch (broadcast_method) {
case ADD_TO_MEMPOOL_NO_BROADCAST:
break;
case ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL:
node.peerman->ScheduleTxForBroadcastToAll(txid, wtxid);
break;
case NO_MEMPOOL_PRIVATE_BROADCAST:
node.peerman->ScheduleTxForPrivateBroadcast(tx);
break;
}
return TransactionError::OK;

View file

@ -6,6 +6,7 @@
#define BITCOIN_NODE_TRANSACTION_H
#include <common/messages.h>
#include <node/types.h>
#include <policy/feerate.h>
#include <primitives/transaction.h>
@ -45,11 +46,16 @@ static const CAmount DEFAULT_MAX_BURN_AMOUNT{0};
* @param[in] tx the transaction to broadcast
* @param[out] err_string reference to std::string to fill with error string if available
* @param[in] max_tx_fee reject txs with fees higher than this (if 0, accept any fee)
* @param[in] relay flag if both mempool insertion and p2p relay are requested
* @param[in] broadcast_method whether to add the transaction to the mempool and/if how to broadcast it
* @param[in] wait_callback wait until callbacks have been processed to avoid stale result due to a sequentially RPC.
* return error
*/
[[nodiscard]] TransactionError BroadcastTransaction(NodeContext& node, CTransactionRef tx, std::string& err_string, const CAmount& max_tx_fee, bool relay, bool wait_callback);
[[nodiscard]] TransactionError BroadcastTransaction(NodeContext& node,
CTransactionRef tx,
std::string& err_string,
const CAmount& max_tx_fee,
TxBroadcastMethod broadcast_method,
bool wait_callback);
/**
* Return transaction with a given hash.

View file

@ -15,6 +15,7 @@
#include <consensus/amount.h>
#include <cstddef>
#include <cstdint>
#include <policy/policy.h>
#include <script/script.h>
#include <util/time.h>
@ -85,6 +86,19 @@ struct BlockWaitOptions {
CAmount fee_threshold{MAX_MONEY};
};
/**
* Methods to broadcast a local transaction.
* Used to influence `BroadcastTransaction()` and its callers.
*/
enum TxBroadcastMethod : uint8_t {
/// Add the transaction to the mempool and broadcast to all peers for which tx relay is enabled.
ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL,
/// Add the transaction to the mempool, but don't broadcast to anybody.
ADD_TO_MEMPOOL_NO_BROADCAST,
/// Omit the mempool and directly send the transaction via a few dedicated connections to
/// peers on privacy networks.
NO_MEMPOOL_PRIVATE_BROADCAST,
};
} // namespace node
#endif // BITCOIN_NODE_TYPES_H

140
src/private_broadcast.cpp Normal file
View file

@ -0,0 +1,140 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#include <private_broadcast.h>
#include <util/check.h>
/// If a transaction is not received back from the network for this duration
/// after it is broadcast, then we consider it stale / for rebroadcasting.
static constexpr auto STALE_DURATION{1min};
bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
const Txid& txid = tx->GetHash();
LOCK(m_mutex);
auto [pos, inserted] = m_by_txid.emplace(txid, TxWithPriority{.tx = tx, .priority = Priority{}});
if (inserted) {
m_by_priority.emplace(Priority{}, txid);
}
return inserted;
}
std::optional<size_t> PrivateBroadcast::Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
auto iters = Find(tx->GetHash());
if (!iters || iters->by_txid->second.tx->GetWitnessHash() != tx->GetWitnessHash()) {
return std::nullopt;
}
const size_t num_broadcasted{iters->by_priority->first.num_broadcasted};
m_by_priority.erase(iters->by_priority);
m_by_txid.erase(iters->by_txid);
return num_broadcasted;
}
std::optional<CTransactionRef> PrivateBroadcast::GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
if (m_by_priority.empty()) {
return std::nullopt;
}
const Txid& txid = m_by_priority.begin()->second;
auto it = m_by_txid.find(txid);
if (Assume(it != m_by_txid.end())) {
return it->second.tx;
}
m_by_priority.erase(m_by_priority.begin());
return std::nullopt;
}
void PrivateBroadcast::PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
m_by_nodeid.emplace(nodeid, txid);
}
std::optional<CTransactionRef> PrivateBroadcast::GetTxPushedToNode(const NodeId& nodeid) const
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
auto it_by_node = m_by_nodeid.find(nodeid);
if (it_by_node == m_by_nodeid.end()) {
return std::nullopt;
}
const Txid txid{it_by_node->second};
auto it_by_txid = m_by_txid.find(txid);
if (it_by_txid == m_by_txid.end()) {
return std::nullopt;
}
return it_by_txid->second.tx;
}
bool PrivateBroadcast::FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
auto it = m_by_nodeid.find(nodeid);
if (it == m_by_nodeid.end()) {
return false;
}
const Txid txid{it->second};
m_by_nodeid.erase(it);
auto iters = Find(txid);
if (!confirmed_by_node || !iters.has_value()) {
return iters.has_value();
}
// Update broadcast stats.
Priority& priority = iters->by_txid->second.priority;
++priority.num_broadcasted;
priority.last_broadcasted = NodeClock::now();
// Remove and re-add the entry in the m_by_priority map because we have changed the key.
m_by_priority.erase(iters->by_priority);
m_by_priority.emplace(priority, txid);
return true;
}
std::vector<CTransactionRef> PrivateBroadcast::GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
const auto stale_time = NodeClock::now() - STALE_DURATION;
std::vector<CTransactionRef> stale;
for (const auto& [txid, tx_with_priority] : m_by_txid) {
if (tx_with_priority.priority.last_broadcasted < stale_time) {
stale.push_back(tx_with_priority.tx);
}
}
return stale;
}
bool PrivateBroadcast::Priority::operator<(const Priority& other) const
{
if (num_broadcasted < other.num_broadcasted) {
return true;
}
return last_broadcasted < other.last_broadcasted;
}
std::optional<PrivateBroadcast::Iterators> PrivateBroadcast::Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
{
AssertLockHeld(m_mutex);
auto i = m_by_txid.find(txid);
if (i == m_by_txid.end()) {
return std::nullopt;
}
const Priority& priority = i->second.priority;
for (auto j = m_by_priority.lower_bound(priority); j != m_by_priority.end(); ++j) {
if (j->second == txid) {
return Iterators{.by_txid = i, .by_priority = j};
}
}
return std::nullopt;
}

116
src/private_broadcast.h Normal file
View file

@ -0,0 +1,116 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#ifndef BITCOIN_PRIVATE_BROADCAST_H
#define BITCOIN_PRIVATE_BROADCAST_H
#include <net.h>
#include <primitives/transaction.h>
#include <sync.h>
#include <threadsafety.h>
#include <util/hasher.h>
#include <util/time.h>
#include <util/transaction_identifier.h>
#include <map>
#include <optional>
#include <unordered_map>
#include <vector>
/**
* Store a list of transactions to be broadcast privately. Supports the following operations:
* - Add a new transaction
* - Remove a transaction, after it has been seen by the network
* - Mark a broadcast of a transaction (remember when and how many times)
* - Get a transaction for broadcast, the one that has been broadcast fewer times and least recently
*/
class PrivateBroadcast
{
public:
/**
* Add a transaction to the storage.
* @param[in] tx The transaction to add.
* @retval true The transaction was added.
* @retval false The transaction was already present.
*/
bool Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Forget a transaction.
* @return the number of times the transaction was broadcast if the transaction existed and was removed,
* otherwise empty optional (the transaction was not in the storage).
*/
std::optional<size_t> Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Get the transaction that has been broadcast fewest times and least recently.
*/
std::optional<CTransactionRef> GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Mark a transaction as pushed to a given node. This is an intermediate state before
* we get a PONG from the node which would confirm that the transaction has been received.
* At the time we get the PONG we need to know which transaction we sent to that node,
* so that we can account how many times we broadcast each transaction.
*/
void PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Get the transaction that was pushed to a given node by PushedToNode().
*/
std::optional<CTransactionRef> GetTxPushedToNode(const NodeId& nodeid) const
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Mark the end of a broadcast of a transaction. Either successful by receiving a PONG,
* or unsuccessful by closing the connection to the node without getting PONG.
* @return true if the reference by the given node id was removed and the transaction
* we tried to send to this node is still in the private broadcast pool.
*/
bool FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Get the transactions that have not been broadcast recently.
*/
std::vector<CTransactionRef> GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
private:
struct Priority {
size_t num_broadcasted{0};
NodeClock::time_point last_broadcasted{};
bool operator<(const Priority& other) const;
};
struct TxWithPriority {
CTransactionRef tx;
Priority priority;
};
using ByTxid = std::unordered_map<Txid, TxWithPriority, SaltedTxidHasher>;
using ByPriority = std::multimap<Priority, Txid>;
using ByNodeId = std::unordered_map<NodeId, Txid>;
struct Iterators {
ByTxid::iterator by_txid;
ByPriority::iterator by_priority;
};
/**
* Get iterators in `m_by_txid` and `m_by_priority` for a given transaction.
*/
std::optional<Iterators> Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex);
mutable Mutex m_mutex;
ByTxid m_by_txid GUARDED_BY(m_mutex);
ByPriority m_by_priority GUARDED_BY(m_mutex);
/**
* Remember which transaction was sent to which node, so that when we get the PONG
* from that node we can mark the transaction as broadcast.
*/
ByNodeId m_by_nodeid GUARDED_BY(m_mutex);
};
#endif // BITCOIN_PRIVATE_BROADCAST_H

View file

@ -729,6 +729,8 @@ QString ConnectionTypeToQString(ConnectionType conn_type, bool prepend_direction
case ConnectionType::FEELER: return prefix + QObject::tr("Feeler");
//: Short-lived peer connection type that solicits known addresses from a peer.
case ConnectionType::ADDR_FETCH: return prefix + QObject::tr("Address Fetch");
//: Short-lived peer connection type that is used for broadcasting privacy-sensitive data.
case ConnectionType::PRIVATE_BROADCAST: return prefix + QObject::tr("Private Broadcast");
} // no default case, so the compiler can warn about missing cases
assert(false);
}

View file

@ -517,7 +517,10 @@ RPCConsole::RPCConsole(interfaces::Node& node, const PlatformStyle *_platformSty
tr("Outbound Feeler: short-lived, for testing addresses"),
/*: Explanatory text for a short-lived outbound peer connection that is used
to request addresses from a peer. */
tr("Outbound Address Fetch: short-lived, for soliciting addresses")};
tr("Outbound Address Fetch: short-lived, for soliciting addresses"),
/*: Explanatory text for a short-lived outbound peer connection that is used
to broadcast privacy-sensitive data (like our transactions). */
tr("Private broadcast: short-lived, for broadcasting privacy-sensitive transactions")};
const QString connection_types_list{"<ul><li>" + Join(CONNECTION_TYPE_DOC, QString("</li><li>")) + "</li></ul>"};
ui->peerConnectionTypeLabel->setToolTip(ui->peerConnectionTypeLabel->toolTip().arg(connection_types_list));
const std::vector<QString> TRANSPORT_TYPE_DOC{

View file

@ -8,10 +8,12 @@
#include <node/mempool_persist.h>
#include <chainparams.h>
#include <common/args.h>
#include <consensus/validation.h>
#include <core_io.h>
#include <kernel/mempool_entry.h>
#include <net_processing.h>
#include <netbase.h> // for g_reachable_nets
#include <node/mempool_persist_args.h>
#include <node/types.h>
#include <policy/rbf.h>
@ -42,11 +44,21 @@ using util::ToString;
static RPCHelpMan sendrawtransaction()
{
return RPCHelpMan{"sendrawtransaction",
"\nSubmit a raw transaction (serialized, hex-encoded) to local node and network.\n"
"\nThe transaction will be sent unconditionally to all peers, so using sendrawtransaction\n"
"for manual rebroadcast may degrade privacy by leaking the transaction's origin, as\n"
"nodes will normally not rebroadcast non-wallet transactions already in their mempool.\n"
"\nSubmit a raw transaction (serialized, hex-encoded) to the network.\n"
"\nIf -privatebroadcast is disabled, then the transaction will be put into the\n"
"local mempool of the node and will be sent unconditionally to all currently\n"
"connected peers, so using sendrawtransaction for manual rebroadcast will degrade\n"
"privacy by leaking the transaction's origin, as nodes will normally not\n"
"rebroadcast non-wallet transactions already in their mempool.\n"
"\nIf -privatebroadcast is enabled, then the transaction will be sent only via\n"
"dedicated, short-lived connections to Tor or I2P peers or IPv4/IPv6 peers\n"
"through the Tor network. This conceals the transaction origin. The transaction\n"
"will only enter the local mempool when it is received back from the network.\n"
"\nA specific exception, RPC_TRANSACTION_ALREADY_IN_UTXO_SET, may throw if the transaction cannot be added to the mempool.\n"
"\nRelated RPCs: createrawtransaction, signrawtransactionwithkey\n",
{
{"hexstring", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The hex string of the raw transaction"},
@ -96,7 +108,24 @@ static RPCHelpMan sendrawtransaction()
std::string err_string;
AssertLockNotHeld(cs_main);
NodeContext& node = EnsureAnyNodeContext(request.context);
const TransactionError err = BroadcastTransaction(node, tx, err_string, max_raw_tx_fee, /*relay=*/true, /*wait_callback=*/true);
const bool private_broadcast_enabled{gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)};
if (private_broadcast_enabled &&
!g_reachable_nets.Contains(NET_ONION) &&
!g_reachable_nets.Contains(NET_I2P)) {
throw JSONRPCError(RPC_MISC_ERROR,
"-privatebroadcast is enabled, but none of the Tor or I2P networks is "
"reachable. Maybe the location of the Tor proxy couldn't be retrieved "
"from the Tor daemon at startup. Check whether the Tor daemon is running "
"and that -torcontrol, -torpassword and -i2psam are configured properly.");
}
const auto method = private_broadcast_enabled ? node::NO_MEMPOOL_PRIVATE_BROADCAST
: node::ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL;
const TransactionError err = BroadcastTransaction(node,
tx,
err_string,
max_raw_tx_fee,
method,
/*wait_callback=*/true);
if (TransactionError::OK != err) {
throw JSONRPCTransactionError(err, err_string);
}
@ -1060,7 +1089,12 @@ static RPCHelpMan submitpackage()
// We do not expect an error here; we are only broadcasting things already/still in mempool
std::string err_string;
const auto err = BroadcastTransaction(node, tx, err_string, /*max_tx_fee=*/0, /*relay=*/true, /*wait_callback=*/true);
const auto err = BroadcastTransaction(node,
tx,
err_string,
/*max_tx_fee=*/0,
node::ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL,
/*wait_callback=*/true);
if (err != TransactionError::OK) {
throw JSONRPCTransactionError(err,
strprintf("transaction broadcast failed: %s (%d transactions were broadcast successfully)",

View file

@ -45,7 +45,8 @@ const std::vector<std::string> CONNECTION_TYPE_DOC{
"inbound (initiated by the peer)",
"manual (added via addnode RPC or -addnode/-connect configuration options)",
"addr-fetch (short-lived automatic connection for soliciting addresses)",
"feeler (short-lived automatic connection for testing addresses)"
"feeler (short-lived automatic connection for testing addresses)",
"private-broadcast (short-lived automatic connection for broadcasting privacy-sensitive transactions)"
};
const std::vector<std::string> TRANSPORT_TYPE_DOC{

View file

@ -107,7 +107,7 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co
CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
{
CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy=*/std::nullopt);
if (!node) return nullptr;
node->SetCommonVersion(PROTOCOL_VERSION);
peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));

View file

@ -122,6 +122,7 @@ constexpr ConnectionType ALL_CONNECTION_TYPES[]{
ConnectionType::FEELER,
ConnectionType::BLOCK_RELAY,
ConnectionType::ADDR_FETCH,
ConnectionType::PRIVATE_BROADCAST,
};
constexpr auto ALL_NETWORKS = std::array{

View file

@ -11,6 +11,7 @@
#include <interfaces/chain.h>
#include <key_io.h>
#include <merkleblock.h>
#include <node/types.h>
#include <rpc/util.h>
#include <script/descriptor.h>
#include <script/script.h>
@ -310,7 +311,7 @@ RPCHelpMan importaddress()
if (fRescan)
{
RescanWallet(*pwallet, reserver);
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
pwallet->ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_NO_BROADCAST, /*force=*/true);
}
return UniValue::VNULL;
@ -476,7 +477,7 @@ RPCHelpMan importpubkey()
if (fRescan)
{
RescanWallet(*pwallet, reserver);
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
pwallet->ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_NO_BROADCAST, /*force=*/true);
}
return UniValue::VNULL;
@ -1413,7 +1414,7 @@ RPCHelpMan importmulti()
}
if (fRescan && fRunScan && requests.size()) {
int64_t scannedTime = pwallet->RescanFromTime(nLowestTimestamp, reserver, /*update=*/true);
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
pwallet->ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_NO_BROADCAST, /*force=*/true);
if (pwallet->IsAbortingRescan()) {
throw JSONRPCError(RPC_MISC_ERROR, "Rescan aborted by user.");
@ -1727,7 +1728,7 @@ RPCHelpMan importdescriptors()
// Rescan the blockchain using the lowest timestamp
if (rescan) {
int64_t scanned_time = pwallet->RescanFromTime(lowest_timestamp, reserver, /*update=*/true);
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
pwallet->ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_NO_BROADCAST, /*force=*/true);
if (pwallet->IsAbortingRescan()) {
throw JSONRPCError(RPC_MISC_ERROR, "Rescan aborted by user.");

View file

@ -13,6 +13,7 @@
#include <interfaces/chain.h>
#include <key_io.h>
#include <node/blockstorage.h>
#include <node/types.h>
#include <policy/policy.h>
#include <rpc/server.h>
#include <script/solver.h>
@ -633,7 +634,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]);
auto mempool_tx = TestSimpleSpend(*m_coinbase_txns[1], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, node::ADD_TO_MEMPOOL_NO_BROADCAST, error));
// Reload wallet and make sure new transactions are detected despite events
@ -675,7 +676,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
block_tx = TestSimpleSpend(*m_coinbase_txns[2], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]);
mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, node::ADD_TO_MEMPOOL_NO_BROADCAST, error));
m_node.validation_signals->SyncWithValidationInterfaceQueue();
});
wallet = TestLoadWallet(context);

View file

@ -2042,7 +2042,9 @@ CWallet::ScanResult CWallet::ScanForWalletTransactions(const uint256& start_bloc
return result;
}
bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string, bool relay) const
bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx,
std::string& err_string,
node::TxBroadcastMethod broadcast_method) const
{
AssertLockHeld(cs_wallet);
@ -2056,8 +2058,19 @@ bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string
// Don't try to submit conflicted or confirmed transactions.
if (GetTxDepthInMainChain(wtx) != 0) return false;
// Submit transaction to mempool for relay
WalletLogPrintf("Submitting wtx %s to mempool for relay\n", wtx.GetHash().ToString());
const char* what{""};
switch (broadcast_method) {
case node::ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL:
what = "to mempool and for broadcast to peers";
break;
case node::ADD_TO_MEMPOOL_NO_BROADCAST:
what = "to mempool without broadcast";
break;
case node::NO_MEMPOOL_PRIVATE_BROADCAST:
what = "for private broadcast without adding to the mempool";
break;
}
WalletLogPrintf("Submitting wtx %s %s\n", wtx.GetHash().ToString(), what);
// We must set TxStateInMempool here. Even though it will also be set later by the
// entered-mempool callback, if we did not there would be a race where a
// user could call sendmoney in a loop and hit spurious out of funds errors
@ -2067,7 +2080,7 @@ bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string
// If broadcast fails for any reason, trying to set wtx.m_state here would be incorrect.
// If transaction was previously in the mempool, it should be updated when
// TransactionRemovedFromMempool fires.
bool ret = chain().broadcastTransaction(wtx.tx, m_default_max_tx_fee, relay, err_string);
bool ret = chain().broadcastTransaction(wtx.tx, m_default_max_tx_fee, broadcast_method, err_string);
if (ret) wtx.m_state = TxStateInMempool{};
return ret;
}
@ -2122,10 +2135,11 @@ NodeClock::time_point CWallet::GetDefaultNextResend() { return FastRandomContext
//
// The `force` option results in all unconfirmed transactions being submitted to
// the mempool. This does not necessarily result in those transactions being relayed,
// that depends on the `relay` option. Periodic rebroadcast uses the pattern
// relay=true force=false, while loading into the mempool
// (on start, or after import) uses relay=false force=true.
void CWallet::ResubmitWalletTransactions(bool relay, bool force)
// that depends on the `broadcast_method` option. Periodic rebroadcast uses the pattern
// broadcast_method=ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL force=false, while loading into
// the mempool (on start, or after import) uses
// broadcast_method=ADD_TO_MEMPOOL_NO_BROADCAST force=true.
void CWallet::ResubmitWalletTransactions(node::TxBroadcastMethod broadcast_method, bool force)
{
// Don't attempt to resubmit if the wallet is configured to not broadcast,
// even if forcing.
@ -2151,7 +2165,7 @@ void CWallet::ResubmitWalletTransactions(bool relay, bool force)
// Now try submitting the transactions to the memory pool and (optionally) relay them.
for (auto wtx : to_submit) {
std::string unused_err_string;
if (SubmitTxMemoryPoolAndRelay(*wtx, unused_err_string, relay)) ++submitted_tx_count;
if (SubmitTxMemoryPoolAndRelay(*wtx, unused_err_string, broadcast_method)) ++submitted_tx_count;
}
} // cs_wallet
@ -2166,7 +2180,7 @@ void MaybeResendWalletTxs(WalletContext& context)
{
for (const std::shared_ptr<CWallet>& pwallet : GetWallets(context)) {
if (!pwallet->ShouldResend()) continue;
pwallet->ResubmitWalletTransactions(/*relay=*/true, /*force=*/false);
pwallet->ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL, /*force=*/false);
pwallet->SetNextResend();
}
}
@ -2374,7 +2388,7 @@ void CWallet::CommitTransaction(CTransactionRef tx, mapValue_t mapValue, std::ve
}
std::string err_string;
if (!SubmitTxMemoryPoolAndRelay(*wtx, err_string, true)) {
if (!SubmitTxMemoryPoolAndRelay(*wtx, err_string, node::ADD_TO_MEMPOOL_AND_BROADCAST_TO_ALL)) {
WalletLogPrintf("CommitTransaction(): Transaction cannot be broadcast immediately, %s\n", err_string);
// TODO: if we expect the failure to be long term or permanent, instead delete wtx from the wallet and return failure.
}
@ -3436,7 +3450,7 @@ void CWallet::postInitProcess()
{
// Add wallet transactions that aren't already in a block to mempool
// Do this here as mempool requires genesis block to be loaded
ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
ResubmitWalletTransactions(node::ADD_TO_MEMPOOL_NO_BROADCAST, /*force=*/true);
// Update wallet transactions with current mempool transactions.
WITH_LOCK(cs_wallet, chain().requestMempoolTransactions(*this));

View file

@ -12,6 +12,7 @@
#include <interfaces/handler.h>
#include <kernel/cs_main.h>
#include <logging.h>
#include <node/types.h>
#include <outputtype.h>
#include <policy/feerate.h>
#include <primitives/transaction.h>
@ -636,7 +637,7 @@ public:
void SetNextResend() { m_next_resend = GetDefaultNextResend(); }
/** Return true if all conditions for periodically resending transactions are met. */
bool ShouldResend() const;
void ResubmitWalletTransactions(bool relay, bool force);
void ResubmitWalletTransactions(node::TxBroadcastMethod broadcast_method, bool force);
OutputType TransactionChangeType(const std::optional<OutputType>& change_type, const std::vector<CRecipient>& vecSend) const;
@ -680,8 +681,8 @@ public:
*/
void CommitTransaction(CTransactionRef tx, mapValue_t mapValue, std::vector<std::pair<std::string, std::string>> orderForm);
/** Pass this transaction to node for mempool insertion and relay to peers if flag set to true */
bool SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string, bool relay) const
/** Pass this transaction to node for optional mempool insertion and relay to peers. */
bool SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string, node::TxBroadcastMethod broadcast_method) const
EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
bool ImportScripts(const std::set<CScript> scripts, int64_t timestamp) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);

View file

@ -411,6 +411,24 @@ class ConfArgsTest(BitcoinTestFramework):
self.restart_node(0, extra_args=[connect_arg, '-dnsseed', '-proxy=localhost:1080'])
self.stop_node(0)
def test_privatebroadcast(self):
self.log.info("Test that an invalid usage of -privatebroadcast throws an init error")
self.stop_node(0)
args_errors = {
"Private broadcast of own transactions requested (-privatebroadcast), "
"but none of Tor or I2P networks is reachable":
["-privatebroadcast"],
"Private broadcast of own transactions requested (-privatebroadcast), "
"but -connect is also configured. They are incompatible because the "
"private broadcast needs to open new connections to randomly chosen "
"Tor or I2P peers. Consider using -maxconnections=0 -addnode=... instead" :
# -onion= makes the Tor network reachable
["-privatebroadcast", "-connect=127.0.0.1:8333", "-onion=127.0.0.1:9050"]
}
for msg, args in args_errors.items():
self.nodes[0].assert_start_raises_init_error(extra_args=args, expected_msg=f"Error: {msg}")
def test_ignored_conf(self):
self.log.info('Test error is triggered when the datadir in use contains a bitcoin.conf file that would be ignored '
'because a conflicting -conf file argument is passed.')
@ -496,6 +514,7 @@ class ConfArgsTest(BitcoinTestFramework):
self.test_seed_peers()
self.test_networkactive()
self.test_connect_with_seednode()
self.test_privatebroadcast()
self.test_dir_config()
self.test_negated_config()

View file

@ -8,10 +8,10 @@ import time
from test_framework.mempool_util import tx_in_orphanage
from test_framework.messages import (
CInv,
CTxInWitness,
MSG_TX,
MSG_WITNESS_TX,
MSG_WTX,
malleate_tx,
msg_getdata,
msg_inv,
msg_notfound,
@ -138,22 +138,6 @@ class OrphanHandlingTest(BitcoinTestFramework):
peer.wait_for_getdata([wtxid])
peer.send_and_ping(msg_tx(tx))
def create_malleated_version(self, tx):
"""
Create a malleated version of the tx where the witness is replaced with garbage data.
Returns a CTransaction object.
"""
tx_bad_wit = tx_from_hex(tx["hex"])
tx_bad_wit.wit.vtxinwit = [CTxInWitness()]
# Add garbage data to witness 0. We cannot simply strip the witness, as the node would
# classify it as a transaction in which the witness was missing rather than wrong.
tx_bad_wit.wit.vtxinwit[0].scriptWitness.stack = [b'garbage']
assert_equal(tx["txid"], tx_bad_wit.rehash())
assert_not_equal(tx["wtxid"], tx_bad_wit.getwtxid())
return tx_bad_wit
@cleanup
def test_arrival_timing_orphan(self):
self.log.info("Test missing parents that arrive during delay are not requested")
@ -449,7 +433,7 @@ class OrphanHandlingTest(BitcoinTestFramework):
tx_child = self.wallet.create_self_transfer(utxo_to_spend=tx_parent["new_utxo"])
# Create a fake version of the child
tx_orphan_bad_wit = self.create_malleated_version(tx_child)
tx_orphan_bad_wit = malleate_tx(tx_child)
bad_peer = node.add_p2p_connection(P2PInterface())
honest_peer = node.add_p2p_connection(P2PInterface())
@ -496,7 +480,7 @@ class OrphanHandlingTest(BitcoinTestFramework):
tx_middle = self.wallet.create_self_transfer(utxo_to_spend=tx_grandparent["new_utxo"])
# Create a fake version of the middle tx
tx_orphan_bad_wit = self.create_malleated_version(tx_middle)
tx_orphan_bad_wit = malleate_tx(tx_middle)
# Create grandchild spending from tx_middle (and spending from tx_orphan_bad_wit since they
# have the same txid).
@ -550,7 +534,7 @@ class OrphanHandlingTest(BitcoinTestFramework):
# Create the real child and fake version
tx_child = self.wallet.create_self_transfer(utxo_to_spend=tx_parent["new_utxo"])
tx_orphan_bad_wit = self.create_malleated_version(tx_child)
tx_orphan_bad_wit = malleate_tx(tx_child)
bad_peer = node.add_p2p_connection(PeerTxRelayer())
# Must not send wtxidrelay because otherwise the inv(TX) will be ignored later

View file

@ -0,0 +1,390 @@
#!/usr/bin/env python3
# Copyright (c) 2017-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test how locally submitted transactions are sent to the network when private broadcast is used.
"""
import time
import threading
from test_framework.p2p import (
P2PDataStore,
P2PInterface,
P2P_SERVICES,
P2P_VERSION,
)
from test_framework.messages import (
CAddress,
CInv,
MSG_WTX,
malleate_tx,
msg_inv,
)
from test_framework.netutil import (
format_addr_port
)
from test_framework.socks5 import (
Socks5Configuration,
Socks5Server,
)
from test_framework.test_framework import (
BitcoinTestFramework,
)
from test_framework.util import (
MAX_NODES,
assert_equal,
assert_raises_rpc_error,
p2p_port,
tor_port,
)
from test_framework.wallet import (
MiniWallet,
)
MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8
MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2
NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS
NUM_PRIVATE_BROADCAST_PER_TX = 3
# Fill addrman with these addresses. Must have enough Tor addresses, so that even
# if all 10 default connections are opened to a Tor address (!?) there must be more
# for private broadcast.
ADDRMAN_ADDRESSES = [
"1.65.195.98",
"2.59.236.56",
"2.83.114.20",
"2.248.194.16",
"5.2.154.6",
"5.101.140.30",
"5.128.87.126",
"5.144.21.49",
"5.172.132.104",
"5.188.62.18",
"5.200.2.180",
"8.129.184.255",
"8.209.105.138",
"12.34.98.148",
"14.199.102.151",
"18.27.79.17",
"18.27.124.231",
"18.216.249.151",
"23.88.155.58",
"23.93.101.158",
"[2001:19f0:1000:1db3:5400:4ff:fe56:5a8d]",
"[2001:19f0:5:24da:3eec:efff:feb9:f36e]",
"[2001:19f0:5:24da::]",
"[2001:19f0:5:4535:3eec:efff:feb9:87e4]",
"[2001:19f0:5:4535::]",
"[2001:1bc0:c1::2000]",
"[2001:1c04:4008:6300:8a5f:2678:114b:a660]",
"[2001:41d0:203:3739::]",
"[2001:41d0:203:8f49::]",
"[2001:41d0:203:bb0a::]",
"[2001:41d0:2:bf8f::]",
"[2001:41d0:303:de8b::]",
"[2001:41d0:403:3d61::]",
"[2001:41d0:405:9600::]",
"[2001:41d0:8:ed7f::1]",
"[2001:41d0:a:69a2::1]",
"[2001:41f0::62:6974:636f:696e]",
"[2001:470:1b62::]",
"[2001:470:1f05:43b:2831:8530:7179:5864]",
"[2001:470:1f09:b14::11]",
"2bqghnldu6mcug4pikzprwhtjjnsyederctvci6klcwzepnjd46ikjyd.onion",
"4lr3w2iyyl5u5l6tosizclykf5v3smqroqdn2i4h3kq6pfbbjb2xytad.onion",
"5g72ppm3krkorsfopcm2bi7wlv4ohhs4u4mlseymasn7g7zhdcyjpfid.onion",
"5sbmcl4m5api5tqafi4gcckrn3y52sz5mskxf3t6iw4bp7erwiptrgqd.onion",
"776aegl7tfhg6oiqqy76jnwrwbvcytsx2qegcgh2mjqujll4376ohlid.onion",
"77mdte42srl42shdh2mhtjr7nf7dmedqrw6bkcdekhdvmnld6ojyyiad.onion",
"azbpsh4arqlm6442wfimy7qr65bmha2zhgjg7wbaji6vvaug53hur2qd.onion",
"b64xcbleqmwgq2u46bh4hegnlrzzvxntyzbmucn3zt7cssm7y4ubv3id.onion",
"bsqbtcparrfihlwolt4xgjbf4cgqckvrvsfyvy6vhiqrnh4w6ghixoid.onion",
"bsqbtctulf2g4jtjsdfgl2ed7qs6zz5wqx27qnyiik7laockryvszqqd.onion",
"cwi3ekrwhig47dhhzfenr5hbvckj7fzaojygvazi2lucsenwbzwoyiqd.onion",
"devinbtcmwkuitvxl3tfi5of4zau46ymeannkjv6fpnylkgf3q5fa3id.onion",
"devinbtcyk643iruzfpaxw3on2jket7rbjmwygm42dmdyub3ietrbmid.onion",
"dtql5vci4iaml4anmueftqr7bfgzqlauzfy4rc2tfgulldd3ekyijjyd.onion",
"emzybtc25oddoa2prol2znpz2axnrg6k77xwgirmhv7igoiucddsxiad.onion",
"emzybtc3ewh7zihpkdvuwlgxrhzcxy2p5fvjggp7ngjbxcytxvt4rjid.onion",
"emzybtc454ewbviqnmgtgx3rgublsgkk23r4onbhidcv36wremue4kqd.onion",
"emzybtc5bnpb2o6gh54oquiox54o4r7yn4a2wiiwzrjonlouaibm2zid.onion",
"fpz6r5ppsakkwypjcglz6gcnwt7ytfhxskkfhzu62tnylcknh3eq6pad.onion",
"255fhcp6ajvftnyo7bwz3an3t4a4brhopm3bamyh2iu5r3gnr2rq.b32.i2p",
"27yrtht5b5bzom2w5ajb27najuqvuydtzb7bavlak25wkufec5mq.b32.i2p",
"3gocb7wc4zvbmmebktet7gujccuux4ifk3kqilnxnj5wpdpqx2hq.b32.i2p",
"4fcc23wt3hyjk3csfzcdyjz5pcwg5dzhdqgma6bch2qyiakcbboa.b32.i2p",
"4osyqeknhx5qf3a73jeimexwclmt42cju6xdp7icja4ixxguu2hq.b32.i2p",
"4umsi4nlmgyp4rckosg4vegd2ysljvid47zu7pqsollkaszcbpqq.b32.i2p",
"6j2ezegd3e2e2x3o3pox335f5vxfthrrigkdrbgfbdjchm5h4awa.b32.i2p",
"6n36ljyr55szci5ygidmxqer64qr24f4qmnymnbvgehz7qinxnla.b32.i2p",
"72yjs6mvlby3ky6mgpvvlemmwq5pfcznrzd34jkhclgrishqdxva.b32.i2p",
"a5qsnv3maw77mlmmzlcglu6twje6ttctd3fhpbfwcbpmewx6fczq.b32.i2p",
"aovep2pco7v2k4rheofrgytbgk23eg22dczpsjqgqtxcqqvmxk6a.b32.i2p",
"bitcoi656nll5hu6u7ddzrmzysdtwtnzcnrjd4rfdqbeey7dmn5a.b32.i2p",
"brifkruhlkgrj65hffybrjrjqcgdgqs2r7siizb5b2232nruik3a.b32.i2p",
"c4gfnttsuwqomiygupdqqqyy5y5emnk5c73hrfvatri67prd7vyq.b32.i2p",
"day3hgxyrtwjslt54sikevbhxxs4qzo7d6vi72ipmscqtq3qmijq.b32.i2p",
"du5kydummi23bjfp6bd7owsvrijgt7zhvxmz5h5f5spcioeoetwq.b32.i2p",
"e55k6wu46rzp4pg5pk5npgbr3zz45bc3ihtzu2xcye5vwnzdy7pq.b32.i2p",
"eciohu5nq7vsvwjjc52epskuk75d24iccgzmhbzrwonw6lx4gdva.b32.i2p",
"ejlnngarmhqvune74ko7kk55xtgbz5i5ncs4vmnvjpy3l7y63xaa.b32.i2p",
"fhzlp3xroabohnmjonu5iqazwhlbbwh5cpujvw2azcu3srqdceja.b32.i2p",
"[fc32:17ea:e415:c3bf:9808:149d:b5a2:c9aa]",
"[fcc7:be49:ccd1:dc91:3125:f0da:457d:8ce]",
"[fcdc:73ae:b1a9:1bf8:d4c2:811:a4c7:c34e]",
]
class P2PPrivateBroadcast(BitcoinTestFramework):
def set_test_params(self):
self.disable_autoconnect = False
self.num_nodes = 2
def setup_nodes(self):
# Start a SOCKS5 proxy server.
socks5_server_config = Socks5Configuration()
# self.nodes[0] listens on p2p_port(0),
# self.nodes[1] listens on p2p_port(1),
# thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes (which is 2))
socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes))
socks5_server_config.unauth = True
socks5_server_config.auth = True
self.socks5_server = Socks5Server(socks5_server_config)
self.socks5_server.start()
# Tor ports are the highest among p2p/rpc/tor, so this should be the first available port.
ports_base = tor_port(MAX_NODES) + 1
self.destinations = []
self.destinations_lock = threading.Lock()
def destinations_factory(requested_to_addr, requested_to_port):
with self.destinations_lock:
i = len(self.destinations)
actual_to_addr = ""
actual_to_port = 0
listener = None
if i == NUM_INITIAL_CONNECTIONS:
# Instruct the SOCKS5 server to redirect the first private
# broadcast connection from nodes[0] to nodes[1]
actual_to_addr = "127.0.0.1" # nodes[1] listen address
actual_to_port = tor_port(1) # nodes[1] listen port for Tor
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
else:
# Create a Python P2P listening node and instruct the SOCKS5 proxy to
# redirect the connection to it. The first outbound connection is used
# later to serve GETDATA, thus make it P2PDataStore().
listener = P2PDataStore() if i == 0 else P2PInterface()
listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
listener.peer_connect_send_version(services=P2P_SERVICES)
def on_listen_done(addr, port):
nonlocal actual_to_addr
nonlocal actual_to_port
actual_to_addr = addr
actual_to_port = port
self.network_thread.listen(
addr="127.0.0.1",
port=ports_base + i,
p2p=listener,
callback=on_listen_done)
# Wait until the callback has been called.
self.wait_until(lambda: actual_to_port != 0)
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
self.destinations.append({
"requested_to": format_addr_port(requested_to_addr, requested_to_port),
"node": listener,
})
assert_equal(len(self.destinations), i + 1)
return {
"actual_to_addr": actual_to_addr,
"actual_to_port": actual_to_port,
}
self.socks5_server.conf.destinations_factory = destinations_factory
self.extra_args = [
[
# Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable).
"-cjdnsreachable",
# Connecting, sending garbage, being disconnected messes up with this test's
# check_broadcasts() which waits for a particular Python node to receive a connection.
"-v2transport=0",
"-test=addrman",
"-privatebroadcast",
f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}",
],
[
"-connect=0",
f"-bind=127.0.0.1:{tor_port(1)}=onion",
],
]
super().setup_nodes()
def setup_network(self):
self.setup_nodes()
def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):
broadcasts_done = 0
i = skip_destinations - 1
while broadcasts_done < broadcasts_to_expect:
i += 1
self.log.debug(f"{label}: waiting for outbound connection i={i}")
# At this point the connection may not yet have been established (A),
# may be active (B), or may have already been closed (C).
self.wait_until(lambda: len(self.destinations) > i)
dest = self.destinations[i]
peer = dest["node"]
peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
# Now it is either (B) or (C).
if peer.last_message["version"].nServices != 0:
self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)")
continue
self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it")
peer.wait_for_disconnect()
# Now it is (C).
assert_equal(peer.message_count, {
"version": 1,
"verack": 1,
"inv": 1,
"tx": 1,
"ping": 1
})
dummy_address = CAddress()
dummy_address.nServices = 0
assert_equal(peer.last_message["version"].nVersion, P2P_VERSION)
assert_equal(peer.last_message["version"].nServices, 0)
assert_equal(peer.last_message["version"].nTime, 0)
assert_equal(peer.last_message["version"].addrTo, dummy_address)
assert_equal(peer.last_message["version"].addrFrom, dummy_address)
assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/")
assert_equal(peer.last_message["version"].nStartingHeight, 0)
assert_equal(peer.last_message["version"].relay, 0)
assert_equal(peer.last_message["tx"].tx.rehash(), tx["txid"])
self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}")
broadcasts_done += 1
def run_test(self):
tx_originator = self.nodes[0]
tx_receiver = self.nodes[1]
far_observer = tx_receiver.add_p2p_connection(P2PInterface())
wallet = MiniWallet(tx_originator)
# Fill tx_originator's addrman.
for addr in ADDRMAN_ADDRESSES:
res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False)
if not res["success"]:
self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")
self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)
# The next opened connection by tx_originator should be "private broadcast"
# for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.
txs = wallet.create_self_transfer_chain(chain_length=3)
self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast")
self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool")
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)
self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, "
"must be the first private broadcast connection")
self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0)
far_observer.wait_for_tx(txs[0]["txid"])
self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: "
"the private broadcast target received and further relayed the transaction")
# One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts.
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1)
self.log.info("Resending the same transaction via RPC again")
ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}"
with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]):
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)
# TODO: Create a malleated valid witness (how?) and substitute malleate_tx() below:
#self.log.info("Sending a malleated transaction with a valid witness via RPC")
#malleated_valid = malleate_tx(txs[0])
#ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={malleated_valid.rehash()}, wtxid={malleated_valid.getwtxid()}"
#with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]):
# tx_originator.sendrawtransaction(hexstring=malleated_valid.serialize_with_witness().hex(), maxfeerate=0.1)
self.log.info("Sending a malleated transaction with an invalid witness via RPC")
malleated_invalid = malleate_tx(txs[0])
assert_raises_rpc_error(-26, "mandatory-script-verify-flag-failed",
tx_originator.sendrawtransaction,
hexstring=malleated_invalid.serialize_with_witness().hex(),
maxfeerate=0.1)
self.log.info("Checking that the transaction is not in the originator node's mempool")
assert_equal(len(tx_originator.getrawmempool()), 0)
wtxid_int = int(txs[0]["wtxid"], 16)
inv = CInv(MSG_WTX, wtxid_int)
self.log.info("Sending INV and waiting for GETDATA from node")
tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
assert "getdata" not in tx_returner.last_message
received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]):
tx_returner.send_without_ping(msg_inv([inv]))
tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message)
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)
self.log.info("Waiting for normal broadcast to another peer")
self.destinations[1]["node"].wait_for_inv([inv])
self.log.info("Sending a transaction with a dependency in the mempool")
skip_destinations = len(self.destinations)
tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1)
self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations)
self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)")
assert_equal(len(tx_originator.getrawmempool()), 1)
assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent",
tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1)
assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent",
tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0)
# Since txs[1] has not been received back by tx_originator,
# it should be re-broadcast after a while. Advance tx_originator's clock
# to trigger a re-broadcast. Should be more than the maximum returned by
# NextTxBroadcast() in net_processing.cpp.
self.log.info("Checking that rebroadcast works")
delta = 20 * 60 # 20min
skip_destinations = len(self.destinations)
rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}"
with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]):
tx_originator.setmocktime(int(time.time()) + delta)
tx_originator.mockscheduler(delta)
self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations)
tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this).
self.log.info("Trying to send a transaction when none of Tor or I2P is reachable")
self.restart_node(0, extra_args=[
"-privatebroadcast",
"-v2transport=0",
# A location where definitely a Tor control is not listening. This would allow
# Bitcoin Core to start, hoping/assuming that the location of the Tor proxy
# may be retrieved after startup from the Tor control, but it will not be, so
# the RPC should throw.
"-torcontrol=127.0.0.1:1",
"-listenonion",
])
assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable",
tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1)
if __name__ == "__main__":
P2PPrivateBroadcast(__file__).main()

View file

@ -29,7 +29,10 @@ import time
import unittest
from test_framework.crypto.siphash import siphash256
from test_framework.util import assert_equal
from test_framework.util import (
assert_equal,
assert_not_equal,
)
MAX_LOCATOR_SZ = 101
MAX_BLOCK_WEIGHT = 4000000
@ -243,6 +246,23 @@ def tx_from_hex(hex_string):
return from_hex(CTransaction(), hex_string)
def malleate_tx(tx):
"""
Create a malleated version of the tx where the witness is replaced with garbage data.
Returns a CTransaction object.
"""
tx_bad_wit = tx_from_hex(tx["hex"])
tx_bad_wit.wit.vtxinwit = [CTxInWitness()]
# Add garbage data to witness 0. We cannot simply strip the witness, as the node would
# classify it as a transaction in which the witness was missing rather than wrong.
tx_bad_wit.wit.vtxinwit[0].scriptWitness.stack = [b'garbage']
assert_equal(tx["txid"], tx_bad_wit.rehash())
assert_not_equal(tx["wtxid"], tx_bad_wit.getwtxid())
return tx_bad_wit
# like from_hex, but without the hex part
def from_binary(cls, stream):
"""deserialize a binary stream (or bytes object) into an object"""

View file

@ -297,6 +297,7 @@ BASE_SCRIPTS = [
'rpc_dumptxoutset.py',
'feature_minchainwork.py',
'rpc_estimatefee.py',
'p2p_private_broadcast.py',
'rpc_getblockstats.py',
'feature_port.py',
'feature_bind_port_externalip.py',