diff --git a/src/net.cpp b/src/net.cpp index 70c04d7a0e..8ae2bebd32 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler() if (lockRecv) { bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); } } if (flagInterruptMsgProc) @@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; fPauseRecv = false; + fPauseSend = false; nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) @@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); diff --git a/src/net.h b/src/net.h index 0eb430a8bf..db73be477f 100644 --- a/src/net.h +++ b/src/net.h @@ -358,6 +358,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -652,6 +652,7 @@ public: const uint64_t nKeyedNetGroup; std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 93b6e2ec01..185ab980fe 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic& interruptMsgProc) { std::deque::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); // // Message format // (4) message start @@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!pfrom->vRecvGetData.empty()) return true; // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) return false; std::list msgs;