Compare commits

...

32 commits

Author SHA1 Message Date
Matthew Zipkin
d94db10ab0
Merge 6a6285d268 into c5e44a0435 2025-04-29 11:54:16 +02:00
merge-script
c5e44a0435
Merge bitcoin/bitcoin#32369: test: Use the correct node for doubled keypath test
Some checks are pending
CI / macOS 14 native, arm64, fuzz (push) Waiting to run
CI / Windows native, VS 2022 (push) Waiting to run
CI / Windows native, fuzz, VS 2022 (push) Waiting to run
CI / Linux->Windows cross, no tests (push) Waiting to run
CI / Windows, test cross-built (push) Blocked by required conditions
CI / ASan + LSan + UBSan + integer, no depends, USDT (push) Waiting to run
CI / test each commit (push) Waiting to run
CI / macOS 14 native, arm64, no depends, sqlite only, gui (push) Waiting to run
32d55e28af test: Use the correct node for doubled keypath test (Ava Chow)

Pull request description:

  #29124 had a silent merge conflict with #32350 which resulted in it using the wrong node. Fix the test to use the correct v22 node.

ACKs for top commit:
  maflcko:
    lgtm ACK 32d55e28af
  rkrux:
    ACK 32d55e28af
  BrandonOdiwuor:
    Code Review ACK 32d55e28af

Tree-SHA512: 1e0231985beb382b16e1d608c874750423d0502388db0c8ad450b22d17f9d96f5e16a6b44948ebda5efc750f62b60d0de8dd20131f449427426a36caf374af92
2025-04-29 09:59:42 +01:00
Ava Chow
32d55e28af test: Use the correct node for doubled keypath test 2025-04-28 14:44:17 -07:00
Matthew Zipkin
6a6285d268
httpserver: delete libevent! 2025-04-03 15:17:35 -04:00
Matthew Zipkin
53ac965791
fuzz: switch http_libevent::HTTPRequest to http_bitcoin::HTTPRequest 2025-04-03 15:17:35 -04:00
Matthew Zipkin
8122a362bf
http: switch servers from libevent to bitcoin 2025-04-03 15:17:35 -04:00
Matthew Zipkin
a792549519
use CScheduler for HTTPRPCTimer
This removes the dependency on libevent for scheduled events,
like re-locking a wallet some time after decryption.
2025-04-03 15:17:35 -04:00
Matthew Zipkin
196698c43e
http: disconnect after idle timeout (-rpcservertimeout) 2025-04-03 15:17:34 -04:00
Matthew Zipkin
e3a94d93c4
http: implement new server control methods to match legacy API 2025-04-03 15:17:34 -04:00
Matthew Zipkin
7b205a21a1
refactor: split HTTPBindAddresses into config parse and libevent setup
The original function was already naturally split into two chunks:
First, we parse and validate the users' RPC configuration for IPs and
ports. Next we bind libevent's http server to the appropriate
endpoints.

This commit splits these chunks into two separate functions, leaving
the argument parsing in the common space of the module and moving the
libevent-specific binding into the http_libevent namespace.

A future commit will implement http_bitcoin::HTTPBindAddresses to
bind the validate list of endpoints by the new HTTP server.
2025-04-03 15:17:34 -04:00
Matthew Zipkin
3adcd9617e
refactor: split http_request_cb into libevent callback and dispatch
The original function is passed to libevent as a callback when HTTP
requests are received and processed. It wrapped the libevent request
object in a http_libevent::HTTPRequest and then handed that off to
bitcoin for basic checks and finally dispatch to worker threads.

In this commit we split the function after the
http_libevent::HTTPRequest is created, and pass that object to a new
function that maintains the logic of checking and dispatching.

This will be the merge point for http_libevent and http_bitcoin,
where HTTPRequest objects from either namespace have the same
downstream lifecycle.
2025-04-03 15:17:34 -04:00
Matthew Zipkin
b828fa1e29
Add helper methods to HTTPRequest to match original API
These methods are called by http_request_cb() and are present in the
original http_libevent::HTTPRequest.
2025-04-03 15:17:34 -04:00
Matthew Zipkin
90761d5026
define HTTP request methods at module level outside of class
This is a refactor to prepare for matching the API of HTTPRequest
definitions in both namespaces http_bitcoin and http_libevent. In
particular, to provide a consistent return type for GetRequestMethod()
in both classes.
2025-04-03 15:17:34 -04:00
Matthew Zipkin
cd059b6e14
http: use a queue to pipeline requests from each connected client
See https://www.rfc-editor.org/rfc/rfc7230#section-6.3.2

> A server MAY process a sequence of pipelined requests in
  parallel if they all have safe methods (Section 4.2.1 of [RFC7231]),
  but it MUST send the corresponding responses in the same order that
  the requests were received.

We choose NOT to process requests in parallel. They are executed in
the order recevied as well as responded to in the order received.
This prevents race conditions where old state may get sent in response
to requests that are very quick to process but were requested later on
in the queue.
2025-04-03 15:17:34 -04:00
Matthew Zipkin
d0224eecde
Allow http workers to send data optimistically as an optimization 2025-04-03 15:17:34 -04:00
Matthew Zipkin
e2b5a3fea5
http: disconnect clients 2025-04-03 15:17:34 -04:00
Matthew Zipkin
73c3c2e3d3
http: compose and send replies to connected clients 2025-04-03 15:17:34 -04:00
Matthew Zipkin
ace3e198d7
http: support "chunked" Transfer-Encoding 2025-04-03 15:17:34 -04:00
Matthew Zipkin
482382bd14
http: read requests from connected clients 2025-04-03 15:17:33 -04:00
Matthew Zipkin
8424daa101
http: Begin implementation of HTTPClient and HTTPServer 2025-04-03 15:17:33 -04:00
Matthew Zipkin
8a933646ac
http: Implement HTTPRequest class
HTTP Request message:
https://datatracker.ietf.org/doc/html/rfc1945#section-5

Request Line aka Control Line aka first line:
https://datatracker.ietf.org/doc/html/rfc1945#section-5.1

See message_read_status() in libevent http.c for how
`MORE_DATA_EXPECTED` is handled there
2025-04-03 15:17:33 -04:00
Matthew Zipkin
1b14d00a57
http: Implement HTTPResponse class
HTTP Response message:
https://datatracker.ietf.org/doc/html/rfc1945#section-6

Status line (first line of response):
https://datatracker.ietf.org/doc/html/rfc1945#section-6.1

Status code definitions:
https://datatracker.ietf.org/doc/html/rfc1945#section-9
2025-04-03 15:17:33 -04:00
Matthew Zipkin
70d003ca10
http: Implement HTTPHeaders class
see:
https://www.rfc-editor.org/rfc/rfc2616#section-4.2
https://www.rfc-editor.org/rfc/rfc7231#section-5
https://www.rfc-editor.org/rfc/rfc7231#section-7
https://httpwg.org/specs/rfc9111.html#header.field.definitions
2025-04-03 15:17:33 -04:00
Matthew Zipkin
12bd25e2b5
http: enclose libevent-dependent code in a namespace
This commit is a no-op to isolate HTTP methods and objects that
depend on libevent. Following commits will add replacement objects
and methods in a new namespace for testing and review before
switching over the server.
2025-04-03 15:17:33 -04:00
Matthew Zipkin
a1e151c774
string: add LineReader
This is a helper struct to parse HTTP messages from data in buffers
from sockets. HTTP messages begin with headers which are
CRLF-terminated lines (\n or \r\n) followed by an arbitrary amount of
body data. Whitespace is trimmed from the field lines but not the body.

https://httpwg.org/specs/rfc9110.html#rfc.section.5
2025-04-03 15:17:33 -04:00
Matthew Zipkin
aeb8352a9d
time: implement and test RFC7231 timestamp string
HTTP 1.1 responses require a timestamp header with a
specific format, specified in:
https://www.rfc-editor.org/rfc/rfc7231#section-7.1.1.1
2025-04-03 15:17:33 -04:00
Matthew Zipkin
144a777f86
string: add CaseInsensitiveComparator
https://httpwg.org/specs/rfc9110.html#rfc.section.5.1
Field names in HTTP headers are case-insensitive. This
comparator will be used in the headers map to search by key.
In libevent these are compared in lowercase:
  evhttp_find_header()
  evutil_ascii_strcasecmp()
  EVUTIL_TOLOWER_()
2025-04-03 15:17:33 -04:00
Matthew Zipkin
4c83785d1d
string: implement ParseUInt64Hex 2025-04-03 15:17:33 -04:00
Matthew Zipkin
f259121f30
test: cover "chunked" Transfer-Encoding 2025-04-03 15:17:33 -04:00
Matthew Zipkin
5ec06529bb
test: cover -rpcservertimeout
Testing this requires adding an option to TestNode to force
the test framework to establish a new HTTP connection for
every RPC. Otherwise, attempting to reuse a persistent connection
would cause framework RPCs during startup and shutdown to fail.
2025-04-03 15:17:33 -04:00
Matthew Zipkin
00d18bbe09
test: cover http pipelining 2025-04-03 15:17:32 -04:00
Vasil Dimov
cfe5eba446
Introduce a new low-level socket managing class SockMan 2025-04-03 15:17:28 -04:00
25 changed files with 2805 additions and 694 deletions

View file

@ -149,6 +149,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
common/run_command.cpp
common/settings.cpp
common/signmessage.cpp
common/sockman.cpp
common/system.cpp
common/url.cpp
compressor.cpp
@ -156,6 +157,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
core_write.cpp
deploymentinfo.cpp
external_signer.cpp
i2p.cpp
init/common.cpp
kernel/chainparams.cpp
key.cpp
@ -233,7 +235,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL
headerssync.cpp
httprpc.cpp
httpserver.cpp
i2p.cpp
index/base.cpp
index/blockfilterindex.cpp
index/coinstatsindex.cpp

535
src/common/sockman.cpp Normal file
View file

