From cfe5eba4463ff289b46ba0b3f4708f7cd3f1e9fe Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Sat, 29 Mar 2025 10:03:49 -0400 Subject: [PATCH] Introduce a new low-level socket managing class `SockMan` --- src/CMakeLists.txt | 3 +- src/common/sockman.cpp | 535 +++++++++++++++++++++++++++++++++++++++++ src/common/sockman.h | 463 +++++++++++++++++++++++++++++++++++ 3 files changed, 1000 insertions(+), 1 deletion(-) create mode 100644 src/common/sockman.cpp create mode 100644 src/common/sockman.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e9a67faa51a..3cdba188fb4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -145,6 +145,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL common/run_command.cpp common/settings.cpp common/signmessage.cpp + common/sockman.cpp common/system.cpp common/url.cpp compressor.cpp @@ -152,6 +153,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL core_write.cpp deploymentinfo.cpp external_signer.cpp + i2p.cpp init/common.cpp kernel/chainparams.cpp key.cpp @@ -229,7 +231,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL headerssync.cpp httprpc.cpp httpserver.cpp - i2p.cpp index/base.cpp index/blockfilterindex.cpp index/coinstatsindex.cpp diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp new file mode 100644 index 00000000000..ee5389cf47c --- /dev/null +++ b/src/common/sockman.cpp @@ -0,0 +1,535 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#include // IWYU pragma: keep + +#include +#include +#include +#include +#include + +#include + +// The set of sockets cannot be modified while waiting +// The sleep time needs to be small to avoid new sockets stalling +static constexpr auto SELECT_TIMEOUT{50ms}; + +/** Get the bind address for a socket as CService. */ +static CService GetBindAddress(const Sock& sock) +{ + CService addr_bind; + struct sockaddr_storage sockaddr_bind; + socklen_t sockaddr_bind_len = sizeof(sockaddr_bind); + if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) { + addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len); + } else { + LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n"); + } + return addr_bind; +} + +bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) +{ + // Create socket for listening for incoming connections + sockaddr_storage storage; + socklen_t len{sizeof(storage)}; + if (!to.GetSockAddr(reinterpret_cast(&storage), &len)) { + err_msg = Untranslated(strprintf("Bind address family for %s not supported", to.ToStringAddrPort())); + return false; + } + + std::unique_ptr sock{CreateSock(to.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP)}; + if (!sock) { + err_msg = Untranslated(strprintf("Cannot create %s listen socket: %s", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError()))); + return false; + } + + int one{1}; + + // Allow binding if the port is still in TIME_WAIT state after + // the program was closed and restarted. + if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&one), sizeof(one)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set SO_REUSEADDR on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); + } + + // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option + // and enable it by default or not. Try to enable it, if possible. + if (to.IsIPv6()) { +#ifdef IPV6_V6ONLY + if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast(&one), sizeof(one)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set IPV6_V6ONLY on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); + } +#endif +#ifdef WIN32 + int prot_level{PROTECTION_LEVEL_UNRESTRICTED}; + if (sock->SetSockOpt(IPPROTO_IPV6, + IPV6_PROTECTION_LEVEL, + reinterpret_cast(&prot_level), + sizeof(prot_level)) == SOCKET_ERROR) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Info, + "Cannot set IPV6_PROTECTION_LEVEL on %s listen socket: %s, continuing anyway\n", + to.ToStringAddrPort(), + NetworkErrorString(WSAGetLastError())); + } +#endif + } + + if (sock->Bind(reinterpret_cast(&storage), len) == SOCKET_ERROR) { + const int err{WSAGetLastError()}; + if (err == WSAEADDRINUSE) { + err_msg = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), + to.ToStringAddrPort(), + CLIENT_NAME); + } else { + err_msg = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), + to.ToStringAddrPort(), + NetworkErrorString(err)); + } + return false; + } + + // Listen for incoming connections + if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) { + err_msg = strprintf(_("Cannot listen on %s: %s"), to.ToStringAddrPort(), NetworkErrorString(WSAGetLastError())); + return false; + } + + m_listen.emplace_back(std::move(sock)); + + return true; +} + +void SockMan::StartSocketsThreads(const Options& options) +{ + m_thread_socket_handler = std::thread( + &util::TraceThread, options.socket_handler_thread_name, [this] { ThreadSocketHandler(); }); + + if (options.i2p.has_value()) { + m_i2p_sam_session = std::make_unique( + options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet); + + m_thread_i2p_accept = + std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); }); + } +} + +void SockMan::JoinSocketsThreads() +{ + if (m_thread_i2p_accept.joinable()) { + m_thread_i2p_accept.join(); + } + + if (m_thread_socket_handler.joinable()) { + m_thread_socket_handler.join(); + } +} + +std::optional +SockMan::ConnectAndMakeId(const std::variant& to, + bool is_important, + std::optional proxy, + bool& proxy_failed, + CService& me) +{ + AssertLockNotHeld(m_connected_mutex); + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + + std::unique_ptr sock; + std::unique_ptr i2p_transient_session; + + Assume(!me.IsValid()); + + if (std::holds_alternative(to)) { + const CService& addr_to{std::get(to)}; + if (addr_to.IsI2P()) { + if (!Assume(proxy.has_value())) { + return std::nullopt; + } + + i2p::Connection conn; + bool connected{false}; + + if (m_i2p_sam_session) { + connected = m_i2p_sam_session->Connect(addr_to, conn, proxy_failed); + } else { + { + LOCK(m_unused_i2p_sessions_mutex); + if (m_unused_i2p_sessions.empty()) { + i2p_transient_session = std::make_unique(proxy.value(), &interruptNet); + } else { + i2p_transient_session.swap(m_unused_i2p_sessions.front()); + m_unused_i2p_sessions.pop(); + } + } + connected = i2p_transient_session->Connect(addr_to, conn, proxy_failed); + if (!connected) { + LOCK(m_unused_i2p_sessions_mutex); + if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) { + m_unused_i2p_sessions.emplace(i2p_transient_session.release()); + } + } + } + + if (connected) { + sock = std::move(conn.sock); + me = conn.me; + } + } else if (proxy.has_value()) { + sock = ConnectThroughProxy(proxy.value(), addr_to.ToStringAddr(), addr_to.GetPort(), proxy_failed); + } else { + sock = ConnectDirectly(addr_to, is_important); + } + } else { + if (!Assume(proxy.has_value())) { + return std::nullopt; + } + + const auto& hostport{std::get(to)}; + + bool dummy_proxy_failed; + sock = ConnectThroughProxy(proxy.value(), hostport.host, hostport.port, dummy_proxy_failed); + } + + if (!sock) { + return std::nullopt; + } + + if (!me.IsValid()) { + me = GetBindAddress(*sock); + } + + const Id id{GetNewId()}; + + { + LOCK(m_connected_mutex); + m_connected.emplace(id, std::make_shared(std::move(sock), + std::move(i2p_transient_session))); + } + + return id; +} + +bool SockMan::CloseConnection(Id id) +{ + LOCK(m_connected_mutex); + return m_connected.erase(id) > 0; +} + +ssize_t SockMan::SendBytes(Id id, + std::span data, + bool will_send_more, + std::string& errmsg) const +{ + AssertLockNotHeld(m_connected_mutex); + + if (data.empty()) { + return 0; + } + + auto sockets{GetConnectionSockets(id)}; + if (!sockets) { + // Bail out immediately and just leave things in the caller's send queue. + return 0; + } + + int flags{MSG_NOSIGNAL | MSG_DONTWAIT}; +#ifdef MSG_MORE + if (will_send_more) { + flags |= MSG_MORE; + } +#endif + + const ssize_t sent{WITH_LOCK( + sockets->mutex, + return sockets->sock->Send(reinterpret_cast(data.data()), data.size(), flags);)}; + + if (sent >= 0) { + return sent; + } + + const int err{WSAGetLastError()}; + if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) { + return 0; + } + errmsg = NetworkErrorString(err); + return -1; +} + +void SockMan::StopListening() +{ + m_listen.clear(); +} + +bool SockMan::ShouldTryToSend(Id id) const { return true; } + +bool SockMan::ShouldTryToRecv(Id id) const { return true; } + +void SockMan::EventIOLoopCompletedForOne(Id id) {} + +void SockMan::EventIOLoopCompletedForAll() {} + +void SockMan::EventI2PStatus(const CService&, I2PStatus) {} + +void SockMan::TestOnlyAddExistentConnection(Id id, std::unique_ptr&& sock) +{ + LOCK(m_connected_mutex); + const auto result{m_connected.emplace(id, std::make_shared(std::move(sock)))}; + assert(result.second); +} + +void SockMan::ThreadI2PAccept() +{ + AssertLockNotHeld(m_connected_mutex); + + static constexpr auto err_wait_begin = 1s; + static constexpr auto err_wait_cap = 5min; + auto err_wait = err_wait_begin; + + i2p::Connection conn; + + auto SleepOnFailure = [&]() { + interruptNet.sleep_for(err_wait); + if (err_wait < err_wait_cap) { + err_wait += 1s; + } + }; + + while (!interruptNet) { + + if (!m_i2p_sam_session->Listen(conn)) { + EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING); + SleepOnFailure(); + continue; + } + + EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING); + + if (!m_i2p_sam_session->Accept(conn)) { + SleepOnFailure(); + continue; + } + + Assume(conn.me.IsI2P()); + Assume(conn.peer.IsI2P()); + + NewSockAccepted(std::move(conn.sock), conn.me, conn.peer); + + err_wait = err_wait_begin; + } +} + +void SockMan::ThreadSocketHandler() +{ + AssertLockNotHeld(m_connected_mutex); + + while (!interruptNet) { + EventIOLoopCompletedForAll(); + + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + auto io_readiness{GenerateWaitSockets()}; + if (io_readiness.events_per_sock.empty() || + // WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant. + !io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT, + io_readiness.events_per_sock)) { + interruptNet.sleep_for(SELECT_TIMEOUT); + } + + // Service (send/receive) each of the already connected sockets. + SocketHandlerConnected(io_readiness); + + // Accept new connections from listening sockets. + SocketHandlerListening(io_readiness.events_per_sock); + } +} + +std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CService& addr) +{ + sockaddr_storage storage; + socklen_t len{sizeof(storage)}; + + auto sock{listen_sock.Accept(reinterpret_cast(&storage), &len)}; + + if (!sock) { + const int err{WSAGetLastError()}; + if (err != WSAEWOULDBLOCK) { + LogPrintLevel(BCLog::NET, + BCLog::Level::Error, + "Cannot accept new connection: %s\n", + NetworkErrorString(err)); + } + return {}; + } + + if (!addr.SetSockAddr(reinterpret_cast(&storage), len)) { + LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n"); + } + + return sock; +} + +void SockMan::NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) +{ + AssertLockNotHeld(m_connected_mutex); + + if (!sock->IsSelectable()) { + LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort()); + return; + } + + // According to the internet TCP_NODELAY is not carried into accepted sockets + // on all platforms. Set it again here just to be sure. + const int on{1}; + if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) { + LogDebug(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n", + them.ToStringAddrPort()); + } + + const Id id{GetNewId()}; + + { + LOCK(m_connected_mutex); + m_connected.emplace(id, std::make_shared(std::move(sock))); + } + + if (!EventNewConnectionAccepted(id, me, them)) { + CloseConnection(id); + } +} + +SockMan::Id SockMan::GetNewId() +{ + return m_next_id.fetch_add(1, std::memory_order_relaxed); +} + +SockMan::IOReadiness SockMan::GenerateWaitSockets() +{ + AssertLockNotHeld(m_connected_mutex); + + IOReadiness io_readiness; + + for (const auto& sock : m_listen) { + io_readiness.events_per_sock.emplace(sock, Sock::Events{Sock::RECV}); + } + + auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)}; + + for (const auto& [id, sockets] : connected_snapshot) { + const bool select_recv{ShouldTryToRecv(id)}; + const bool select_send{ShouldTryToSend(id)}; + if (!select_recv && !select_send) continue; + + Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); + io_readiness.events_per_sock.emplace(sockets->sock, Sock::Events{event}); + io_readiness.ids_per_sock.emplace(sockets->sock, id); + } + + return io_readiness; +} + +void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness) +{ + AssertLockNotHeld(m_connected_mutex); + + for (const auto& [sock, events] : io_readiness.events_per_sock) { + if (interruptNet) { + return; + } + + auto it{io_readiness.ids_per_sock.find(sock)}; + if (it == io_readiness.ids_per_sock.end()) { + continue; + } + const Id id{it->second}; + + bool send_ready = events.occurred & Sock::SEND; // Sock::SEND could only be set if ShouldTryToSend() has returned true in GenerateWaitSockets(). + bool recv_ready = events.occurred & Sock::RECV; // Sock::RECV could only be set if ShouldTryToRecv() has returned true in GenerateWaitSockets(). + bool err_ready = events.occurred & Sock::ERR; + + if (send_ready) { + bool cancel_recv; + + EventReadyToSend(id, cancel_recv); + + if (cancel_recv) { + recv_ready = false; + } + } + + if (recv_ready || err_ready) { + uint8_t buf[0x10000]; // typical socket buffer is 8K-64K + + auto sockets{GetConnectionSockets(id)}; + if (!sockets) { + continue; + } + + const ssize_t nrecv{WITH_LOCK( + sockets->mutex, + return sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)}; + + if (nrecv < 0) { // In all cases (including -1 and 0) EventIOLoopCompletedForOne() should be executed after this, don't change the code to skip it. + const int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) { + EventGotPermanentReadError(id, NetworkErrorString(err)); + } + } else if (nrecv == 0) { + EventGotEOF(id); + } else { + EventGotData(id, {buf, static_cast(nrecv)}); + } + } + + EventIOLoopCompletedForOne(id); + } +} + +void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) +{ + AssertLockNotHeld(m_connected_mutex); + + for (const auto& sock : m_listen) { + if (interruptNet) { + return; + } + const auto it = events_per_sock.find(sock); + if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) { + CService addr_accepted; + + auto sock_accepted{AcceptConnection(*sock, addr_accepted)}; + + if (sock_accepted) { + NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted); + } + } + } +} + +std::shared_ptr SockMan::GetConnectionSockets(Id id) const +{ + LOCK(m_connected_mutex); + + auto it{m_connected.find(id)}; + if (it == m_connected.end()) { + // There is no socket in case we've already disconnected, or in test cases without + // real connections. + return {}; + } + + return it->second; +} diff --git a/src/common/sockman.h b/src/common/sockman.h new file mode 100644 index 00000000000..7c4a61cb779 --- /dev/null +++ b/src/common/sockman.h @@ -0,0 +1,463 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#ifndef BITCOIN_COMMON_SOCKMAN_H +#define BITCOIN_COMMON_SOCKMAN_H + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * A socket manager class which handles socket operations. + * To use this class, inherit from it and implement the pure virtual methods. + * Handled operations: + * - binding and listening on sockets + * - starting of necessary threads to process socket operations + * - accepting incoming connections + * - making outbound connections + * - closing connections + * - waiting for IO readiness on sockets and doing send/recv accordingly + */ +class SockMan +{ +public: + /** + * Each connection is assigned an unique id of this type. + */ + using Id = int64_t; + + /** + * Possible status changes that can be passed to `EventI2PStatus()`. + */ + enum class I2PStatus : uint8_t { + /// The listen succeeded and we are now listening for incoming I2P connections. + START_LISTENING, + + /// The listen failed and now we are not listening (even if START_LISTENING was signaled before). + STOP_LISTENING, + }; + + virtual ~SockMan() = default; + + // + // Non-virtual functions, to be reused by children classes. + // + + /** + * Bind to a new address:port, start listening and add the listen socket to `m_listen`. + * Should be called before `StartSocketsThreads()`. + * @param[in] to Where to bind. + * @param[out] err_msg Error string if an error occurs. + * @retval true Success. + * @retval false Failure, `err_msg` will be set. + */ + bool BindAndStartListening(const CService& to, bilingual_str& err_msg); + + /** + * Options to influence `StartSocketsThreads()`. + */ + struct Options { + std::string_view socket_handler_thread_name; + + struct I2P { + explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name) + : private_key_file{file}, + sam_proxy{proxy}, + accept_thread_name{accept_thread_name} + {} + + const fs::path private_key_file; + const Proxy sam_proxy; + const std::string_view accept_thread_name; + }; + + /** + * I2P options. If set then a thread will be started that will accept incoming I2P connections. + */ + std::optional i2p; + }; + + /** + * Start the necessary threads for sockets IO. + */ + void StartSocketsThreads(const Options& options); + + /** + * Join (wait for) the threads started by `StartSocketsThreads()` to exit. + */ + void JoinSocketsThreads(); + + /** + * A more readable std::tuple for host and port. + */ + struct StringHostIntPort { + const std::string& host; + uint16_t port; + }; + + /** + * Make an outbound connection, save the socket internally and return a newly generated connection id. + * @param[in] to The address to connect to, either as CService or a host as string and port as + * an integer, if the later is used, then `proxy` must be valid. + * @param[in] is_important If true, then log failures with higher severity. + * @param[in] proxy Proxy to connect through, if set. + * @param[out] proxy_failed If `proxy` is valid and the connection failed because of the + * proxy, then it will be set to true. + * @param[out] me If the connection was successful then this is set to the address on the + * local side of the socket. + * @return Newly generated id, or std::nullopt if the operation fails. + */ + std::optional ConnectAndMakeId(const std::variant& to, + bool is_important, + std::optional proxy, + bool& proxy_failed, + CService& me) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex, !m_unused_i2p_sessions_mutex); + + /** + * Destroy a given connection by closing its socket and release resources occupied by it. + * @param[in] id Connection to destroy. + * @return Whether the connection existed and its socket was closed by this call. + */ + bool CloseConnection(Id id) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Try to send some data over the given connection. + * @param[in] id Identifier of the connection. + * @param[in] data The data to send, it might happen that only a prefix of this is sent. + * @param[in] will_send_more Used as an optimization if the caller knows that they will + * be sending more data soon after this call. + * @param[out] errmsg If <0 is returned then this will contain a human readable message + * explaining the error. + * @retval >=0 The number of bytes actually sent. + * @retval <0 A permanent error has occurred. + */ + ssize_t SendBytes(Id id, + std::span data, + bool will_send_more, + std::string& errmsg) const + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Stop listening by closing all listening sockets. + */ + void StopListening(); + + /** + * This is signaled when network activity should cease. + * A pointer to it is saved in `m_i2p_sam_session`, so make sure that + * the lifetime of `interruptNet` is not shorter than + * the lifetime of `m_i2p_sam_session`. + */ + CThreadInterrupt interruptNet; + +protected: + + /** + * During some tests mocked sockets are created outside of `SockMan`, make it + * possible to add those so that send/recv can be exercised. + * @param[in] id Connection id to add. + * @param[in,out] sock Socket to associate with the added connection. + */ + void TestOnlyAddExistentConnection(Id id, std::unique_ptr&& sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + +private: + + /** + * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not + * unexpectedly use too much memory. + */ + static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10}; + + // + // Pure virtual functions must be implemented by children classes. + // + + /** + * Be notified when a new connection has been accepted. + * @param[in] id Id of the newly accepted connection. + * @param[in] me The address and port at our side of the connection. + * @param[in] them The address and port at the peer's side of the connection. + * @retval true The new connection was accepted at the higher level. + * @retval false The connection was refused at the higher level, so the + * associated socket and id should be discarded by `SockMan`. + */ + virtual bool EventNewConnectionAccepted(Id id, + const CService& me, + const CService& them) = 0; + + /** + * Called when the socket is ready to send data and `ShouldTryToSend()` has + * returned true. This is where the higher level code serializes its messages + * and calls `SockMan::SendBytes()`. + * @param[in] id Id of the connection whose socket is ready to send. + * @param[out] cancel_recv Should always be set upon return and if it is true, + * then the next attempt to receive data from that connection will be omitted. + */ + virtual void EventReadyToSend(Id id, bool& cancel_recv) = 0; + + /** + * Called when new data has been received. + * @param[in] id Connection for which the data arrived. + * @param[in] data Received data. + */ + virtual void EventGotData(Id id, std::span data) = 0; + + /** + * Called when the remote peer has sent an EOF on the socket. This is a graceful + * close of their writing side, we can still send and they will receive, if it + * makes sense at the application level. + * @param[in] id Connection whose socket got EOF. + */ + virtual void EventGotEOF(Id id) = 0; + + /** + * Called when we get an irrecoverable error trying to read from a socket. + * @param[in] id Connection whose socket got an error. + * @param[in] errmsg Message describing the error. + */ + virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) = 0; + + // + // Non-pure virtual functions can be overridden by children classes or left + // alone to use the default implementation from SockMan. + // + + /** + * Can be used to temporarily pause sends on a connection. + * SockMan would only call EventReadyToSend() if this returns true. + * The implementation in SockMan always returns true. + * @param[in] id Connection for which to confirm or omit the next call to EventReadyToSend(). + */ + virtual bool ShouldTryToSend(Id id) const; + + /** + * SockMan would only call Recv() on a connection's socket if this returns true. + * Can be used to temporarily pause receives on a connection. + * The implementation in SockMan always returns true. + * @param[in] id Connection for which to confirm or omit the next receive. + */ + virtual bool ShouldTryToRecv(Id id) const; + + /** + * SockMan has completed the current send+recv iteration for a given connection. + * It will do another send+recv for this connection after processing all other connections. + * Can be used to execute periodic tasks for a given connection. + * The implementation in SockMan does nothing. + * @param[in] id Connection for which send+recv has been done. + */ + virtual void EventIOLoopCompletedForOne(Id id); + + /** + * SockMan has completed send+recv for all connections. + * Can be used to execute periodic tasks for all connections, like closing + * connections due to higher level logic. + * The implementation in SockMan does nothing. + */ + virtual void EventIOLoopCompletedForAll(); + + /** + * Be notified of a change in the state of the I2P connectivity. + * The default behavior, implemented by `SockMan`, is to ignore this event. + * @param[in] addr The address we started or stopped listening on. + * @param[in] new_status New status. + */ + virtual void EventI2PStatus(const CService& addr, I2PStatus new_status); + + /** + * The sockets used by a connection - a data socket and an optional I2P session socket. + */ + struct ConnectionSockets { + explicit ConnectionSockets(std::unique_ptr&& s) + : sock{std::move(s)} + { + } + + explicit ConnectionSockets(std::shared_ptr&& s, std::unique_ptr&& sess) + : sock{std::move(s)}, + i2p_transient_session{std::move(sess)} + { + } + + /** + * Mutex that serializes the Send() and Recv() calls on `sock`. + */ + Mutex mutex; + + /** + * Underlying socket. + * `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the + * underlying file descriptor by one thread while another thread is poll(2)-ing + * it for activity. + * @see https://github.com/bitcoin/bitcoin/issues/21744 for details. + */ + std::shared_ptr sock; + + /** + * When transient I2P sessions are used, then each connection has its own session, otherwise + * all connections use the session from `m_i2p_sam_session` and share the same I2P address. + * I2P sessions involve a data/transport socket (in `sock`) and a control socket + * (in `i2p_transient_session`). For transient sessions, once the data socket `sock` is + * closed, the control socket is not going to be used anymore and would be just taking + * resources. Storing it here makes its deletion together with `sock` automatic. + */ + std::unique_ptr i2p_transient_session; + }; + + /** + * Info about which socket has which event ready and its connection id. + */ + struct IOReadiness { + /** + * Map of socket -> socket events. For example: + * socket1 -> { requested = SEND|RECV, occurred = RECV } + * socket2 -> { requested = SEND, occurred = SEND } + */ + Sock::EventsPerSock events_per_sock; + + /** + * Map of socket -> connection id (in `m_connected`). For example + * socket1 -> id=23 + * socket2 -> id=56 + */ + std::unordered_map + ids_per_sock; + }; + + /** + * Accept incoming I2P connections in a loop and call + * `EventNewConnectionAccepted()` for each new connection. + */ + void ThreadI2PAccept() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Check connected and listening sockets for IO readiness and process them accordingly. + */ + void ThreadSocketHandler() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Accept a connection. + * @param[in] listen_sock Socket on which to accept the connection. + * @param[out] addr Address of the peer that was accepted. + * @return Newly created socket for the accepted connection. + */ + std::unique_ptr AcceptConnection(const Sock& listen_sock, CService& addr); + + /** + * After a new socket with a peer has been created, configure its flags, + * make a new connection id and call `EventNewConnectionAccepted()`. + * @param[in] sock The newly created socket. + * @param[in] me Address at our end of the connection. + * @param[in] them Address of the new peer. + */ + void NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Generate an id for a newly created connection. + */ + Id GetNewId(); + + /** + * Generate a collection of sockets to check for IO readiness. + * @return Sockets to check for readiness plus an aux map to find the + * corresponding connection id given a socket. + */ + IOReadiness GenerateWaitSockets() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Do the read/write for connected sockets that are ready for IO. + * @param[in] io_readiness Which sockets are ready and their connection ids. + */ + void SocketHandlerConnected(const IOReadiness& io_readiness) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Accept incoming connections, one from each read-ready listening socket. + * @param[in] events_per_sock Sockets that are ready for IO. + */ + void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Retrieve an entry from m_connected. + * @param[in] id Connection id to search for. + * @return ConnectionSockets for the given connection id or empty shared_ptr if not found. + */ + std::shared_ptr GetConnectionSockets(Id id) const + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * The id to assign to the next created connection. Used to generate ids of connections. + */ + std::atomic m_next_id{0}; + + /** + * Thread that sends to and receives from sockets and accepts connections. + */ + std::thread m_thread_socket_handler; + + /** + * Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`. + */ + std::thread m_thread_i2p_accept; + + /** + * Mutex protecting m_i2p_sam_sessions. + */ + Mutex m_unused_i2p_sessions_mutex; + + /** + * A pool of created I2P SAM transient sessions that should be used instead + * of creating new ones in order to reduce the load on the I2P network. + * Creating a session in I2P is not cheap, thus if this is not empty, then + * pick an entry from it instead of creating a new session. If connecting to + * a host fails, then the created session is put to this pool for reuse. + */ + std::queue> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); + + /** + * I2P SAM session. + * Used to accept incoming and make outgoing I2P connections from a persistent + * address. + */ + std::unique_ptr m_i2p_sam_session; + + /** + * List of listening sockets. + */ + std::vector> m_listen; + + mutable Mutex m_connected_mutex; + + /** + * Sockets for existent connections. + * The `shared_ptr` makes it possible to create a snapshot of this by simply copying + * it (under `m_connected_mutex`). + */ + std::unordered_map> m_connected GUARDED_BY(m_connected_mutex); +}; + +#endif // BITCOIN_COMMON_SOCKMAN_H