i2p: make a time gap between creating transient sessions and using them

Connecting to an I2P peer consists of creating a session (the
`SESSION CREATE` command) and then connecting to the peer using that
session (`STREAM CONNECT ID=session_id ...`).

This change is only relevant for transient sessions because when a
persistent session is used it is created once and used for all
connections.

Before this change Bitcoin Core would create the session and use it in
quick succession. That is, the `SESSION CREATE` command would be
immediately followed by `STREAM CONNECT`. This could ease network
activity monitoring by an adversary.

To mitigate that, this change creates a transient session upfront
without an immediate demand for new sessions and later uses it. This
creates a time gap between `SESSION CREATE` and `STREAM CONNECT`.
Note that there is always some demand for new I2P connections due to
disconnects.

---

Summary of the changes in the code:

* Create the session from the `Session` constructor (send `SESSION CREATE`
  to the I2P SAM proxy). This constructor was only called when transient
  sessions were needed and was immediately followed by `Connect()` which
  would have created the session. So this is a noop change if viewed
  in isolation.

* Change `CConnman::m_unused_i2p_sessions` from a queue to a single
  entity. Given that normally `CConnman::ConnectNode()` is not executed
  concurrently by multiple threads, the queue could have had either 0 or
  1 entry. Simplify the code by replacing the queue with a single
  session.

* Every time we try to connect to any peer (not just I2P) pre-create a
  new spare I2P session. This way session creation is decoupled from the
  time when the session will be used (`STREAM CONNECT`).
This commit is contained in:
Vasil Dimov 2025-03-13 09:29:19 +01:00
parent aa87e0b446
commit 859c259092
No known key found for this signature in database
GPG key ID: 54DF06F64B55CBBF
4 changed files with 90 additions and 43 deletions

View file

@ -132,6 +132,13 @@ Session::Session(const Proxy& control_host, CThreadInterrupt* interrupt)
m_interrupt{interrupt}, m_interrupt{interrupt},
m_transient{true} m_transient{true}
{ {
try {
LOCK(m_mutex);
CreateIfNotCreatedAlready();
} catch (const std::runtime_error&) {
// This was just an eager optimistic attempt to create the session.
// If it fails, then it will be reattempted again when `Connect()` is called.
}
} }
Session::~Session() Session::~Session()

View file