@ -0,0 +1,535 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#include <bitcoin-build-config.h> // IWYU pragma: keep
#include <common/sockman.h>
#include <logging.h>
#include <netbase.h>
#include <util/sock.h>
#include <util/thread.h>
#include <cassert>
// The set of sockets cannot be modified while waiting
// The sleep time needs to be small to avoid new sockets stalling
static constexpr auto SELECT_TIMEOUT{50ms};
/** Get the bind address for a socket as CService. */
static CService GetBindAddress(const Sock& sock)
{
CService addr_bind;
struct sockaddr_storage sockaddr_bind;
socklen_t sockaddr_bind_len = sizeof(sockaddr_bind);
if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) {
addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind, sockaddr_bind_len);
} else {
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n");
}
return addr_bind;
}
bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg)
{
// Create socket for listening for incoming connections
sockaddr_storage storage;
socklen_t len{sizeof(storage)};
if (!to.GetSockAddr(reinterpret_cast<sockaddr*>(&storage), &len)) {
err_msg = Untranslated(strprintf("Bind address family for %s not supported", to.ToStringAddrPort()));
return false;
}
std::unique_ptr<Sock> sock{CreateSock(to.GetSAFamily(), SOCK_STREAM, IPPROTO_TCP)};
if (!sock) {
err_msg = Untranslated(strprintf("Cannot create %s listen socket: %s",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError())));
return false;
}
int one{1};
// Allow binding if the port is still in TIME_WAIT state after
// the program was closed and restarted.
if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set SO_REUSEADDR on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
// some systems don't have IPV6_V6ONLY but are always v6only; others do have the option
// and enable it by default or not. Try to enable it, if possible.
if (to.IsIPv6()) {
#ifdef IPV6_V6ONLY
if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<sockopt_arg_type>(&one), sizeof(one)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set IPV6_V6ONLY on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
#endif
#ifdef WIN32
int prot_level{PROTECTION_LEVEL_UNRESTRICTED};
if (sock->SetSockOpt(IPPROTO_IPV6,
IPV6_PROTECTION_LEVEL,
reinterpret_cast<const char*>(&prot_level),
sizeof(prot_level)) == SOCKET_ERROR) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Info,
"Cannot set IPV6_PROTECTION_LEVEL on %s listen socket: %s, continuing anyway\n",
to.ToStringAddrPort(),
NetworkErrorString(WSAGetLastError()));
}
#endif
}
if (sock->Bind(reinterpret_cast<sockaddr*>(&storage), len) == SOCKET_ERROR) {
const int err{WSAGetLastError()};
if (err == WSAEADDRINUSE) {
err_msg = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."),
to.ToStringAddrPort(),
CLIENT_NAME);
} else {
err_msg = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"),
to.ToStringAddrPort(),
NetworkErrorString(err));
}
return false;
}
// Listen for incoming connections
if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) {
err_msg = strprintf(_("Cannot listen on %s: %s"), to.ToStringAddrPort(), NetworkErrorString(WSAGetLastError()));
return false;
}
m_listen.emplace_back(std::move(sock));
return true;
}
void SockMan::StartSocketsThreads(const Options& options)
{
m_thread_socket_handler = std::thread(
&util::TraceThread, options.socket_handler_thread_name, [this] { ThreadSocketHandler(); });
if (options.i2p.has_value()) {
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(
options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet);
m_thread_i2p_accept =
std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); });
}
}
void SockMan::JoinSocketsThreads()
{
if (m_thread_i2p_accept.joinable()) {
m_thread_i2p_accept.join();
}
if (m_thread_socket_handler.joinable()) {
m_thread_socket_handler.join();
}
}
std::optional<SockMan::Id>
SockMan::ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
bool is_important,
std::optional<Proxy> proxy,
bool& proxy_failed,
CService& me)
{
AssertLockNotHeld(m_connected_mutex);
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
std::unique_ptr<Sock> sock;
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
Assume(!me.IsValid());
if (std::holds_alternative<CService>(to)) {
const CService& addr_to{std::get<CService>(to)};
if (addr_to.IsI2P()) {
if (!Assume(proxy.has_value())) {
return std::nullopt;
}
i2p::Connection conn;
bool connected{false};
if (m_i2p_sam_session) {
connected = m_i2p_sam_session->Connect(addr_to, conn, proxy_failed);
} else {
{
LOCK(m_unused_i2p_sessions_mutex);
if (m_unused_i2p_sessions.empty()) {
i2p_transient_session = std::make_unique<i2p::sam::Session>(proxy.value(), &interruptNet);
} else {
i2p_transient_session.swap(m_unused_i2p_sessions.front());
m_unused_i2p_sessions.pop();
}
}
connected = i2p_transient_session->Connect(addr_to, conn, proxy_failed);
if (!connected) {
LOCK(m_unused_i2p_sessions_mutex);
if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) {
m_unused_i2p_sessions.emplace(i2p_transient_session.release());
}
}
}
if (connected) {
sock = std::move(conn.sock);
me = conn.me;
}
} else if (proxy.has_value()) {
sock = ConnectThroughProxy(proxy.value(), addr_to.ToStringAddr(), addr_to.GetPort(), proxy_failed);
} else {
sock = ConnectDirectly(addr_to, is_important);
}
} else {
if (!Assume(proxy.has_value())) {
return std::nullopt;
}
const auto& hostport{std::get<StringHostIntPort>(to)};
bool dummy_proxy_failed;
sock = ConnectThroughProxy(proxy.value(), hostport.host, hostport.port, dummy_proxy_failed);
}
if (!sock) {
return std::nullopt;
}
if (!me.IsValid()) {
me = GetBindAddress(*sock);
}
const Id id{GetNewId()};
{
LOCK(m_connected_mutex);
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock),
std::move(i2p_transient_session)));
}
return id;
}
bool SockMan::CloseConnection(Id id)
{
LOCK(m_connected_mutex);
return m_connected.erase(id) > 0;
}
ssize_t SockMan::SendBytes(Id id,
std::span<const unsigned char> data,
bool will_send_more,
std::string& errmsg) const
{
AssertLockNotHeld(m_connected_mutex);
if (data.empty()) {
return 0;
}
auto sockets{GetConnectionSockets(id)};
if (!sockets) {
// Bail out immediately and just leave things in the caller's send queue.
return 0;
}
int flags{MSG_NOSIGNAL | MSG_DONTWAIT};
#ifdef MSG_MORE
if (will_send_more) {
flags |= MSG_MORE;
}
#endif
const ssize_t sent{WITH_LOCK(
sockets->mutex,
return sockets->sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);)};
if (sent >= 0) {
return sent;
}
const int err{WSAGetLastError()};
if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) {
return 0;
}
errmsg = NetworkErrorString(err);
return -1;
}
void SockMan::StopListening()
{
m_listen.clear();
}
bool SockMan::ShouldTryToSend(Id id) const { return true; }
bool SockMan::ShouldTryToRecv(Id id) const { return true; }
void SockMan::EventIOLoopCompletedForOne(Id id) {}
void SockMan::EventIOLoopCompletedForAll() {}
void SockMan::EventI2PStatus(const CService&, I2PStatus) {}
void SockMan::TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
{
LOCK(m_connected_mutex);
const auto result{m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)))};
assert(result.second);
}
void SockMan::ThreadI2PAccept()
{
AssertLockNotHeld(m_connected_mutex);
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
i2p::Connection conn;
auto SleepOnFailure = [&]() {
interruptNet.sleep_for(err_wait);
if (err_wait < err_wait_cap) {
err_wait += 1s;
}
};
while (!interruptNet) {
if (!m_i2p_sam_session->Listen(conn)) {
EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING);
SleepOnFailure();
continue;
}
EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING);
if (!m_i2p_sam_session->Accept(conn)) {
SleepOnFailure();
continue;
}
Assume(conn.me.IsI2P());
Assume(conn.peer.IsI2P());
NewSockAccepted(std::move(conn.sock), conn.me, conn.peer);
err_wait = err_wait_begin;
}
}
void SockMan::ThreadSocketHandler()
{
AssertLockNotHeld(m_connected_mutex);
while (!interruptNet) {
EventIOLoopCompletedForAll();
// Check for the readiness of the already connected sockets and the
// listening sockets in one call ("readiness" as in poll(2) or
// select(2)). If none are ready, wait for a short while and return
// empty sets.
auto io_readiness{GenerateWaitSockets()};
if (io_readiness.events_per_sock.empty() ||
// WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant.
!io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT,
io_readiness.events_per_sock)) {
interruptNet.sleep_for(SELECT_TIMEOUT);
}
// Service (send/receive) each of the already connected sockets.
SocketHandlerConnected(io_readiness);
// Accept new connections from listening sockets.
SocketHandlerListening(io_readiness.events_per_sock);
}
}
std::unique_ptr<Sock> SockMan::AcceptConnection(const Sock& listen_sock, CService& addr)
{
sockaddr_storage storage;
socklen_t len{sizeof(storage)};
auto sock{listen_sock.Accept(reinterpret_cast<sockaddr*>(&storage), &len)};
if (!sock) {
const int err{WSAGetLastError()};
if (err != WSAEWOULDBLOCK) {
LogPrintLevel(BCLog::NET,
BCLog::Level::Error,
"Cannot accept new connection: %s\n",
NetworkErrorString(err));
}
return {};
}
if (!addr.SetSockAddr(reinterpret_cast<sockaddr*>(&storage), len)) {
LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n");
}
return sock;
}
void SockMan::NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
{
AssertLockNotHeld(m_connected_mutex);
if (!sock->IsSelectable()) {
LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort());
return;
}
// According to the internet TCP_NODELAY is not carried into accepted sockets
// on all platforms. Set it again here just to be sure.
const int on{1};
if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) {
LogDebug(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n",
them.ToStringAddrPort());
}
const Id id{GetNewId()};
{
LOCK(m_connected_mutex);
m_connected.emplace(id, std::make_shared<ConnectionSockets>(std::move(sock)));
}
if (!EventNewConnectionAccepted(id, me, them)) {
CloseConnection(id);
}
}
SockMan::Id SockMan::GetNewId()
{
return m_next_id.fetch_add(1, std::memory_order_relaxed);
}
SockMan::IOReadiness SockMan::GenerateWaitSockets()
{
AssertLockNotHeld(m_connected_mutex);
IOReadiness io_readiness;
for (const auto& sock : m_listen) {
io_readiness.events_per_sock.emplace(sock, Sock::Events{Sock::RECV});
}
auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)};
for (const auto& [id, sockets] : connected_snapshot) {
const bool select_recv{ShouldTryToRecv(id)};
const bool select_send{ShouldTryToSend(id)};
if (!select_recv && !select_send) continue;
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
io_readiness.events_per_sock.emplace(sockets->sock, Sock::Events{event});
io_readiness.ids_per_sock.emplace(sockets->sock, id);
}
return io_readiness;
}
void SockMan::SocketHandlerConnected(const IOReadiness& io_readiness)
{
AssertLockNotHeld(m_connected_mutex);
for (const auto& [sock, events] : io_readiness.events_per_sock) {
if (interruptNet) {
return;
}
auto it{io_readiness.ids_per_sock.find(sock)};
if (it == io_readiness.ids_per_sock.end()) {
continue;
}
const Id id{it->second};
bool send_ready = events.occurred & Sock::SEND; // Sock::SEND could only be set if ShouldTryToSend() has returned true in GenerateWaitSockets().
bool recv_ready = events.occurred & Sock::RECV; // Sock::RECV could only be set if ShouldTryToRecv() has returned true in GenerateWaitSockets().
bool err_ready = events.occurred & Sock::ERR;
if (send_ready) {
bool cancel_recv;
EventReadyToSend(id, cancel_recv);
if (cancel_recv) {
recv_ready = false;
}
}
if (recv_ready || err_ready) {
uint8_t buf[0x10000]; // typical socket buffer is 8K-64K
auto sockets{GetConnectionSockets(id)};
if (!sockets) {
continue;
}
const ssize_t nrecv{WITH_LOCK(
sockets->mutex,
return sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)};
if (nrecv < 0) { // In all cases (including -1 and 0) EventIOLoopCompletedForOne() should be executed after this, don't change the code to skip it.
const int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) {
EventGotPermanentReadError(id, NetworkErrorString(err));
}
} else if (nrecv == 0) {
EventGotEOF(id);
} else {
EventGotData(id, {buf, static_cast<size_t>(nrecv)});
}
}
EventIOLoopCompletedForOne(id);
}
}
void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
{
AssertLockNotHeld(m_connected_mutex);
for (const auto& sock : m_listen) {
if (interruptNet) {
return;
}
const auto it = events_per_sock.find(sock);
if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) {
CService addr_accepted;
auto sock_accepted{AcceptConnection(*sock, addr_accepted)};
if (sock_accepted) {
NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted);
}
}
}
}
std::shared_ptr<SockMan::ConnectionSockets> SockMan::GetConnectionSockets(Id id) const
{
LOCK(m_connected_mutex);
auto it{m_connected.find(id)};
if (it == m_connected.end()) {
// There is no socket in case we've already disconnected, or in test cases without
// real connections.
return {};
}
return it->second;
}

463
src/common/sockman.h Normal file
View file

