Merge bitcoin/bitcoin#27981: Fix potential network stalling bug

3388e523a1 Rework receive buffer pushback (Pieter Wuille)

Pull request description:

  See https://github.com/ElementsProject/elements/issues/1233. There, it has been observed that if both sides of a P2P connection have a significant amount of data to send, a stall can occur, where both try to drain their own send queue before trying to receive. The same issue seems to apply to the current Bitcoin Core codebase, though I don't know whether it's a frequent issue for us.

  The core issue is that whenever our optimistic send fails to fully send a message, we do subsequently not even select() for receiving; if it then turns out that sending is not possible either, no progress is made at all. To address this, the solution used in this PR is to still select() for both sending and receiving when an optimistic send fails, but skip receiving if sending succeeded, and (still) doesn't fully drain the send queue.

  This is a significant reduction in how aggressive the "receive pushback" mechanism is, because now it will only mildly push back while sending progress is made; if the other side stops receiving entirely, the pushback disappears. I don't think that's a serious problem though:
  * We still have a pushback mechanism at the application buffer level (when the application receive buffer overflows, receiving is paused until messages in the buffer get processed; waiting on our own net_processing thread, not on the remote party).
  * There are cases where the existing mechanism is too aggressive; e.g. when the send queue is non-empty, but tiny, and can be sent with a single send() call. In that case, I think we'd prefer to still receive within the same processing loop of the network thread.

ACKs for top commit:
  ajtowns:
    ACK 3388e523a1
  naumenkogs:
    ACK 3388e523a1
  mzumsande:
    Tested ACK 3388e523a1

Tree-SHA512: 28960feb3cd2ff3dfb39622510da62472612f88165ea98fc9fb844bfcb8fa3ed3633f83e7bd72bdbbbd37993ef10181b2e1b34836ebb8f0d83fd1c558921ec17
This commit is contained in:
fanquake 2023-08-17 13:01:17 +01:00
commit 0a55bcd299
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
2 changed files with 30 additions and 37 deletions

View file

@ -833,7 +833,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
}
size_t CConnman::SocketSendData(CNode& node) const
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
@ -888,7 +888,7 @@ size_t CConnman::SocketSendData(CNode& node) const
assert(node.nSendSize == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
return nSentSize;
return {nSentSize, !node.vSendMsg.empty()};
}
/** Try to find a connection to evict when the node is full.
@ -1226,37 +1226,15 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
}
for (CNode* pnode : nodes) {
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
if (!pnode->m_sock) {
continue;
if (pnode->m_sock) {
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
events_per_sock.emplace(pnode->m_sock, Sock::Events{event});
}
Sock::Event requested{0};
if (select_send) {
requested = Sock::SEND;
} else if (select_recv) {
requested = Sock::RECV;
}
events_per_sock.emplace(pnode->m_sock, Sock::Events{requested});
}
return events_per_sock;
@ -1317,6 +1295,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
errorSet = it->second.occurred & Sock::ERR;
}
}
if (sendSet) {
// Send data
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) {
RecordBytesSent(bytes_sent);
// If both receiving and (non-optimistic) sending were possible, we first attempt
// sending. If that succeeds, but does not fully drain the send queue, do not
// attempt to receive. This avoids needlessly queueing data if the remote peer
// is slow at receiving data, by means of TCP flow control. We only do this when
// sending actually succeeded to make sure progress is always made; otherwise a
// deadlock would be possible when both sides have data to send, but neither is
// receiving.
if (data_left) recvSet = false;
}
}
if (recvSet || errorSet)
{
// typical socket buffer is 8K-64K
@ -1363,12 +1359,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
}
if (sendSet) {
// Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent);
}
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
@ -2935,7 +2925,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
// If write queue empty, attempt "optimistic write"
if (optimisticSend) nBytesSent = SocketSendData(*pnode);
bool data_left;
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}

View file

@ -1013,7 +1013,9 @@ private:
NodeId GetNewNodeId();
size_t SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
void DumpAddresses();
// Network stats