Merge bitcoin/bitcoin#21943: Dedup and RAII-fy the creation of a copy of CConnman::vNodes

f52b6b2d9f net: split CConnman::SocketHandler() (Vasil Dimov)
c7eb19ec83 style: remove unnecessary braces (Vasil Dimov)
664ac22c53 net: keep reference to each node during socket wait (Vasil Dimov)
75e8bf55f5 net: dedup and RAII-fy the creation of a copy of CConnman::vNodes (Vasil Dimov)

Pull request description:

  _This is a piece of https://github.com/bitcoin/bitcoin/pull/21878, chopped off to ease review._

  The following pattern was duplicated in CConnman:

  ```cpp
  lock
  create a copy of vNodes, add a reference to each one
  unlock
  ... use the copy ...
  lock
  release each node from the copy
  unlock
  ```

  Put that code in a RAII helper that reduces it to:

  ```cpp
  create snapshot "snap"
  ... use the copy ...
  // release happens when "snap" goes out of scope

ACKs for top commit:
  jonatack:
    ACK  f52b6b2d9f changes since last review are reordered commits, removing an unneeded local variable, and code formatting and documentation improvements
  LarryRuane:
    code review ACK f52b6b2d9f
  promag:
    Code review ACK f52b6b2d9f, only format changes and comment tweaks since last review.

Tree-SHA512: 5ead7b4c641ebe5b215e7baeb7bc0cdab2a588b2871d9a343a1d518535c55c0353d4e46de663f41513cdcc79262938ccea3232f6d5166570fc2230286c985f68
This commit is contained in:
W. J. van der Laan 2021-11-24 16:36:39 +01:00
commit 64059b78f5
No known key found for this signature in database
GPG key ID: 1E4AED62986CD25D
2 changed files with 183 additions and 102 deletions

View file

@ -1353,46 +1353,45 @@ bool CConnman::InactivityCheck(const CNode& node) const
return false;
}
bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set)
{
for (const ListenSocket& hListenSocket : vhListenSocket) {
recv_set.insert(hListenSocket.socket);
}
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
for (CNode* pnode : nodes) {
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
error_set.insert(pnode->hSocket);
if (select_send) {
send_set.insert(pnode->hSocket);
continue;
}
if (select_recv) {
recv_set.insert(pnode->hSocket);
}
error_set.insert(pnode->hSocket);
if (select_send) {
send_set.insert(pnode->hSocket);
continue;
}
if (select_recv) {
recv_set.insert(pnode->hSocket);
}
}
@ -1400,10 +1399,13 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
}
#ifdef USE_POLL
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@ -1442,10 +1444,13 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
}
}
#else
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@ -1519,34 +1524,33 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
void CConnman::SocketHandler()
{
std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set);
std::set<SOCKET> recv_set;
std::set<SOCKET> send_set;
std::set<SOCKET> error_set;
if (interruptNet) return;
//
// Accept new connections
//
for (const ListenSocket& hListenSocket : vhListenSocket)
{
if (hListenSocket.socket != INVALID_SOCKET && recv_set.count(hListenSocket.socket) > 0)
{
AcceptConnection(hListenSocket);
}
const NodesSnapshot snap{*this, /*shuffle=*/false};
// Check for the readiness of the already connected sockets and the
// listening sockets in one call ("readiness" as in poll(2) or
// select(2)). If none are ready, wait for a short while and return
// empty sets.
SocketEvents(snap.Nodes(), recv_set, send_set, error_set);
// Service (send/receive) each of the already connected nodes.
SocketHandlerConnected(snap.Nodes(), recv_set, send_set, error_set);
}
//
// Service each socket
//
std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
for (CNode* pnode : vNodesCopy)
pnode->AddRef();
}
for (CNode* pnode : vNodesCopy)
{
// Accept new connections from listening sockets.
SocketHandlerListening(recv_set);
}
void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
const std::set<SOCKET>& recv_set,
const std::set<SOCKET>& send_set,
const std::set<SOCKET>& error_set)
{
for (CNode* pnode : nodes) {
if (interruptNet)
return;
@ -1628,10 +1632,17 @@ void CConnman::SocketHandler()
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodesCopy)
pnode->Release();
}
void CConnman::SocketHandlerListening(const std::set<SOCKET>& recv_set)
{
for (const ListenSocket& listen_socket : vhListenSocket) {
if (interruptNet) {
return;
}
if (listen_socket.socket != INVALID_SOCKET && recv_set.count(listen_socket.socket) > 0) {
AcceptConnection(listen_socket);
}
}
}
@ -2246,49 +2257,34 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
void CConnman::ThreadMessageHandler()
{
SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER);
FastRandomContext rng;
while (!flagInterruptMsgProc)
{
std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
for (CNode* pnode : vNodesCopy) {
pnode->AddRef();
}
}
bool fMoreWork = false;
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the vNodes list.
Shuffle(vNodesCopy.begin(), vNodesCopy.end(), rng);
for (CNode* pnode : vNodesCopy)
{
if (pnode->fDisconnect)
continue;
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the vNodes list.
const NodesSnapshot snap{*this, /*shuffle=*/true};
// Receive messages
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
{
LOCK(pnode->cs_sendProcessing);
m_msgproc->SendMessages(pnode);
for (CNode* pnode : snap.Nodes()) {
if (pnode->fDisconnect)
continue;
// Receive messages
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
{
LOCK(pnode->cs_sendProcessing);
m_msgproc->SendMessages(pnode);
}
if (flagInterruptMsgProc)
return;
}
if (flagInterruptMsgProc)
return;
}
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodesCopy)
pnode->Release();
}
WAIT_LOCK(mutexMsgProc, lock);

View file

@ -983,9 +983,57 @@ private:
void NotifyNumConnectionsChanged();
/** Return true if the peer is inactive and should be disconnected. */
bool InactivityCheck(const CNode& node) const;
bool GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
void SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
/**
* Generate a collection of sockets to check for IO readiness.
* @param[in] nodes Select from these nodes' sockets.
* @param[out] recv_set Sockets to check for read readiness.
* @param[out] send_set Sockets to check for write readiness.
* @param[out] error_set Sockets to check for errors.
* @return true if at least one socket is to be checked (the returned set is not empty)
*/
bool GenerateSelectSet(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set);
/**
* Check which sockets are ready for IO.
* @param[in] nodes Select from these nodes' sockets.
* @param[out] recv_set Sockets which are ready for read.
* @param[out] send_set Sockets which are ready for write.
* @param[out] error_set Sockets which have errors.
* This calls `GenerateSelectSet()` to gather a list of sockets to check.
*/
void SocketEvents(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void SocketHandler();
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] nodes Nodes to process. The socket of each node is checked against
* `recv_set`, `send_set` and `error_set`.
* @param[in] recv_set Sockets that are ready for read.
* @param[in] send_set Sockets that are ready for send.
* @param[in] error_set Sockets that have an exceptional condition (error).
*/
void SocketHandlerConnected(const std::vector<CNode*>& nodes,
const std::set<SOCKET>& recv_set,
const std::set<SOCKET>& send_set,
const std::set<SOCKET>& error_set);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] recv_set Sockets that are ready for read.
*/
void SocketHandlerListening(const std::set<SOCKET>& recv_set);
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
@ -1177,6 +1225,43 @@ private:
*/
std::vector<CService> m_onion_binds;
/**
* RAII helper to atomically create a copy of `vNodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/
class NodesSnapshot
{
public:
explicit NodesSnapshot(const CConnman& connman, bool shuffle)
{
{
LOCK(connman.cs_vNodes);
m_nodes_copy = connman.vNodes;
for (auto& node : m_nodes_copy) {
node->AddRef();
}
}
if (shuffle) {
Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{});
}
}
~NodesSnapshot()
{
for (auto& node : m_nodes_copy) {
node->Release();
}
}
const std::vector<CNode*>& Nodes() const
{
return m_nodes_copy;
}
private:
std::vector<CNode*> m_nodes_copy;
};
friend struct CConnmanTest;
friend struct ConnmanTestMsg;
};