This commit is contained in:
Vasil Dimov 2025-04-29 12:01:43 +02:00 committed by GitHub
commit e83066c94a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1429 additions and 742 deletions

View file

@ -149,6 +149,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
common/run_command.cpp
common/settings.cpp
common/signmessage.cpp
common/sockman.cpp
common/system.cpp
common/url.cpp
compressor.cpp
@ -156,6 +157,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
core_write.cpp
deploymentinfo.cpp
external_signer.cpp
i2p.cpp
init/common.cpp
kernel/chainparams.cpp
key.cpp
@ -233,7 +235,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL
headerssync.cpp
httprpc.cpp
httpserver.cpp
i2p.cpp
index/base.cpp
index/blockfilterindex.cpp
index/coinstatsindex.cpp

535
src/common/sockman.cpp Normal file
View file

@ -0,0 +1,535 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#include <bitcoin-build-config.h> // IWYU pragma: keep
#include <common/sockman.h>
#include <logging.h>
#include <netbase.h>
#include <util/sock.h>
#include <util/thread.h>
#include <cassert>
// The set of sockets cannot be modified while waiting
// The sleep time needs to be small to avoid new sockets stalling
static constexpr auto SELECT_TIMEOUT{50ms};
/** Get the bind address for a socket as CService. */
static CService GetBindAddress(const Sock& sock)
{
CService addr_bind;
struct sockaddr_storage sockaddr_bind;
socklen_t sockaddr_bind_len = sizeof(sockaddr_bind);
if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) {
addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len);
} else {
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n");
}
return addr_bind;
}
bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg)
{
// Create socket for listening for incoming connections
sockaddr_storage storage;
socklen_t len{sizeof(storage)};
if (!to.GetSockAddr(reinterpret_cast<sockaddr*>(&storage), &len)) {
err_msg = Untranslated(strprintf("Bind address family for %s not supported", to.ToStringAddrPort()));
return false;
}
std::unique_ptr<Sock> sock{CreateSock(to.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP)};
if (!sock) {
err_msg = Untranslated(strprintf("Cannot create %s listen socket: %s",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError())));
return false;
}
int one{1};
// Allow binding if the port is still in TIME_WAIT state after
// the program was closed and restarted.
if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set SO_REUSEADDR on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
// some systems don't have IPV6_V6ONLY but are always v6only; others do have the option
// and enable it by default or not. Try to enable it, if possible.
if (to.IsIPv6()) {
#ifdef IPV6_V6ONLY
if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set IPV6_V6ONLY on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
#endif
#ifdef WIN32
int prot_level{PROTECTION_LEVEL_UNRESTRICTED};
if (sock->SetSockOpt(IPPROTO_IPV6,
IPV6_PROTECTION_LEVEL,
reinterpret_cast<const char*>(&prot_level),
sizeof(prot_level)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set IPV6_PROTECTION_LEVEL on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
#endif
}
if (sock->Bind(reinterpret_cast<sockaddr*>(&storage), len) == SOCKET_ERROR) {
const int err{WSAGetLastError()};
if (err == WSAEADDRINUSE) {
err_msg = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."),
to.ToStringAddrPort(),
CLIENT_NAME);
} else {
err_msg = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"),
to.ToStringAddrPort(),
NetworkErrorString(err));
}
return false;
}
// Listen for incoming connections
if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) {
err_msg = strprintf(_("Cannot listen on %s: %s"), to.ToStringAddrPort(), NetworkErrorString(WSAGetLastError()));
return false;
}
m_listen.emplace_back(std::move(sock));
return true;
}
void SockMan::StartSocketsThreads(const Options& options)
{
m_thread_socket_handler = std::thread(
&util::TraceThread, options.socket_handler_thread_name, [this] { ThreadSocketHandler(); });
if (options.i2p.has_value()) {
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(
options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet);
m_thread_i2p_accept =
std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); });
}
}
void SockMan::JoinSocketsThreads()
{
if (m_thread_i2p_accept.joinable()) {
m_thread_i2p_accept.join();
}
if (m_thread_socket_handler.joinable()) {
m_thread_socket_handler.join();
}
}
std::optional<SockMan::Id>
SockMan::ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
bool is_important,
std::optional<Proxy> proxy,
bool& proxy_failed,
CService& me)
{
AssertLockNotHeld(m_connected_mutex);
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
std::unique_ptr<Sock> sock;
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
Assume(!me.IsValid());
if (std::holds_alternative<CService>(to)) {
const CService& addr_to{std::get<CService>(to)};
if (addr_to.IsI2P()) {
if (!Assume(proxy.has_value())) {
return std::nullopt;
}
i2p::Connection conn;
bool connected{false};
if (m_i2p_sam_session) {
connected = m_i2p_sam_session->Connect(addr_to, conn, proxy_failed);
} else {
{
LOCK(m_unused_i2p_sessions_mutex);
if (m_unused_i2p_sessions.empty()) {
i2p_transient_session = std::make_unique<i2p::sam::Session>(proxy.value(), &interruptNet);
} else {
i2p_transient_session.swap(m_unused_i2p_sessions.front());
m_unused_i2p_sessions.pop();
}
}
connected = i2p_transient_session->Connect(addr_to, conn, proxy_failed);
if (!connected) {
LOCK(m_unused_i2p_sessions_mutex);
if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) {
m_unused_i2p_sessions.emplace(i2p_transient_session.release());
}
}
}
if (connected) {
sock = std::move(conn.sock);
me = conn.me;
}
} else if (proxy.has_value()) {
sock = ConnectThroughProxy(proxy.value(), addr_to.ToStringAddr(), addr_to.GetPort(), proxy_failed);
} else {
sock = ConnectDirectly(addr_to, is_important);
}
} else {
if (!Assume(proxy.has_value())) {
return std::nullopt;
}
const auto& hostport{std::get<StringHostIntPort>(to)};
bool dummy_proxy_failed;
sock = ConnectThroughProxy(proxy.value(), hostport.host, hostport.port, dummy_proxy_failed);
}
if (!sock) {
return std::nullopt;
}
if (!me.IsValid()) {
me = GetBindAddress(*sock);
}
const Id id{GetNewId()};
{
LOCK(m_connected_mutex);
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock),
std::move(i2p_transient_session)));
}
return id;
}
bool SockMan::CloseConnection(Id id)
{
LOCK(m_connected_mutex);
return m_connected.erase(id) > 0;
}
ssize_t SockMan::SendBytes(Id id,
std::span<const unsigned char> data,
bool will_send_more,
std::string& errmsg) const
{
AssertLockNotHeld(m_connected_mutex);
if (data.empty()) {
return 0;
}
auto sockets{GetConnectionSockets(id)};
if (!sockets) {
// Bail out immediately and just leave things in the caller's send queue.
return 0;
}
int flags{MSG_NOSIGNAL | MSG_DONTWAIT};
#ifdef MSG_MORE
if (will_send_more) {
flags |= MSG_MORE;
}
#endif
const ssize_t sent{WITH_LOCK(
sockets->mutex,
return sockets->sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);)};
if (sent >= 0) {
return sent;
}
const int err{WSAGetLastError()};
if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) {
return 0;
}
errmsg = NetworkErrorString(err);
return -1;
}
void SockMan::StopListening()
{
m_listen.clear();
}
bool SockMan::ShouldTryToSend(Id id) const { return true; }
bool SockMan::ShouldTryToRecv(Id id) const { return true; }
void SockMan::EventIOLoopCompletedForOne(Id id) {}
void SockMan::EventIOLoopCompletedForAll() {}
void SockMan::EventI2PStatus(const CService&, I2PStatus) {}
void SockMan::TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
{
LOCK(m_connected_mutex);
const auto result{m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)))};
assert(result.second);
}
void SockMan::ThreadI2PAccept()
{
AssertLockNotHeld(m_connected_mutex);
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
i2p::Connection conn;
auto SleepOnFailure = [&]() {
interruptNet.sleep_for(err_wait);
if (err_wait < err_wait_cap) {
err_wait += 1s;
}
};
while (!interruptNet) {
if (!m_i2p_sam_session->Listen(conn)) {
EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING);
SleepOnFailure();
continue;
}
EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING);
if (!m_i2p_sam_session->Accept(conn)) {
SleepOnFailure();
continue;
}
Assume(conn.me.IsI2P());
Assume(conn.peer.IsI2P());
NewSockAccepted(std::move(conn.sock), conn.me, conn.peer);
err_wait = err_wait_begin;
}
}
void SockMan::ThreadSocketHandler()
{
AssertLockNotHeld(m_connected_mutex);
while (!interruptNet) {
EventIOLoopCompletedForAll();
// Check for the readiness of the already connected sockets and the
// listening sockets in one call ("readiness" as in poll(2) or
// select(2)). If none are ready, wait for a short while and return
// empty sets.
auto io_readiness{GenerateWaitSockets()};
if (io_readiness.events_per_sock.empty() ||
// WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant.
!io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT,
io_readiness.events_per_sock)) {
interruptNet.sleep_for(SELECT_TIMEOUT);
}
// Service (send/receive) each of the already connected sockets.
SocketHandlerConnected(io_readiness);
// Accept new connections from listening sockets.
SocketHandlerListening(io_readiness.events_per_sock);
}
}
std::unique_ptr<Sock> SockMan::AcceptConnection(const Sock& listen_sock, CService& addr)
{
sockaddr_storage storage;
socklen_t len{sizeof(storage)};
auto sock{listen_sock.Accept(reinterpret_cast<sockaddr*>(&storage), &len)};
if (!sock) {
const int err{WSAGetLastError()};
if (err != WSAEWOULDBLOCK) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Error,
"Cannot accept new connection: %s\n",
NetworkErrorString(err));
}
return {};
}
if (!addr.SetSockAddr(reinterpret_cast<sockaddr*>(&storage), len)) {
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n");
}
return sock;
}
void SockMan::NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
{
AssertLockNotHeld(m_connected_mutex);
if (!sock->IsSelectable()) {
LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort());
return;
}
// According to the internet TCP_NODELAY is not carried into accepted sockets
// on all platforms. Set it again here just to be sure.
const int on{1};
if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) {
LogDebug(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n",
them.ToStringAddrPort());
}
const Id id{GetNewId()};
{
LOCK(m_connected_mutex);
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)));
}
if (!EventNewConnectionAccepted(id, me, them)) {
CloseConnection(id);
}
}
SockMan::Id SockMan::GetNewId()
{
return m_next_id.fetch_add(1, std::memory_order_relaxed);
}
SockMan::IOReadiness SockMan::GenerateWaitSockets()
{
AssertLockNotHeld(m_connected_mutex);
IOReadiness io_readiness;
for (const auto& sock : m_listen) {
io_readiness.events_per_sock.emplace(sock, Sock::Events{Sock::RECV});
}
auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)};
for (const auto& [id, sockets] : connected_snapshot) {
const bool select_recv{ShouldTryToRecv(id)};
const bool select_send{ShouldTryToSend(id)};
if (!select_recv && !select_send) continue;
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
io_readiness.events_per_sock.emplace(sockets->sock, Sock::Events{event});
io_readiness.ids_per_sock.emplace(sockets->sock, id);
}
return io_readiness;
}
void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness)
{
AssertLockNotHeld(m_connected_mutex);
for (const auto& [sock, events] : io_readiness.events_per_sock) {
if (interruptNet) {
return;
}
auto it{io_readiness.ids_per_sock.find(sock)};
if (it == io_readiness.ids_per_sock.end()) {
continue;
}
const Id id{it->second};
bool send_ready = events.occurred & Sock::SEND; // Sock::SEND could only be set if ShouldTryToSend() has returned true in GenerateWaitSockets().
bool recv_ready = events.occurred & Sock::RECV; // Sock::RECV could only be set if ShouldTryToRecv() has returned true in GenerateWaitSockets().
bool err_ready = events.occurred & Sock::ERR;
if (send_ready) {
bool cancel_recv;
EventReadyToSend(id, cancel_recv);
if (cancel_recv) {
recv_ready = false;
}
}
if (recv_ready || err_ready) {
uint8_t buf[0x10000]; // typical socket buffer is 8K-64K
auto sockets{GetConnectionSockets(id)};
if (!sockets) {
continue;
}
const ssize_t nrecv{WITH_LOCK(
sockets->mutex,
return sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)};
if (nrecv < 0) { // In all cases (including -1 and 0) EventIOLoopCompletedForOne() should be executed after this, don't change the code to skip it.
const int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) {
EventGotPermanentReadError(id, NetworkErrorString(err));
}
} else if (nrecv == 0) {
EventGotEOF(id);
} else {
EventGotData(id, {buf, static_cast<size_t>(nrecv)});
}
}
EventIOLoopCompletedForOne(id);
}
}
void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
{
AssertLockNotHeld(m_connected_mutex);
for (const auto& sock : m_listen) {
if (interruptNet) {
return;
}
const auto it = events_per_sock.find(sock);
if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) {
CService addr_accepted;
auto sock_accepted{AcceptConnection(*sock, addr_accepted)};
if (sock_accepted) {
NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted);
}
}
}
}
std::shared_ptr<SockMan::ConnectionSockets> SockMan::GetConnectionSockets(Id id) const
{
LOCK(m_connected_mutex);
auto it{m_connected.find(id)};
if (it == m_connected.end()) {
// There is no socket in case we've already disconnected, or in test cases without
// real connections.
return {};
}
return it->second;
}