@ -0,0 +1,463 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://opensource.org/license/mit/.
#ifndef BITCOIN_COMMON_SOCKMAN_H
#define BITCOIN_COMMON_SOCKMAN_H
#include <i2p.h>
#include <netaddress.h>
#include <netbase.h>
#include <util/fs.h>
#include <util/sock.h>
#include <util/translation.h>
#include <atomic>
#include <memory>
#include <optional>
#include <queue>
#include <span>
#include <thread>
#include <variant>
#include <vector>
/**
* A socket manager class which handles socket operations.
* To use this class, inherit from it and implement the pure virtual methods.
* Handled operations:
* - binding and listening on sockets
* - starting of necessary threads to process socket operations
* - accepting incoming connections
* - making outbound connections
* - closing connections
* - waiting for IO readiness on sockets and doing send/recv accordingly
*/
class SockMan
{
public:
/**
* Each connection is assigned an unique id of this type.
*/
using Id = int64_t;
/**
* Possible status changes that can be passed to `EventI2PStatus()`.
*/
enum class I2PStatus : uint8_t {
/// The listen succeeded and we are now listening for incoming I2P connections.
START_LISTENING,
/// The listen failed and now we are not listening (even if START_LISTENING was signaled before).
STOP_LISTENING,
};
virtual ~SockMan() = default;
//
// Non-virtual functions, to be reused by children classes.
//
/**
* Bind to a new address:port, start listening and add the listen socket to `m_listen`.
* Should be called before `StartSocketsThreads()`.
* @param[in] to Where to bind.
* @param[out] err_msg Error string if an error occurs.
* @retval true Success.
* @retval false Failure, `err_msg` will be set.
*/
bool BindAndStartListening(const CService& to, bilingual_str& err_msg);
/**
* Options to influence `StartSocketsThreads()`.
*/
struct Options {
std::string_view socket_handler_thread_name;
struct I2P {
explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name)
: private_key_file{file},
sam_proxy{proxy},
accept_thread_name{accept_thread_name}
{}
const fs::path private_key_file;
const Proxy sam_proxy;
const std::string_view accept_thread_name;
};
/**
* I2P options. If set then a thread will be started that will accept incoming I2P connections.
*/
std::optional<I2P> i2p;
};
/**
* Start the necessary threads for sockets IO.
*/
void StartSocketsThreads(const Options& options);
/**
* Join (wait for) the threads started by `StartSocketsThreads()` to exit.
*/
void JoinSocketsThreads();
/**
* A more readable std::tuple<std::string, uint16_t> for host and port.
*/
struct StringHostIntPort {
const std::string& host;
uint16_t port;
};
/**
* Make an outbound connection, save the socket internally and return a newly generated connection id.
* @param[in] to The address to connect to, either as CService or a host as string and port as
* an integer, if the later is used, then `proxy` must be valid.
* @param[in] is_important If true, then log failures with higher severity.
* @param[in] proxy Proxy to connect through, if set.
* @param[out] proxy_failed If `proxy` is valid and the connection failed because of the
* proxy, then it will be set to true.
* @param[out] me If the connection was successful then this is set to the address on the
* local side of the socket.
* @return Newly generated id, or std::nullopt if the operation fails.
*/
std::optional<SockMan::Id> ConnectAndMakeId(const std::variant<CService, StringHostIntPort>& to,
bool is_important,
std::optional<Proxy> proxy,
bool& proxy_failed,
CService& me)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex, !m_unused_i2p_sessions_mutex);
/**
* Destroy a given connection by closing its socket and release resources occupied by it.
* @param[in] id Connection to destroy.
* @return Whether the connection existed and its socket was closed by this call.
*/
bool CloseConnection(Id id)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Try to send some data over the given connection.
* @param[in] id Identifier of the connection.
* @param[in] data The data to send, it might happen that only a prefix of this is sent.
* @param[in] will_send_more Used as an optimization if the caller knows that they will
* be sending more data soon after this call.
* @param[out] errmsg If <0 is returned then this will contain a human readable message
* explaining the error.
* @retval >=0 The number of bytes actually sent.
* @retval <0 A permanent error has occurred.
*/
ssize_t SendBytes(Id id,
std::span<const unsigned char> data,
bool will_send_more,
std::string& errmsg) const
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Stop listening by closing all listening sockets.
*/
void StopListening();
/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
protected:
/**
* During some tests mocked sockets are created outside of `SockMan`, make it
* possible to add those so that send/recv can be exercised.
* @param[in] id Connection id to add.
* @param[in,out] sock Socket to associate with the added connection.
*/
void TestOnlyAddExistentConnection(Id id, std::unique_ptr<Sock>&& sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
private:
/**
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
* unexpectedly use too much memory.
*/
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
//
// Pure virtual functions must be implemented by children classes.
//
/**
* Be notified when a new connection has been accepted.
* @param[in] id Id of the newly accepted connection.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
* @retval true The new connection was accepted at the higher level.
* @retval false The connection was refused at the higher level, so the
* associated socket and id should be discarded by `SockMan`.
*/
virtual bool EventNewConnectionAccepted(Id id,
const CService& me,
const CService& them) = 0;
/**
* Called when the socket is ready to send data and `ShouldTryToSend()` has
* returned true. This is where the higher level code serializes its messages
* and calls `SockMan::SendBytes()`.
* @param[in] id Id of the connection whose socket is ready to send.
* @param[out] cancel_recv Should always be set upon return and if it is true,
* then the next attempt to receive data from that connection will be omitted.
*/
virtual void EventReadyToSend(Id id, bool& cancel_recv) = 0;
/**
* Called when new data has been received.
* @param[in] id Connection for which the data arrived.
* @param[in] data Received data.
*/
virtual void EventGotData(Id id, std::span<const uint8_t> data) = 0;
/**
* Called when the remote peer has sent an EOF on the socket. This is a graceful
* close of their writing side, we can still send and they will receive, if it
* makes sense at the application level.
* @param[in] id Connection whose socket got EOF.
*/
virtual void EventGotEOF(Id id) = 0;
/**
* Called when we get an irrecoverable error trying to read from a socket.
* @param[in] id Connection whose socket got an error.
* @param[in] errmsg Message describing the error.
*/
virtual void EventGotPermanentReadError(Id id, const std::string& errmsg) = 0;
//
// Non-pure virtual functions can be overridden by children classes or left
// alone to use the default implementation from SockMan.
//
/**
* Can be used to temporarily pause sends on a connection.
* SockMan would only call EventReadyToSend() if this returns true.
* The implementation in SockMan always returns true.
* @param[in] id Connection for which to confirm or omit the next call to EventReadyToSend().
*/
virtual bool ShouldTryToSend(Id id) const;
/**
* SockMan would only call Recv() on a connection's socket if this returns true.
* Can be used to temporarily pause receives on a connection.
* The implementation in SockMan always returns true.
* @param[in] id Connection for which to confirm or omit the next receive.
*/
virtual bool ShouldTryToRecv(Id id) const;
/**
* SockMan has completed the current send+recv iteration for a given connection.
* It will do another send+recv for this connection after processing all other connections.
* Can be used to execute periodic tasks for a given connection.
* The implementation in SockMan does nothing.
* @param[in] id Connection for which send+recv has been done.
*/
virtual void EventIOLoopCompletedForOne(Id id);
/**
* SockMan has completed send+recv for all connections.
* Can be used to execute periodic tasks for all connections, like closing
* connections due to higher level logic.
* The implementation in SockMan does nothing.
*/
virtual void EventIOLoopCompletedForAll();
/**
* Be notified of a change in the state of the I2P connectivity.
* The default behavior, implemented by `SockMan`, is to ignore this event.
* @param[in] addr The address we started or stopped listening on.
* @param[in] new_status New status.
*/
virtual void EventI2PStatus(const CService& addr, I2PStatus new_status);
/**
* The sockets used by a connection - a data socket and an optional I2P session socket.
*/
struct ConnectionSockets {
explicit ConnectionSockets(std::unique_ptr<Sock>&& s)
: sock{std::move(s)}
{
}
explicit ConnectionSockets(std::shared_ptr<Sock>&& s, std::unique_ptr<i2p::sam::Session>&& sess)
: sock{std::move(s)},
i2p_transient_session{std::move(sess)}
{
}
/**
* Mutex that serializes the Send() and Recv() calls on `sock`.
*/
Mutex mutex;
/**
* Underlying socket.
* `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the
* underlying file descriptor by one thread while another thread is poll(2)-ing
* it for activity.
* @see https://github.com/bitcoin/bitcoin/issues/21744 for details.
*/
std::shared_ptr<Sock> sock;
/**
* When transient I2P sessions are used, then each connection has its own session, otherwise
* all connections use the session from `m_i2p_sam_session` and share the same I2P address.
* I2P sessions involve a data/transport socket (in `sock`) and a control socket
* (in `i2p_transient_session`). For transient sessions, once the data socket `sock` is
* closed, the control socket is not going to be used anymore and would be just taking
* resources. Storing it here makes its deletion together with `sock` automatic.
*/
std::unique_ptr<i2p::sam::Session> i2p_transient_session;
};
/**
* Info about which socket has which event ready and its connection id.
*/
struct IOReadiness {
/**
* Map of socket -> socket events. For example:
* socket1 -> { requested = SEND|RECV, occurred = RECV }
* socket2 -> { requested = SEND, occurred = SEND }
*/
Sock::EventsPerSock events_per_sock;
/**
* Map of socket -> connection id (in `m_connected`). For example
* socket1 -> id=23
* socket2 -> id=56
*/
std::unordered_map<Sock::EventsPerSock::key_type,
SockMan::Id,
Sock::HashSharedPtrSock,
Sock::EqualSharedPtrSock>
ids_per_sock;
};
/**
* Accept incoming I2P connections in a loop and call
* `EventNewConnectionAccepted()` for each new connection.
*/
void ThreadI2PAccept()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void ThreadSocketHandler()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Accept a connection.
* @param[in] listen_sock Socket on which to accept the connection.
* @param[out] addr Address of the peer that was accepted.
* @return Newly created socket for the accepted connection.
*/
std::unique_ptr<Sock> AcceptConnection(const Sock& listen_sock, CService& addr);
/**
* After a new socket with a peer has been created, configure its flags,
* make a new connection id and call `EventNewConnectionAccepted()`.
* @param[in] sock The newly created socket.
* @param[in] me Address at our end of the connection.
* @param[in] them Address of the new peer.
*/
void NewSockAccepted(std::unique_ptr<Sock>&& sock, const CService& me, const CService& them)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Generate an id for a newly created connection.
*/
Id GetNewId();
/**
* Generate a collection of sockets to check for IO readiness.
* @return Sockets to check for readiness plus an aux map to find the
* corresponding connection id given a socket.
*/
IOReadiness GenerateWaitSockets()
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] io_readiness Which sockets are ready and their connection ids.
*/
void SocketHandlerConnected(const IOReadiness& io_readiness)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* Retrieve an entry from m_connected.
* @param[in] id Connection id to search for.
* @return ConnectionSockets for the given connection id or empty shared_ptr if not found.
*/
std::shared_ptr<ConnectionSockets> GetConnectionSockets(Id id) const
EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex);
/**
* The id to assign to the next created connection. Used to generate ids of connections.
*/
std::atomic<Id> m_next_id{0};
/**
* Thread that sends to and receives from sockets and accepts connections.
*/
std::thread m_thread_socket_handler;
/**
* Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`.
*/
std::thread m_thread_i2p_accept;
/**
* Mutex protecting m_i2p_sam_sessions.
*/
Mutex m_unused_i2p_sessions_mutex;
/**
* A pool of created I2P SAM transient sessions that should be used instead
* of creating new ones in order to reduce the load on the I2P network.
* Creating a session in I2P is not cheap, thus if this is not empty, then
* pick an entry from it instead of creating a new session. If connecting to
* a host fails, then the created session is put to this pool for reuse.
*/
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
/**
* I2P SAM session.
* Used to accept incoming and make outgoing I2P connections from a persistent
* address.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
/**
* List of listening sockets.
*/
std::vector<std::shared_ptr<Sock>> m_listen;
mutable Mutex m_connected_mutex;
/**
* Sockets for existent connections.
* The `shared_ptr` makes it possible to create a snapshot of this by simply copying
* it (under `m_connected_mutex`).
*/
std::unordered_map<Id, std::shared_ptr<ConnectionSockets>> m_connected GUARDED_BY(m_connected_mutex);
};
#endif // BITCOIN_COMMON_SOCKMAN_H

View file

