Merge pull request #3180 from pstratem/processgetdata

Reduce latency in network processing
This commit is contained in:
Gavin Andresen 2013-11-03 21:52:25 -08:00
commit 97f844dd95
2 changed files with 25 additions and 3 deletions

View file

@ -3155,6 +3155,9 @@ void static ProcessGetData(CNode* pfrom)
// Track requests for our stuff. // Track requests for our stuff.
g_signals.Inventory(inv.hash); g_signals.Inventory(inv.hash);
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
break;
} }
} }
@ -3841,7 +3844,10 @@ bool ProcessMessages(CNode* pfrom)
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom); ProcessGetData(pfrom);
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk;
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin(); std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
@ -3929,6 +3935,8 @@ bool ProcessMessages(CNode* pfrom)
if (!fRet) if (!fRet)
LogPrintf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); LogPrintf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
break;
} }
// In case the connection got shut down, its receive buffer was wiped // In case the connection got shut down, its receive buffer was wiped

View file

@ -1540,6 +1540,9 @@ void ThreadMessageHandler()
CNode* pnodeTrickle = NULL; CNode* pnodeTrickle = NULL;
if (!vNodesCopy.empty()) if (!vNodesCopy.empty())
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
bool fSleep = true;
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
if (pnode->fDisconnect) if (pnode->fDisconnect)
@ -1549,8 +1552,18 @@ void ThreadMessageHandler()
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv) if (lockRecv)
{
if (!g_signals.ProcessMessages(pnode)) if (!g_signals.ProcessMessages(pnode))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if (pnode->nSendSize < SendBufferSize())
{
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
{
fSleep = false;
}
}
}
} }
boost::this_thread::interruption_point(); boost::this_thread::interruption_point();
@ -1568,8 +1581,9 @@ void ThreadMessageHandler()
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->Release(); pnode->Release();
} }
MilliSleep(100); if (fSleep)
MilliSleep(100);
} }
} }