diff --git a/src/i2p.cpp b/src/i2p.cpp index 156c8024339..53f4c5f43b3 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -119,7 +119,7 @@ namespace sam { Session::Session(const fs::path& private_key_file, const Proxy& control_host, - CThreadInterrupt* interrupt) + std::shared_ptr interrupt) : m_private_key_file{private_key_file}, m_control_host{control_host}, m_interrupt{interrupt}, @@ -127,7 +127,7 @@ Session::Session(const fs::path& private_key_file, { } -Session::Session(const Proxy& control_host, CThreadInterrupt* interrupt) +Session::Session(const Proxy& control_host, std::shared_ptr interrupt) : m_control_host{control_host}, m_interrupt{interrupt}, m_transient{true} @@ -162,7 +162,7 @@ bool Session::Accept(Connection& conn) std::string errmsg; bool disconnect{false}; - while (!*m_interrupt) { + while (!m_interrupt->interrupted()) { Sock::Event occurred; if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) { errmsg = "wait on socket failed"; @@ -205,7 +205,7 @@ bool Session::Accept(Connection& conn) return true; } - if (*m_interrupt) { + if (m_interrupt->interrupted()) { LogPrintLevel(BCLog::I2P, BCLog::Level::Debug, "Accept was interrupted\n"); } else { LogPrintLevel(BCLog::I2P, BCLog::Level::Debug, "Error accepting%s: %s\n", disconnect ? " (will close the session)" : "", errmsg); diff --git a/src/i2p.h b/src/i2p.h index 153263399df..a41406f380f 100644 --- a/src/i2p.h +++ b/src/i2p.h @@ -63,13 +63,11 @@ public: * private key will be generated and saved into the file. * @param[in] control_host Location of the SAM proxy. * @param[in,out] interrupt If this is signaled then all operations are canceled as soon as - * possible and executing methods throw an exception. Notice: only a pointer to the - * `CThreadInterrupt` object is saved, so it must not be destroyed earlier than this - * `Session` object. + * possible and executing methods throw an exception. */ Session(const fs::path& private_key_file, const Proxy& control_host, - CThreadInterrupt* interrupt); + std::shared_ptr interrupt); /** * Construct a transient session which will generate its own I2P private key @@ -78,11 +76,9 @@ public: * the session will be lazily created later when first used. * @param[in] control_host Location of the SAM proxy. * @param[in,out] interrupt If this is signaled then all operations are canceled as soon as - * possible and executing methods throw an exception. Notice: only a pointer to the - * `CThreadInterrupt` object is saved, so it must not be destroyed earlier than this - * `Session` object. + * possible and executing methods throw an exception. */ - Session(const Proxy& control_host, CThreadInterrupt* interrupt); + Session(const Proxy& control_host, std::shared_ptr interrupt); /** * Destroy the session, closing the internally used sockets. The sockets that have been @@ -235,7 +231,7 @@ private: /** * Cease network activity when this is signaled. */ - CThreadInterrupt* const m_interrupt; + const std::shared_ptr m_interrupt; /** * Mutex protecting the members that can be concurrently accessed. diff --git a/src/net.cpp b/src/net.cpp index fc0edc1a5c1..f2f53def8af 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -478,7 +478,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo LOCK(m_unused_i2p_sessions_mutex); if (m_unused_i2p_sessions.empty()) { i2p_transient_session = - std::make_unique(proxy, &interruptNet); + std::make_unique(proxy, m_interrupt_net); } else { i2p_transient_session.swap(m_unused_i2p_sessions.front()); m_unused_i2p_sessions.pop(); @@ -2101,7 +2101,7 @@ void CConnman::SocketHandler() // empty sets. events_per_sock = GenerateWaitSockets(snap.Nodes()); if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) { - interruptNet.sleep_for(timeout); + m_interrupt_net->sleep_for(timeout); } // Service (send/receive) each of the already connected nodes. @@ -2118,8 +2118,9 @@ void CConnman::SocketHandlerConnected(const std::vector& nodes, AssertLockNotHeld(m_total_bytes_sent_mutex); for (CNode* pnode : nodes) { - if (interruptNet) + if (m_interrupt_net->interrupted()) { return; + } // // Receive @@ -2214,7 +2215,7 @@ void CConnman::SocketHandlerConnected(const std::vector& nodes, void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) { for (const ListenSocket& listen_socket : vhListenSocket) { - if (interruptNet) { + if (m_interrupt_net->interrupted()) { return; } const auto it = events_per_sock.find(listen_socket.sock); @@ -2228,8 +2229,7 @@ void CConnman::ThreadSocketHandler() { AssertLockNotHeld(m_total_bytes_sent_mutex); - while (!interruptNet) - { + while (!m_interrupt_net->interrupted()) { DisconnectNodes(); NotifyNumConnectionsChanged(); SocketHandler(); @@ -2253,9 +2253,10 @@ void CConnman::ThreadDNSAddressSeed() auto start = NodeClock::now(); constexpr std::chrono::seconds SEEDNODE_TIMEOUT = 30s; LogPrintf("-seednode enabled. Trying the provided seeds for %d seconds before defaulting to the dnsseeds.\n", SEEDNODE_TIMEOUT.count()); - while (!interruptNet) { - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + while (!m_interrupt_net->interrupted()) { + if (!m_interrupt_net->sleep_for(500ms)) { return; + } // Abort if we have spent enough time without reaching our target. // Giving seed nodes 30 seconds so this does not become a race against fixedseeds (which triggers after 1 min) @@ -2316,7 +2317,7 @@ void CConnman::ThreadDNSAddressSeed() // early to see if we have enough peers and can stop // this thread entirely freeing up its resources std::chrono::seconds w = std::min(DNSSEEDS_DELAY_FEW_PEERS, to_wait); - if (!interruptNet.sleep_for(w)) return; + if (!m_interrupt_net->sleep_for(w)) return; to_wait -= w; if (GetFullOutboundConnCount() >= SEED_OUTBOUND_CONNECTION_THRESHOLD) { @@ -2332,13 +2333,13 @@ void CConnman::ThreadDNSAddressSeed() } } - if (interruptNet) return; + if (m_interrupt_net->interrupted()) return; // hold off on querying seeds if P2P network deactivated if (!fNetworkActive) { LogPrintf("Waiting for network to be reactivated before querying DNS seeds.\n"); do { - if (!interruptNet.sleep_for(std::chrono::seconds{1})) return; + if (!m_interrupt_net->sleep_for(1s)) return; } while (!fNetworkActive); } @@ -2533,12 +2534,14 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/use_v2transport); for (int i = 0; i < 10 && i < nLoop; i++) { - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + if (!m_interrupt_net->sleep_for(500ms)) { return; + } } } - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + if (!m_interrupt_net->sleep_for(500ms)) { return; + } PerformReconnections(); } } @@ -2562,8 +2565,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std LogPrintf("Fixed seeds are disabled\n"); } - while (!interruptNet) - { + while (!m_interrupt_net->interrupted()) { if (add_addr_fetch) { add_addr_fetch = false; const auto& seed{SpanPopBack(seed_nodes)}; @@ -2578,14 +2580,16 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std ProcessAddrFetch(); - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + if (!m_interrupt_net->sleep_for(500ms)) { return; + } PerformReconnections(); CSemaphoreGrant grant(*semOutbound); - if (interruptNet) + if (m_interrupt_net->interrupted()) { return; + } const std::unordered_set fixed_seed_networks{GetReachableEmptyNetworks()}; if (add_fixed_seeds && !fixed_seed_networks.empty()) { @@ -2759,8 +2763,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std int nTries = 0; const auto reachable_nets{g_reachable_nets.All()}; - while (!interruptNet) - { + while (!m_interrupt_net->interrupted()) { if (anchor && !m_anchors.empty()) { const CAddress addr = m_anchors.back(); m_anchors.pop_back(); @@ -2862,7 +2865,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std if (addrConnect.IsValid()) { if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. - if (!interruptNet.sleep_for(rng.rand_uniform_duration(FEELER_SLEEP_WINDOW))) { + if (!m_interrupt_net->sleep_for(rng.rand_uniform_duration(FEELER_SLEEP_WINDOW))) { return; } LogDebug(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort()); @@ -2973,14 +2976,15 @@ void CConnman::ThreadOpenAddedConnections() tried = true; CAddress addr(CService(), NODE_NONE); OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport); - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + if (!m_interrupt_net->sleep_for(500ms)) return; grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true); } // See if any reconnections are desired. PerformReconnections(); // Retry every 60 seconds if a connection was attempted, otherwise two seconds - if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) + if (!m_interrupt_net->sleep_for(tried ? 60s : 2s)) { return; + } } } @@ -2993,7 +2997,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai // // Initiate outbound network connection // - if (interruptNet) { + if (m_interrupt_net->interrupted()) { return; } if (!fNetworkActive) { @@ -3081,13 +3085,13 @@ void CConnman::ThreadI2PAcceptIncoming() i2p::Connection conn; auto SleepOnFailure = [&]() { - interruptNet.sleep_for(err_wait); + m_interrupt_net->sleep_for(err_wait); if (err_wait < err_wait_cap) { err_wait += 1s; } }; - while (!interruptNet) { + while (!m_interrupt_net->interrupted()) { if (!m_i2p_sam_session->Listen(conn)) { if (advertising_listen_addr && conn.me.IsValid()) { @@ -3209,12 +3213,18 @@ void CConnman::SetNetworkActive(bool active) } } -CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In, AddrMan& addrman_in, - const NetGroupManager& netgroupman, const CChainParams& params, bool network_active) +CConnman::CConnman(uint64_t nSeed0In, + uint64_t nSeed1In, + AddrMan& addrman_in, + const NetGroupManager& netgroupman, + const CChainParams& params, + bool network_active, + std::shared_ptr interrupt_net) : addrman(addrman_in) , m_netgroupman{netgroupman} , nSeed0(nSeed0In) , nSeed1(nSeed1In) + , m_interrupt_net{interrupt_net} , m_params(params) { SetTryNewOutboundPeer(false); @@ -3310,7 +3320,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) Proxy i2p_sam; if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) { m_i2p_sam_session = std::make_unique(gArgs.GetDataDirNet() / "i2p_private_key", - i2p_sam, &interruptNet); + i2p_sam, m_interrupt_net); } // Randomize the order in which we may query seednode to potentially prevent connecting to the same one every restart (and signal that we have restarted) @@ -3347,7 +3357,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) // Start threads // assert(m_msgproc); - interruptNet.reset(); + m_interrupt_net->reset(); flagInterruptMsgProc = false; { @@ -3423,7 +3433,7 @@ void CConnman::Interrupt() } condMsgProc.notify_all(); - interruptNet(); + (*m_interrupt_net)(); g_socks5_interrupt(); if (semOutbound) { diff --git a/src/net.h b/src/net.h index 9fdec52115e..aab4dd1f8ca 100644 --- a/src/net.h +++ b/src/net.h @@ -1117,8 +1117,13 @@ public: whitelist_relay = connOptions.whitelist_relay; } - CConnman(uint64_t seed0, uint64_t seed1, AddrMan& addrman, const NetGroupManager& netgroupman, - const CChainParams& params, bool network_active = true); + CConnman(uint64_t seed0, + uint64_t seed1, + AddrMan& addrman, + const NetGroupManager& netgroupman, + const CChainParams& params, + bool network_active = true, + std::shared_ptr interrupt_net = std::make_shared()); ~CConnman(); @@ -1542,11 +1547,9 @@ private: /** * This is signaled when network activity should cease. - * A pointer to it is saved in `m_i2p_sam_session`, so make sure that - * the lifetime of `interruptNet` is not shorter than - * the lifetime of `m_i2p_sam_session`. + * A copy of this is saved in `m_i2p_sam_session`. */ - CThreadInterrupt interruptNet; + const std::shared_ptr m_interrupt_net; /** * I2P SAM session. diff --git a/src/test/fuzz/connman.cpp b/src/test/fuzz/connman.cpp index 17fac1d4f12..e2257d97d3d 100644 --- a/src/test/fuzz/connman.cpp +++ b/src/test/fuzz/connman.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -70,7 +71,8 @@ FUZZ_TARGET(connman, .init = initialize_connman) addr_man, netgroupman, Params(), - fuzzed_data_provider.ConsumeBool()}; + fuzzed_data_provider.ConsumeBool(), + ConsumeThreadInterrupt(fuzzed_data_provider)}; const uint64_t max_outbound_limit{fuzzed_data_provider.ConsumeIntegral()}; CConnman::Options options; diff --git a/src/test/fuzz/i2p.cpp b/src/test/fuzz/i2p.cpp index 29a11123d9a..64f30719c58 100644 --- a/src/test/fuzz/i2p.cpp +++ b/src/test/fuzz/i2p.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -35,15 +36,15 @@ FUZZ_TARGET(i2p, .init = initialize_i2p) const fs::path private_key_path = gArgs.GetDataDirNet() / "fuzzed_i2p_private_key"; const CService addr{in6_addr(IN6ADDR_LOOPBACK_INIT), 7656}; const Proxy sam_proxy{addr, /*tor_stream_isolation=*/false}; - CThreadInterrupt interrupt; + auto interrupt{ConsumeThreadInterrupt(fuzzed_data_provider)}; - i2p::sam::Session session{private_key_path, sam_proxy, &interrupt}; + i2p::sam::Session session{private_key_path, sam_proxy, interrupt}; i2p::Connection conn; if (session.Listen(conn)) { if (session.Accept(conn)) { try { - (void)conn.sock->RecvUntilTerminator('\n', 10ms, interrupt, i2p::sam::MAX_MSG_SIZE); + (void)conn.sock->RecvUntilTerminator('\n', 10ms, *interrupt, i2p::sam::MAX_MSG_SIZE); } catch (const std::runtime_error&) { } } @@ -53,7 +54,7 @@ FUZZ_TARGET(i2p, .init = initialize_i2p) if (session.Connect(CService{}, conn, proxy_error)) { try { - conn.sock->SendComplete("verack\n", 10ms, interrupt); + conn.sock->SendComplete("verack\n", 10ms, *interrupt); } catch (const std::runtime_error&) { } } diff --git a/src/test/fuzz/util/CMakeLists.txt b/src/test/fuzz/util/CMakeLists.txt index 878286b0f44..282de3d708f 100644 --- a/src/test/fuzz/util/CMakeLists.txt +++ b/src/test/fuzz/util/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(test_fuzz STATIC EXCLUDE_FROM_ALL descriptor.cpp mempool.cpp net.cpp + threadinterrupt.cpp ../fuzz.cpp ../util.cpp ) diff --git a/src/test/fuzz/util/threadinterrupt.cpp b/src/test/fuzz/util/threadinterrupt.cpp new file mode 100644 index 00000000000..5dd87e05888 --- /dev/null +++ b/src/test/fuzz/util/threadinterrupt.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +FuzzedThreadInterrupt::FuzzedThreadInterrupt(FuzzedDataProvider& fuzzed_data_provider) + : m_fuzzed_data_provider{fuzzed_data_provider} +{ +} + +bool FuzzedThreadInterrupt::interrupted() const +{ + return m_fuzzed_data_provider.ConsumeBool(); +} + +bool FuzzedThreadInterrupt::sleep_for(Clock::duration) +{ + SetMockTime(ConsumeTime(m_fuzzed_data_provider)); // Time could go backwards. + return m_fuzzed_data_provider.ConsumeBool(); +} diff --git a/src/test/fuzz/util/threadinterrupt.h b/src/test/fuzz/util/threadinterrupt.h new file mode 100644 index 00000000000..d56aefd919f --- /dev/null +++ b/src/test/fuzz/util/threadinterrupt.h @@ -0,0 +1,33 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_TEST_FUZZ_UTIL_THREADINTERRUPT_H +#define BITCOIN_TEST_FUZZ_UTIL_THREADINTERRUPT_H + +#include +#include + +#include + +/** + * Mocked CThreadInterrupt that returns "randomly" whether it is interrupted and never sleeps. + */ +class FuzzedThreadInterrupt : public CThreadInterrupt +{ +public: + explicit FuzzedThreadInterrupt(FuzzedDataProvider& fuzzed_data_provider); + + virtual bool interrupted() const override; + virtual bool sleep_for(Clock::duration) override; + +private: + FuzzedDataProvider& m_fuzzed_data_provider; +}; + +[[nodiscard]] inline std::shared_ptr ConsumeThreadInterrupt(FuzzedDataProvider& fuzzed_data_provider) +{ + return std::make_shared(fuzzed_data_provider); +} + +#endif // BITCOIN_TEST_FUZZ_UTIL_THREADINTERRUPT_H diff --git a/src/test/i2p_tests.cpp b/src/test/i2p_tests.cpp index bdb3408b66c..da9cbb5d228 100644 --- a/src/test/i2p_tests.cpp +++ b/src/test/i2p_tests.cpp @@ -50,10 +50,10 @@ BOOST_AUTO_TEST_CASE(unlimited_recv) return std::make_unique(std::string(i2p::sam::MAX_MSG_SIZE + 1, 'a')); }; - CThreadInterrupt interrupt; + auto interrupt{std::make_shared()}; const std::optional addr{Lookup("127.0.0.1", 9000, false)}; const Proxy sam_proxy(addr.value(), /*tor_stream_isolation=*/false); - i2p::sam::Session session(gArgs.GetDataDirNet() / "test_i2p_private_key", sam_proxy, &interrupt); + i2p::sam::Session session(gArgs.GetDataDirNet() / "test_i2p_private_key", sam_proxy, interrupt); { ASSERT_DEBUG_LOG("Creating persistent SAM session"); @@ -112,12 +112,12 @@ BOOST_AUTO_TEST_CASE(listen_ok_accept_fail) // clang-format on }; - CThreadInterrupt interrupt; + auto interrupt{std::make_shared()}; const CService addr{in6_addr(IN6ADDR_LOOPBACK_INIT), /*port=*/7656}; const Proxy sam_proxy(addr, /*tor_stream_isolation=*/false); i2p::sam::Session session(gArgs.GetDataDirNet() / "test_i2p_private_key", sam_proxy, - &interrupt); + interrupt); i2p::Connection conn; for (size_t i = 0; i < 5; ++i) { @@ -155,10 +155,10 @@ BOOST_AUTO_TEST_CASE(damaged_private_key) "391 bytes"}}) { BOOST_REQUIRE(WriteBinaryFile(i2p_private_key_file, file_contents)); - CThreadInterrupt interrupt; + auto interrupt{std::make_shared()}; const CService addr{in6_addr(IN6ADDR_LOOPBACK_INIT), /*port=*/7656}; const Proxy sam_proxy{addr, /*tor_stream_isolation=*/false}; - i2p::sam::Session session(i2p_private_key_file, sam_proxy, &interrupt); + i2p::sam::Session session(i2p_private_key_file, sam_proxy, interrupt); { ASSERT_DEBUG_LOG("Creating persistent SAM session"); diff --git a/src/util/threadinterrupt.cpp b/src/util/threadinterrupt.cpp index 3ea406d4a8e..aaa9e831a91 100644 --- a/src/util/threadinterrupt.cpp +++ b/src/util/threadinterrupt.cpp @@ -9,11 +9,16 @@ CThreadInterrupt::CThreadInterrupt() : flag(false) {} -CThreadInterrupt::operator bool() const +bool CThreadInterrupt::interrupted() const { return flag.load(std::memory_order_acquire); } +CThreadInterrupt::operator bool() const +{ + return interrupted(); +} + void CThreadInterrupt::reset() { flag.store(false, std::memory_order_release); diff --git a/src/util/threadinterrupt.h b/src/util/threadinterrupt.h index 0b79b382763..8b393c26dfc 100644 --- a/src/util/threadinterrupt.h +++ b/src/util/threadinterrupt.h @@ -27,11 +27,27 @@ class CThreadInterrupt { public: using Clock = std::chrono::steady_clock; + CThreadInterrupt(); - explicit operator bool() const; - void operator()() EXCLUSIVE_LOCKS_REQUIRED(!mut); - void reset(); - bool sleep_for(Clock::duration rel_time) EXCLUSIVE_LOCKS_REQUIRED(!mut); + + virtual ~CThreadInterrupt() = default; + + /// Return true if `operator()()` has been called. + virtual bool interrupted() const; + + /// An alias for `interrupted()`. + virtual explicit operator bool() const; + + /// Interrupt any sleeps. After this `interrupted()` will return `true`. + virtual void operator()() EXCLUSIVE_LOCKS_REQUIRED(!mut); + + /// Reset to an non-interrupted state. + virtual void reset(); + + /// Sleep for the given duration. + /// @retval true The time passed. + /// @retval false The sleep was interrupted. + virtual bool sleep_for(Clock::duration rel_time) EXCLUSIVE_LOCKS_REQUIRED(!mut); private: std::condition_variable cond;