@ -9,8 +9,10 @@
#include <httpserver.h>
#include <logging.h>
#include <netaddress.h>
#include <node/context.h>
#include <rpc/protocol.h>
#include <rpc/server.h>
#include <scheduler.h>
#include <util/fs.h>
#include <util/fs_helpers.h>
#include <util/strencodings.h>
@ -26,6 +28,8 @@
#include <string>
#include <vector>
using node::NodeContext;
using http_bitcoin::HTTPRequest;
using util::SplitString;
using util::TrimStringView;
@ -38,22 +42,16 @@ static const char* WWW_AUTH_HEADER_DATA = "Basic realm=\"jsonrpc\"";
class HTTPRPCTimer : public RPCTimerBase
{
public:
HTTPRPCTimer(struct event_base* eventBase, std::function<void()>& func, int64_t millis) :
ev(eventBase, false, func)
HTTPRPCTimer(NodeContext* context, std::function<void()>& func, int64_t millis)
{
struct timeval tv;
tv.tv_sec = millis/1000;
tv.tv_usec = (millis%1000)*1000;
ev.trigger(&tv);
context->scheduler->scheduleFromNow(func, std::chrono::milliseconds(millis));
}
private:
HTTPEvent ev;
};
class HTTPRPCTimerInterface : public RPCTimerInterface
{
public:
explicit HTTPRPCTimerInterface(struct event_base* _base) : base(_base)
explicit HTTPRPCTimerInterface(const std::any& context) : m_context(std::any_cast<NodeContext*>(context))
{
}
const char* Name() override
@ -62,10 +60,10 @@ public:
}
RPCTimerBase* NewTimer(std::function<void()>& func, int64_t millis) override
{
return new HTTPRPCTimer(base, func, millis);
return new HTTPRPCTimer(m_context, func, millis);
}
private:
struct event_base* base;
NodeContext* m_context;
};
@ -85,7 +83,7 @@ static void JSONErrorReply(HTTPRequest* req, UniValue objError, const JSONRPCReq
Assume(jreq.m_json_version != JSONRPCVersion::V2);
// Send error reply from json-rpc error object
int nStatus = HTTP_INTERNAL_SERVER_ERROR;
HTTPStatusCode nStatus = HTTP_INTERNAL_SERVER_ERROR;
int code = objError.find_value("code").getInt<int>();
if (code == RPC_INVALID_REQUEST)
@ -156,7 +154,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna
static bool HTTPReq_JSONRPC(const std::any& context, HTTPRequest* req)
{
// JSONRPC handles only POST
if (req->GetRequestMethod() != HTTPRequest::POST) {
if (req->GetRequestMethod() != HTTPRequestMethod::POST) {
req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests");
return false;
}
@ -370,9 +368,7 @@ bool StartHTTPRPC(const std::any& context)
if (g_wallet_init_interface.HasWalletSupport()) {
RegisterHTTPHandler("/wallet/", false, handle_rpc);
}
struct event_base* eventBase = EventBase();
assert(eventBase);
httpRPCTimerInterface = std::make_unique<HTTPRPCTimerInterface>(eventBase);
httpRPCTimerInterface = std::make_unique<HTTPRPCTimerInterface>(context);
RPCSetTimerInterface(httpRPCTimerInterface.get());
return true;
}

File diff suppressed because it is too large Load diff

View file

@ -6,10 +6,17 @@
#define BITCOIN_HTTPSERVER_H
#include <functional>
#include <map>
#include <optional>
#include <span>
#include <string>
#include <rpc/protocol.h>
#include <common/sockman.h>
#include <util/strencodings.h>
#include <util/string.h>
#include <util/time.h>
namespace util {
class SignalInterrupt;
} // namespace util
@ -27,10 +34,292 @@ static const int DEFAULT_HTTP_WORKQUEUE=64;
static const int DEFAULT_HTTP_SERVER_TIMEOUT=30;
struct evhttp_request;
struct event_base;
class CService;
class HTTPRequest;
enum HTTPRequestMethod {
UNKNOWN,
GET,
POST,
HEAD,
PUT
};
/** Event handler closure.
*/
class HTTPClosure
{
public:
virtual void operator()() = 0;
virtual ~HTTPClosure() = default;
};
namespace http_bitcoin {
using util::LineReader;
using NodeId = SockMan::Id;
// shortest valid request line, used by libevent in evhttp_parse_request_line()
static const size_t MIN_REQUEST_LINE_LENGTH{strlen("GET / HTTP/1.0")};
// maximum size of http request (request line + headers)
// see https://github.com/bitcoin/bitcoin/issues/6425
static const size_t MAX_HEADERS_SIZE{8192};
class HTTPHeaders
{
public:
std::optional<std::string> Find(const std::string key) const;
void Write(const std::string key, const std::string value);
void Remove(const std::string key);
bool Read(util::LineReader& reader);
std::string Stringify() const;
private:
std::map<std::string, std::string, util::CaseInsensitiveComparator> m_map;
};
class HTTPResponse
{
public:
int m_version_major;
int m_version_minor;
HTTPStatusCode m_status;
std::string m_reason;
HTTPHeaders m_headers;
std::vector<std::byte> m_body;
bool m_keep_alive{false};
std::string StringifyHeaders() const;
};
class HTTPClient;
class HTTPRequest
{
public:
std::string m_method;
std::string m_target;
// Default protocol version is used by error responses to unreadable requests
int m_version_major{1};
int m_version_minor{1};
HTTPHeaders m_headers;
std::string m_body;
// Keep a pointer to the client that made the request so
// we know who to respond to.
std::shared_ptr<HTTPClient> m_client;
explicit HTTPRequest(std::shared_ptr<HTTPClient> client) : m_client(client) {};
// Null client for unit tests
explicit HTTPRequest() : m_client(nullptr) {};
// Readers return false if they need more data from the
// socket to parse properly. They throw errors if
// the data is invalid.
bool LoadControlData(LineReader& reader);
bool LoadHeaders(LineReader& reader);
bool LoadBody(LineReader& reader);
// These methods reimplement the API from http_libevent::HTTPRequest
// for downstream JSONRPC and REST modules.
std::string GetURI() const {return m_target;};
CService GetPeer() const;
HTTPRequestMethod GetRequestMethod() const;
std::optional<std::string> GetQueryParameter(const std::string& key) const;
std::pair<bool, std::string> GetHeader(const std::string& hdr) const;
std::string ReadBody() const {return m_body;};
void WriteHeader(const std::string& hdr, const std::string& value);
// Response headers may be set in advance before response body is known
HTTPHeaders m_response_headers;
void WriteReply(HTTPStatusCode status, std::span<const std::byte> reply_body = {});
void WriteReply(HTTPStatusCode status, const char* reply_body)
{
auto reply_body_view = std::string_view(reply_body);
std::span<const std::byte> byte_span(reinterpret_cast<const std::byte*>(reply_body_view.data()), reply_body_view.size());
WriteReply(status, byte_span);
}
void WriteReply(HTTPStatusCode status, const std::string& reply_body)
{
std::span<const std::byte> byte_span{reinterpret_cast<const std::byte*>(reply_body.data()), reply_body.size()};
WriteReply(status, byte_span);
}
};
std::optional<std::string> GetQueryParameterFromUri(const std::string& uri, const std::string& key);
class HTTPServer;
class HTTPClient
{
public:
// ID provided by SockMan, inherited by HTTPServer
NodeId m_node_id;
// Remote address of connected client
CService m_addr;
// IP:port of connected client, cached for logging purposes
std::string m_origin;
// Pointer back to the server so we can call Sockman I/O methods from the client
// Ok to remain null for unit tests.
HTTPServer* m_server;
// In lieu of an intermediate transport class like p2p uses,
// we copy data from the socket buffer to the client object
// and attempt to read HTTP requests from here.
std::vector<std::byte> m_recv_buffer{};
// Requests from a client must be processed in the order in which
// they were received, blocking on a per-client basis. We won't
// process the next request in the queue if we are currently busy
// handling a previous request.
std::deque<std::unique_ptr<HTTPRequest>> m_req_queue;
// Set to true by the main thread when a request is popped off
// and passed to a worker, reset to false by the worker thread.
std::atomic_bool m_req_busy{false};
// Response data destined for this client.
// Written to directly by http worker threads, read and erased by Sockman I/O
Mutex m_send_mutex;
std::vector<std::byte> m_send_buffer GUARDED_BY(m_send_mutex);
// Set true by worker threads after writing a response to m_send_buffer.
// Set false by the Sockman I/O thread after flushing m_send_buffer.
// Checked in the Sockman I/O loop to avoid locking m_send_mutex if there's nothing to send.
std::atomic_bool m_send_ready{false};
// Set to true when we receive request data and set to false once m_send_buffer is cleared.
// Checked during DisconnectClients(). All of these operations take place in the Sockman I/O loop,
// however it may get set my a worker thread during an "optimistic send".
std::atomic_bool m_prevent_disconnect{false};
// Client request to keep connection open after all requests have been responded to.
// Set by (potentially multiple) worker threads and checked in the Sockman I/O loop.
std::atomic_bool m_keep_alive{false};
// Flag this client for disconnection on next loop.
// Checked at the end of every Sockman I/O loop, may be set a worker thread.
std::atomic_bool m_disconnect{false};
// Timestamp of last receive activity, used for -rpcservertimeout
SteadySeconds m_idle_since;
explicit HTTPClient(NodeId node_id, CService addr) : m_node_id(node_id), m_addr(addr)
{
m_origin = addr.ToStringAddrPort();
};
// Try to read an HTTP request from the receive buffer
bool ReadRequest(std::unique_ptr<HTTPRequest>& req);
// Push data from m_send_buffer to the connected socket via m_server
// Returns false if we are done with this client and Sockman can
// therefore skip the next read operation from it.
bool SendBytesFromBuffer() EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
// Disable copies (should only be used as shared pointers)
HTTPClient(const HTTPClient&) = delete;
HTTPClient& operator=(const HTTPClient&) = delete;
};
class HTTPServer : public SockMan
{
private:
void CloseConnectionInternal(std::shared_ptr<HTTPClient>& client);
public:
explicit HTTPServer(std::function<void(std::unique_ptr<HTTPRequest>)> func) : m_request_dispatcher(func) {};
// Set in the Sockman I/O loop and only checked by main thread when shutting
// down to wait for all clients to be disconnected.
std::atomic_bool m_no_clients{true};
//! Connected clients with live HTTP connections
std::unordered_map<NodeId, std::shared_ptr<HTTPClient>> m_connected_clients;
// What to do with HTTP requests once received, validated and parsed
std::function<void(std::unique_ptr<HTTPRequest>)> m_request_dispatcher;
std::shared_ptr<HTTPClient> GetClientById(NodeId node_id) const;
// Close underlying connections where flagged
void DisconnectClients();
// Flag used during shutdown to bypass keep-alive flag.
// Set by main thread and read by Sockman I/O thread
std::atomic_bool m_disconnect_all_clients{false};
// Idle timeout after which clients are disconnected
std::chrono::seconds m_rpcservertimeout{DEFAULT_HTTP_SERVER_TIMEOUT};
/**
* Be notified when a new connection has been accepted.
* @param[in] node_id Id of the newly accepted connection.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
* @retval true The new connection was accepted at the higher level.
* @retval false The connection was refused at the higher level, so the
* associated socket and node_id should be discarded by `SockMan`.
*/
virtual bool EventNewConnectionAccepted(NodeId node_id, const CService& me, const CService& them) override;
/**
* Called when the socket is ready to send data and `ShouldTryToSend()` has
* returned true. This is where the higher level code serializes its messages
* and calls `SockMan::SendBytes()`.
* @param[in] node_id Id of the node whose socket is ready to send.
* @param[out] cancel_recv Should always be set upon return and if it is true,
* then the next attempt to receive data from that node will be omitted.
*/
virtual void EventReadyToSend(NodeId node_id, bool& cancel_recv) override;
/**
* Called when new data has been received.
* @param[in] node_id Connection for which the data arrived.
* @param[in] data Received data.
*/
virtual void EventGotData(NodeId node_id, std::span<const uint8_t> data) override;
/**
* Called when the remote peer has sent an EOF on the socket. This is a graceful
* close of their writing side, we can still send and they will receive, if it
* makes sense at the application level.
* @param[in] node_id Node whose socket got EOF.
*/
virtual void EventGotEOF(NodeId node_id) override;
/**
* Called when we get an irrecoverable error trying to read from a socket.
* @param[in] node_id Node whose socket got an error.
* @param[in] errmsg Message describing the error.
*/
virtual void EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) override;
/**
* SockMan has completed the current send+recv iteration for a given connection.
* It will do another send+recv for this connection after processing all other connections.
* Can be used to execute periodic tasks for a given connection.
* The implementation in SockMan does nothing.
* @param[in] node_id Connection for which send+recv has been done.
*/
virtual void EventIOLoopCompletedForOne(NodeId node_id) override;
/**
* SockMan has completed send+recv for all nodes.
* Can be used to execute periodic tasks for all nodes, like disconnecting
* nodes due to higher level logic.
* The implementation in SockMan does nothing.
*/
virtual void EventIOLoopCompletedForAll() override;
/**
* Can be used to temporarily pause sends on a connection.
* SockMan would only call EventReadyToSend() if this returns true.
* The implementation in SockMan always returns true.
* @param[in] node_id Connection for which to confirm or omit the next call to EventReadyToSend().
*/
virtual bool ShouldTryToSend(NodeId node_id) const override;
/**
* SockMan would only call Recv() on a connection's socket if this returns true.
* Can be used to temporarily pause receives on a connection.
* The implementation in SockMan always returns true.
* @param[in] node_id Connection for which to confirm or omit the next receive.
*/
virtual bool ShouldTryToRecv(NodeId node_id) const override;
};
/** Initialize HTTP server.
* Call this before RegisterHTTPHandler or EventBase().
@ -45,12 +334,10 @@ void StartHTTPServer();
void InterruptHTTPServer();
/** Stop HTTP server */
void StopHTTPServer();
/** Change logging level for libevent. */
void UpdateHTTPServerLogging(bool enable);
} // namespace http_bitcoin
/** Handler for requests to a certain HTTP path */
typedef std::function<bool(HTTPRequest* req, const std::string &)> HTTPRequestHandler;
typedef std::function<bool(http_bitcoin::HTTPRequest* req, const std::string&)> HTTPRequestHandler;
/** Register handler for prefix.
* If multiple handlers match a prefix, the first-registered one will
* be invoked.
@ -59,136 +346,4 @@ void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPR
/** Unregister handler for prefix */
void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch);
/** Return evhttp event base. This can be used by submodules to
* queue timers or custom events.
*/
struct event_base* EventBase();
/** In-flight HTTP request.
* Thin C++ wrapper around evhttp_request.
*/
class HTTPRequest
{
private:
struct evhttp_request* req;
const util::SignalInterrupt& m_interrupt;
bool replySent;
public:
explicit HTTPRequest(struct evhttp_request* req, const util::SignalInterrupt& interrupt, bool replySent = false);
~HTTPRequest();
enum RequestMethod {
UNKNOWN,
GET,
POST,
HEAD,
PUT
};
/** Get requested URI.
*/
std::string GetURI() const;
/** Get CService (address:ip) for the origin of the http request.
*/
CService GetPeer() const;
/** Get request method.
*/
RequestMethod GetRequestMethod() const;
/** Get the query parameter value from request uri for a specified key, or std::nullopt if the
* key is not found.
*
* If the query string contains duplicate keys, the first value is returned. Many web frameworks
* would instead parse this as an array of values, but this is not (yet) implemented as it is
* currently not needed in any of the endpoints.
*
* @param[in] key represents the query parameter of which the value is returned
*/
std::optional<std::string> GetQueryParameter(const std::string& key) const;
/**
* Get the request header specified by hdr, or an empty string.
* Return a pair (isPresent,string).
*/
std::pair<bool, std::string> GetHeader(const std::string& hdr) const;
/**
* Read request body.
*
* @note As this consumes the underlying buffer, call this only once.
* Repeated calls will return an empty string.
*/
std::string ReadBody();
/**
* Write output header.
*
* @note call this before calling WriteErrorReply or Reply.
*/
void WriteHeader(const std::string& hdr, const std::string& value);
/**
* Write HTTP reply.
* nStatus is the HTTP status code to send.
* reply is the body of the reply. Keep it empty to send a standard message.
*
* @note Can be called only once. As this will give the request back to the
* main thread, do not call any other HTTPRequest methods after calling this.
*/
void WriteReply(int nStatus, std::string_view reply = "")
{
WriteReply(nStatus, std::as_bytes(std::span{reply}));
}
void WriteReply(int nStatus, std::span<const std::byte> reply);
};
/** Get the query parameter value from request uri for a specified key, or std::nullopt if the key
* is not found.
*
* If the query string contains duplicate keys, the first value is returned. Many web frameworks
* would instead parse this as an array of values, but this is not (yet) implemented as it is
* currently not needed in any of the endpoints.
*
* Helper function for HTTPRequest::GetQueryParameter.
*
* @param[in] uri is the entire request uri
* @param[in] key represents the query parameter of which the value is returned
*/
std::optional<std::string> GetQueryParameterFromUri(const char* uri, const std::string& key);
/** Event handler closure.
*/
class HTTPClosure
{
public:
virtual void operator()() = 0;
virtual ~HTTPClosure() = default;
};
/** Event class. This can be used either as a cross-thread trigger or as a timer.
*/
class HTTPEvent
{
public:
/** Create a new event.
* deleteWhenTriggered deletes this event object after the event is triggered (and the handler called)
* handler is the handler to call when the event is triggered.
*/
HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const std::function<void()>& handler);
~HTTPEvent();
/** Trigger the event. If tv is 0, trigger it immediately. Otherwise trigger it after
* the given time has elapsed.
*/
void trigger(struct timeval* tv);
bool deleteWhenTriggered;
std::function<void()> handler;
private:
struct event* ev;
};
#endif // BITCOIN_HTTPSERVER_H

