This commit is contained in:
Gavin Andresen 2012-02-17 13:40:42 -05:00
commit 25bc37f8a2
4 changed files with 81 additions and 69 deletions

View file

@ -2355,15 +2355,15 @@ void ThreadRPCServer(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
try try
{ {
vnThreadsRunning[4]++; vnThreadsRunning[THREAD_RPCSERVER]++;
ThreadRPCServer2(parg); ThreadRPCServer2(parg);
vnThreadsRunning[4]--; vnThreadsRunning[THREAD_RPCSERVER]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[4]--; vnThreadsRunning[THREAD_RPCSERVER]--;
PrintException(&e, "ThreadRPCServer()"); PrintException(&e, "ThreadRPCServer()");
} catch (...) { } catch (...) {
vnThreadsRunning[4]--; vnThreadsRunning[THREAD_RPCSERVER]--;
PrintException(NULL, "ThreadRPCServer()"); PrintException(NULL, "ThreadRPCServer()");
} }
printf("ThreadRPCServer exiting\n"); printf("ThreadRPCServer exiting\n");
@ -2443,7 +2443,7 @@ void ThreadRPCServer2(void* parg)
#endif #endif
ip::tcp::endpoint peer; ip::tcp::endpoint peer;
vnThreadsRunning[4]--; vnThreadsRunning[THREAD_RPCSERVER]--;
#ifdef USE_SSL #ifdef USE_SSL
acceptor.accept(sslStream.lowest_layer(), peer); acceptor.accept(sslStream.lowest_layer(), peer);
#else #else

View file

@ -3390,7 +3390,7 @@ void static BitcoinMiner(CWallet *pwallet)
{ {
nLogTime = GetTime(); nLogTime = GetTime();
printf("%s ", DateTimeStrFormat("%x %H:%M", GetTime()).c_str()); printf("%s ", DateTimeStrFormat("%x %H:%M", GetTime()).c_str());
printf("hashmeter %3d CPUs %6.0f khash/s\n", vnThreadsRunning[3], dHashesPerSec/1000.0); printf("hashmeter %3d CPUs %6.0f khash/s\n", vnThreadsRunning[THREAD_MINER], dHashesPerSec/1000.0);
} }
} }
} }
@ -3401,7 +3401,7 @@ void static BitcoinMiner(CWallet *pwallet)
return; return;
if (!fGenerateBitcoins) if (!fGenerateBitcoins)
return; return;
if (fLimitProcessors && vnThreadsRunning[3] > nLimitProcessors) if (fLimitProcessors && vnThreadsRunning[THREAD_MINER] > nLimitProcessors)
return; return;
if (vNodes.empty()) if (vNodes.empty())
break; break;
@ -3424,22 +3424,22 @@ void static ThreadBitcoinMiner(void* parg)
CWallet* pwallet = (CWallet*)parg; CWallet* pwallet = (CWallet*)parg;
try try
{ {
vnThreadsRunning[3]++; vnThreadsRunning[THREAD_MINER]++;
BitcoinMiner(pwallet); BitcoinMiner(pwallet);
vnThreadsRunning[3]--; vnThreadsRunning[THREAD_MINER]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[3]--; vnThreadsRunning[THREAD_MINER]--;
PrintException(&e, "ThreadBitcoinMiner()"); PrintException(&e, "ThreadBitcoinMiner()");
} catch (...) { } catch (...) {
vnThreadsRunning[3]--; vnThreadsRunning[THREAD_MINER]--;
PrintException(NULL, "ThreadBitcoinMiner()"); PrintException(NULL, "ThreadBitcoinMiner()");
} }
UIThreadCall(boost::bind(CalledSetStatusBar, "", 0)); UIThreadCall(boost::bind(CalledSetStatusBar, "", 0));
nHPSTimerStart = 0; nHPSTimerStart = 0;
if (vnThreadsRunning[3] == 0) if (vnThreadsRunning[THREAD_MINER] == 0)
dHashesPerSec = 0; dHashesPerSec = 0;
printf("ThreadBitcoinMiner exiting, %d threads remaining\n", vnThreadsRunning[3]); printf("ThreadBitcoinMiner exiting, %d threads remaining\n", vnThreadsRunning[THREAD_MINER]);
} }
@ -3459,7 +3459,7 @@ void GenerateBitcoins(bool fGenerate, CWallet* pwallet)
nProcessors = 1; nProcessors = 1;
if (fLimitProcessors && nProcessors > nLimitProcessors) if (fLimitProcessors && nProcessors > nLimitProcessors)
nProcessors = nLimitProcessors; nProcessors = nLimitProcessors;
int nAddThreads = nProcessors - vnThreadsRunning[3]; int nAddThreads = nProcessors - vnThreadsRunning[THREAD_MINER];
printf("Starting %d BitcoinMiner threads\n", nAddThreads); printf("Starting %d BitcoinMiner threads\n", nAddThreads);
for (int i = 0; i < nAddThreads; i++) for (int i = 0; i < nAddThreads; i++)
{ {

View file

@ -38,8 +38,6 @@ bool OpenNetworkConnection(const CAddress& addrConnect);
// //
// Global state variables // Global state variables
// //
@ -49,7 +47,7 @@ uint64 nLocalServices = (fClient ? 0 : NODE_NETWORK);
CAddress addrLocalHost(CService("0.0.0.0", 0), nLocalServices); CAddress addrLocalHost(CService("0.0.0.0", 0), nLocalServices);
static CNode* pnodeLocalHost = NULL; static CNode* pnodeLocalHost = NULL;
uint64 nLocalHostNonce = 0; uint64 nLocalHostNonce = 0;
array<int, 10> vnThreadsRunning; array<int, THREAD_MAX> vnThreadsRunning;
static SOCKET hListenSocket = INVALID_SOCKET; static SOCKET hListenSocket = INVALID_SOCKET;
vector<CNode*> vNodes; vector<CNode*> vNodes;
@ -67,7 +65,6 @@ CCriticalSection cs_setservAddNodeAddresses;
unsigned short GetListenPort() unsigned short GetListenPort()
{ {
return (unsigned short)(GetArg("-port", GetDefaultPort())); return (unsigned short)(GetArg("-port", GetDefaultPort()));
@ -602,15 +599,15 @@ void ThreadSocketHandler(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadSocketHandler(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadSocketHandler(parg));
try try
{ {
vnThreadsRunning[0]++; vnThreadsRunning[THREAD_SOCKETHANDLER]++;
ThreadSocketHandler2(parg); ThreadSocketHandler2(parg);
vnThreadsRunning[0]--; vnThreadsRunning[THREAD_SOCKETHANDLER]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[0]--; vnThreadsRunning[THREAD_SOCKETHANDLER]--;
PrintException(&e, "ThreadSocketHandler()"); PrintException(&e, "ThreadSocketHandler()");
} catch (...) { } catch (...) {
vnThreadsRunning[0]--; vnThreadsRunning[THREAD_SOCKETHANDLER]--;
throw; // support pthread_cancel() throw; // support pthread_cancel()
} }
printf("ThreadSocketHandler exiting\n"); printf("ThreadSocketHandler exiting\n");
@ -712,9 +709,9 @@ void ThreadSocketHandler2(void* parg)
} }
} }
vnThreadsRunning[0]--; vnThreadsRunning[THREAD_SOCKETHANDLER]--;
int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
vnThreadsRunning[0]++; vnThreadsRunning[THREAD_SOCKETHANDLER]++;
if (fShutdown) if (fShutdown)
return; return;
if (nSelect == SOCKET_ERROR) if (nSelect == SOCKET_ERROR)
@ -927,15 +924,15 @@ void ThreadMapPort(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadMapPort(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadMapPort(parg));
try try
{ {
vnThreadsRunning[5]++; vnThreadsRunning[THREAD_UPNP]++;
ThreadMapPort2(parg); ThreadMapPort2(parg);
vnThreadsRunning[5]--; vnThreadsRunning[THREAD_UPNP]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[5]--; vnThreadsRunning[THREAD_UPNP]--;
PrintException(&e, "ThreadMapPort()"); PrintException(&e, "ThreadMapPort()");
} catch (...) { } catch (...) {
vnThreadsRunning[5]--; vnThreadsRunning[THREAD_UPNP]--;
PrintException(NULL, "ThreadMapPort()"); PrintException(NULL, "ThreadMapPort()");
} }
printf("ThreadMapPort exiting\n"); printf("ThreadMapPort exiting\n");
@ -1056,7 +1053,7 @@ void MapPort(bool fMapPort)
fUseUPnP = fMapPort; fUseUPnP = fMapPort;
WriteSetting("fUseUPnP", fUseUPnP); WriteSetting("fUseUPnP", fUseUPnP);
} }
if (fUseUPnP && vnThreadsRunning[5] < 1) if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1)
{ {
if (!CreateThread(ThreadMapPort, NULL)) if (!CreateThread(ThreadMapPort, NULL))
printf("Error: ThreadMapPort(ThreadMapPort) failed\n"); printf("Error: ThreadMapPort(ThreadMapPort) failed\n");
@ -1090,15 +1087,15 @@ void ThreadDNSAddressSeed(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadDNSAddressSeed(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadDNSAddressSeed(parg));
try try
{ {
vnThreadsRunning[6]++; vnThreadsRunning[THREAD_DNSSEED]++;
ThreadDNSAddressSeed2(parg); ThreadDNSAddressSeed2(parg);
vnThreadsRunning[6]--; vnThreadsRunning[THREAD_DNSSEED]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[6]--; vnThreadsRunning[THREAD_DNSSEED]--;
PrintException(&e, "ThreadDNSAddressSeed()"); PrintException(&e, "ThreadDNSAddressSeed()");
} catch (...) { } catch (...) {
vnThreadsRunning[6]--; vnThreadsRunning[THREAD_DNSSEED]--;
throw; // support pthread_cancel() throw; // support pthread_cancel()
} }
printf("ThreadDNSAddressSeed exiting\n"); printf("ThreadDNSAddressSeed exiting\n");
@ -1236,15 +1233,15 @@ void ThreadOpenConnections(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadOpenConnections(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadOpenConnections(parg));
try try
{ {
vnThreadsRunning[1]++; vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
ThreadOpenConnections2(parg); ThreadOpenConnections2(parg);
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
PrintException(&e, "ThreadOpenConnections()"); PrintException(&e, "ThreadOpenConnections()");
} catch (...) { } catch (...) {
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
PrintException(NULL, "ThreadOpenConnections()"); PrintException(NULL, "ThreadOpenConnections()");
} }
printf("ThreadOpenConnections exiting\n"); printf("ThreadOpenConnections exiting\n");
@ -1278,9 +1275,9 @@ void ThreadOpenConnections2(void* parg)
int64 nStart = GetTime(); int64 nStart = GetTime();
loop loop
{ {
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500); Sleep(500);
vnThreadsRunning[1]++; vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown) if (fShutdown)
return; return;
@ -1296,9 +1293,9 @@ void ThreadOpenConnections2(void* parg)
nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125)); nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
if (nOutbound < nMaxOutboundConnections) if (nOutbound < nMaxOutboundConnections)
break; break;
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(2000); Sleep(2000);
vnThreadsRunning[1]++; vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown) if (fShutdown)
return; return;
} }
@ -1410,15 +1407,15 @@ void ThreadOpenAddedConnections(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadOpenAddedConnections(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadOpenAddedConnections(parg));
try try
{ {
vnThreadsRunning[7]++; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
ThreadOpenAddedConnections2(parg); ThreadOpenAddedConnections2(parg);
vnThreadsRunning[7]--; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[7]--; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
PrintException(&e, "ThreadOpenAddedConnections()"); PrintException(&e, "ThreadOpenAddedConnections()");
} catch (...) { } catch (...) {
vnThreadsRunning[7]--; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
PrintException(NULL, "ThreadOpenAddedConnections()"); PrintException(NULL, "ThreadOpenAddedConnections()");
} }
printf("ThreadOpenAddedConnections exiting\n"); printf("ThreadOpenAddedConnections exiting\n");
@ -1467,9 +1464,9 @@ void ThreadOpenAddedConnections2(void* parg)
} }
if (fShutdown) if (fShutdown)
return; return;
vnThreadsRunning[7]--; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
Sleep(120000); // Retry every 2 minutes Sleep(120000); // Retry every 2 minutes
vnThreadsRunning[7]++; vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
if (fShutdown) if (fShutdown)
return; return;
} }
@ -1486,9 +1483,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect)
FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect)) FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect))
return false; return false;
vnThreadsRunning[1]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CNode* pnode = ConnectNode(addrConnect); CNode* pnode = ConnectNode(addrConnect);
vnThreadsRunning[1]++; vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown) if (fShutdown)
return false; return false;
if (!pnode) if (!pnode)
@ -1510,15 +1507,15 @@ void ThreadMessageHandler(void* parg)
IMPLEMENT_RANDOMIZE_STACK(ThreadMessageHandler(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadMessageHandler(parg));
try try
{ {
vnThreadsRunning[2]++; vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
ThreadMessageHandler2(parg); ThreadMessageHandler2(parg);
vnThreadsRunning[2]--; vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[2]--; vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
PrintException(&e, "ThreadMessageHandler()"); PrintException(&e, "ThreadMessageHandler()");
} catch (...) { } catch (...) {
vnThreadsRunning[2]--; vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
PrintException(NULL, "ThreadMessageHandler()"); PrintException(NULL, "ThreadMessageHandler()");
} }
printf("ThreadMessageHandler exiting\n"); printf("ThreadMessageHandler exiting\n");
@ -1566,11 +1563,11 @@ void ThreadMessageHandler2(void* parg)
// Wait and allow messages to bunch up. // Wait and allow messages to bunch up.
// Reduce vnThreadsRunning so StopNode has permission to exit while // Reduce vnThreadsRunning so StopNode has permission to exit while
// we're sleeping, but we must always check fShutdown after doing this. // we're sleeping, but we must always check fShutdown after doing this.
vnThreadsRunning[2]--; vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
Sleep(100); Sleep(100);
if (fRequestShutdown) if (fRequestShutdown)
Shutdown(NULL); Shutdown(NULL);
vnThreadsRunning[2]++; vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
if (fShutdown) if (fShutdown)
return; return;
} }
@ -1773,23 +1770,26 @@ bool StopNode()
fShutdown = true; fShutdown = true;
nTransactionsUpdated++; nTransactionsUpdated++;
int64 nStart = GetTime(); int64 nStart = GetTime();
while (vnThreadsRunning[0] > 0 || vnThreadsRunning[1] > 0 || vnThreadsRunning[2] > 0 || vnThreadsRunning[3] > 0 || vnThreadsRunning[4] > 0 do
|| (fHaveUPnP && vnThreadsRunning[5] > 0) || vnThreadsRunning[6] > 0 || vnThreadsRunning[7] > 0
)
{ {
int nThreadsRunning = 0;
for (int n = 0; n < THREAD_MAX; n++)
nThreadsRunning += vnThreadsRunning[n];
if (nThreadsRunning == 0)
break;
if (GetTime() - nStart > 20) if (GetTime() - nStart > 20)
break; break;
Sleep(20); Sleep(20);
} } while(true);
if (vnThreadsRunning[0] > 0) printf("ThreadSocketHandler still running\n"); if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n");
if (vnThreadsRunning[1] > 0) printf("ThreadOpenConnections still running\n"); if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
if (vnThreadsRunning[2] > 0) printf("ThreadMessageHandler still running\n"); if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[3] > 0) printf("ThreadBitcoinMiner still running\n"); if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
if (vnThreadsRunning[4] > 0) printf("ThreadRPCServer still running\n"); if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
if (fHaveUPnP && vnThreadsRunning[5] > 0) printf("ThreadMapPort still running\n"); if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
if (vnThreadsRunning[6] > 0) printf("ThreadDNSAddressSeed still running\n"); if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
if (vnThreadsRunning[7] > 0) printf("ThreadOpenAddedConnections still running\n"); if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
while (vnThreadsRunning[2] > 0 || vnThreadsRunning[4] > 0) while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
Sleep(20); Sleep(20);
Sleep(50); Sleep(50);

View file

@ -68,14 +68,26 @@ public:
enum threadId
{
THREAD_SOCKETHANDLER,
THREAD_OPENCONNECTIONS,
THREAD_MESSAGEHANDLER,
THREAD_MINER,
THREAD_RPCSERVER,
THREAD_UPNP,
THREAD_DNSSEED,
THREAD_ADDEDCONNECTIONS,
THREAD_MAX
};
extern bool fClient; extern bool fClient;
extern bool fAllowDNS; extern bool fAllowDNS;
extern uint64 nLocalServices; extern uint64 nLocalServices;
extern CAddress addrLocalHost; extern CAddress addrLocalHost;
extern uint64 nLocalHostNonce; extern uint64 nLocalHostNonce;
extern boost::array<int, 10> vnThreadsRunning; extern boost::array<int, THREAD_MAX> vnThreadsRunning;
extern std::vector<CNode*> vNodes; extern std::vector<CNode*> vNodes;
extern CCriticalSection cs_vNodes; extern CCriticalSection cs_vNodes;