mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-01-10 11:57:28 -03:00
net: reconnect with V1Transport under certain conditions
When an outbound v2 connection is disconnected without receiving anything, but at least 24 bytes of our pubkey were sent out (enough to constitute an invalid v1 header), add them to a queue of reconnections to be tried. The reconnections are in a queue rather than performed immediately, because we should not block the socket handler thread with connection creation (a blocking operation that can take multiple seconds).
This commit is contained in:
parent
4d265d0342
commit
432a62c4dc
2 changed files with 119 additions and 4 deletions
79
src/net.cpp
79
src/net.cpp
|
@ -1546,6 +1546,9 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
|
|||
|
||||
m_send_pos += bytes_sent;
|
||||
Assume(m_send_pos <= m_send_buffer.size());
|
||||
if (m_send_pos >= CMessageHeader::HEADER_SIZE) {
|
||||
m_sent_v1_header_worth = true;
|
||||
}
|
||||
// Wipe the buffer when everything is sent.
|
||||
if (m_send_pos == m_send_buffer.size()) {
|
||||
m_send_pos = 0;
|
||||
|
@ -1553,6 +1556,23 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
|
|||
}
|
||||
}
|
||||
|
||||
bool V2Transport::ShouldReconnectV1() const noexcept
|
||||
{
|
||||
AssertLockNotHeld(m_send_mutex);
|
||||
AssertLockNotHeld(m_recv_mutex);
|
||||
// Only outgoing connections need reconnection.
|
||||
if (!m_initiating) return false;
|
||||
|
||||
LOCK(m_recv_mutex);
|
||||
// We only reconnect in the very first state and when the receive buffer is empty. Together
|
||||
// these conditions imply nothing has been received so far.
|
||||
if (m_recv_state != RecvState::KEY) return false;
|
||||
if (!m_recv_buffer.empty()) return false;
|
||||
// Check if we've sent enough for the other side to disconnect us (if it was V1).
|
||||
LOCK(m_send_mutex);
|
||||
return m_sent_v1_header_worth;
|
||||
}
|
||||
|
||||
size_t V2Transport::GetSendMemoryUsage() const noexcept
|
||||
{
|
||||
AssertLockNotHeld(m_send_mutex);
|
||||
|
@ -1868,6 +1888,13 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
|
|||
|
||||
void CConnman::DisconnectNodes()
|
||||
{
|
||||
AssertLockNotHeld(m_nodes_mutex);
|
||||
AssertLockNotHeld(m_reconnections_mutex);
|
||||
|
||||
// Use a temporary variable to accumulate desired reconnections, so we don't need
|
||||
// m_reconnections_mutex while holding m_nodes_mutex.
|
||||
decltype(m_reconnections) reconnections_to_add;
|
||||
|
||||
{
|
||||
LOCK(m_nodes_mutex);
|
||||
|
||||
|
@ -1890,6 +1917,19 @@ void CConnman::DisconnectNodes()
|
|||
// remove from m_nodes
|
||||
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
|
||||
|
||||
// Add to reconnection list if appropriate. We don't reconnect right here, because
|
||||
// the creation of a connection is a blocking operation (up to several seconds),
|
||||
// and we don't want to hold up the socket handler thread for that long.
|
||||
if (pnode->m_transport->ShouldReconnectV1()) {
|
||||
reconnections_to_add.push_back({
|
||||
.addr_connect = pnode->addr,
|
||||
.grant = std::move(pnode->grantOutbound),
|
||||
.destination = pnode->m_dest,
|
||||
.conn_type = pnode->m_conn_type,
|
||||
.use_v2transport = false});
|
||||
LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
|
||||
}
|
||||
|
||||
// release outbound grant (if any)
|
||||
pnode->grantOutbound.Release();
|
||||
|
||||
|
@ -1917,6 +1957,11 @@ void CConnman::DisconnectNodes()
|
|||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
// Move entries from reconnections_to_add to m_reconnections.
|
||||
LOCK(m_reconnections_mutex);
|
||||
m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add));
|
||||
}
|
||||
}
|
||||
|
||||
void CConnman::NotifyNumConnectionsChanged()
|
||||
|
@ -2389,6 +2434,7 @@ bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
|
|||
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
|
||||
{
|
||||
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
|
||||
AssertLockNotHeld(m_reconnections_mutex);
|
||||
FastRandomContext rng;
|
||||
// Connect to specific addresses
|
||||
if (!connect.empty())
|
||||
|
@ -2432,6 +2478,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
|
|||
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
|
||||
return;
|
||||
|
||||
PerformReconnections();
|
||||
|
||||
CSemaphoreGrant grant(*semOutbound);
|
||||
if (interruptNet)
|
||||
return;
|
||||
|
@ -2778,6 +2826,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
|
|||
void CConnman::ThreadOpenAddedConnections()
|
||||
{
|
||||
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
|
||||
AssertLockNotHeld(m_reconnections_mutex);
|
||||
while (true)
|
||||
{
|
||||
CSemaphoreGrant grant(*semAddnode);
|
||||
|
@ -2800,6 +2849,8 @@ void CConnman::ThreadOpenAddedConnections()
|
|||
// Retry every 60 seconds if a connection was attempted, otherwise two seconds
|
||||
if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
|
||||
return;
|
||||
// See if any reconnections are desired.
|
||||
PerformReconnections();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3613,6 +3664,7 @@ CNode::CNode(NodeId idIn,
|
|||
addr{addrIn},
|
||||
addrBind{addrBindIn},
|
||||
m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn},
|
||||
m_dest(addrNameIn),
|
||||
m_inbound_onion{inbound_onion},
|
||||
m_prefer_evict{node_opts.prefer_evict},
|
||||
nKeyedNetGroup{nKeyedNetGroupIn},
|
||||
|
@ -3743,6 +3795,33 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
|
|||
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize();
|
||||
}
|
||||
|
||||
void CConnman::PerformReconnections()
|
||||
{
|
||||
AssertLockNotHeld(m_reconnections_mutex);
|
||||
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
|
||||
while (true) {
|
||||
// Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
|
||||
decltype(m_reconnections) todo;
|
||||
{
|
||||
LOCK(m_reconnections_mutex);
|
||||
if (m_reconnections.empty()) break;
|
||||
todo.splice(todo.end(), m_reconnections, m_reconnections.begin());
|
||||
}
|
||||
|
||||
auto& item = *todo.begin();
|
||||
OpenNetworkConnection(item.addr_connect,
|
||||
// We only reconnect if the first attempt to connect succeeded at
|
||||
// connection time, but then failed after the CNode object was
|
||||
// created. Since we already know connecting is possible, do not
|
||||
// count failure to reconnect.
|
||||
/*fCountFailure=*/false,
|
||||
std::move(item.grant),
|
||||
item.destination.empty() ? nullptr : item.destination.c_str(),
|
||||
item.conn_type,
|
||||
item.use_v2transport);
|
||||
}
|
||||
}
|
||||
|
||||
// Dump binary message to file, with timestamp.
|
||||
static void CaptureMessageToFile(const CAddress& addr,
|
||||
const std::string& msg_type,
|
||||
|
|
44
src/net.h
44
src/net.h
|
@ -361,6 +361,11 @@ public:
|
|||
|
||||
/** Return the memory usage of this transport attributable to buffered data to send. */
|
||||
virtual size_t GetSendMemoryUsage() const noexcept = 0;
|
||||
|
||||
// 3. Miscellaneous functions.
|
||||
|
||||
/** Whether upon disconnections, a reconnect with V1 is warranted. */
|
||||
virtual bool ShouldReconnectV1() const noexcept = 0;
|
||||
};
|
||||
|
||||
class V1Transport final : public Transport
|
||||
|
@ -440,6 +445,7 @@ public:
|
|||
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
bool ShouldReconnectV1() const noexcept override { return false; }
|
||||
};
|
||||
|
||||
class V2Transport final : public Transport
|
||||
|
@ -608,6 +614,8 @@ private:
|
|||
std::string m_send_type GUARDED_BY(m_send_mutex);
|
||||
/** Current sender state. */
|
||||
SendState m_send_state GUARDED_BY(m_send_mutex);
|
||||
/** Whether we've sent at least 24 bytes (which would trigger disconnect for V1 peers). */
|
||||
bool m_sent_v1_header_worth GUARDED_BY(m_send_mutex) {false};
|
||||
|
||||
/** Change the receive state. */
|
||||
void SetReceiveState(RecvState recv_state) noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
|
||||
|
@ -653,6 +661,9 @@ public:
|
|||
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
|
||||
|
||||
// Miscellaneous functions.
|
||||
bool ShouldReconnectV1() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex, !m_send_mutex);
|
||||
};
|
||||
|
||||
struct CNodeOptions
|
||||
|
@ -706,6 +717,8 @@ public:
|
|||
// Bind address of our side of the connection
|
||||
const CAddress addrBind;
|
||||
const std::string m_addr_name;
|
||||
/** The pszDest argument provided to ConnectNode(). Only used for reconnections. */
|
||||
const std::string m_dest;
|
||||
//! Whether this peer is an inbound onion, i.e. connected via our Tor onion service.
|
||||
const bool m_inbound_onion;
|
||||
std::atomic<int> nVersion{0};
|
||||
|
@ -1253,10 +1266,10 @@ private:
|
|||
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
|
||||
bool InitBinds(const Options& options);
|
||||
|
||||
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
|
||||
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
|
||||
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
|
||||
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
|
||||
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
|
||||
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
|
||||
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
||||
void ThreadI2PAcceptIncoming();
|
||||
void AcceptConnection(const ListenSocket& hListenSocket);
|
||||
|
@ -1274,7 +1287,7 @@ private:
|
|||
const CAddress& addr_bind,
|
||||
const CAddress& addr);
|
||||
|
||||
void DisconnectNodes();
|
||||
void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex);
|
||||
void NotifyNumConnectionsChanged();
|
||||
/** Return true if the peer is inactive and should be disconnected. */
|
||||
bool InactivityCheck(const CNode& node) const;
|
||||
|
@ -1306,7 +1319,7 @@ private:
|
|||
*/
|
||||
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock);
|
||||
|
||||
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
|
||||
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex);
|
||||
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
|
||||
|
||||
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
|
||||
|
@ -1537,6 +1550,29 @@ private:
|
|||
*/
|
||||
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
|
||||
|
||||
/**
|
||||
* Mutex protecting m_reconnections.
|
||||
*/
|
||||
Mutex m_reconnections_mutex;
|
||||
|
||||
/** Struct for entries in m_reconnections. */
|
||||
struct ReconnectionInfo
|
||||
{
|
||||
CAddress addr_connect;
|
||||
CSemaphoreGrant grant;
|
||||
std::string destination;
|
||||
ConnectionType conn_type;
|
||||
bool use_v2transport;
|
||||
};
|
||||
|
||||
/**
|
||||
* List of reconnections we have to make.
|
||||
*/
|
||||
std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex);
|
||||
|
||||
/** Attempt reconnections, if m_reconnections non-empty. */
|
||||
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_unused_i2p_sessions_mutex);
|
||||
|
||||
/**
|
||||
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
|
||||
* unexpectedly use too much memory.
|
||||
|
|
Loading…
Reference in a new issue