diff --git a/src/main.cpp b/src/main.cpp index 7f408c4d1b..06957ba526 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3235,18 +3235,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) } } - // Ask the first connected node for block updates - static int nAskedForBlocks = 0; - if (!pfrom->fClient && !pfrom->fOneShot && !fImporting && !fReindex && - (pfrom->nStartingHeight > (nBestHeight - 144)) && - (pfrom->nVersion < NOBLKS_VERSION_START || - pfrom->nVersion >= NOBLKS_VERSION_END) && - (nAskedForBlocks < 1 || vNodes.size() <= 1)) - { - nAskedForBlocks++; - pfrom->PushGetBlocks(pindexBest, uint256(0)); - } - // Relay alerts { LOCK(cs_mapAlerts); @@ -3855,6 +3843,12 @@ bool SendMessages(CNode* pto, bool fSendTrickle) pto->PushMessage("ping"); } + // Start block sync + if (pto->fStartSync && !fImporting && !fReindex) { + pto->fStartSync = false; + pto->PushGetBlocks(pindexBest, uint256(0)); + } + // Resend wallet transactions that haven't gotten in a block yet // Except during reindex, importing and IBD, when old wallet // transactions become unconfirmed and spams other nodes. diff --git a/src/net.cpp b/src/net.cpp index 069f8b5cf6..755312682b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -44,6 +44,7 @@ static map mapLocalHost; static bool vfReachable[NET_MAX] = {}; static bool vfLimited[NET_MAX] = {}; static CNode* pnodeLocalHost = NULL; +static CNode* pnodeSync = NULL; uint64 nLocalHostNonce = 0; static std::vector vhListenSocket; CAddrMan addrman; @@ -515,12 +516,16 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); + + // if this was the sync node, we'll need a new one + if (this == pnodeSync) + pnodeSync = NULL; } void CNode::Cleanup() @@ -607,6 +612,9 @@ void CNode::copyStats(CNodeStats &stats) X(fInbound); X(nStartingHeight); X(nMisbehavior); + X(nSendBytes); + X(nRecvBytes); + stats.fSyncNode = (this == pnodeSync); } #undef X @@ -701,6 +709,7 @@ void SocketSendData(CNode *pnode) int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); if (nBytes > 0) { pnode->nLastSend = GetTime(); + pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; @@ -963,6 +972,7 @@ void ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); + pnode->nRecvBytes += nBytes; } else if (nBytes == 0) { @@ -1538,24 +1548,64 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu } +// for now, use a very simple selection metric: the node from which we received +// most recently +double static NodeSyncScore(const CNode *pnode) { + return -pnode->nLastRecv; +} +void static StartSync(const vector &vNodes) { + CNode *pnodeNewSync = NULL; + double dBestScore = 0; + // fImporting and fReindex are accessed out of cs_main here, but only + // as an optimization - they are checked again in SendMessages. + if (fImporting || fReindex) + return; - + // Iterate over all nodes + BOOST_FOREACH(CNode* pnode, vNodes) { + // check preconditions for allowing a sync + if (!pnode->fClient && !pnode->fOneShot && + !pnode->fDisconnect && pnode->fSuccessfullyConnected && + (pnode->nStartingHeight > (nBestHeight - 144)) && + (pnode->nVersion < NOBLKS_VERSION_START || pnode->nVersion >= NOBLKS_VERSION_END)) { + // if ok, compare node's score with the best so far + double dScore = NodeSyncScore(pnode); + if (pnodeNewSync == NULL || dScore > dBestScore) { + pnodeNewSync = pnode; + dBestScore = dScore; + } + } + } + // if a new sync candidate was found, start sync! + if (pnodeNewSync) { + pnodeNewSync->fStartSync = true; + pnodeSync = pnodeNewSync; + } +} void ThreadMessageHandler() { SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); while (true) { + bool fHaveSyncNode = false; + vector vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; - BOOST_FOREACH(CNode* pnode, vNodesCopy) + BOOST_FOREACH(CNode* pnode, vNodesCopy) { pnode->AddRef(); + if (pnode == pnodeSync) + fHaveSyncNode = true; + } } + if (!fHaveSyncNode) + StartSync(vNodesCopy); + // Poll the connected nodes for messages CNode* pnodeTrickle = NULL; if (!vNodesCopy.empty()) diff --git a/src/net.h b/src/net.h index 80773e3f19..32bbbca6dc 100644 --- a/src/net.h +++ b/src/net.h @@ -101,6 +101,9 @@ public: bool fInbound; int nStartingHeight; int nMisbehavior; + uint64 nSendBytes; + uint64 nRecvBytes; + bool fSyncNode; }; @@ -155,12 +158,14 @@ public: CDataStream ssSend; size_t nSendSize; // total size of all vSendMsg entries size_t nSendOffset; // offset inside the first vSendMsg already sent + uint64 nSendBytes; std::deque vSendMsg; CCriticalSection cs_vSend; std::deque vRecvGetData; std::deque vRecvMsg; CCriticalSection cs_vRecvMsg; + uint64 nRecvBytes; int nRecvVersion; int64 nLastSend; @@ -200,6 +205,7 @@ public: CBlockIndex* pindexLastGetBlocksBegin; uint256 hashLastGetBlocksEnd; int nStartingHeight; + bool fStartSync; // flood relay std::vector vAddrToSend; @@ -220,6 +226,8 @@ public: nRecvVersion = MIN_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; + nSendBytes = 0; + nRecvBytes = 0; nLastSendEmpty = GetTime(); nTimeConnected = GetTime(); addr = addrIn; @@ -239,6 +247,7 @@ public: pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; nStartingHeight = -1; + fStartSync = false; fGetAddr = false; nMisbehavior = 0; fRelayTxes = false; diff --git a/src/rpcnet.cpp b/src/rpcnet.cpp index e37b3009dd..644c2675ae 100644 --- a/src/rpcnet.cpp +++ b/src/rpcnet.cpp @@ -51,12 +51,16 @@ Value getpeerinfo(const Array& params, bool fHelp) obj.push_back(Pair("services", strprintf("%08"PRI64x, stats.nServices))); obj.push_back(Pair("lastsend", (boost::int64_t)stats.nLastSend)); obj.push_back(Pair("lastrecv", (boost::int64_t)stats.nLastRecv)); + obj.push_back(Pair("bytessent", (boost::int64_t)stats.nSendBytes)); + obj.push_back(Pair("bytesrecv", (boost::int64_t)stats.nRecvBytes)); obj.push_back(Pair("conntime", (boost::int64_t)stats.nTimeConnected)); obj.push_back(Pair("version", stats.nVersion)); obj.push_back(Pair("subver", stats.strSubVer)); obj.push_back(Pair("inbound", stats.fInbound)); obj.push_back(Pair("startingheight", stats.nStartingHeight)); obj.push_back(Pair("banscore", stats.nMisbehavior)); + if (stats.fSyncNode) + obj.push_back(Pair("syncnode", true)); ret.push_back(obj); }