net: fix a few races. Credit @TheBlueMatt

These are (afaik) all long-standing races or concurrent accesses. Going
forward, we can clean these up so that they're not all individual atomic
accesses.

- Reintroduce cs_vRecv to guard receive-specific vars
- Lock vRecv/vSend for CNodeStats
- Make some vars atomic.
- Only set the connection time in CNode's constructor so that it doesn't change
This commit is contained in:
Cory Fields 2017-02-06 02:34:57 -05:00 committed by Matt Corallo
parent 2447c1024e
commit 321d0fc6b6
3 changed files with 22 additions and 15 deletions

View file

@ -389,7 +389,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices); pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
pnode->nTimeConnected = GetSystemTimeInSeconds();
pnode->AddRef(); pnode->AddRef();
return pnode; return pnode;
@ -612,10 +611,16 @@ void CNode::copyStats(CNodeStats &stats)
X(fInbound); X(fInbound);
X(fAddnode); X(fAddnode);
X(nStartingHeight); X(nStartingHeight);
X(nSendBytes); {
X(mapSendBytesPerMsgCmd); LOCK(cs_vSend);
X(nRecvBytes); X(mapSendBytesPerMsgCmd);
X(mapRecvBytesPerMsgCmd); X(nSendBytes);
}
{
LOCK(cs_vRecv);
X(mapRecvBytesPerMsgCmd);
X(nRecvBytes);
}
X(fWhitelisted); X(fWhitelisted);
// It is common for nodes with good ping times to suddenly become lagged, // It is common for nodes with good ping times to suddenly become lagged,
@ -643,6 +648,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
{ {
complete = false; complete = false;
int64_t nTimeMicros = GetTimeMicros(); int64_t nTimeMicros = GetTimeMicros();
LOCK(cs_vRecv);
nLastRecv = nTimeMicros / 1000000; nLastRecv = nTimeMicros / 1000000;
nRecvBytes += nBytes; nRecvBytes += nBytes;
while (nBytes > 0) { while (nBytes > 0) {

View file

@ -573,6 +573,7 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg; std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
CCriticalSection cs_hSocket; CCriticalSection cs_hSocket;
CCriticalSection cs_vRecv;
CCriticalSection cs_vProcessMsg; CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg; std::list<CNetMessage> vProcessMsg;
@ -584,10 +585,10 @@ public:
uint64_t nRecvBytes; uint64_t nRecvBytes;
std::atomic<int> nRecvVersion; std::atomic<int> nRecvVersion;
int64_t nLastSend; std::atomic<int64_t> nLastSend;
int64_t nLastRecv; std::atomic<int64_t> nLastRecv;
int64_t nTimeConnected; int64_t nTimeConnected;
int64_t nTimeOffset; std::atomic<int64_t> nTimeOffset;
const CAddress addr; const CAddress addr;
std::string addrName; std::string addrName;
CService addrLocal; CService addrLocal;
@ -614,7 +615,7 @@ public:
CSemaphoreGrant grantOutbound; CSemaphoreGrant grantOutbound;
CCriticalSection cs_filter; CCriticalSection cs_filter;
CBloomFilter* pfilter; CBloomFilter* pfilter;
int nRefCount; std::atomic<int> nRefCount;
const NodeId id; const NodeId id;
const uint64_t nKeyedNetGroup; const uint64_t nKeyedNetGroup;
@ -665,15 +666,15 @@ public:
// Ping time measurement: // Ping time measurement:
// The pong reply we're expecting, or 0 if no pong expected. // The pong reply we're expecting, or 0 if no pong expected.
uint64_t nPingNonceSent; std::atomic<uint64_t> nPingNonceSent;
// Time (in usec) the last ping was sent, or 0 if no ping was ever sent. // Time (in usec) the last ping was sent, or 0 if no ping was ever sent.
int64_t nPingUsecStart; std::atomic<int64_t> nPingUsecStart;
// Last measured round-trip time. // Last measured round-trip time.
int64_t nPingUsecTime; std::atomic<int64_t> nPingUsecTime;
// Best measured round-trip time. // Best measured round-trip time.
int64_t nMinPingUsecTime; std::atomic<int64_t> nMinPingUsecTime;
// Whether a ping is requested. // Whether a ping is requested.
bool fPingQueued; std::atomic<bool> fPingQueued;
// Minimum fee rate with which to filter inv's to this node // Minimum fee rate with which to filter inv's to this node
CAmount minFeeFilter; CAmount minFeeFilter;
CCriticalSection cs_feeFilter; CCriticalSection cs_feeFilter;

View file

@ -2450,7 +2450,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (pingUsecTime > 0) { if (pingUsecTime > 0) {
// Successful ping time measurement, replace previous // Successful ping time measurement, replace previous
pfrom->nPingUsecTime = pingUsecTime; pfrom->nPingUsecTime = pingUsecTime;
pfrom->nMinPingUsecTime = std::min(pfrom->nMinPingUsecTime, pingUsecTime); pfrom->nMinPingUsecTime = std::min(pfrom->nMinPingUsecTime.load(), pingUsecTime);
} else { } else {
// This should never happen // This should never happen
sProblem = "Timing mishap"; sProblem = "Timing mishap";