463
src/common/sockman.h Normal file
View file

@ -0,0 +1,463 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#ifndef BITCOIN_COMMON_SOCKMAN_H
#define BITCOIN_COMMON_SOCKMAN_H
#include <i2p.h>
#include <netaddress.h>
#include <netbase.h>
#include <util/fs.h>
#include <util/sock.h>
#include <util/translation.h>
#include <atomic>
#include <memory>
#include <optional>
#include <queue>
#include <span>
#include <thread>
#include <variant>
#include <vector>
/**
* A socket manager class which handles socket operations.
* To use this class, inherit from it and implement the pure virtual methods.
* Handled operations:
* - binding and listening on sockets
* - starting of necessary threads to process socket operations
* - accepting incoming connections
* - making outbound connections
* - closing connections
* - waiting for IO readiness on sockets and doing send/recv accordingly
*/
class SockMan
{
public:
/**
* Each connection is assigned an unique id of this type.
*/
using Id = int64_t;
/**
* Possible status changes that can be passed to `EventI2PStatus()`.
*/
enum class I2PStatus : uint8_t {
/// The listen succeeded and we are now listening for incoming I2P connections.
START_LISTENING,
/// The listen failed and now we are not listening (even if START_LISTENING was signaled before).
STOP_LISTENING,
};
virtual ~SockMan() = default;
//
// Non-virtual functions, to be reused by children classes.
//
/**
* Bind to a new address:port, start listening and add the listen socket to `m_listen`.
* Should be called before `StartSocketsThreads()`.
* @param[in] to Where to bind.
* @param[out] err_msg Error string if an error occurs.
* @retval true Success.
* @retval false Failure, `err_msg` will be set.
*/
bool BindAndStartListening(const CService& to, bilingual_str& err_msg);
/**
* Options to influence `StartSocketsThreads()`.
*/
struct Options {
std::string_view socket_handler_thread_name;
struct I2P {
explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name)
: private_key_file{file},
sam_proxy{proxy},
accept_thread_name{accept_thread_name}
{}
const fs::path private_key_file;
const Proxy sam_proxy;
const std::string_view accept_thread_name;
};
/**
* I2P options. If set then a thread will be started that will accept incoming I2P connections.
*/
std::optional<I2P> i2p;
};
/**
* Start the necessary threads for sockets IO.
*/
void StartSocketsThreads(const Options& options);
/**
* Join (wait for) the threads started by `StartSocketsThreads()` to exit.
*/
void JoinSocketsThreads();
/**
* A more readable std::tuple<std::string, uint16_t> for host and port.
*/
struct StringHostIntPort {
const std::string& host;
uint16_t port;
};
/**
* Make an outbound connection, save the socket internally and return a newly generated connection id.
* @param[in] to The address to connect to, either as CService or a host as string and port as
* an integer, if the later is used, then `proxy` must be valid.
* @param[in] is_important If true, then log failures with higher severity.
* @param[in] proxy Proxy to connect through, if set.
* @param[out] proxy_failed If `proxy` is valid and the connection failed because of the
* proxy, then it will be set to true.
* @param[out] me If the connection was successful then this is set to the address on the
* local side of the socket.
* @return Newly generated id, or std::nullopt if the operation fails.
*/
std::optional<SockMan::Id> ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
bool is_important,
std::optional<Proxy> proxy,
bool& proxy_failed,
CService& me)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex, !m_unused_i2p_sessions_mutex);
/**
* Destroy a given connection by closing its socket and release resources occupied by it.
* @param[in] id Connection to destroy.
* @return Whether the connection existed and its socket was closed by this call.
*/
bool CloseConnection(Id id)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Try to send some data over the given connection.
* @param[in] id Identifier of the connection.
* @param[in] data The data to send, it might happen that only a prefix of this is sent.
* @param[in] will_send_more Used as an optimization if the caller knows that they will
* be sending more data soon after this call.
* @param[out] errmsg If <0 is returned then this will contain a human readable message
* explaining the error.
* @retval >=0 The number of bytes actually sent.
* @retval <0 A permanent error has occurred.
*/
ssize_t SendBytes(Id id,
std::span<const unsigned char> data,
bool will_send_more,
std::string& errmsg) const
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Stop listening by closing all listening sockets.
*/
void StopListening();
/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
protected:
/**
* During some tests mocked sockets are created outside of `SockMan`, make it
* possible to add those so that send/recv can be exercised.
* @param[in] id Connection id to add.
* @param[in,out] sock Socket to associate with the added connection.
*/
void TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
private:
/**
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
* unexpectedly use too much memory.
*/
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
//
// Pure virtual functions must be implemented by children classes.
//
/**
* Be notified when a new connection has been accepted.
* @param[in] id Id of the newly accepted connection.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
* @retval true The new connection was accepted at the higher level.
* @retval false The connection was refused at the higher level, so the
* associated socket and id should be discarded by `SockMan`.
*/
virtual bool EventNewConnectionAccepted(Id id,
const CService& me,
const CService& them) = 0;
/**
* Called when the socket is ready to send data and `ShouldTryToSend()` has
* returned true. This is where the higher level code serializes its messages
* and calls `SockMan::SendBytes()`.
* @param[in] id Id of the connection whose socket is ready to send.
* @param[out] cancel_recv Should always be set upon return and if it is true,
* then the next attempt to receive data from that connection will be omitted.
*/
virtual void EventReadyToSend(Id id, bool& cancel_recv) = 0;
/**
* Called when new data has been received.
* @param[in] id Connection for which the data arrived.
* @param[in] data Received data.
*/
virtual void EventGotData(Id id, std::span<const uint8_t> data) = 0;
/**
* Called when the remote peer has sent an EOF on the socket. This is a graceful
* close of their writing side, we can still send and they will receive, if it
* makes sense at the application level.
* @param[in] id Connection whose socket got EOF.
*/
virtual void EventGotEOF(Id id) = 0;
/**
* Called when we get an irrecoverable error trying to read from a socket.
* @param[in] id Connection whose socket got an error.
* @param[in] errmsg Message describing the error.
*/
virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) = 0;
//
// Non-pure virtual functions can be overridden by children classes or left
// alone to use the default implementation from SockMan.
//
/**
* Can be used to temporarily pause sends on a connection.
* SockMan would only call EventReadyToSend() if this returns true.
* The implementation in SockMan always returns true.
* @param[in] id Connection for which to confirm or omit the next call to EventReadyToSend().
*/
virtual bool ShouldTryToSend(Id id) const;
/**
* SockMan would only call Recv() on a connection's socket if this returns true.
* Can be used to temporarily pause receives on a connection.
* The implementation in SockMan always returns true.
* @param[in] id Connection for which to confirm or omit the next receive.
*/
virtual bool ShouldTryToRecv(Id id) const;
/**
* SockMan has completed the current send+recv iteration for a given connection.
* It will do another send+recv for this connection after processing all other connections.
* Can be used to execute periodic tasks for a given connection.
* The implementation in SockMan does nothing.
* @param[in] id Connection for which send+recv has been done.
*/
virtual void EventIOLoopCompletedForOne(Id id);
/**
* SockMan has completed send+recv for all connections.
* Can be used to execute periodic tasks for all connections, like closing
* connections due to higher level logic.
* The implementation in SockMan does nothing.
*/
virtual void EventIOLoopCompletedForAll();
/**
* Be notified of a change in the state of the I2P connectivity.
* The default behavior, implemented by `SockMan`, is to ignore this event.
* @param[in] addr The address we started or stopped listening on.
* @param[in] new_status New status.
*/
virtual void EventI2PStatus(const CService& addr, I2PStatus new_status);
/**
* The sockets used by a connection - a data socket and an optional I2P session socket.
*/
struct ConnectionSockets {
explicit ConnectionSockets(std::unique_ptr<Sock>&& s)
: sock{std::move(s)}
{
}
explicit ConnectionSockets(std::shared_ptr<Sock>&& s, std::unique_ptr<i2p::sam::Session>&& sess)
: sock{std::move(s)},
i2p_transient_session{std::move(sess)}
{
}
/**
* Mutex that serializes the Send() and Recv() calls on `sock`.
*/
Mutex mutex;
/**
* Underlying socket.
* `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the
* underlying file descriptor by one thread while another thread is poll(2)-ing
* it for activity.
* @see https://github.com/bitcoin/bitcoin/issues/21744 for details.
*/
std::shared_ptr<Sock> sock;
/**
* When transient I2P sessions are used, then each connection has its own session, otherwise
* all connections use the session from `m_i2p_sam_session` and share the same I2P address.
* I2P sessions involve a data/transport socket (in `sock`) and a control socket
* (in `i2p_transient_session`). For transient sessions, once the data socket `sock` is
* closed, the control socket is not going to be used anymore and would be just taking
* resources. Storing it here makes its deletion together with `sock` automatic.
*/
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
};
/**
* Info about which socket has which event ready and its connection id.
*/
struct IOReadiness {
/**
* Map of socket -> socket events. For example:
* socket1 -> { requested = SEND|RECV, occurred = RECV }
* socket2 -> { requested = SEND, occurred = SEND }
*/
Sock::EventsPerSock events_per_sock;
/**
* Map of socket -> connection id (in `m_connected`). For example
* socket1 -> id=23
* socket2 -> id=56
*/
std::unordered_map<Sock::EventsPerSock::key_type,
SockMan::Id,
Sock::HashSharedPtrSock,
Sock::EqualSharedPtrSock>
ids_per_sock;
};
/**
* Accept incoming I2P connections in a loop and call
* `EventNewConnectionAccepted()` for each new connection.
*/
void ThreadI2PAccept()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void ThreadSocketHandler()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Accept a connection.
* @param[in] listen_sock Socket on which to accept the connection.
* @param[out] addr Address of the peer that was accepted.
* @return Newly created socket for the accepted connection.
*/
std::unique_ptr<Sock> AcceptConnection(const Sock& listen_sock, CService& addr);
/**
* After a new socket with a peer has been created, configure its flags,
* make a new connection id and call `EventNewConnectionAccepted()`.
* @param[in] sock The newly created socket.
* @param[in] me Address at our end of the connection.
* @param[in] them Address of the new peer.
*/
void NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Generate an id for a newly created connection.
*/
Id GetNewId();
/**
* Generate a collection of sockets to check for IO readiness.
* @return Sockets to check for readiness plus an aux map to find the
* corresponding connection id given a socket.
*/
IOReadiness GenerateWaitSockets()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] io_readiness Which sockets are ready and their connection ids.
*/
void SocketHandlerConnected(const IOReadiness& io_readiness)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Retrieve an entry from m_connected.
* @param[in] id Connection id to search for.
* @return ConnectionSockets for the given connection id or empty shared_ptr if not found.
*/
std::shared_ptr<ConnectionSockets> GetConnectionSockets(Id id) const
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* The id to assign to the next created connection. Used to generate ids of connections.
*/
std::atomic<Id> m_next_id{0};
/**
* Thread that sends to and receives from sockets and accepts connections.
*/
std::thread m_thread_socket_handler;
/**
* Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`.
*/
std::thread m_thread_i2p_accept;
/**
* Mutex protecting m_i2p_sam_sessions.
*/
Mutex m_unused_i2p_sessions_mutex;
/**
* A pool of created I2P SAM transient sessions that should be used instead
* of creating new ones in order to reduce the load on the I2P network.
* Creating a session in I2P is not cheap, thus if this is not empty, then
* pick an entry from it instead of creating a new session. If connecting to
* a host fails, then the created session is put to this pool for reuse.
*/
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
/**
* I2P SAM session.
* Used to accept incoming and make outgoing I2P connections from a persistent
* address.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
/**
* List of listening sockets.
*/
std::vector<std::shared_ptr<Sock>> m_listen;
mutable Mutex m_connected_mutex;
/**
* Sockets for existent connections.
* The `shared_ptr` makes it possible to create a snapshot of this by simply copying
* it (under `m_connected_mutex`).
*/
std::unordered_map<Id, std::shared_ptr<ConnectionSockets>> m_connected GUARDED_BY(m_connected_mutex);
};
#endif // BITCOIN_COMMON_SOCKMAN_H