@ -396,7 +396,7 @@ static CService GetBindAddress(const Sock& sock)
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
assert(conn_type != ConnectionType::INBOUND); assert(conn_type != ConnectionType::INBOUND);
if (pszDest == nullptr) { if (pszDest == nullptr) {
@ -455,6 +455,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
connect_to.push_back(addrConnect); connect_to.push_back(addrConnect);
} }
// Pre-create an I2P transient session if needed and store it for later.
// This makes a time gap between session creation and usage.
SaveI2PSession(GetI2PTransientSession());
// Connect // Connect
std::unique_ptr<Sock> sock; std::unique_ptr<Sock> sock;
Proxy proxy; Proxy proxy;
@ -474,22 +478,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
if (m_i2p_sam_session) { if (m_i2p_sam_session) {
connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed); connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed);
} else { } else {
{ i2p_transient_session = GetI2PTransientSession();
LOCK(m_unused_i2p_sessions_mutex); if (!i2p_transient_session) {
if (m_unused_i2p_sessions.empty()) { return nullptr;
i2p_transient_session =
std::make_unique<i2p::sam::Session>(proxy, &interruptNet);
} else {
i2p_transient_session.swap(m_unused_i2p_sessions.front());
m_unused_i2p_sessions.pop();
}
} }
connected = i2p_transient_session->Connect(target_addr, conn, proxyConnectionFailed); connected = i2p_transient_session->Connect(target_addr, conn, proxyConnectionFailed);
if (!connected) { if (!connected) {
LOCK(m_unused_i2p_sessions_mutex); SaveI2PSession(std::move(i2p_transient_session));
if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) {
m_unused_i2p_sessions.emplace(i2p_transient_session.release());
}
} }
} }
@ -1863,7 +1858,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport = false) bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport = false)
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
std::optional<int> max_connections; std::optional<int> max_connections;
switch (conn_type) { switch (conn_type) {
case ConnectionType::INBOUND: case ConnectionType::INBOUND:
@ -2393,7 +2388,7 @@ void CConnman::DumpAddresses()
void CConnman::ProcessAddrFetch() void CConnman::ProcessAddrFetch()
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
std::string strDest; std::string strDest;
{ {
LOCK(m_addr_fetches_mutex); LOCK(m_addr_fetches_mutex);
@ -2513,7 +2508,7 @@ bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, std::span<const std::string> seed_nodes) void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, std::span<const std::string> seed_nodes)
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
AssertLockNotHeld(m_reconnections_mutex); AssertLockNotHeld(m_reconnections_mutex);
FastRandomContext rng; FastRandomContext rng;
// Connect to specific addresses // Connect to specific addresses
@ -2954,7 +2949,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo(bool include_connected) co
void CConnman::ThreadOpenAddedConnections() void CConnman::ThreadOpenAddedConnections()
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
AssertLockNotHeld(m_reconnections_mutex); AssertLockNotHeld(m_reconnections_mutex);
while (true) while (true)
{ {
@ -2984,7 +2979,7 @@ void CConnman::ThreadOpenAddedConnections()
// if successful, this moves the passed grant to the constructed node // if successful, this moves the passed grant to the constructed node
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport) void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport)
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
assert(conn_type != ConnectionType::INBOUND); assert(conn_type != ConnectionType::INBOUND);
// //
@ -3237,6 +3232,38 @@ uint16_t CConnman::GetDefaultPort(const std::string& addr) const
return a.SetSpecial(addr) ? GetDefaultPort(a.GetNetwork()) : m_params.GetDefaultPort(); return a.SetSpecial(addr) ? GetDefaultPort(a.GetNetwork()) : m_params.GetDefaultPort();
} }
std::unique_ptr<i2p::sam::Session> CConnman::GetI2PTransientSession()
{
AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
Proxy i2p_proxy;
if (!g_reachable_nets.Contains(NET_I2P) || // Not using I2P at all.
m_i2p_sam_session || // Using a single persistent session, transient sessions are not needed.
!GetProxy(NET_I2P, i2p_proxy)) {
return nullptr;
}
{
LOCK(m_unused_i2p_transient_session_mutex);
if (m_unused_i2p_transient_session) {
return std::move(m_unused_i2p_transient_session);
}
}
return std::make_unique<i2p::sam::Session>(i2p_proxy, &interruptNet);
}
void CConnman::SaveI2PSession(std::unique_ptr<i2p::sam::Session>&& session)
{
LOCK(m_unused_i2p_transient_session_mutex);
if (!m_unused_i2p_transient_session) {
m_unused_i2p_transient_session = std::move(session);
} else {
session.reset();
}
}
bool CConnman::Bind(const CService& addr_, unsigned int flags, NetPermissionFlags permissions) bool CConnman::Bind(const CService& addr_, unsigned int flags, NetPermissionFlags permissions)
{ {
const CService addr{MaybeFlipIPv6toCJDNS(addr_)}; const CService addr{MaybeFlipIPv6toCJDNS(addr_)};
@ -3942,7 +3969,7 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CNetAddr& address) const
void CConnman::PerformReconnections() void CConnman::PerformReconnections()
{ {
AssertLockNotHeld(m_reconnections_mutex); AssertLockNotHeld(m_reconnections_mutex);
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_transient_session_mutex);
while (true) { while (true) {
// Move first element of m_reconnections to todo (avoiding an allocation inside the lock). // Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
decltype(m_reconnections) todo; decltype(m_reconnections) todo;

View file

@ -41,7 +41,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <queue>
#include <thread> #include <thread>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
@ -1136,7 +1135,7 @@ public:
bool GetNetworkActive() const { return fNetworkActive; }; bool GetNetworkActive() const { return fNetworkActive; };
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; }; bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
void SetNetworkActive(bool active); void SetNetworkActive(bool active);
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
bool CheckIncomingNonce(uint64_t nonce); bool CheckIncomingNonce(uint64_t nonce);
void ASMapHealthCheck(); void ASMapHealthCheck();
@ -1221,7 +1220,7 @@ public:
* - Max total outbound connection capacity filled * - Max total outbound connection capacity filled
* - Max connection capacity for type is filled * - Max connection capacity for type is filled
*/ */
bool AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); bool AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
size_t GetNodeCount(ConnectionDirection) const; size_t GetNodeCount(ConnectionDirection) const;
std::map<CNetAddr, LocalServiceInfo> getNetLocalAddresses() const; std::map<CNetAddr, LocalServiceInfo> getNetLocalAddresses() const;
@ -1294,10 +1293,10 @@ private:
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions); bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
bool InitBinds(const Options& options); bool InitBinds(const Options& options);
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_transient_session_mutex, !m_reconnections_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex); void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_transient_session_mutex);
void ThreadOpenConnections(std::vector<std::string> connect, std::span<const std::string> seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadOpenConnections(std::vector<std::string> connect, std::span<const std::string> seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_transient_session_mutex, !m_reconnections_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming(); void ThreadI2PAcceptIncoming();
void AcceptConnection(const ListenSocket& hListenSocket); void AcceptConnection(const ListenSocket& hListenSocket);
@ -1363,7 +1362,7 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr); bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection(); bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector<NetWhitelistPermissions>& ranges) const; void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector<NetWhitelistPermissions>& ranges) const;
void DeleteNode(CNode* pnode); void DeleteNode(CNode* pnode);
@ -1408,6 +1407,20 @@ private:
uint16_t GetDefaultPort(Network net) const; uint16_t GetDefaultPort(Network net) const;
uint16_t GetDefaultPort(const std::string& addr) const; uint16_t GetDefaultPort(const std::string& addr) const;
/**
* Create an I2P transient session or get the one from
* `m_unused_i2p_transient_session` if it is not empty.
* @return session or empty unique_ptr if transient sessions are not needed.
*/
std::unique_ptr<i2p::sam::Session> GetI2PTransientSession()
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
/**
* Store an I2P session in `m_unused_i2p_transient_session` if it is empty.
*/
void SaveI2PSession(std::unique_ptr<i2p::sam::Session>&& session)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
// Network usage totals // Network usage totals
mutable Mutex m_total_bytes_sent_mutex; mutable Mutex m_total_bytes_sent_mutex;
std::atomic<uint64_t> nTotalBytesRecv{0}; std::atomic<uint64_t> nTotalBytesRecv{0};
@ -1592,18 +1605,23 @@ private:
bool whitelist_relay; bool whitelist_relay;
/** /**
* Mutex protecting m_i2p_sam_sessions. * Mutex protecting `m_unused_i2p_transient_session`.
*/ */
Mutex m_unused_i2p_sessions_mutex; Mutex m_unused_i2p_transient_session_mutex;
/** /**
* A pool of created I2P SAM transient sessions that should be used instead * A pre-created I2P SAM transient session that should be used instead
* of creating new ones in order to reduce the load on the I2P network. * of creating new one. This has two purposes:
* Creating a session in I2P is not cheap, thus if this is not empty, then * 1. If we create a session, try to connect to a peer, but the connection
* pick an entry from it instead of creating a new session. If connecting to * fails then we store the session here to be used by the next connection
* a host fails, then the created session is put to this pool for reuse. * attempt. Creating a session in I2P is not cheap, so this reduces the
* load on the I2P network by avoiding useless session creation and
* destruction.
* 2. Pre-creating a session some time before using it makes it harder to
* correlate session creation and session usage.
*/ */
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); std::unique_ptr<i2p::sam::Session> m_unused_i2p_transient_session
GUARDED_BY(m_unused_i2p_transient_session_mutex);
/** /**
* Mutex protecting m_reconnections. * Mutex protecting m_reconnections.
@ -1626,13 +1644,7 @@ private:
std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex); std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex);
/** Attempt reconnections, if m_reconnections non-empty. */ /** Attempt reconnections, if m_reconnections non-empty. */
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_unused_i2p_sessions_mutex); void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_unused_i2p_transient_session_mutex);
/**
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
* unexpectedly use too much memory.
*/
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
/** /**
* RAII helper to atomically create a copy of `m_nodes` and add a reference * RAII helper to atomically create a copy of `m_nodes` and add a reference

View file

@ -26,6 +26,7 @@
#include <cstring> #include <cstring>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <queue>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
@ -89,7 +90,7 @@ struct ConnmanTestMsg : public CConnman {
bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); }; bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); };
CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_transient_session_mutex);
}; };
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{ constexpr ServiceFlags ALL_SERVICE_FLAGS[]{