net_processing: add thread safety annotations for Peer members accessed only via the msgproc thread

This commit is contained in:
Anthony Towns 2022-09-13 12:24:45 +10:00
parent bf12abe454
commit a66a7ccb82

View file

@ -264,10 +264,10 @@ struct Peer {
/** The feerate in the most recent BIP133 `feefilter` message sent to the peer. /** The feerate in the most recent BIP133 `feefilter` message sent to the peer.
* It is *not* a p2p protocol violation for the peer to send us * It is *not* a p2p protocol violation for the peer to send us
* transactions with a lower fee rate than this. See BIP133. */ * transactions with a lower fee rate than this. See BIP133. */
CAmount m_fee_filter_sent{0}; CAmount m_fee_filter_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
/** Timestamp after which we will send the next BIP133 `feefilter` message /** Timestamp after which we will send the next BIP133 `feefilter` message
* to the peer. */ * to the peer. */
std::chrono::microseconds m_next_send_feefilter{0}; std::chrono::microseconds m_next_send_feefilter GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
struct TxRelay { struct TxRelay {
mutable RecursiveMutex m_bloom_filter_mutex; mutable RecursiveMutex m_bloom_filter_mutex;
@ -298,7 +298,7 @@ struct Peer {
std::atomic<std::chrono::seconds> m_last_mempool_req{0s}; std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
/** The next time after which we will send an `inv` message containing /** The next time after which we will send an `inv` message containing
* transaction announcements to this peer. */ * transaction announcements to this peer. */
std::chrono::microseconds m_next_inv_send_time{0}; std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
/** Minimum fee rate with which to filter transaction announcements to this node. See BIP133. */ /** Minimum fee rate with which to filter transaction announcements to this node. See BIP133. */
std::atomic<CAmount> m_fee_filter_received{0}; std::atomic<CAmount> m_fee_filter_received{0};
@ -319,7 +319,7 @@ struct Peer {
}; };
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send; std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
/** Probabilistic filter to track recent addr messages relayed with this /** Probabilistic filter to track recent addr messages relayed with this
* peer. Used to avoid relaying redundant addresses to this peer. * peer. Used to avoid relaying redundant addresses to this peer.
* *
@ -329,7 +329,7 @@ struct Peer {
* *
* Presence of this filter must correlate with m_addr_relay_enabled. * Presence of this filter must correlate with m_addr_relay_enabled.
**/ **/
std::unique_ptr<CRollingBloomFilter> m_addr_known; std::unique_ptr<CRollingBloomFilter> m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
/** Whether we are participating in address relay with this connection. /** Whether we are participating in address relay with this connection.
* *
* We set this bool to true for outbound peers (other than * We set this bool to true for outbound peers (other than
@ -346,7 +346,7 @@ struct Peer {
* initialized.*/ * initialized.*/
std::atomic_bool m_addr_relay_enabled{false}; std::atomic_bool m_addr_relay_enabled{false};
/** Whether a getaddr request to this peer is outstanding. */ /** Whether a getaddr request to this peer is outstanding. */
bool m_getaddr_sent{false}; bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Guards address sending timers. */ /** Guards address sending timers. */
mutable Mutex m_addr_send_times_mutex; mutable Mutex m_addr_send_times_mutex;
/** Time point to send the next ADDR message to this peer. */ /** Time point to send the next ADDR message to this peer. */
@ -357,12 +357,12 @@ struct Peer {
* messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */ * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */
std::atomic_bool m_wants_addrv2{false}; std::atomic_bool m_wants_addrv2{false};
/** Whether this peer has already sent us a getaddr message. */ /** Whether this peer has already sent us a getaddr message. */
bool m_getaddr_recvd{false}; bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Number of addresses that can be processed from this peer. Start at 1 to /** Number of addresses that can be processed from this peer. Start at 1 to
* permit self-announcement. */ * permit self-announcement. */
double m_addr_token_bucket{1.0}; double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0};
/** When m_addr_token_bucket was last updated */ /** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()}; std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */ /** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0}; std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate-limited ones). */ /** Total number of addresses that were processed (excludes rate-limited ones). */
@ -372,7 +372,7 @@ struct Peer {
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
/** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */ /** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
bool m_inv_triggered_getheaders_before_sync{false}; bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Protects m_getdata_requests **/ /** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex; Mutex m_getdata_requests_mutex;
@ -380,7 +380,7 @@ struct Peer {
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
/** Time of the last getheaders message to this peer */ /** Time of the last getheaders message to this peer */
NodeClock::time_point m_last_getheaders_timestamp{}; NodeClock::time_point m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){};
/** Protects m_headers_sync **/ /** Protects m_headers_sync **/
Mutex m_headers_sync_mutex; Mutex m_headers_sync_mutex;
@ -537,7 +537,7 @@ public:
private: private:
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex);
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@ -601,7 +601,7 @@ private:
void ProcessHeadersMessage(CNode& pfrom, Peer& peer, void ProcessHeadersMessage(CNode& pfrom, Peer& peer,
std::vector<CBlockHeader>&& headers, std::vector<CBlockHeader>&& headers,
bool via_compact_block) bool via_compact_block)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
/** Various helpers for headers processing, invoked by ProcessHeadersMessage() */ /** Various helpers for headers processing, invoked by ProcessHeadersMessage() */
/** Return true if headers are continuous and have valid proof-of-work (DoS points assigned on failure) */ /** Return true if headers are continuous and have valid proof-of-work (DoS points assigned on failure) */
bool CheckHeadersPoW(const std::vector<CBlockHeader>& headers, const Consensus::Params& consensusParams, Peer& peer); bool CheckHeadersPoW(const std::vector<CBlockHeader>& headers, const Consensus::Params& consensusParams, Peer& peer);
@ -610,7 +610,7 @@ private:
/** Deal with state tracking and headers sync for peers that send the /** Deal with state tracking and headers sync for peers that send the
* occasional non-connecting header (this can happen due to BIP 130 headers * occasional non-connecting header (this can happen due to BIP 130 headers
* announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */ * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */
void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers); void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Return true if the headers connect to each other, false otherwise */ /** Return true if the headers connect to each other, false otherwise */
bool CheckHeadersAreContinuous(const std::vector<CBlockHeader>& headers) const; bool CheckHeadersAreContinuous(const std::vector<CBlockHeader>& headers) const;
/** Try to continue a low-work headers sync that has already begun. /** Try to continue a low-work headers sync that has already begun.
@ -633,7 +633,7 @@ private:
*/ */
bool IsContinuationOfLowWorkHeadersSync(Peer& peer, CNode& pfrom, bool IsContinuationOfLowWorkHeadersSync(Peer& peer, CNode& pfrom,
std::vector<CBlockHeader>& headers) std::vector<CBlockHeader>& headers)
EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex); EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
/** Check work on a headers chain to be processed, and if insufficient, /** Check work on a headers chain to be processed, and if insufficient,
* initiate our anti-DoS headers sync mechanism. * initiate our anti-DoS headers sync mechanism.
* *
@ -649,7 +649,7 @@ private:
bool TryLowWorkHeadersSync(Peer& peer, CNode& pfrom, bool TryLowWorkHeadersSync(Peer& peer, CNode& pfrom,
const CBlockIndex* chain_start_header, const CBlockIndex* chain_start_header,
std::vector<CBlockHeader>& headers) std::vector<CBlockHeader>& headers)
EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex); EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
/** Return true if the given header is an ancestor of /** Return true if the given header is an ancestor of
* m_chainman.m_best_header or our current tip */ * m_chainman.m_best_header or our current tip */
@ -659,7 +659,7 @@ private:
* We don't issue a getheaders message if we have a recent one outstanding. * We don't issue a getheaders message if we have a recent one outstanding.
* This returns true if a getheaders is actually sent, and false otherwise. * This returns true if a getheaders is actually sent, and false otherwise.
*/ */
bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer); bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Potentially fetch blocks from this peer upon receipt of a new headers tip */ /** Potentially fetch blocks from this peer upon receipt of a new headers tip */
void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast); void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast);
/** Update peer state based on received headers message */ /** Update peer state based on received headers message */
@ -683,10 +683,10 @@ private:
void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now);
/** Send `addr` messages on a regular schedule. */ /** Send `addr` messages on a regular schedule. */
void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time); void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Send a single `sendheaders` message, after we have completed headers sync with a peer. */ /** Send a single `sendheaders` message, after we have completed headers sync with a peer. */
void MaybeSendSendHeaders(CNode& node, Peer& peer); void MaybeSendSendHeaders(CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Relay (gossip) an address to a few randomly chosen nodes. /** Relay (gossip) an address to a few randomly chosen nodes.
* *
@ -695,10 +695,10 @@ private:
* @param[in] fReachable Whether the address' network is reachable. We relay unreachable * @param[in] fReachable Whether the address' network is reachable. We relay unreachable
* addresses less. * addresses less.
*/ */
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
/** Send `feefilter` message. */ /** Send `feefilter` message. */
void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time); void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
const CChainParams& m_chainparams; const CChainParams& m_chainparams;
CConnman& m_connman; CConnman& m_connman;
@ -1010,7 +1010,10 @@ private:
* @return True if address relay is enabled with peer * @return True if address relay is enabled with peer
* False if address relay is disallowed * False if address relay is disallowed
*/ */
bool SetupAddressRelay(const CNode& node, Peer& peer); bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
}; };
const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main) const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main)
@ -1036,13 +1039,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr)
return peer.m_wants_addrv2 || addr.IsAddrV1Compatible(); return peer.m_wants_addrv2 || addr.IsAddrV1Compatible();
} }
static void AddAddressKnown(Peer& peer, const CAddress& addr) void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr)
{ {
assert(peer.m_addr_known); assert(peer.m_addr_known);
peer.m_addr_known->insert(addr.GetKey()); peer.m_addr_known->insert(addr.GetKey());
} }
static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
{ {
// Known checking here is only to save space from duplicates. // Known checking here is only to save space from duplicates.
// Before sending, we'll filter it again for known addresses that were // Before sending, we'll filter it again for known addresses that were
@ -5103,7 +5106,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros
// Remove addr records that the peer already knows about, and add new // Remove addr records that the peer already knows about, and add new
// addrs to the m_addr_known filter on the same pass. // addrs to the m_addr_known filter on the same pass.
auto addr_already_known = [&peer](const CAddress& addr) { auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) {
bool ret = peer.m_addr_known->contains(addr.GetKey()); bool ret = peer.m_addr_known->contains(addr.GetKey());
if (!ret) peer.m_addr_known->insert(addr.GetKey()); if (!ret) peer.m_addr_known->insert(addr.GetKey());
return ret; return ret;