File diff suppressed because it is too large Load diff

200
src/net.h
View file

@ -9,6 +9,7 @@
#include <bip324.h>
#include <chainparams.h>
#include <common/bloom.h>
#include <common/sockman.h>
#include <compat/compat.h>
#include <consensus/amount.h>
#include <crypto/siphash.h>
@ -41,8 +42,8 @@
#include <map>
#include <memory>
#include <optional>
#include <queue>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@ -94,7 +95,7 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
static constexpr bool DEFAULT_V2_TRANSPORT{true};
typedef int64_t NodeId;
using NodeId = SockMan::Id;
struct AddedNodeParams {
std::string m_added_node;
@ -662,7 +663,6 @@ public:
struct CNodeOptions
{
NetPermissionFlags permission_flags = NetPermissionFlags::None;
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
bool prefer_evict = false;
size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000};
bool use_v2transport = false;
@ -678,16 +678,6 @@ public:
const NetPermissionFlags m_permission_flags;
/**
* Socket used for communication with the node.
* May not own a Sock object (after `CloseSocketDisconnect()` or during tests).
* `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of
* the underlying file descriptor by one thread while another thread is
* poll(2)-ing it for activity.
* @see https://github.com/bitcoin/bitcoin/issues/21744 for details.
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);
/** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
/** Total number of bytes sent on the wire to this peer. */
@ -695,7 +685,6 @@ public:
/** Messages still to be fed to m_transport->SetMessageToSend. */
std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
Mutex cs_vSend;
Mutex m_sock_mutex;
Mutex cs_vRecv;
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
@ -879,7 +868,6 @@ public:
std::atomic<std::chrono::microseconds> m_min_ping_time{std::chrono::microseconds::max()};
CNode(NodeId id,
std::shared_ptr<Sock> sock,
const CAddress& addrIn,
uint64_t nKeyedNetGroupIn,
uint64_t nLocalHostNonceIn,
@ -941,8 +929,6 @@ public:
nRefCount--;
}
void CloseSocketDisconnect() EXCLUSIVE_LOCKS_REQUIRED(!m_sock_mutex);
void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv);
std::string ConnectionTypeAsString() const { return ::ConnectionTypeAsString(m_conn_type); }
@ -987,18 +973,6 @@ private:
mapMsgTypeSize mapSendBytesPerMsgType GUARDED_BY(cs_vSend);
mapMsgTypeSize mapRecvBytesPerMsgType GUARDED_BY(cs_vRecv);
/**
* If an I2P session is created per connection (for outbound transient I2P
* connections) then it is stored here so that it can be destroyed when the
* socket is closed. I2P sessions involve a data/transport socket (in `m_sock`)
* and a control socket (in `m_i2p_sam_session`). For transient sessions, once
* the data socket is closed, the control socket is not going to be used anymore
* and is just taking up resources. So better close it as soon as `m_sock` is
* closed.
* Otherwise this unique_ptr is empty.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex);
};
/**
@ -1048,7 +1022,7 @@ protected:
~NetEventsInterface() = default;
};
class CConnman
class CConnman : private SockMan
{
public:
@ -1136,7 +1110,7 @@ public:
bool GetNetworkActive() const { return fNetworkActive; };
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
void SetNetworkActive(bool active);
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport);
bool CheckIncomingNonce(uint64_t nonce);
void ASMapHealthCheck();
@ -1151,7 +1125,7 @@ public:
void ForEachNode(const NodeFn& func)
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
for (auto& [_, node] : m_nodes) {
if (NodeFullyConnected(node))
func(node);
}
@ -1160,7 +1134,7 @@ public:
void ForEachNode(const NodeFn& func) const
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
for (auto& [_, node] : m_nodes) {
if (NodeFullyConnected(node))
func(node);
}
@ -1221,7 +1195,7 @@ public:
* - Max total outbound connection capacity filled
* - Max connection capacity for type is filled
*/
bool AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
bool AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport);
size_t GetNodeCount(ConnectionDirection) const;
std::map<CNetAddr, LocalServiceInfo> getNetLocalAddresses() const;
@ -1273,81 +1247,71 @@ public:
bool MultipleManualOrFullOutboundConns(Network net) const EXCLUSIVE_LOCKS_REQUIRED(m_nodes_mutex);
private:
struct ListenSocket {
public:
std::shared_ptr<Sock> sock;
inline void AddSocketPermissionFlags(NetPermissionFlags& flags) const { NetPermissions::AddFlag(flags, m_permissions); }
ListenSocket(std::shared_ptr<Sock> sock_, NetPermissionFlags permissions_)
: sock{sock_}, m_permissions{permissions_}
{
}
private:
NetPermissionFlags m_permissions;
};
//! returns the time left in the current max outbound cycle
//! in case of no limit, it will always return 0
std::chrono::seconds GetMaxOutboundTimeLeftInCycle_() const EXCLUSIVE_LOCKS_REQUIRED(m_total_bytes_sent_mutex);
bool BindListenPort(const CService& bindAddr, bilingual_str& strError, NetPermissionFlags permissions);
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
bool InitBinds(const Options& options);
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_reconnections_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
void ThreadOpenConnections(std::vector<std::string> connect, std::span<const std::string> seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ThreadOpenConnections(std::vector<std::string> connect, std::span<const std::string> seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_reconnections_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming();
void AcceptConnection(const ListenSocket& hListenSocket);
/// Whether we are currently advertising our I2P address (via `AddLocal()`).
bool m_i2p_advertising_listen_addr{false};
virtual void EventI2PStatus(const CService& addr, SockMan::I2PStatus new_status) override;
/**
* Create a `CNode` object from a socket that has just been accepted and add the node to
* the `m_nodes` member.
* @param[in] sock Connected socket to communicate with the peer.
* @param[in] permission_flags The peer's permissions.
* @param[in] addr_bind The address and port at our side of the connection.
* @param[in] addr The address and port at the peer's side of the connection.
* Create a `CNode` object and add it to the `m_nodes` member.
* @param[in] id Id of the newly accepted connection.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
* @retval true on success
* @retval false on failure, meaning that the associated socket and node_id should be discarded
*/
void CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
NetPermissionFlags permission_flags,
const CService& addr_bind,
const CService& addr);
virtual bool EventNewConnectionAccepted(SockMan::Id id,
const CService& me,
const CService& them) override;
/**
* Mark a node as disconnected and close its connection with the peer.
* @param[in] node Node to disconnect.
*/
void MarkAsDisconnectAndCloseConnection(CNode& node);
void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex);
void NotifyNumConnectionsChanged();
/** Return true if the peer is inactive and should be disconnected. */
bool InactivityCheck(const CNode& node) const;
/**
* Generate a collection of sockets to check for IO readiness.
* @param[in] nodes Select from these nodes' sockets.
* @return sockets to check for readiness
*/
Sock::EventsPerSock GenerateWaitSockets(std::span<CNode* const> nodes);
void EventReadyToSend(SockMan::Id id, bool& cancel_recv) override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
virtual void EventGotData(SockMan::Id id, std::span<const uint8_t> data) override
EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc, !m_nodes_mutex);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] nodes Nodes to process. The socket of each node is checked against `what`.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
void SocketHandlerConnected(const std::vector<CNode*>& nodes,
const Sock::EventsPerSock& events_per_sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
virtual void EventGotEOF(SockMan::Id id) override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock);
virtual void EventGotPermanentReadError(SockMan::Id id, const std::string& errmsg) override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
virtual bool ShouldTryToSend(SockMan::Id id) const override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
virtual bool ShouldTryToRecv(SockMan::Id id) const override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
virtual void EventIOLoopCompletedForOne(SockMan::Id id) override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
virtual void EventIOLoopCompletedForAll() override
EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex, !m_reconnections_mutex);
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex);
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
uint64_t CalculateKeyedNetGroup(const CNetAddr& ad) const;
@ -1363,15 +1327,14 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector<NetWhitelistPermissions>& ranges) const;
void DeleteNode(CNode* pnode);
NodeId GetNewNodeId();
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
std::pair<size_t, bool> SendMessagesAsBytes(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend)
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex);
void DumpAddresses();
@ -1430,7 +1393,11 @@ private:
unsigned int nSendBufferMaxSize{0};
unsigned int nReceiveFloodSize{0};
std::vector<ListenSocket> vhListenSocket;
/**
* Permissions that incoming peers get based on our listening address they connected to.
*/
std::unordered_map<CService, NetPermissionFlags, CServiceHash> m_listen_permissions;
std::atomic<bool> fNetworkActive{true};
bool fAddressesInitialized{false};
AddrMan& addrman;
@ -1441,11 +1408,12 @@ private:
// connection string and whether to use v2 p2p
std::vector<AddedNodeParams> m_added_node_params GUARDED_BY(m_added_nodes_mutex);
CNode* GetNodeById(NodeId node_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
std::unordered_map<NodeId, CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
std::list<CNode*> m_nodes_disconnected;
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
// Stores number of full-tx connections (outbound and manual) per network
@ -1540,27 +1508,10 @@ private:
Mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc{false};
/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
/**
* I2P SAM session.
* Used to accept incoming and make outgoing I2P connections from a persistent
* address.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
std::thread threadI2PAcceptIncoming;
/** flag for deciding to connect to an extra outbound peer,
* in excess of m_max_outbound_full_relay
@ -1591,20 +1542,6 @@ private:
*/
bool whitelist_relay;
/**
* Mutex protecting m_i2p_sam_sessions.
*/
Mutex m_unused_i2p_sessions_mutex;
/**
* A pool of created I2P SAM transient sessions that should be used instead
* of creating new ones in order to reduce the load on the I2P network.
* Creating a session in I2P is not cheap, thus if this is not empty, then
* pick an entry from it instead of creating a new session. If connecting to
* a host fails, then the created session is put to this pool for reuse.
*/
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
/**
* Mutex protecting m_reconnections.
*/
@ -1626,13 +1563,7 @@ private:
std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex);
/** Attempt reconnections, if m_reconnections non-empty. */
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_unused_i2p_sessions_mutex);
/**
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
* unexpectedly use too much memory.
*/
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex);
/**
* RAII helper to atomically create a copy of `m_nodes` and add a reference
@ -1645,8 +1576,9 @@ private:
{
{
LOCK(connman.m_nodes_mutex);
m_nodes_copy = connman.m_nodes;
for (auto& node : m_nodes_copy) {
m_nodes_copy.reserve(connman.m_nodes.size());
for (auto& [_, node] : connman.m_nodes) {
m_nodes_copy.push_back(node);
node->AddRef();
}
}

View file

@ -5112,10 +5112,15 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
m_connman.ForEachNode([&](CNode* pnode) {
if (!pnode->IsBlockOnlyConn() || pnode->fDisconnect) return;
if (pnode->GetId() > youngest_peer.first) {
next_youngest_peer = youngest_peer;
youngest_peer.first = pnode->GetId();
youngest_peer.second = pnode->m_last_block_time;
if (pnode->GetId() > next_youngest_peer.first) {
if (pnode->GetId() > youngest_peer.first) {
next_youngest_peer = youngest_peer;
youngest_peer.first = pnode->GetId();
youngest_peer.second = pnode->m_last_block_time;
} else {
next_youngest_peer.first = pnode->GetId();
next_youngest_peer.second = pnode->m_last_block_time;
}
}
});
NodeId to_disconnect = youngest_peer.first;

View file

@ -87,7 +87,7 @@ public:
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Pre-register peer=%d\n", peer_id);
const uint64_t local_salt{FastRandomContext().rand64()};
// We do this exactly once per peer (which are unique by NodeId, see GetNewNodeId) so it's
// We do this exactly once per peer (which are unique by id, see SockMan::GetNewId()) so it's
// safe to assume we don't have this record yet.
Assume(m_states.emplace(peer_id, local_salt).second);
return local_salt;

View file

@ -203,6 +203,10 @@ static RPCHelpMan getpeerinfo()
std::vector<CNodeStats> vstats;
connman.GetNodeStats(vstats);
std::sort(vstats.begin(), vstats.end(), [](const CNodeStats& a, const CNodeStats& b) {
return a.nodeid < b.nodeid;
});
UniValue ret(UniValue::VARR);
for (const CNodeStats& stats : vstats) {

View file

@ -55,7 +55,6 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
NodeId id{0};
CNode dummyNode1{id++,
/*sock=*/nullptr,
addr1,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -121,7 +120,6 @@ void AddRandomOutboundPeer(NodeId& id, std::vector<CNode*>& vNodes, PeerManager&
}
vNodes.emplace_back(new CNode{id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -320,7 +318,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
banman->ClearBanned();
NodeId id{0};
nodes[0] = new CNode{id++,
/*sock=*/nullptr,
addr[0],
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -340,7 +337,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
BOOST_CHECK(!banman->IsDiscouraged(other_addr)); // Different address, not discouraged
nodes[1] = new CNode{id++,
/*sock=*/nullptr,
addr[1],
/*nKeyedNetGroupIn=*/1,
/*nLocalHostNonceIn=*/1,
@ -370,7 +366,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
// Make sure non-IP peers are discouraged and disconnected properly.
nodes[2] = new CNode{id++,
/*sock=*/nullptr,
addr[2],
/*nKeyedNetGroupIn=*/1,
/*nLocalHostNonceIn=*/1,
@ -412,7 +407,6 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
CAddress addr(ip(0xa0b0c001), NODE_NONE);
NodeId id{0};
CNode dummyNode{id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/4,
/*nLocalHostNonceIn=*/4,

View file

@ -64,13 +64,15 @@ FUZZ_TARGET(connman, .init = initialize_connman)
connman.Init(options);
CNetAddr random_netaddr;
CNode random_node = ConsumeNode(fuzzed_data_provider);
NodeId node_id{0};
CNode& random_node{*ConsumeNodeAsUniquePtr(fuzzed_data_provider, node_id++).release()};
connman.AddTestNode(random_node, std::make_unique<FuzzedSock>(fuzzed_data_provider));
CSubNet random_subnet;
std::string random_string;
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 100) {
CNode& p2p_node{*ConsumeNodeAsUniquePtr(fuzzed_data_provider).release()};
connman.AddTestNode(p2p_node);
CNode& p2p_node{*ConsumeNodeAsUniquePtr(fuzzed_data_provider, node_id++).release()};
connman.AddTestNode(p2p_node, std::make_unique<FuzzedSock>(fuzzed_data_provider));
}
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) {
@ -103,6 +105,15 @@ FUZZ_TARGET(connman, .init = initialize_connman)
[&] {
connman.DisconnectNode(random_subnet);
},
[&] {
if (fuzzed_data_provider.ConsumeBool()) {
auto nonexistent_node{ConsumeNodeAsUniquePtr(fuzzed_data_provider, node_id++)};
connman.MarkAsDisconnectAndCloseConnection(*nonexistent_node);
} else {
CNode& existent_node{*connman.TestNodes().begin()->second};
connman.MarkAsDisconnectAndCloseConnection(existent_node);
}
},
[&] {
connman.ForEachNode([](auto) {});
},

View file

@ -42,9 +42,6 @@ FUZZ_TARGET(net, .init = initialize_net)
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) {
CallOneOf(
fuzzed_data_provider,
[&] {
node.CloseSocketDisconnect();
},
[&] {
CNodeStats stats;
node.CopyStats(stats);

View file

@ -65,7 +65,7 @@ FUZZ_TARGET(p2p_handshake, .init = ::initialize)
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
for (int i = 0; i < num_peers_to_add; ++i) {
peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release());
connman.AddTestNode(*peers.back());
connman.AddTestNode(*peers.back(), std::make_unique<FuzzedSock>(fuzzed_data_provider));
peerman->InitializeNode(
*peers.back(),
static_cast<ServiceFlags>(fuzzed_data_provider.ConsumeIntegral<uint64_t>()));

View file

@ -70,7 +70,7 @@ void HeadersSyncSetup::ResetAndInitialize()
for (auto conn_type : conn_types) {
CAddress addr{};
m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false));
m_connections.push_back(new CNode(id++, addr, 0, 0, addr, "", conn_type, false));
CNode& p2p_node = *m_connections.back();
connman.Handshake(

View file

@ -68,7 +68,7 @@ FUZZ_TARGET(process_message, .init = initialize_process_message)
}
CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider).release();
connman.AddTestNode(p2p_node);
connman.AddTestNode(p2p_node, std::make_unique<FuzzedSock>(fuzzed_data_provider));
FillNode(fuzzed_data_provider, connman, p2p_node);
const auto mock_time = ConsumeTime(fuzzed_data_provider);

View file

@ -60,7 +60,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
FillNode(fuzzed_data_provider, connman, p2p_node);
connman.AddTestNode(p2p_node);
connman.AddTestNode(p2p_node, std::make_unique<FuzzedSock>(fuzzed_data_provider));
}
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 30)

View file

@ -231,7 +231,6 @@ template <bool ReturnUniquePtr = false>
auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<NodeId>& node_id_in = std::nullopt) noexcept
{
const NodeId node_id = node_id_in.value_or(fuzzed_data_provider.ConsumeIntegralInRange<NodeId>(0, std::numeric_limits<NodeId>::max()));
const auto sock = std::make_shared<FuzzedSock>(fuzzed_data_provider);
const CAddress address = ConsumeAddress(fuzzed_data_provider);
const uint64_t keyed_net_group = fuzzed_data_provider.ConsumeIntegral<uint64_t>();
const uint64_t local_host_nonce = fuzzed_data_provider.ConsumeIntegral<uint64_t>();
@ -242,7 +241,6 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
NetPermissionFlags permission_flags = ConsumeWeakEnum(fuzzed_data_provider, ALL_NET_PERMISSION_FLAGS);
if constexpr (ReturnUniquePtr) {
return std::make_unique<CNode>(node_id,
sock,
address,
keyed_net_group,
local_host_nonce,
@ -253,7 +251,6 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
CNodeOptions{ .permission_flags = permission_flags });
} else {
return CNode{node_id,
sock,
address,
keyed_net_group,
local_host_nonce,

View file

@ -65,7 +65,6 @@ void AddPeer(NodeId& id, std::vector<CNode*>& nodes, PeerManager& peerman, Connm
const bool inbound_onion{onion_peer && conn_type == ConnectionType::INBOUND};
nodes.emplace_back(new CNode{++id,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -117,9 +116,9 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
BOOST_CHECK_EQUAL(nodes.back()->ConnectedThroughNetwork(), Network::NET_CJDNS);
BOOST_TEST_MESSAGE("Call AddNode() for all the peers");
for (auto node : connman->TestNodes()) {
for (const auto& [id, node] : connman->TestNodes()) {
BOOST_CHECK(connman->AddNode({/*m_added_node=*/node->addr.ToStringAddrPort(), /*m_use_v2transport=*/true}));
BOOST_TEST_MESSAGE(strprintf("peer id=%s addr=%s", node->GetId(), node->addr.ToStringAddrPort()));
BOOST_TEST_MESSAGE(strprintf("peer id=%s addr=%s", id, node->addr.ToStringAddrPort()));
}
BOOST_TEST_MESSAGE("\nCall AddNode() with 2 addrs resolving to existing localhost addnode entry; neither should be added");
@ -134,7 +133,7 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
BOOST_CHECK(connman->GetAddedNodeInfo(/*include_connected=*/false).empty());
// Test AddedNodesContain()
for (auto node : connman->TestNodes()) {
for (const auto& [_, node] : connman->TestNodes()) {
BOOST_CHECK(connman->AddedNodesContain(node->addr));
}
AddPeer(id, nodes, *peerman, *connman, ConnectionType::OUTBOUND_FULL_RELAY);
@ -151,12 +150,12 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
}
BOOST_TEST_MESSAGE("\nCheck that all connected peers are correctly detected as connected");
for (auto node : connman->TestNodes()) {
for (const auto& [_, node] : connman->TestNodes()) {
BOOST_CHECK(connman->AlreadyConnectedPublic(node->addr));
}
// Clean up
for (auto node : connman->TestNodes()) {
for (const auto& [_, node] : connman->TestNodes()) {
peerman->FinalizeNode(*node);
}
connman->ClearTestNodes();

View file

@ -60,7 +60,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test)
std::string pszDest;
std::unique_ptr<CNode> pnode1 = std::make_unique<CNode>(id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -78,7 +77,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test)
BOOST_CHECK_EQUAL(pnode1->ConnectedThroughNetwork(), Network::NET_IPV4);
std::unique_ptr<CNode> pnode2 = std::make_unique<CNode>(id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/1,
/*nLocalHostNonceIn=*/1,
@ -96,7 +94,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test)
BOOST_CHECK_EQUAL(pnode2->ConnectedThroughNetwork(), Network::NET_IPV4);
std::unique_ptr<CNode> pnode3 = std::make_unique<CNode>(id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -114,7 +111,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test)
BOOST_CHECK_EQUAL(pnode3->ConnectedThroughNetwork(), Network::NET_IPV4);
std::unique_ptr<CNode> pnode4 = std::make_unique<CNode>(id++,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/1,
/*nLocalHostNonceIn=*/1,
@ -606,7 +602,6 @@ BOOST_AUTO_TEST_CASE(ipv4_peer_with_ipv6_addrMe_test)
ipv4AddrPeer.s_addr = 0xa0b0c001;
CAddress addr = CAddress(CService(ipv4AddrPeer, 7777), NODE_NETWORK);
std::unique_ptr<CNode> pnode = std::make_unique<CNode>(/*id=*/0,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -660,7 +655,6 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port)
in_addr peer_out_in_addr;
peer_out_in_addr.s_addr = htonl(0x01020304);
CNode peer_out{/*id=*/0,
/*sock=*/nullptr,
/*addrIn=*/CAddress{CService{peer_out_in_addr, 8333}, NODE_NETWORK},
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -681,7 +675,6 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port)
in_addr peer_in_in_addr;
peer_in_in_addr.s_addr = htonl(0x05060708);
CNode peer_in{/*id=*/0,
/*sock=*/nullptr,
/*addrIn=*/CAddress{CService{peer_in_in_addr, 8333}, NODE_NETWORK},
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -818,7 +811,6 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message)
in_addr peer_in_addr;
peer_in_addr.s_addr = htonl(0x01020304);
CNode peer{/*id=*/0,
/*sock=*/nullptr,
/*addrIn=*/CAddress{CService{peer_in_addr, 8333}, NODE_NETWORK},
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,
@ -893,7 +885,6 @@ BOOST_AUTO_TEST_CASE(advertise_local_address)
{
auto CreatePeer = [](const CAddress& addr) {
return std::make_unique<CNode>(/*id=*/0,
/*sock=*/nullptr,
addr,
/*nKeyedNetGroupIn=*/0,
/*nLocalHostNonceIn=*/0,

View file

@ -15,6 +15,7 @@
#include <node/eviction.h>
#include <span.h>
#include <sync.h>
#include <util/check.h>
#include <util/sock.h>
#include <algorithm>
@ -45,16 +46,23 @@ struct ConnmanTestMsg : public CConnman {
m_peer_connect_timeout = timeout;
}
std::vector<CNode*> TestNodes()
auto TestNodes()
{
LOCK(m_nodes_mutex);
return m_nodes;
}
void AddTestNode(CNode& node, std::unique_ptr<Sock>&& sock)
{
TestOnlyAddExistentConnection(node.GetId(), std::move(sock));
AddTestNode(node);
}
void AddTestNode(CNode& node)
{
LOCK(m_nodes_mutex);
m_nodes.push_back(&node);
auto [_, inserted] = m_nodes.emplace(node.GetId(), &node);
Assert(inserted);
if (node.IsManualOrFullOutboundConn()) ++m_network_conn_counts[node.addr.GetNetwork()];
}
@ -62,7 +70,7 @@ struct ConnmanTestMsg : public CConnman {
void ClearTestNodes()
{
LOCK(m_nodes_mutex);
for (CNode* node : m_nodes) {
for (auto& [_, node] : m_nodes) {
delete node;
}
m_nodes.clear();
@ -88,8 +96,9 @@ struct ConnmanTestMsg : public CConnman {
bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); };
CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type);
using CConnman::MarkAsDisconnectAndCloseConnection;
};
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{

View file

@ -29,23 +29,23 @@ class PortTest(BitcoinTestFramework):
port2 = p2p_port(self.num_nodes + 5)
self.log.info("When starting with -port, bitcoind binds to it and uses port + 1 for an onion bind")
with node.assert_debug_log(expected_msgs=[f'Bound to 0.0.0.0:{port1}', f'Bound to 127.0.0.1:{port1 + 1}']):
with node.assert_debug_log(expected_msgs=[f'Bound to and listening on 0.0.0.0:{port1}', f'Bound to and listening on 127.0.0.1:{port1 + 1}']):
self.restart_node(0, extra_args=["-listen", f"-port={port1}"])
self.log.info("When specifying -port multiple times, only the last one is taken")
with node.assert_debug_log(expected_msgs=[f'Bound to 0.0.0.0:{port2}', f'Bound to 127.0.0.1:{port2 + 1}'], unexpected_msgs=[f'Bound to 0.0.0.0:{port1}']):
with node.assert_debug_log(expected_msgs=[f'Bound to and listening on 0.0.0.0:{port2}', f'Bound to and listening on 127.0.0.1:{port2 + 1}'], unexpected_msgs=[f'Bound to and listening on 0.0.0.0:{port1}']):
self.restart_node(0, extra_args=["-listen", f"-port={port1}", f"-port={port2}"])
self.log.info("When specifying ports with both -port and -bind, the one from -port is ignored")
with node.assert_debug_log(expected_msgs=[f'Bound to 0.0.0.0:{port2}'], unexpected_msgs=[f'Bound to 0.0.0.0:{port1}']):
with node.assert_debug_log(expected_msgs=[f'Bound to and listening on 0.0.0.0:{port2}'], unexpected_msgs=[f'Bound to and listening on 0.0.0.0:{port1}']):
self.restart_node(0, extra_args=["-listen", f"-port={port1}", f"-bind=0.0.0.0:{port2}"])
self.log.info("When -bind specifies no port, the values from -port and -bind are combined")
with self.nodes[0].assert_debug_log(expected_msgs=[f'Bound to 0.0.0.0:{port1}']):
with self.nodes[0].assert_debug_log(expected_msgs=[f'Bound to and listening on 0.0.0.0:{port1}']):
self.restart_node(0, extra_args=["-listen", f"-port={port1}", "-bind=0.0.0.0"])
self.log.info("When an onion bind specifies no port, the value from -port, incremented by 1, is taken")
with self.nodes[0].assert_debug_log(expected_msgs=[f'Bound to 127.0.0.1:{port1 + 1}']):
with self.nodes[0].assert_debug_log(expected_msgs=[f'Bound to and listening on 127.0.0.1:{port1 + 1}']):
self.restart_node(0, extra_args=["-listen", f"-port={port1}", "-bind=127.0.0.1=onion"])
self.log.info("Invalid values for -port raise errors")