mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-04-29 14:59:39 -04:00
Introduce a new low-level socket managing class SockMan
This commit is contained in:
parent
c66f7dab33
commit
cfe5eba446
3 changed files with 1000 additions and 1 deletions
|
@ -145,6 +145,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
|
|||
common/run_command.cpp
|
||||
common/settings.cpp
|
||||
common/signmessage.cpp
|
||||
common/sockman.cpp
|
||||
common/system.cpp
|
||||
common/url.cpp
|
||||
compressor.cpp
|
||||
|
@ -152,6 +153,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
|
|||
core_write.cpp
|
||||
deploymentinfo.cpp
|
||||
external_signer.cpp
|
||||
i2p.cpp
|
||||
init/common.cpp
|
||||
kernel/chainparams.cpp
|
||||
key.cpp
|
||||
|
@ -229,7 +231,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL
|
|||
headerssync.cpp
|
||||
httprpc.cpp
|
||||
httpserver.cpp
|
||||
i2p.cpp
|
||||
index/base.cpp
|
||||
index/blockfilterindex.cpp
|
||||
index/coinstatsindex.cpp
|
||||
|
|
535
src/common/sockman.cpp
Normal file
535
src/common/sockman.cpp
Normal file
|
@ -0,0 +1,535 @@
|
|||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or https://opensource.org/license/mit/.
|
||||
|
||||
#include <bitcoin-build-config.h> // IWYU pragma: keep
|
||||
|
||||
#include <common/sockman.h>
|
||||
#include <logging.h>
|
||||
#include <netbase.h>
|
||||
#include <util/sock.h>
|
||||
#include <util/thread.h>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
// The set of sockets cannot be modified while waiting
|
||||
// The sleep time needs to be small to avoid new sockets stalling
|
||||
static constexpr auto SELECT_TIMEOUT{50ms};
|
||||
|
||||
/** Get the bind address for a socket as CService. */
|
||||
static CService GetBindAddress(const Sock& sock)
|
||||
{
|
||||
CService addr_bind;
|
||||
struct sockaddr_storage sockaddr_bind;
|
||||
socklen_t sockaddr_bind_len = sizeof(sockaddr_bind);
|
||||
if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) {
|
||||
addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len);
|
||||
} else {
|
||||
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n");
|
||||
}
|
||||
return addr_bind;
|
||||
}
|
||||
|
||||
bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg)
|
||||
{
|
||||
// Create socket for listening for incoming connections
|
||||
sockaddr_storage storage;
|
||||
socklen_t len{sizeof(storage)};
|
||||
if (!to.GetSockAddr(reinterpret_cast<sockaddr*>(&storage), &len)) {
|
||||
err_msg = Untranslated(strprintf("Bind address family for %s not supported", to.ToStringAddrPort()));
|
||||
return false;
|
||||
}
|
||||
|
||||
std::unique_ptr<Sock> sock{CreateSock(to.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP)};
|
||||
if (!sock) {
|
||||
err_msg = Untranslated(strprintf("Cannot create %s listen socket: %s",
|
||||
to.ToStringAddrPort(),
|
||||
NetworkErrorString(WSAGetLastError())));
|
||||
return false;
|
||||
}
|
||||
|
||||
int one{1};
|
||||
|
||||
// Allow binding if the port is still in TIME_WAIT state after
|
||||
// the program was closed and restarted.
|
||||
if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
|
||||
LogPrintLevel(BCLog::NET,
|
||||
BCLog::Level::Info,
|
||||
"Cannot set SO_REUSEADDR on %s listen socket: %s, continuing anyway\n",
|
||||
to.ToStringAddrPort(),
|
||||
NetworkErrorString(WSAGetLastError()));
|
||||
}
|
||||
|
||||
// some systems don't have IPV6_V6ONLY but are always v6only; others do have the option
|
||||
// and enable it by default or not. Try to enable it, if possible.
|
||||
if (to.IsIPv6()) {
|
||||
#ifdef IPV6_V6ONLY
|
||||
if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
|
||||
LogPrintLevel(BCLog::NET,
|
||||
BCLog::Level::Info,
|
||||
"Cannot set IPV6_V6ONLY on %s listen socket: %s, continuing anyway\n",
|
||||
to.ToStringAddrPort(),
|
||||
NetworkErrorString(WSAGetLastError()));
|
||||
}
|
||||
#endif
|
||||
#ifdef WIN32
|
||||
int prot_level{PROTECTION_LEVEL_UNRESTRICTED};
|
||||
if (sock->SetSockOpt(IPPROTO_IPV6,
|
||||
IPV6_PROTECTION_LEVEL,
|
||||
reinterpret_cast<const char*>(&prot_level),
|
||||
sizeof(prot_level)) == SOCKET_ERROR) {
|
||||
LogPrintLevel(BCLog::NET,
|
||||
BCLog::Level::Info,
|
||||
"Cannot set IPV6_PROTECTION_LEVEL on %s listen socket: %s, continuing anyway\n",
|
||||
to.ToStringAddrPort(),
|
||||
NetworkErrorString(WSAGetLastError()));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (sock->Bind(reinterpret_cast<sockaddr*>(&storage), len) == SOCKET_ERROR) {
|
||||
const int err{WSAGetLastError()};
|
||||
if (err == WSAEADDRINUSE) {
|
||||
err_msg = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."),
|
||||
to.ToStringAddrPort(),
|
||||
CLIENT_NAME);
|
||||
} else {
|
||||
err_msg = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"),
|
||||
to.ToStringAddrPort(),
|
||||
NetworkErrorString(err));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Listen for incoming connections
|
||||
if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) {
|
||||
err_msg = strprintf(_("Cannot listen on %s: %s"), to.ToStringAddrPort(), NetworkErrorString(WSAGetLastError()));
|
||||
return false;
|
||||
}
|
||||
|
||||
m_listen.emplace_back(std::move(sock));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void SockMan::StartSocketsThreads(const Options& options)
|
||||
{
|
||||
m_thread_socket_handler = std::thread(
|
||||
&util::TraceThread, options.socket_handler_thread_name, [this] { ThreadSocketHandler(); });
|
||||
|
||||
if (options.i2p.has_value()) {
|
||||
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(
|
||||
options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet);
|
||||
|
||||
m_thread_i2p_accept =
|
||||
std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); });
|
||||
}
|
||||
}
|
||||
|
||||
void SockMan::JoinSocketsThreads()
|
||||
{
|
||||
if (m_thread_i2p_accept.joinable()) {
|
||||
m_thread_i2p_accept.join();
|
||||
}
|
||||
|
||||
if (m_thread_socket_handler.joinable()) {
|
||||
m_thread_socket_handler.join();
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<SockMan::Id>
|
||||
SockMan::ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
|
||||
bool is_important,
|
||||
std::optional<Proxy> proxy,
|
||||
bool& proxy_failed,
|
||||
CService& me)
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
|
||||
|
||||
std::unique_ptr<Sock> sock;
|
||||
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
|
||||
|
||||
Assume(!me.IsValid());
|
||||
|
||||
if (std::holds_alternative<CService>(to)) {
|
||||
const CService& addr_to{std::get<CService>(to)};
|
||||
if (addr_to.IsI2P()) {
|
||||
if (!Assume(proxy.has_value())) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
i2p::Connection conn;
|
||||
bool connected{false};
|
||||
|
||||
if (m_i2p_sam_session) {
|
||||
connected = m_i2p_sam_session->Connect(addr_to, conn, proxy_failed);
|
||||
} else {
|
||||
{
|
||||
LOCK(m_unused_i2p_sessions_mutex);
|
||||
if (m_unused_i2p_sessions.empty()) {
|
||||
i2p_transient_session = std::make_unique<i2p::sam::Session>(proxy.value(), &interruptNet);
|
||||
} else {
|
||||
i2p_transient_session.swap(m_unused_i2p_sessions.front());
|
||||
m_unused_i2p_sessions.pop();
|
||||
}
|
||||
}
|
||||
connected = i2p_transient_session->Connect(addr_to, conn, proxy_failed);
|
||||
if (!connected) {
|
||||
LOCK(m_unused_i2p_sessions_mutex);
|
||||
if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) {
|
||||
m_unused_i2p_sessions.emplace(i2p_transient_session.release());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (connected) {
|
||||
sock = std::move(conn.sock);
|
||||
me = conn.me;
|
||||
}
|
||||
} else if (proxy.has_value()) {
|
||||
sock = ConnectThroughProxy(proxy.value(), addr_to.ToStringAddr(), addr_to.GetPort(), proxy_failed);
|
||||
} else {
|
||||
sock = ConnectDirectly(addr_to, is_important);
|
||||
}
|
||||
} else {
|
||||
if (!Assume(proxy.has_value())) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const auto& hostport{std::get<StringHostIntPort>(to)};
|
||||
|
||||
bool dummy_proxy_failed;
|
||||
sock = ConnectThroughProxy(proxy.value(), hostport.host, hostport.port, dummy_proxy_failed);
|
||||
}
|
||||
|
||||
if (!sock) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
if (!me.IsValid()) {
|
||||
me = GetBindAddress(*sock);
|
||||
}
|
||||
|
||||
const Id id{GetNewId()};
|
||||
|
||||
{
|
||||
LOCK(m_connected_mutex);
|
||||
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock),
|
||||
std::move(i2p_transient_session)));
|
||||
}
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
bool SockMan::CloseConnection(Id id)
|
||||
{
|
||||
LOCK(m_connected_mutex);
|
||||
return m_connected.erase(id) > 0;
|
||||
}
|
||||
|
||||
ssize_t SockMan::SendBytes(Id id,
|
||||
std::span<const unsigned char> data,
|
||||
bool will_send_more,
|
||||
std::string& errmsg) const
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
if (data.empty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto sockets{GetConnectionSockets(id)};
|
||||
if (!sockets) {
|
||||
// Bail out immediately and just leave things in the caller's send queue.
|
||||
return 0;
|
||||
}
|
||||
|
||||
int flags{MSG_NOSIGNAL | MSG_DONTWAIT};
|
||||
#ifdef MSG_MORE
|
||||
if (will_send_more) {
|
||||
flags |= MSG_MORE;
|
||||
}
|
||||
#endif
|
||||
|
||||
const ssize_t sent{WITH_LOCK(
|
||||
sockets->mutex,
|
||||
return sockets->sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);)};
|
||||
|
||||
if (sent >= 0) {
|
||||
return sent;
|
||||
}
|
||||
|
||||
const int err{WSAGetLastError()};
|
||||
if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) {
|
||||
return 0;
|
||||
}
|
||||
errmsg = NetworkErrorString(err);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void SockMan::StopListening()
|
||||
{
|
||||
m_listen.clear();
|
||||
}
|
||||
|
||||
bool SockMan::ShouldTryToSend(Id id) const { return true; }
|
||||
|
||||
bool SockMan::ShouldTryToRecv(Id id) const { return true; }
|
||||
|
||||
void SockMan::EventIOLoopCompletedForOne(Id id) {}
|
||||
|
||||
void SockMan::EventIOLoopCompletedForAll() {}
|
||||
|
||||
void SockMan::EventI2PStatus(const CService&, I2PStatus) {}
|
||||
|
||||
void SockMan::TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
|
||||
{
|
||||
LOCK(m_connected_mutex);
|
||||
const auto result{m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)))};
|
||||
assert(result.second);
|
||||
}
|
||||
|
||||
void SockMan::ThreadI2PAccept()
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
static constexpr auto err_wait_begin = 1s;
|
||||
static constexpr auto err_wait_cap = 5min;
|
||||
auto err_wait = err_wait_begin;
|
||||
|
||||
i2p::Connection conn;
|
||||
|
||||
auto SleepOnFailure = [&]() {
|
||||
interruptNet.sleep_for(err_wait);
|
||||
if (err_wait < err_wait_cap) {
|
||||
err_wait += 1s;
|
||||
}
|
||||
};
|
||||
|
||||
while (!interruptNet) {
|
||||
|
||||
if (!m_i2p_sam_session->Listen(conn)) {
|
||||
EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING);
|
||||
SleepOnFailure();
|
||||
continue;
|
||||
}
|
||||
|
||||
EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING);
|
||||
|
||||
if (!m_i2p_sam_session->Accept(conn)) {
|
||||
SleepOnFailure();
|
||||
continue;
|
||||
}
|
||||
|
||||
Assume(conn.me.IsI2P());
|
||||
Assume(conn.peer.IsI2P());
|
||||
|
||||
NewSockAccepted(std::move(conn.sock), conn.me, conn.peer);
|
||||
|
||||
err_wait = err_wait_begin;
|
||||
}
|
||||
}
|
||||
|
||||
void SockMan::ThreadSocketHandler()
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
while (!interruptNet) {
|
||||
EventIOLoopCompletedForAll();
|
||||
|
||||
// Check for the readiness of the already connected sockets and the
|
||||
// listening sockets in one call ("readiness" as in poll(2) or
|
||||
// select(2)). If none are ready, wait for a short while and return
|
||||
// empty sets.
|
||||
auto io_readiness{GenerateWaitSockets()};
|
||||
if (io_readiness.events_per_sock.empty() ||
|
||||
// WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant.
|
||||
!io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT,
|
||||
io_readiness.events_per_sock)) {
|
||||
interruptNet.sleep_for(SELECT_TIMEOUT);
|
||||
}
|
||||
|
||||
// Service (send/receive) each of the already connected sockets.
|
||||
SocketHandlerConnected(io_readiness);
|
||||
|
||||
// Accept new connections from listening sockets.
|
||||
SocketHandlerListening(io_readiness.events_per_sock);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<Sock> SockMan::AcceptConnection(const Sock& listen_sock, CService& addr)
|
||||
{
|
||||
sockaddr_storage storage;
|
||||
socklen_t len{sizeof(storage)};
|
||||
|
||||
auto sock{listen_sock.Accept(reinterpret_cast<sockaddr*>(&storage), &len)};
|
||||
|
||||
if (!sock) {
|
||||
const int err{WSAGetLastError()};
|
||||
if (err != WSAEWOULDBLOCK) {
|
||||
LogPrintLevel(BCLog::NET,
|
||||
BCLog::Level::Error,
|
||||
"Cannot accept new connection: %s\n",
|
||||
NetworkErrorString(err));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!addr.SetSockAddr(reinterpret_cast<sockaddr*>(&storage), len)) {
|
||||
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n");
|
||||
}
|
||||
|
||||
return sock;
|
||||
}
|
||||
|
||||
void SockMan::NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
if (!sock->IsSelectable()) {
|
||||
LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort());
|
||||
return;
|
||||
}
|
||||
|
||||
// According to the internet TCP_NODELAY is not carried into accepted sockets
|
||||
// on all platforms. Set it again here just to be sure.
|
||||
const int on{1};
|
||||
if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) {
|
||||
LogDebug(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n",
|
||||
them.ToStringAddrPort());
|
||||
}
|
||||
|
||||
const Id id{GetNewId()};
|
||||
|
||||
{
|
||||
LOCK(m_connected_mutex);
|
||||
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)));
|
||||
}
|
||||
|
||||
if (!EventNewConnectionAccepted(id, me, them)) {
|
||||
CloseConnection(id);
|
||||
}
|
||||
}
|
||||
|
||||
SockMan::Id SockMan::GetNewId()
|
||||
{
|
||||
return m_next_id.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
SockMan::IOReadiness SockMan::GenerateWaitSockets()
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
IOReadiness io_readiness;
|
||||
|
||||
for (const auto& sock : m_listen) {
|
||||
io_readiness.events_per_sock.emplace(sock, Sock::Events{Sock::RECV});
|
||||
}
|
||||
|
||||
auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)};
|
||||
|
||||
for (const auto& [id, sockets] : connected_snapshot) {
|
||||
const bool select_recv{ShouldTryToRecv(id)};
|
||||
const bool select_send{ShouldTryToSend(id)};
|
||||
if (!select_recv && !select_send) continue;
|
||||
|
||||
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
|
||||
io_readiness.events_per_sock.emplace(sockets->sock, Sock::Events{event});
|
||||
io_readiness.ids_per_sock.emplace(sockets->sock, id);
|
||||
}
|
||||
|
||||
return io_readiness;
|
||||
}
|
||||
|
||||
void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness)
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
for (const auto& [sock, events] : io_readiness.events_per_sock) {
|
||||
if (interruptNet) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto it{io_readiness.ids_per_sock.find(sock)};
|
||||
if (it == io_readiness.ids_per_sock.end()) {
|
||||
continue;
|
||||
}
|
||||
const Id id{it->second};
|
||||
|
||||
bool send_ready = events.occurred & Sock::SEND; // Sock::SEND could only be set if ShouldTryToSend() has returned true in GenerateWaitSockets().
|
||||
bool recv_ready = events.occurred & Sock::RECV; // Sock::RECV could only be set if ShouldTryToRecv() has returned true in GenerateWaitSockets().
|
||||
bool err_ready = events.occurred & Sock::ERR;
|
||||
|
||||
if (send_ready) {
|
||||
bool cancel_recv;
|
||||
|
||||
EventReadyToSend(id, cancel_recv);
|
||||
|
||||
if (cancel_recv) {
|
||||
recv_ready = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (recv_ready || err_ready) {
|
||||
uint8_t buf[0x10000]; // typical socket buffer is 8K-64K
|
||||
|
||||
auto sockets{GetConnectionSockets(id)};
|
||||
if (!sockets) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const ssize_t nrecv{WITH_LOCK(
|
||||
sockets->mutex,
|
||||
return sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)};
|
||||
|
||||
if (nrecv < 0) { // In all cases (including -1 and 0) EventIOLoopCompletedForOne() should be executed after this, don't change the code to skip it.
|
||||
const int err = WSAGetLastError();
|
||||
if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) {
|
||||
EventGotPermanentReadError(id, NetworkErrorString(err));
|
||||
}
|
||||
} else if (nrecv == 0) {
|
||||
EventGotEOF(id);
|
||||
} else {
|
||||
EventGotData(id, {buf, static_cast<size_t>(nrecv)});
|
||||
}
|
||||
}
|
||||
|
||||
EventIOLoopCompletedForOne(id);
|
||||
}
|
||||
}
|
||||
|
||||
void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
|
||||
{
|
||||
AssertLockNotHeld(m_connected_mutex);
|
||||
|
||||
for (const auto& sock : m_listen) {
|
||||
if (interruptNet) {
|
||||
return;
|
||||
}
|
||||
const auto it = events_per_sock.find(sock);
|
||||
if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) {
|
||||
CService addr_accepted;
|
||||
|
||||
auto sock_accepted{AcceptConnection(*sock, addr_accepted)};
|
||||
|
||||
if (sock_accepted) {
|
||||
NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<SockMan::ConnectionSockets> SockMan::GetConnectionSockets(Id id) const
|
||||
{
|
||||
LOCK(m_connected_mutex);
|
||||
|
||||
auto it{m_connected.find(id)};
|
||||
if (it == m_connected.end()) {
|
||||
// There is no socket in case we've already disconnected, or in test cases without
|
||||
// real connections.
|
||||
return {};
|
||||
}
|
||||
|
||||
return it->second;
|
||||
}
|
463
src/common/sockman.h
Normal file
463
src/common/sockman.h
Normal file
|
@ -0,0 +1,463 @@
|
|||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or https://opensource.org/license/mit/.
|
||||
|
||||
#ifndef BITCOIN_COMMON_SOCKMAN_H
|
||||
#define BITCOIN_COMMON_SOCKMAN_H
|
||||
|
||||
#include <i2p.h>
|
||||
#include <netaddress.h>
|
||||
#include <netbase.h>
|
||||
#include <util/fs.h>
|
||||
#include <util/sock.h>
|
||||
#include <util/translation.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <span>
|
||||
#include <thread>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
/**
|
||||
* A socket manager class which handles socket operations.
|
||||
* To use this class, inherit from it and implement the pure virtual methods.
|
||||
* Handled operations:
|
||||
* - binding and listening on sockets
|
||||
* - starting of necessary threads to process socket operations
|
||||
* - accepting incoming connections
|
||||
* - making outbound connections
|
||||
* - closing connections
|
||||
* - waiting for IO readiness on sockets and doing send/recv accordingly
|
||||
*/
|
||||
class SockMan
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Each connection is assigned an unique id of this type.
|
||||
*/
|
||||
using Id = int64_t;
|
||||
|
||||
/**
|
||||
* Possible status changes that can be passed to `EventI2PStatus()`.
|
||||
*/
|
||||
enum class I2PStatus : uint8_t {
|
||||
/// The listen succeeded and we are now listening for incoming I2P connections.
|
||||
START_LISTENING,
|
||||
|
||||
/// The listen failed and now we are not listening (even if START_LISTENING was signaled before).
|
||||
STOP_LISTENING,
|
||||
};
|
||||
|
||||
virtual ~SockMan() = default;
|
||||
|
||||
//
|
||||
// Non-virtual functions, to be reused by children classes.
|
||||
//
|
||||
|
||||
/**
|
||||
* Bind to a new address:port, start listening and add the listen socket to `m_listen`.
|
||||
* Should be called before `StartSocketsThreads()`.
|
||||
* @param[in] to Where to bind.
|
||||
* @param[out] err_msg Error string if an error occurs.
|
||||
* @retval true Success.
|
||||
* @retval false Failure, `err_msg` will be set.
|
||||
*/
|
||||
bool BindAndStartListening(const CService& to, bilingual_str& err_msg);
|
||||
|
||||
/**
|
||||
* Options to influence `StartSocketsThreads()`.
|
||||
*/
|
||||
struct Options {
|
||||
std::string_view socket_handler_thread_name;
|
||||
|
||||
struct I2P {
|
||||
explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name)
|
||||
: private_key_file{file},
|
||||
sam_proxy{proxy},
|
||||
accept_thread_name{accept_thread_name}
|
||||
{}
|
||||
|
||||
const fs::path private_key_file;
|
||||
const Proxy sam_proxy;
|
||||
const std::string_view accept_thread_name;
|
||||
};
|
||||
|
||||
/**
|
||||
* I2P options. If set then a thread will be started that will accept incoming I2P connections.
|
||||
*/
|
||||
std::optional<I2P> i2p;
|
||||
};
|
||||
|
||||
/**
|
||||
* Start the necessary threads for sockets IO.
|
||||
*/
|
||||
void StartSocketsThreads(const Options& options);
|
||||
|
||||
/**
|
||||
* Join (wait for) the threads started by `StartSocketsThreads()` to exit.
|
||||
*/
|
||||
void JoinSocketsThreads();
|
||||
|
||||
/**
|
||||
* A more readable std::tuple<std::string, uint16_t> for host and port.
|
||||
*/
|
||||
struct StringHostIntPort {
|
||||
const std::string& host;
|
||||
uint16_t port;
|
||||
};
|
||||
|
||||
/**
|
||||
* Make an outbound connection, save the socket internally and return a newly generated connection id.
|
||||
* @param[in] to The address to connect to, either as CService or a host as string and port as
|
||||
* an integer, if the later is used, then `proxy` must be valid.
|
||||
* @param[in] is_important If true, then log failures with higher severity.
|
||||
* @param[in] proxy Proxy to connect through, if set.
|
||||
* @param[out] proxy_failed If `proxy` is valid and the connection failed because of the
|
||||
* proxy, then it will be set to true.
|
||||
* @param[out] me If the connection was successful then this is set to the address on the
|
||||
* local side of the socket.
|
||||
* @return Newly generated id, or std::nullopt if the operation fails.
|
||||
*/
|
||||
std::optional<SockMan::Id> ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
|
||||
bool is_important,
|
||||
std::optional<Proxy> proxy,
|
||||
bool& proxy_failed,
|
||||
CService& me)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex, !m_unused_i2p_sessions_mutex);
|
||||
|
||||
/**
|
||||
* Destroy a given connection by closing its socket and release resources occupied by it.
|
||||
* @param[in] id Connection to destroy.
|
||||
* @return Whether the connection existed and its socket was closed by this call.
|
||||
*/
|
||||
bool CloseConnection(Id id)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Try to send some data over the given connection.
|
||||
* @param[in] id Identifier of the connection.
|
||||
* @param[in] data The data to send, it might happen that only a prefix of this is sent.
|
||||
* @param[in] will_send_more Used as an optimization if the caller knows that they will
|
||||
* be sending more data soon after this call.
|
||||
* @param[out] errmsg If <0 is returned then this will contain a human readable message
|
||||
* explaining the error.
|
||||
* @retval >=0 The number of bytes actually sent.
|
||||
* @retval <0 A permanent error has occurred.
|
||||
*/
|
||||
ssize_t SendBytes(Id id,
|
||||
std::span<const unsigned char> data,
|
||||
bool will_send_more,
|
||||
std::string& errmsg) const
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Stop listening by closing all listening sockets.
|
||||
*/
|
||||
void StopListening();
|
||||
|
||||
/**
|
||||
* This is signaled when network activity should cease.
|
||||
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
|
||||
* the lifetime of `interruptNet` is not shorter than
|
||||
* the lifetime of `m_i2p_sam_session`.
|
||||
*/
|
||||
CThreadInterrupt interruptNet;
|
||||
|
||||
protected:
|
||||
|
||||
/**
|
||||
* During some tests mocked sockets are created outside of `SockMan`, make it
|
||||
* possible to add those so that send/recv can be exercised.
|
||||
* @param[in] id Connection id to add.
|
||||
* @param[in,out] sock Socket to associate with the added connection.
|
||||
*/
|
||||
void TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
|
||||
* unexpectedly use too much memory.
|
||||
*/
|
||||
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
|
||||
|
||||
//
|
||||
// Pure virtual functions must be implemented by children classes.
|
||||
//
|
||||
|
||||
/**
|
||||
* Be notified when a new connection has been accepted.
|
||||
* @param[in] id Id of the newly accepted connection.
|
||||
* @param[in] me The address and port at our side of the connection.
|
||||
* @param[in] them The address and port at the peer's side of the connection.
|
||||
* @retval true The new connection was accepted at the higher level.
|
||||
* @retval false The connection was refused at the higher level, so the
|
||||
* associated socket and id should be discarded by `SockMan`.
|
||||
*/
|
||||
virtual bool EventNewConnectionAccepted(Id id,
|
||||
const CService& me,
|
||||
const CService& them) = 0;
|
||||
|
||||
/**
|
||||
* Called when the socket is ready to send data and `ShouldTryToSend()` has
|
||||
* returned true. This is where the higher level code serializes its messages
|
||||
* and calls `SockMan::SendBytes()`.
|
||||
* @param[in] id Id of the connection whose socket is ready to send.
|
||||
* @param[out] cancel_recv Should always be set upon return and if it is true,
|
||||
* then the next attempt to receive data from that connection will be omitted.
|
||||
*/
|
||||
virtual void EventReadyToSend(Id id, bool& cancel_recv) = 0;
|
||||
|
||||
/**
|
||||
* Called when new data has been received.
|
||||
* @param[in] id Connection for which the data arrived.
|
||||
* @param[in] data Received data.
|
||||
*/
|
||||
virtual void EventGotData(Id id, std::span<const uint8_t> data) = 0;
|
||||
|
||||
/**
|
||||
* Called when the remote peer has sent an EOF on the socket. This is a graceful
|
||||
* close of their writing side, we can still send and they will receive, if it
|
||||
* makes sense at the application level.
|
||||
* @param[in] id Connection whose socket got EOF.
|
||||
*/
|
||||
virtual void EventGotEOF(Id id) = 0;
|
||||
|
||||
/**
|
||||
* Called when we get an irrecoverable error trying to read from a socket.
|
||||
* @param[in] id Connection whose socket got an error.
|
||||
* @param[in] errmsg Message describing the error.
|
||||
*/
|
||||
virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) = 0;
|
||||
|
||||
//
|
||||
// Non-pure virtual functions can be overridden by children classes or left
|
||||
// alone to use the default implementation from SockMan.
|
||||
//
|
||||
|
||||
/**
|
||||
* Can be used to temporarily pause sends on a connection.
|
||||
* SockMan would only call EventReadyToSend() if this returns true.
|
||||
* The implementation in SockMan always returns true.
|
||||
* @param[in] id Connection for which to confirm or omit the next call to EventReadyToSend().
|
||||
*/
|
||||
virtual bool ShouldTryToSend(Id id) const;
|
||||
|
||||
/**
|
||||
* SockMan would only call Recv() on a connection's socket if this returns true.
|
||||
* Can be used to temporarily pause receives on a connection.
|
||||
* The implementation in SockMan always returns true.
|
||||
* @param[in] id Connection for which to confirm or omit the next receive.
|
||||
*/
|
||||
virtual bool ShouldTryToRecv(Id id) const;
|
||||
|
||||
/**
|
||||
* SockMan has completed the current send+recv iteration for a given connection.
|
||||
* It will do another send+recv for this connection after processing all other connections.
|
||||
* Can be used to execute periodic tasks for a given connection.
|
||||
* The implementation in SockMan does nothing.
|
||||
* @param[in] id Connection for which send+recv has been done.
|
||||
*/
|
||||
virtual void EventIOLoopCompletedForOne(Id id);
|
||||
|
||||
/**
|
||||
* SockMan has completed send+recv for all connections.
|
||||
* Can be used to execute periodic tasks for all connections, like closing
|
||||
* connections due to higher level logic.
|
||||
* The implementation in SockMan does nothing.
|
||||
*/
|
||||
virtual void EventIOLoopCompletedForAll();
|
||||
|
||||
/**
|
||||
* Be notified of a change in the state of the I2P connectivity.
|
||||
* The default behavior, implemented by `SockMan`, is to ignore this event.
|
||||
* @param[in] addr The address we started or stopped listening on.
|
||||
* @param[in] new_status New status.
|
||||
*/
|
||||
virtual void EventI2PStatus(const CService& addr, I2PStatus new_status);
|
||||
|
||||
/**
|
||||
* The sockets used by a connection - a data socket and an optional I2P session socket.
|
||||
*/
|
||||
struct ConnectionSockets {
|
||||
explicit ConnectionSockets(std::unique_ptr<Sock>&& s)
|
||||
: sock{std::move(s)}
|
||||
{
|
||||
}
|
||||
|
||||
explicit ConnectionSockets(std::shared_ptr<Sock>&& s, std::unique_ptr<i2p::sam::Session>&& sess)
|
||||
: sock{std::move(s)},
|
||||
i2p_transient_session{std::move(sess)}
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutex that serializes the Send() and Recv() calls on `sock`.
|
||||
*/
|
||||
Mutex mutex;
|
||||
|
||||
/**
|
||||
* Underlying socket.
|
||||
* `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the
|
||||
* underlying file descriptor by one thread while another thread is poll(2)-ing
|
||||
* it for activity.
|
||||
* @see https://github.com/bitcoin/bitcoin/issues/21744 for details.
|
||||
*/
|
||||
std::shared_ptr<Sock> sock;
|
||||
|
||||
/**
|
||||
* When transient I2P sessions are used, then each connection has its own session, otherwise
|
||||
* all connections use the session from `m_i2p_sam_session` and share the same I2P address.
|
||||
* I2P sessions involve a data/transport socket (in `sock`) and a control socket
|
||||
* (in `i2p_transient_session`). For transient sessions, once the data socket `sock` is
|
||||
* closed, the control socket is not going to be used anymore and would be just taking
|
||||
* resources. Storing it here makes its deletion together with `sock` automatic.
|
||||
*/
|
||||
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
|
||||
};
|
||||
|
||||
/**
|
||||
* Info about which socket has which event ready and its connection id.
|
||||
*/
|
||||
struct IOReadiness {
|
||||
/**
|
||||
* Map of socket -> socket events. For example:
|
||||
* socket1 -> { requested = SEND|RECV, occurred = RECV }
|
||||
* socket2 -> { requested = SEND, occurred = SEND }
|
||||
*/
|
||||
Sock::EventsPerSock events_per_sock;
|
||||
|
||||
/**
|
||||
* Map of socket -> connection id (in `m_connected`). For example
|
||||
* socket1 -> id=23
|
||||
* socket2 -> id=56
|
||||
*/
|
||||
std::unordered_map<Sock::EventsPerSock::key_type,
|
||||
SockMan::Id,
|
||||
Sock::HashSharedPtrSock,
|
||||
Sock::EqualSharedPtrSock>
|
||||
ids_per_sock;
|
||||
};
|
||||
|
||||
/**
|
||||
* Accept incoming I2P connections in a loop and call
|
||||
* `EventNewConnectionAccepted()` for each new connection.
|
||||
*/
|
||||
void ThreadI2PAccept()
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Check connected and listening sockets for IO readiness and process them accordingly.
|
||||
*/
|
||||
void ThreadSocketHandler()
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Accept a connection.
|
||||
* @param[in] listen_sock Socket on which to accept the connection.
|
||||
* @param[out] addr Address of the peer that was accepted.
|
||||
* @return Newly created socket for the accepted connection.
|
||||
*/
|
||||
std::unique_ptr<Sock> AcceptConnection(const Sock& listen_sock, CService& addr);
|
||||
|
||||
/**
|
||||
* After a new socket with a peer has been created, configure its flags,
|
||||
* make a new connection id and call `EventNewConnectionAccepted()`.
|
||||
* @param[in] sock The newly created socket.
|
||||
* @param[in] me Address at our end of the connection.
|
||||
* @param[in] them Address of the new peer.
|
||||
*/
|
||||
void NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Generate an id for a newly created connection.
|
||||
*/
|
||||
Id GetNewId();
|
||||
|
||||
/**
|
||||
* Generate a collection of sockets to check for IO readiness.
|
||||
* @return Sockets to check for readiness plus an aux map to find the
|
||||
* corresponding connection id given a socket.
|
||||
*/
|
||||
IOReadiness GenerateWaitSockets()
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Do the read/write for connected sockets that are ready for IO.
|
||||
* @param[in] io_readiness Which sockets are ready and their connection ids.
|
||||
*/
|
||||
void SocketHandlerConnected(const IOReadiness& io_readiness)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Accept incoming connections, one from each read-ready listening socket.
|
||||
* @param[in] events_per_sock Sockets that are ready for IO.
|
||||
*/
|
||||
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* Retrieve an entry from m_connected.
|
||||
* @param[in] id Connection id to search for.
|
||||
* @return ConnectionSockets for the given connection id or empty shared_ptr if not found.
|
||||
*/
|
||||
std::shared_ptr<ConnectionSockets> GetConnectionSockets(Id id) const
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
|
||||
|
||||
/**
|
||||
* The id to assign to the next created connection. Used to generate ids of connections.
|
||||
*/
|
||||
std::atomic<Id> m_next_id{0};
|
||||
|
||||
/**
|
||||
* Thread that sends to and receives from sockets and accepts connections.
|
||||
*/
|
||||
std::thread m_thread_socket_handler;
|
||||
|
||||
/**
|
||||
* Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`.
|
||||
*/
|
||||
std::thread m_thread_i2p_accept;
|
||||
|
||||
/**
|
||||
* Mutex protecting m_i2p_sam_sessions.
|
||||
*/
|
||||
Mutex m_unused_i2p_sessions_mutex;
|
||||
|
||||
/**
|
||||
* A pool of created I2P SAM transient sessions that should be used instead
|
||||
* of creating new ones in order to reduce the load on the I2P network.
|
||||
* Creating a session in I2P is not cheap, thus if this is not empty, then
|
||||
* pick an entry from it instead of creating a new session. If connecting to
|
||||
* a host fails, then the created session is put to this pool for reuse.
|
||||
*/
|
||||
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
|
||||
|
||||
/**
|
||||
* I2P SAM session.
|
||||
* Used to accept incoming and make outgoing I2P connections from a persistent
|
||||
* address.
|
||||
*/
|
||||
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
|
||||
|
||||
/**
|
||||
* List of listening sockets.
|
||||
*/
|
||||
std::vector<std::shared_ptr<Sock>> m_listen;
|
||||
|
||||
mutable Mutex m_connected_mutex;
|
||||
|
||||
/**
|
||||
* Sockets for existent connections.
|
||||
* The `shared_ptr` makes it possible to create a snapshot of this by simply copying
|
||||
* it (under `m_connected_mutex`).
|
||||
*/
|
||||
std::unordered_map<Id, std::shared_ptr<ConnectionSockets>> m_connected GUARDED_BY(m_connected_mutex);
|
||||
};
|
||||
|
||||
#endif // BITCOIN_COMMON_SOCKMAN_H
|
Loading…
Add table
Reference in a new issue