View file

@ -121,6 +121,10 @@ using common::AmountErrMsg;
using common::InvalidPortErrMsg;
using common::ResolveErrMsg;
using http_bitcoin::InitHTTPServer;
using http_bitcoin::InterruptHTTPServer;
using http_bitcoin::StartHTTPServer;
using http_bitcoin::StopHTTPServer;
using node::ApplyArgsManOptions;
using node::BlockManager;
using node::CalculateCacheSizes;

View file

@ -37,6 +37,7 @@
#include <univalue.h>
using http_bitcoin::HTTPRequest;
using node::GetTransaction;
using node::NodeContext;
using util::SplitString;

View file

@ -240,24 +240,16 @@ static RPCHelpMan logging()
},
RPCExamples{
HelpExampleCli("logging", "\"[\\\"all\\\"]\" \"[\\\"http\\\"]\"")
+ HelpExampleRpc("logging", "[\"all\"], [\"libevent\"]")
+ HelpExampleRpc("logging", "[\"all\"], [\"walletdb\"]")
},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
BCLog::CategoryMask original_log_categories = LogInstance().GetCategoryMask();
if (request.params[0].isArray()) {
EnableOrDisableLogCategories(request.params[0], true);
}
if (request.params[1].isArray()) {
EnableOrDisableLogCategories(request.params[1], false);
}
BCLog::CategoryMask updated_log_categories = LogInstance().GetCategoryMask();
BCLog::CategoryMask changed_log_categories = original_log_categories ^ updated_log_categories;
// Update libevent logging if BCLog::LIBEVENT has changed.
if (changed_log_categories & BCLog::LIBEVENT) {
UpdateHTTPServerLogging(LogInstance().WillLogCategory(BCLog::LIBEVENT));
}
UniValue result(UniValue::VOBJ);
for (const auto& logCatActive : LogInstance().LogCategoriesList()) {

View file

@ -20,6 +20,20 @@ enum HTTPStatusCode
HTTP_SERVICE_UNAVAILABLE = 503,
};
// Copied from libevent http.c success_phrases[] and client_error_phrases[]
// TODO: Should HTTPStatusCode and HTTPReason be moved since they are not RPC protocols?
const std::map<HTTPStatusCode, std::string> HTTPReason{
{HTTP_OK, "OK"},
{HTTP_NO_CONTENT, "No Content"},
{HTTP_BAD_REQUEST, "Bad Request"},
{HTTP_UNAUTHORIZED, "Unauthorized"},
{HTTP_FORBIDDEN, "Forbidden"},
{HTTP_NOT_FOUND, "Not Found"},
{HTTP_BAD_METHOD, "Method Not Allowed"},
{HTTP_INTERNAL_SERVER_ERROR, "Internal Server Error"},
{HTTP_SERVICE_UNAVAILABLE, "Service Unavailable"},
};
//! Bitcoin RPC error codes
enum RPCErrorCode
{

View file

@ -10,47 +10,35 @@
#include <util/signalinterrupt.h>
#include <util/strencodings.h>
#include <event2/buffer.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/http_struct.h>
#include <cassert>
#include <cstdint>
#include <string>
#include <vector>
extern "C" int evhttp_parse_firstline_(struct evhttp_request*, struct evbuffer*);
extern "C" int evhttp_parse_headers_(struct evhttp_request*, struct evbuffer*);
std::string RequestMethodString(HTTPRequest::RequestMethod m);
std::string RequestMethodString(HTTPRequestMethod m);
FUZZ_TARGET(http_request)
{
using http_bitcoin::HTTPRequest;
using http_bitcoin::MAX_HEADERS_SIZE;
using util::LineReader;
FuzzedDataProvider fuzzed_data_provider{buffer.data(), buffer.size()};
evhttp_request* evreq = evhttp_request_new(nullptr, nullptr);
assert(evreq != nullptr);
evreq->kind = EVHTTP_REQUEST;
evbuffer* evbuf = evbuffer_new();
assert(evbuf != nullptr);
const std::vector<uint8_t> http_buffer = ConsumeRandomLengthByteVector(fuzzed_data_provider, 4096);
evbuffer_add(evbuf, http_buffer.data(), http_buffer.size());
// Avoid constructing requests that will be interpreted by libevent as PROXY requests to avoid triggering
// a nullptr dereference. The dereference (req->evcon->http_server) takes place in evhttp_parse_request_line
// and is a consequence of our hacky but necessary use of the internal function evhttp_parse_firstline_ in
// this fuzzing harness. The workaround is not aesthetically pleasing, but it successfully avoids the troublesome
// code path. " http:// HTTP/1.1\n" was a crashing input prior to this workaround.
const std::string http_buffer_str = ToLower(std::string{http_buffer.begin(), http_buffer.end()});
if (http_buffer_str.find(" http://") != std::string::npos || http_buffer_str.find(" https://") != std::string::npos ||
evhttp_parse_firstline_(evreq, evbuf) != 1 || evhttp_parse_headers_(evreq, evbuf) != 1) {
evbuffer_free(evbuf);
evhttp_request_free(evreq);
const std::vector<std::byte> http_bytes_buffer(reinterpret_cast<const std::byte*>(http_buffer.data()),
reinterpret_cast<const std::byte*>(http_buffer.data()) + http_buffer.size());
HTTPRequest http_request;
LineReader reader(http_bytes_buffer, MAX_HEADERS_SIZE);
try {
if (!http_request.LoadControlData(reader)) return;
if (!http_request.LoadHeaders(reader)) return;
if (!http_request.LoadBody(reader)) return;
} catch (const std::runtime_error&) {
return;
}
util::SignalInterrupt interrupt;
HTTPRequest http_request{evreq, interrupt, true};
const HTTPRequest::RequestMethod request_method = http_request.GetRequestMethod();
const HTTPRequestMethod request_method = http_request.GetRequestMethod();
(void)RequestMethodString(request_method);
(void)http_request.GetURI();
(void)http_request.GetHeader("Host");
@ -60,9 +48,4 @@ FUZZ_TARGET(http_request)
(void)http_request.GetHeader(header);
const std::string body = http_request.ReadBody();
assert(body.empty());
const CService service = http_request.GetPeer();
assert(service.ToStringAddrPort() == "[::]:0");
evbuffer_free(evbuf);
evhttp_request_free(evreq);
}

View file

@ -3,14 +3,80 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <httpserver.h>
#include <time.h>
#include <rpc/protocol.h>
#include <test/util/net.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>
#include <boost/test/unit_test.hpp>
BOOST_FIXTURE_TEST_SUITE(httpserver_tests, BasicTestingSetup)
using http_bitcoin::HTTPHeaders;
using http_bitcoin::HTTPRequest;
using http_bitcoin::HTTPResponse;
using http_bitcoin::HTTPServer;
using http_bitcoin::MAX_HEADERS_SIZE;
using util::LineReader;
BOOST_AUTO_TEST_CASE(test_query_parameters)
// Reading request captured from bitcoin-cli
const std::string full_request =
"504f5354202f20485454502f312e310d0a486f73743a203132372e302e302e310d"
"0a436f6e6e656374696f6e3a20636c6f73650d0a436f6e74656e742d547970653a"
"206170706c69636174696f6e2f6a736f6e0d0a417574686f72697a6174696f6e3a"
"204261736963205831396a6232397261575666587a6f354f4751354f4451334d57"
"4e6d4e6a67304e7a417a59546b7a4e32457a4e7a6b305a44466c4f4451314e6a5a"
"6d5954526b5a6a4a694d7a466b596a68684f4449345a4759344d6a566a4f546735"
"5a4749344f54566c0d0a436f6e74656e742d4c656e6774683a2034360d0a0d0a7b"
"226d6574686f64223a22676574626c6f636b636f756e74222c22706172616d7322"
"3a5b5d2c226964223a317d0a";
/// Save the value of CreateSock and restore it when the test ends.
class HTTPTestingSetup : public BasicTestingSetup
{
public:
explicit HTTPTestingSetup() : m_create_sock_orig{CreateSock} {};
~HTTPTestingSetup()
{
CreateSock = m_create_sock_orig;
}
private:
const decltype(CreateSock) m_create_sock_orig;
};
BOOST_FIXTURE_TEST_SUITE(httpserver_tests, HTTPTestingSetup)
BOOST_AUTO_TEST_CASE(test_query_parameters_new_behavior)
{
// The legacy code that relied on libevent couldn't handle an invalid URI encoding.
// The new code is more tolerant and so we expect a difference in behavior.
// Re: libevent evhttp_uri_parse() see:
// "bugfix: rest: avoid segfault for invalid URI" https://github.com/bitcoin/bitcoin/pull/27468
// "httpserver, rest: improving URI validation" https://github.com/bitcoin/bitcoin/pull/27253
// Re: More tolerant URI decoding see:
// "refactor: Use our own implementation of urlDecode" https://github.com/bitcoin/bitcoin/pull/29904
std::string uri {};
// This is an invalid URI because it contains a % that is not followed by two hex digits
uri = "/rest/endpoint/someresource.json?p1=v1&p2=v2%";
// Old libevent behavior: URI with invalid characters (%) raised a runtime error regardless of which query parameter is queried
// New behavior: Tolerate as much as we can even
BOOST_CHECK_EQUAL(http_bitcoin::GetQueryParameterFromUri(uri.c_str(), "p1").value(), "v1");
BOOST_CHECK_EQUAL(http_bitcoin::GetQueryParameterFromUri(uri.c_str(), "p2").value(), "v2%");
// This is a valid URI because the %XX encoding is correct: `?p1=v1&p2=100%`
uri = "/rest/endpoint/someresource.json%3Fp1%3Dv1%26p2%3D100%25";
// Old behavior: libevent did not decode the URI before parsing, so it did not detect or return the query
// (libevent would parse the entire argument string as the uri path)
// New behavior: Decode before parsing the URI so reserved characters like ? & = are interpreted correctly
BOOST_CHECK_EQUAL(http_bitcoin::GetQueryParameterFromUri(uri.c_str(), "p1").value(), "v1");
BOOST_CHECK_EQUAL(http_bitcoin::GetQueryParameterFromUri(uri.c_str(), "p2").value(), "100%");
}
// Ensure new behavior matches old behavior
template <typename func>
void test_query_parameters(func GetQueryParameterFromUri) {
std::string uri {};
// No parameters
@ -35,8 +101,341 @@ BOOST_AUTO_TEST_CASE(test_query_parameters)
uri = "/rest/endpoint/someresource.json&p1=v1&p2=v2";
BOOST_CHECK(!GetQueryParameterFromUri(uri.c_str(), "p1").has_value());
// URI with invalid characters (%) raises a runtime error regardless of which query parameter is queried
uri = "/rest/endpoint/someresource.json&p1=v1&p2=v2%";
BOOST_CHECK_EXCEPTION(GetQueryParameterFromUri(uri.c_str(), "p1"), std::runtime_error, HasReason("URI parsing failed, it likely contained RFC 3986 invalid characters"));
// Multiple parameters, some characters encoded
uri = "/rest/endpoint/someresource.json?p1=v1%20&p2=100%25";
BOOST_CHECK_EQUAL(GetQueryParameterFromUri(uri.c_str(), "p1").value(), "v1 ");
BOOST_CHECK_EQUAL(GetQueryParameterFromUri(uri.c_str(), "p2").value(), "100%");
}
BOOST_AUTO_TEST_CASE(test_query_parameters_bitcoin)
{
test_query_parameters(http_bitcoin::GetQueryParameterFromUri);
}
BOOST_AUTO_TEST_CASE(http_headers_tests)
{
{
// Writing response headers
HTTPHeaders headers{};
BOOST_CHECK(!headers.Find("Cache-Control"));
headers.Write("Cache-Control", "no-cache");
// Check case-insensitive key matching
BOOST_CHECK_EQUAL(headers.Find("Cache-Control").value(), "no-cache");
BOOST_CHECK_EQUAL(headers.Find("cache-control").value(), "no-cache");
// Additional values are comma-separated and appended
headers.Write("Cache-Control", "no-store");
BOOST_CHECK_EQUAL(headers.Find("Cache-Control").value(), "no-cache, no-store");
// Add a few more
headers.Write("Pie", "apple");
headers.Write("Sandwich", "ham");
headers.Write("Coffee", "black");
BOOST_CHECK_EQUAL(headers.Find("Pie").value(), "apple");
// Remove
headers.Remove("Pie");
BOOST_CHECK(!headers.Find("Pie"));
// Combine for transmission
// std::map sorts alphabetically by key, no order is specified for HTTP
BOOST_CHECK_EQUAL(
headers.Stringify(),
"Cache-Control: no-cache, no-store\r\n"
"Coffee: black\r\n"
"Sandwich: ham\r\n\r\n");
}
{
// Reading request headers captured from bitcoin-cli
std::vector<std::byte> buffer{TryParseHex<std::byte>(
"486f73743a203132372e302e302e310d0a436f6e6e656374696f6e3a20636c6f73"
"650d0a436f6e74656e742d547970653a206170706c69636174696f6e2f6a736f6e"
"0d0a417574686f72697a6174696f6e3a204261736963205831396a623239726157"
"5666587a6f7a597a4a6b4e5441784e44466c4d474a69596d56684d5449354f4467"
"334e7a49354d544d334e54526d4e54686b4e6a63324f574d775a5459785a6a677a"
"4e5467794e7a4577595459314f47526b596a566d5a4751330d0a436f6e74656e74"
"2d4c656e6774683a2034360d0a0d0a").value()};
util::LineReader reader(buffer, /*max_read=*/1028);
HTTPHeaders headers{};
headers.Read(reader);
BOOST_CHECK_EQUAL(headers.Find("Host").value(), "127.0.0.1");
BOOST_CHECK_EQUAL(headers.Find("Connection").value(), "close");
BOOST_CHECK_EQUAL(headers.Find("Content-Type").value(), "application/json");
BOOST_CHECK_EQUAL(headers.Find("Authorization").value(), "Basic X19jb29raWVfXzozYzJkNTAxNDFlMGJiYmVhMTI5ODg3NzI5MTM3NTRmNThkNjc2OWMwZTYxZjgzNTgyNzEwYTY1OGRkYjVmZGQ3");
BOOST_CHECK_EQUAL(headers.Find("Content-Length").value(), "46");
BOOST_CHECK(!headers.Find("Pizza"));
}
}
BOOST_AUTO_TEST_CASE(http_response_tests)
{
// Typical HTTP 1.1 response headers
HTTPHeaders headers{};
headers.Write("Content-Type", "application/json");
headers.Write("Date", "Tue, 15 Oct 2024 17:54:12 GMT");
headers.Write("Content-Length", "41");
// Response points to headers which already exist because some of them
// are set before we even know what the response will be.
HTTPResponse res;
res.m_version_major = 1;
res.m_version_minor = 1;
res.m_status = HTTP_OK;
res.m_reason = HTTPReason.find(res.m_status)->second;
res.m_body = StringToBuffer("{\"result\":865793,\"error\":null,\"id\":null\"}");
// Everything except the body, which might be raw bytes instead of a string
res.m_headers = std::move(headers);
BOOST_CHECK_EQUAL(
res.StringifyHeaders(),
"HTTP/1.1 200 OK\r\n"
"Content-Length: 41\r\n"
"Content-Type: application/json\r\n"
"Date: Tue, 15 Oct 2024 17:54:12 GMT\r\n"
"\r\n");
}
BOOST_AUTO_TEST_CASE(http_request_tests)
{
{
HTTPRequest req;
std::vector<std::byte> buffer{TryParseHex<std::byte>(full_request).value()};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(req.LoadHeaders(reader));
BOOST_CHECK(req.LoadBody(reader));
BOOST_CHECK_EQUAL(req.m_method, "POST");
BOOST_CHECK_EQUAL(req.GetRequestMethod(), HTTPRequestMethod::POST);
BOOST_CHECK_EQUAL(req.m_target, "/");
BOOST_CHECK_EQUAL(req.GetURI(), "/");
BOOST_CHECK_EQUAL(req.m_version_major, 1);
BOOST_CHECK_EQUAL(req.m_version_minor, 1);
BOOST_CHECK_EQUAL(req.m_headers.Find("Host").value(), "127.0.0.1");
BOOST_CHECK_EQUAL(req.m_headers.Find("Connection").value(), "close");
BOOST_CHECK_EQUAL(req.m_headers.Find("Content-Type").value(), "application/json");
BOOST_CHECK_EQUAL(req.m_headers.Find("Authorization").value(), "Basic X19jb29raWVfXzo5OGQ5ODQ3MWNmNjg0NzAzYTkzN2EzNzk0ZDFlODQ1NjZmYTRkZjJiMzFkYjhhODI4ZGY4MjVjOTg5ZGI4OTVl");
BOOST_CHECK_EQUAL(req.m_headers.Find("Content-Length").value(), "46");
BOOST_CHECK_EQUAL(req.m_body.size(), 46);
BOOST_CHECK_EQUAL(req.m_body, "{\"method\":\"getblockcount\",\"params\":[],\"id\":1}\n");
}
{
const std::string too_short_request_line = "GET/HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(too_short_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK_THROW(req.LoadControlData(reader), std::runtime_error);
}
{
const std::string malformed_request_line = "GET / HTTP / 1.0\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(malformed_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK_THROW(req.LoadControlData(reader), std::runtime_error);
}
{
const std::string malformed_request_line = "GET / HTTP1.0\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(malformed_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK_THROW(req.LoadControlData(reader), std::runtime_error);
}
{
const std::string malformed_request_line = "GET / HTTP/11\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(malformed_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK_THROW(req.LoadControlData(reader), std::runtime_error);
}
{
const std::string malformed_request_line = "GET / HTTP/1.x\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(malformed_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK_THROW(req.LoadControlData(reader), std::runtime_error);
}
{
const std::string ok_request_line = "GET / HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(ok_request_line)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(req.LoadHeaders(reader));
BOOST_CHECK(req.LoadBody(reader));
BOOST_CHECK_EQUAL(req.m_method, "GET");
BOOST_CHECK_EQUAL(req.m_target, "/");
BOOST_CHECK_EQUAL(req.m_version_major, 1);
BOOST_CHECK_EQUAL(req.m_version_minor, 0);
BOOST_CHECK_EQUAL(req.m_headers.Find("Host").value(), "127.0.0.1");
// no body is OK
BOOST_CHECK_EQUAL(req.m_body.size(), 0);
}
{
const std::string malformed_headers = "GET / HTTP/1.0\r\nHost=127.0.0.1\r\n\r\n";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(malformed_headers)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK_THROW(req.LoadHeaders(reader), std::runtime_error);
}
{
// We might not have received enough data from the client which is not
// an error. We return false so the caller can try again later when the
// buffer has more data.
const std::string incomplete_headers = "GET / HTTP/1.0\r\nHost: ";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(incomplete_headers)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(!req.LoadHeaders(reader));
}
{
const std::string no_content_length = "GET / HTTP/1.0\r\n\r\n{\"method\":\"getblockcount\"}";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(no_content_length)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(req.LoadHeaders(reader));
BOOST_CHECK(req.LoadBody(reader));
// Don't try to read request body if Content-Length is missing
BOOST_CHECK_EQUAL(req.m_body.size(), 0);
}
{
const std::string bad_content_length = "GET / HTTP/1.0\r\nContent-Length: eleven\r\n\r\n{\"method\":\"getblockcount\"}";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(bad_content_length)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(req.LoadHeaders(reader));
BOOST_CHECK_THROW(req.LoadBody(reader), std::runtime_error);
}
{
// Content-Length indicates more data than we have in the buffer.
// Again, not an error just try again later.
const std::string excessive_content_length = "GET / HTTP/1.0\r\nContent-Length: 1024\r\n\r\n{\"method\":\"getblockcount\"}";
HTTPRequest req;
std::vector<std::byte> buffer{StringToBuffer(excessive_content_length)};
LineReader reader(buffer, MAX_HEADERS_SIZE);
BOOST_CHECK(req.LoadControlData(reader));
BOOST_CHECK(req.LoadHeaders(reader));
BOOST_CHECK(!req.LoadBody(reader));
}
}
BOOST_AUTO_TEST_CASE(http_client_server_tests)
{
// Hard code the timestamp for the Date header in the HTTP response
// Wed Dec 11 00:47:09 2024 UTC
SetMockTime(1733878029);
// Queue of connected sockets returned by listening socket (represents network interface)
std::shared_ptr<DynSock::Queue> accepted_sockets{std::make_shared<DynSock::Queue>()};
CreateSock = [&accepted_sockets](int, int, int) {
// This is a mock Listening Socket that the HTTP server will "bind" to and
// listen to for incoming connections. We won't need to access its I/O
// pipes because we don't read or write directly to it. It will return
// Connected Sockets from the queue via its Accept() method.
return std::make_unique<DynSock>(std::make_shared<DynSock::Pipes>(), accepted_sockets);
};
{
// I/O pipes of one mock Connected Socket we can read and write to.
std::shared_ptr<DynSock::Pipes> connected_socket_pipes(std::make_shared<DynSock::Pipes>());
// Insert the payload: a correctly formatted HTTP request
std::vector<std::byte> buffer{TryParseHex<std::byte>(full_request).value()};
connected_socket_pipes->recv.PushBytes(buffer.data(), buffer.size());
// Mock Connected Socket that represents a client.
// It needs I/O pipes but its queue can remain empty
std::unique_ptr<DynSock> connected_socket{std::make_unique<DynSock>(connected_socket_pipes, std::make_shared<DynSock::Queue>())};
// Prepare queue of accepted_sockets: just one connection with no data
accepted_sockets->Push(std::move(connected_socket));
// Prepare a request handler that just stores received requests so we can examine them
// Mutex is required to prevent a race between this test's main thread and the Sockman I/O loop.
Mutex requests_mutex;
std::deque<std::unique_ptr<HTTPRequest>> requests;
auto StoreRequest = [&](std::unique_ptr<HTTPRequest> req) {
LOCK(requests_mutex);
requests.push_back(std::move(req));
};
// Instantiate server with dead-end request handler
HTTPServer server = HTTPServer(StoreRequest);
BOOST_REQUIRE(server.m_no_clients);
// This address won't actually get used because we stubbed CreateSock()
const std::optional<CService> addr{Lookup("127.0.0.1", 8333, false)};
bilingual_str strError;
// Bind to mock Listening Socket
BOOST_REQUIRE(server.BindAndStartListening(addr.value(), strError));
// Start the I/O loop, accepting connections
SockMan::Options sockman_options;
server.StartSocketsThreads(sockman_options);
// Wait up to one minute for mock client to connect.
// Given that the mock client is itself a mock socket
// with hard-coded data it should only take a fraction of that.
int attempts{6000};
while (attempts > 0)
{
if (!server.m_no_clients) break;
std::this_thread::sleep_for(10ms);
--attempts;
}
BOOST_REQUIRE(!server.m_no_clients);
{
LOCK(requests_mutex);
// Connected client should have one request already from the static content.
BOOST_CHECK_EQUAL(requests.size(), 1);
// Check the received request
BOOST_CHECK_EQUAL(requests.front()->m_body, "{\"method\":\"getblockcount\",\"params\":[],\"id\":1}\n");
BOOST_CHECK_EQUAL(requests.front()->GetPeer().ToStringAddrPort(), "5.5.5.5:6789");
// Respond to request
requests.front()->WriteReply(HTTP_OK, "874140\n");
}
// Check the sent response from the mock client at the other end of the mock socket
std::string expected = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Length: 7\r\n"
"Content-Type: text/html; charset=ISO-8859-1\r\n"
"Date: Wed, 11 Dec 2024 00:47:09 GMT\r\n"
"\r\n"
"874140\n";
std::string actual;
// Wait up to one minute for all the bytes to appear in the "send" pipe.
char buf[0x10000] = {};
attempts = 6000;
while (attempts > 0)
{
ssize_t bytes_read = connected_socket_pipes->send.GetBytes(buf, sizeof(buf), 0);
if (bytes_read > 0) {
actual.append(buf, bytes_read);
if (actual == expected) break;
}
std::this_thread::sleep_for(10ms);
--attempts;
}
BOOST_CHECK_EQUAL(actual, expected);
// Wait up to one minute for connection to be closed
attempts = 6000;
while (attempts > 0)
{
if (server.m_no_clients) break;
std::this_thread::sleep_for(10ms);
--attempts;
}
BOOST_REQUIRE(server.m_no_clients);
// Close server
server.interruptNet();
// Wait for I/O loop to finish, after all sockets are closed
server.JoinSocketsThreads();
}
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -2,6 +2,7 @@
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <util/strencodings.h>
#include <util/string.h>
#include <boost/test/unit_test.hpp>
@ -146,4 +147,61 @@ BOOST_AUTO_TEST_CASE(ConstevalFormatString_NumSpec)
HasReason{"tinyformat: Too many conversion specifiers in format string"});
}
BOOST_AUTO_TEST_CASE(case_insensitive_comparator_test)
{
CaseInsensitiveComparator cmp;
BOOST_CHECK(cmp("A", "B"));
BOOST_CHECK(cmp("A", "b"));
BOOST_CHECK(cmp("a", "B"));
BOOST_CHECK(!cmp("B", "A"));
BOOST_CHECK(!cmp("B", "a"));
BOOST_CHECK(!cmp("b", "A"));
// Use a character with value > 127
// to ensure we don't trigger implicit-integer-sign-change
BOOST_CHECK(cmp("a", "\xe4"));
}
BOOST_AUTO_TEST_CASE(line_reader_test)
{
{
// Check three lines terminated by \n, \r\n, and end of buffer, trimming whitespace
const std::vector<std::byte> input{StringToBuffer("once upon a time\n there was a dog \r\nwho liked food")};
LineReader reader(input, /*max_read=*/128);
std::optional<std::string> line1{reader.ReadLine()};
BOOST_CHECK_EQUAL(reader.Left(), 33);
std::optional<std::string> line2{reader.ReadLine()};
BOOST_CHECK_EQUAL(reader.Left(), 14);
std::optional<std::string> line3{reader.ReadLine()};
std::optional<std::string> line4{reader.ReadLine()};
BOOST_CHECK(line1);
BOOST_CHECK(line2);
BOOST_CHECK(line3);
BOOST_CHECK(!line4);
BOOST_CHECK_EQUAL(line1.value(), "once upon a time");
BOOST_CHECK_EQUAL(line2.value(), "there was a dog");
BOOST_CHECK_EQUAL(line3.value(), "who liked food");
}
{
// Do not exceed max_read while searching for EOL
const std::vector<std::byte> input1{StringToBuffer("once upon a time there was a dog\nwho liked food")};
LineReader reader1(input1, /*max_read=*/10);
BOOST_CHECK_THROW(reader1.ReadLine(), std::runtime_error);
const std::vector<std::byte> input2{StringToBuffer("once upon\n a time there was a dog who liked food")};
LineReader reader2(input2, /*max_read=*/10);
BOOST_CHECK_EQUAL(reader2.ReadLine(), "once upon");
BOOST_CHECK_THROW(reader2.ReadLine(), std::runtime_error);
}
{
// Read specific number of bytes regardless of max_read or \n unless buffer is too short
const std::vector<std::byte> input{StringToBuffer("once upon a time\n there was a dog \r\nwho liked food")};
LineReader reader(input, /*max_read=*/1);
BOOST_CHECK_EQUAL(reader.ReadLength(0), "");
BOOST_CHECK_EQUAL(reader.ReadLength(3), "onc");
BOOST_CHECK_EQUAL(reader.ReadLength(8), "e upon a");
BOOST_CHECK_EQUAL(reader.ReadLength(8), " time\n t");
BOOST_CHECK_THROW(reader.ReadLength(128), std::runtime_error);
}
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -385,6 +385,13 @@ BOOST_AUTO_TEST_CASE(util_FormatISO8601Date)
BOOST_CHECK_EQUAL(FormatISO8601Date(1317425777), "2011-09-30");
}
BOOST_AUTO_TEST_CASE(util_FormatRFC7231DateTime)
{
BOOST_CHECK_EQUAL(FormatRFC7231DateTime(253402214400), "Fri, 31 Dec 9999 00:00:00 GMT");
BOOST_CHECK_EQUAL(FormatRFC7231DateTime(1717429609), "Mon, 03 Jun 2024 15:46:49 GMT");
BOOST_CHECK_EQUAL(FormatRFC7231DateTime(0), "Thu, 01 Jan 1970 00:00:00 GMT");
}
BOOST_AUTO_TEST_CASE(util_FormatMoney)
{
BOOST_CHECK_EQUAL(FormatMoney(0), "0.00");
@ -1034,6 +1041,26 @@ BOOST_AUTO_TEST_CASE(test_ParseUInt64)
BOOST_CHECK(!ParseUInt64("-1234", &n));
}
BOOST_AUTO_TEST_CASE(test_ParseUInt64Hex)
{
uint64_t n;
// Valid values
BOOST_CHECK(ParseUInt64Hex("1234", nullptr));
BOOST_CHECK(ParseUInt64Hex("1234", &n) && n == 4660);
BOOST_CHECK(ParseUInt64Hex("a", &n) && n == 10);
BOOST_CHECK(ParseUInt64Hex("0000000a", &n) && n == 10);
BOOST_CHECK(ParseUInt64Hex("100", &n) && n == 256);
BOOST_CHECK(ParseUInt64Hex("DEADbeef", &n) && n == 3735928559);
BOOST_CHECK(ParseUInt64Hex("FfFfFfFf", &n) && n == 4294967295);
// Invalid values
BOOST_CHECK(!ParseUInt64Hex("123456789", &n));
BOOST_CHECK(!ParseUInt64Hex("", &n));
BOOST_CHECK(!ParseUInt64Hex("-1", &n));
BOOST_CHECK(!ParseUInt64Hex("10 00", &n));
BOOST_CHECK(!ParseUInt64Hex("1 ", &n));
BOOST_CHECK(!ParseUInt64Hex("0xAB", &n));
}
BOOST_AUTO_TEST_CASE(test_FormatParagraph)
{
BOOST_CHECK_EQUAL(FormatParagraph("", 79, 0), "");

View file

@ -251,6 +251,24 @@ bool ParseUInt64(std::string_view str, uint64_t* out)
return ParseIntegral<uint64_t>(str, out);
}
bool ParseUInt64Hex(std::string_view str, uint64_t* out)
{
if (str.size() > 8) return false;
if (str.size() < 1) return false;
uint64_t total{0};
auto it = str.begin();
while (it != str.end()) {
auto v = HexDigit(*(it++));
if (v < 0) return false;
total <<= 4;
total |= v;
}
if (out != nullptr) {
*out = total;
}
return true;
}
std::string FormatParagraph(std::string_view in, size_t width, size_t indent)
{
assert(width >= indent);
@ -479,3 +497,10 @@ std::optional<uint64_t> ParseByteUnits(std::string_view str, ByteUnit default_mu
}
return *parsed_num * unit_amount;
}
std::vector<std::byte> StringToBuffer(const std::string& str)
{
return std::vector<std::byte>(
reinterpret_cast<const std::byte*>(str.data()),
reinterpret_cast<const std::byte*>(str.data() + str.size()));
}

View file

@ -229,6 +229,14 @@ std::optional<T> ToIntegral(std::string_view str)
*/
[[nodiscard]] bool ParseUInt64(std::string_view str, uint64_t *out);
/**
* Convert hexadecimal string to unsigned 64-bit integer, with 4-bit
* resolution (odd length strings are acceptable without leading "0")
* @returns true if the entire string could be parsed as valid integer,
* false if not, or in case of overflow.
*/
[[nodiscard]] bool ParseUInt64Hex(std::string_view str, uint64_t *out);
/**
* Format a paragraph of text to a fixed width, adding spaces for
* indentation to any added line.
@ -367,6 +375,15 @@ std::string Capitalize(std::string str);
*/
std::optional<uint64_t> ParseByteUnits(std::string_view str, ByteUnit default_multiplier);
/**
* Returns a byte vector filled with data from a string. Used to test string-
* encoded data from a socket like HTTP headers.
*
* @param[in] str the string to convert into bytes
* @returns byte vector
*/
std::vector<std::byte> StringToBuffer(const std::string& str);
namespace util {
/** consteval version of HexDigit() without the lookup table. */
consteval uint8_t ConstevalHexDigit(const char c)
@ -395,6 +412,18 @@ struct Hex {
};
} // namespace detail
struct CaseInsensitiveComparator {
bool operator()(const std::string& s1, const std::string& s2) const
{
return std::lexicographical_compare(
s1.begin(), s1.end(),
s2.begin(), s2.end(),
[](char c1, char c2) {
return static_cast<uint8_t>(ToLower(c1)) < static_cast<uint8_t>(ToLower(c2));
});
}
};
/**
* ""_hex is a compile-time user-defined literal returning a
* `std::array<std::byte>`, equivalent to ParseHex(). Variants provided:

View file

@ -13,4 +13,42 @@ void ReplaceAll(std::string& in_out, const std::string& search, const std::strin
if (search.empty()) return;
in_out = std::regex_replace(in_out, std::regex(search), substitute);
}
LineReader::LineReader(std::span<const std::byte> buffer, size_t max_read)
: start(buffer.begin()), end(buffer.end()), max_read(max_read), it(buffer.begin()) {}
std::optional<std::string> LineReader::ReadLine()
{
if (it == end) {
return std::nullopt;
}
auto line_start = it;
std::string line{};
while (it != end) {
char c = static_cast<char>(*it);
line += c;
++it;
if (c == '\n') break;
if ((size_t)std::distance(line_start, it) >= max_read) throw std::runtime_error("max_read exceeded by LineReader");
}
line = TrimString(line); // delete trailing \r and/or \n
return line;
}
// Ignores max_read but won't overflow
std::string LineReader::ReadLength(size_t len)
{
if (len == 0) return "";
if (Left() < len) throw std::runtime_error("Not enough data in buffer");
std::string out(reinterpret_cast<const char*>(&(*it)), len);
it += len;
return out;
}
size_t LineReader::Left() const
{
return std::distance(it, end);
}
} // namespace util

View file

@ -11,6 +11,7 @@
#include <cstdint>
#include <cstring>
#include <locale>
#include <optional>
#include <sstream>
#include <string> // IWYU pragma: export
#include <string_view> // IWYU pragma: export
@ -248,6 +249,25 @@ template <typename T1, size_t PREFIX_LEN>
return obj.size() >= PREFIX_LEN &&
std::equal(std::begin(prefix), std::end(prefix), std::begin(obj));
}
struct LineReader {
const std::span<const std::byte>::iterator start;
const std::span<const std::byte>::iterator end;
const size_t max_read;
std::span<const std::byte>::iterator it;
explicit LineReader(std::span<const std::byte> buffer, size_t max_read);
// Returns a string from current iterator position up to next \n
// and advances iterator, does not return trailing \n or \r.
// Will not search for \n past max_read.
std::optional<std::string> ReadLine();
// Returns string from current iterator position of specified length
// and advances iterator. May exceed max_read but will not read past end of buffer.
std::string ReadLength(size_t len);
// Returns remaining size of bytes in buffer
size_t Left() const;
};
} // namespace util
#endif // BITCOIN_UTIL_STRING_H

View file

@ -17,6 +17,9 @@
#include <string_view>
#include <thread>
static const std::string weekdays[7] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
static const std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
void UninterruptibleSleep(const std::chrono::microseconds& n) { std::this_thread::sleep_for(n); }
static std::atomic<std::chrono::seconds> g_mock_time{}; //!< For testing
@ -116,6 +119,20 @@ std::optional<int64_t> ParseISO8601DateTime(std::string_view str)
return int64_t{TicksSinceEpoch<std::chrono::seconds>(tp)};
}
std::string FormatRFC7231DateTime(int64_t nTime)
{
const std::chrono::sys_seconds secs{std::chrono::seconds{nTime}};
const auto days{std::chrono::floor<std::chrono::days>(secs)};
// 1970-01-01 was a Thursday
std::string weekday{weekdays[(days.time_since_epoch().count() + 4) % 7]};
const std::chrono::year_month_day ymd{days};
std::string month{months[unsigned{ymd.month()} - 1]};
const std::chrono::hh_mm_ss hms{secs - days};
// examples: Mon, 27 Jul 2009 12:28:53 GMT
// Fri, 31 May 2024 19:18:04 GMT
return strprintf("%03s, %02u %03s %04i %02i:%02i:%02i GMT", weekday, unsigned{ymd.day()}, month, signed{ymd.year()}, hms.hours().count(), hms.minutes().count(), hms.seconds().count());
}
struct timeval MillisToTimeval(int64_t nTimeout)
{
struct timeval timeout;

View file

@ -135,6 +135,12 @@ std::string FormatISO8601DateTime(int64_t nTime);
std::string FormatISO8601Date(int64_t nTime);
std::optional<int64_t> ParseISO8601DateTime(std::string_view str);
/**
* RFC7231 formatting https://www.rfc-editor.org/rfc/rfc7231#section-7.1.1.1
* Used in HTTP/1.1 responses
*/
std::string FormatRFC7231DateTime(int64_t nTime);
/**
* Convert milliseconds to a struct timeval for e.g. select.
*/

View file

@ -8,6 +8,7 @@ from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, str_to_b64str
import http.client
import time
import urllib.parse
class HTTPBasicsTest (BitcoinTestFramework):
@ -105,5 +106,127 @@ class HTTPBasicsTest (BitcoinTestFramework):
assert_equal(out1.status, http.client.BAD_REQUEST)
self.log.info("Check pipelining")
# Requests are responded to in order they were received
# See https://www.rfc-editor.org/rfc/rfc7230#section-6.3.2
tip_height = self.nodes[2].getblockcount()
req = "POST / HTTP/1.1\r\n"
req += f'Authorization: Basic {str_to_b64str(authpair)}\r\n'
# First request will take a long time to process
body1 = f'{{"method": "waitforblockheight", "params": [{tip_height + 1}]}}'
req1 = req
req1 += f'Content-Length: {len(body1)}\r\n\r\n'
req1 += body1
# Second request will process very fast
body2 = '{"method": "getblockcount"}'
req2 = req
req2 += f'Content-Length: {len(body2)}\r\n\r\n'
req2 += body2
# Get the underlying socket from HTTP connection so we can send something unusual
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
sock = conn.sock
sock.settimeout(1)
# Send two requests in a row. The first will block the second indefinitely
sock.sendall(req1.encode("utf-8"))
sock.sendall(req2.encode("utf-8"))
try:
# The server should not respond to the fast, second request
# until the (very) slow first request has been handled:
res = sock.recv(1024)
assert not res
except TimeoutError:
pass
# Use a separate http connection to generate a block
self.generate(self.nodes[2], 1, sync_fun=self.no_op)
# Wait for two responses to be received
res = b""
while res.count(b"result") != 2:
res += sock.recv(1024)
# waitforblockheight was responded to first, and then getblockcount
# which includes the block added after the request was made
chunks = res.split(b'"result":')
assert chunks[1].startswith(b'{"hash":')
assert chunks[2].startswith(bytes(f'{tip_height + 1}', 'utf8'))
self.log.info("Check HTTP request encoded with chunked transfer")
headers_chunked = headers.copy()
headers_chunked.update({"Transfer-encoding": "chunked"})
body_chunked = [
b'{"method": "submitblock", "params": ["',
b'0A' * 1000000,
b'0B' * 1000000,
b'0C' * 1000000,
b'0D' * 1000000,
b'"]}'
]
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
conn.request(
method='POST',
url='/',
body=iter(body_chunked),
headers=headers_chunked,
encode_chunked=True)
out1 = conn.getresponse().read()
assert out1 == b'{"result":"high-hash","error":null}\n'
self.log.info("Check -rpcservertimeout")
# The test framework typically reuses a single persistent HTTP connection
# for all RPCs to a TestNode. Because we are setting -rpcservertimeout
# so low on this one node, its connection will quickly timeout and get dropped by
# the server. Negating this setting will force the AuthServiceProxy
# for this node to create a fresh new HTTP connection for every command
# called for the remainder of this test.
self.nodes[2].reuse_http_connections = False
self.restart_node(2, extra_args=["-rpcservertimeout=1"])
# This is the amount of time the server will wait for a client to
# send a complete request. Test it by sending an incomplete but
# so-far otherwise well-formed HTTP request, and never finishing it.
# Copied from http_incomplete_test_() in regress_http.c in libevent.
# A complete request would have an additional "\r\n" at the end.
http_request = "GET /test1 HTTP/1.1\r\nHost: somehost\r\n"
# Get the underlying socket from HTTP connection so we can send something unusual
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
sock = conn.sock
sock.sendall(http_request.encode("utf-8"))
# Wait for response, but expect a timeout disconnection after 1 second
start = time.time()
res = sock.recv(1024)
stop = time.time()
assert res == b""
assert stop - start >= 1
# definitely closed
try:
conn.request('GET', '/')
conn.getresponse()
# macos/linux windows
except (ConnectionResetError, ConnectionAbortedError):
pass
# Sanity check
http_request = "GET /test2 HTTP/1.1\r\nHost: somehost\r\n\r\n"
conn = http.client.HTTPConnection(urlNode2.hostname, urlNode2.port)
conn.connect()
sock = conn.sock
sock.sendall(http_request.encode("utf-8"))
res = sock.recv(1024)
assert res.startswith(b"HTTP/1.1 404 Not Found")
# still open
conn.request('GET', '/')
conn.getresponse()
if __name__ == '__main__':
HTTPBasicsTest(__file__).main()

View file

@ -282,10 +282,12 @@ class RESTTest (BitcoinTestFramework):
assert_equal(len(json_obj), 1) # ensure that there is one header in the json response
assert_equal(json_obj[0]['hash'], bb_hash) # request/response hash should be the same
# Check invalid uri (% symbol at the end of the request)
for invalid_uri in [f"/headers/{bb_hash}%", f"/blockfilterheaders/basic/{bb_hash}%", "/mempool/contents.json?%"]:
# Check tolerance for invalid URI (% symbol at the end of the request)
for invalid_uri in [f"/headers/{bb_hash}%", f"/blockfilterheaders/basic/{bb_hash}%"]:
resp = self.test_rest_request(invalid_uri, ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), "URI parsing failed, it likely contained RFC 3986 invalid characters")
assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {bb_hash}%")
resp = self.test_rest_request("/mempool/contents.json?%", ret_type=RetType.OBJ, status=200)
assert_equal(resp.read().decode('utf-8').rstrip(), "{}")
# Compare with normal RPC block response
rpc_block_json = self.nodes[0].getblock(bb_hash)

View file

@ -75,6 +75,7 @@ class AuthServiceProxy():
self.__service_url = service_url
self._service_name = service_name
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests
self.reuse_http_connections = True
self.__url = urllib.parse.urlparse(service_url)
user = None if self.__url.username is None else self.__url.username.encode('utf8')
passwd = None if self.__url.password is None else self.__url.password.encode('utf8')
@ -92,6 +93,8 @@ class AuthServiceProxy():
raise AttributeError
if self._service_name is not None:
name = "%s.%s" % (self._service_name, name)
if not self.reuse_http_connections:
self._set_conn()
return AuthServiceProxy(self.__service_url, name, connection=self.__conn)
def _request(self, method, path, postdata):
@ -102,6 +105,8 @@ class AuthServiceProxy():
'User-Agent': USER_AGENT,
'Authorization': self.__auth_header,
'Content-type': 'application/json'}
if not self.reuse_http_connections:
self._set_conn()
self.__conn.request(method, path, postdata, headers)
return self._get_response()

View file

@ -156,6 +156,7 @@ class TestNode():
self.process = None
self.rpc_connected = False
self.rpc = None
self.reuse_http_connections = True # Must be set before calling get_rpc_proxy() i.e. before restarting node
self.url = None
self.log = logging.getLogger('TestFramework.node%d' % i)
# Cache perf subprocesses here by their data output filename.
@ -280,6 +281,7 @@ class TestNode():
timeout=self.rpc_timeout // 2, # Shorter timeout to allow for one retry in case of ETIMEDOUT
coveragedir=self.coverage_dir,
)
rpc.auth_service_proxy_instance.reuse_http_connections = self.reuse_http_connections
rpc.getblockcount()
# If the call to getblockcount() succeeds then the RPC connection is up
if self.version_is_at_least(190000) and wait_for_import:

View file

@ -87,7 +87,7 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
# 0.21.x and 22.x would both produce bad derivation paths when topping up an inactive hd chain
# Make sure that this is being automatically cleaned up by migration
node_master = self.nodes[1]
node_v22 = self.nodes[self.num_nodes - 5]
node_v22 = self.nodes[self.num_nodes - 3]
wallet_name = "bad_deriv_path"
node_v22.createwallet(wallet_name=wallet_name, descriptors=False)
bad_deriv_wallet = node_v22.get_wallet_rpc(wallet_name)