http: compose and send replies to connected clients

This commit is contained in:
Matthew Zipkin 2024-12-10 20:02:55 -05:00 committed by Matthew Zipkin
parent ace3e198d7
commit 73c3c2e3d3
No known key found for this signature in database
GPG key ID: E7E2984B6289C93A
3 changed files with 236 additions and 1 deletions

View file

@ -12,6 +12,7 @@
#include <netbase.h>
#include <node/interface_ui.h>
#include <rpc/protocol.h> // For HTTP status codes
#include <span.h>
#include <sync.h>
#include <util/check.h>
#include <util/signalinterrupt.h>
@ -940,6 +941,92 @@ bool HTTPRequest::LoadBody(LineReader& reader)
}
}
void HTTPRequest::WriteReply(HTTPStatusCode status, std::span<const std::byte> reply_body)
{
HTTPResponse res;
// Some response headers are determined in advance and stored in the request
res.m_headers = std::move(m_response_headers);
// Response version matches request version
res.m_version_major = m_version_major;
res.m_version_minor = m_version_minor;
// Add response code and look up reason string
res.m_status = status;
res.m_reason = HTTPReason.find(status)->second;
// See libevent evhttp_response_needs_body()
// Response headers are different if no body is needed
bool needs_body{status != HTTP_NO_CONTENT && (status < 100 || status >= 200)};
// See libevent evhttp_make_header_response()
// Expected response headers depend on protocol version
if (m_version_major == 1) {
// HTTP/1.0
if (m_version_minor == 0) {
auto connection_header{m_headers.Find("Connection")};
if (connection_header && ToLower(connection_header.value()) == "keep-alive") {
res.m_headers.Write("Connection", "keep-alive");
res.m_keep_alive = true;
}
}
// HTTP/1.1
if (m_version_minor >= 1) {
const int64_t now_seconds{TicksSinceEpoch<std::chrono::seconds>(NodeClock::now())};
res.m_headers.Write("Date", FormatRFC7231DateTime(now_seconds));
if (needs_body) {
res.m_headers.Write("Content-Length", strprintf("%d", reply_body.size()));
}
// Default for HTTP/1.1
res.m_keep_alive = true;
}
}
if (needs_body && !res.m_headers.Find("Content-Type")) {
// Default type from libevent evhttp_new_object()
res.m_headers.Write("Content-Type", "text/html; charset=ISO-8859-1");
}
auto connection_header{m_headers.Find("Connection")};
if (connection_header && ToLower(connection_header.value()) == "close") {
// Might not exist already but we need to replace it, not append to it
res.m_headers.Remove("Connection");
res.m_headers.Write("Connection", "close");
res.m_keep_alive = false;
}
// Serialize the response headers
const std::string headers{res.StringifyHeaders()};
const auto headers_bytes{std::as_bytes(std::span(headers.begin(), headers.end()))};
// Fill the send buffer with the complete serialized response headers + body
{
LOCK(m_client->m_send_mutex);
m_client->m_send_buffer.insert(m_client->m_send_buffer.end(), headers_bytes.begin(), headers_bytes.end());
// We've been using std::span up until now but it is finally time to copy
// data. The original data will go out of scope when WriteReply() returns.
// This is analogous to the memcpy() in libevent's evbuffer_add()
m_client->m_send_buffer.insert(m_client->m_send_buffer.end(), reply_body.begin(), reply_body.end());
}
// Inform Sockman I/O there is data that is ready to be sent to this client
// in the next loop iteration.
m_client->m_send_ready = true;
LogDebug(
BCLog::HTTP,
"HTTPResponse (status code: %d size: %lld) added to send buffer for client %s (id=%lld)\n",
status,
headers_bytes.size() + reply_body.size(),
m_client->m_origin,
m_client->m_node_id);
}
bool HTTPClient::ReadRequest(std::unique_ptr<HTTPRequest>& req)
{
LineReader reader(m_recv_buffer, MAX_HEADERS_SIZE);
@ -958,6 +1045,41 @@ bool HTTPClient::ReadRequest(std::unique_ptr<HTTPRequest>& req)
return true;
}
bool HTTPClient::SendBytesFromBuffer()
{
Assume(m_server);
// Send as much data from this client's buffer as we can
LOCK(m_send_mutex);
if (!m_send_buffer.empty()) {
std::string err;
// We don't intend to "send more" because http responses are usually small and we want the kernel to send them right away.
ssize_t bytes_sent = m_server->SendBytes(m_node_id, MakeUCharSpan(m_send_buffer), /*will_send_more=*/false, err);
if (bytes_sent < 0) {
LogDebug(
BCLog::HTTP,
"Error sending HTTP response data to client %s (id=%lld): %s\n",
m_origin,
m_node_id,
err);
// TODO: disconnect
return false;
}
Assume(static_cast<size_t>(bytes_sent) <= m_send_buffer.size());
m_send_buffer.erase(m_send_buffer.begin(), m_send_buffer.begin() + bytes_sent);
LogDebug(
BCLog::HTTP,
"Sent %d bytes to client %s (id=%lld)\n",
bytes_sent,
m_origin,
m_node_id);
}
return true;
}
bool HTTPServer::EventNewConnectionAccepted(NodeId node_id,
const CService& me,
const CService& them)
@ -971,6 +1093,23 @@ bool HTTPServer::EventNewConnectionAccepted(NodeId node_id,
return true;
}
void HTTPServer::EventReadyToSend(NodeId node_id, bool& cancel_recv)
{
// Next attempt to receive data from this node is permitted
cancel_recv = false;
// Get the HTTPClient
auto client{GetClientById(node_id)};
if (client == nullptr) {
return;
}
// SendBytesFromBuffer() returns true if we should keep the client around,
// false if we are done with it. Invert that boolean to inform Sockman
// whether it should cancel the next receive attempt from this client.
cancel_recv = !client->SendBytesFromBuffer();
}
void HTTPServer::EventGotData(NodeId node_id, std::span<const uint8_t> data)
{
// Get the HTTPClient
@ -1021,6 +1160,29 @@ void HTTPServer::EventGotData(NodeId node_id, std::span<const uint8_t> data)
}
}
bool HTTPServer::ShouldTryToSend(NodeId node_id) const
{
// Get the HTTPClient
auto client{GetClientById(node_id)};
if (client == nullptr) {
return false;
}
return client->m_send_ready;
}
bool HTTPServer::ShouldTryToRecv(NodeId node_id) const
{
// Get the HTTPClient
auto client{GetClientById(node_id)};
if (client == nullptr) {
return false;
}
// Don't try to receive again until we've cleared the send buffer to this client
return !client->m_send_ready;
}
std::shared_ptr<HTTPClient> HTTPServer::GetClientById(NodeId node_id) const
{
auto it{m_connected_clients.find(node_id)};

View file

@ -266,6 +266,15 @@ public:
bool LoadControlData(LineReader& reader);
bool LoadHeaders(LineReader& reader);
bool LoadBody(LineReader& reader);
// Response headers may be set in advance before response body is known
HTTPHeaders m_response_headers;
void WriteReply(HTTPStatusCode status, std::span<const std::byte> reply_body = {});
void WriteReply(HTTPStatusCode status, const char* reply_body) {
auto reply_body_view = std::string_view(reply_body);
std::span<const std::byte> byte_span(reinterpret_cast<const std::byte*>(reply_body_view.data()), reply_body_view.size());
WriteReply(status, byte_span);
}
};
class HTTPServer;
@ -288,6 +297,15 @@ public:
// and attempt to read HTTP requests from here.
std::vector<std::byte> m_recv_buffer{};
// Response data destined for this client.
// Written to directly by http worker threads, read and erased by Sockman I/O
Mutex m_send_mutex;
std::vector<std::byte> m_send_buffer GUARDED_BY(m_send_mutex);
// Set true by worker threads after writing a response to m_send_buffer.
// Set false by the Sockman I/O thread after flushing m_send_buffer.
// Checked in the Sockman I/O loop to avoid locking m_send_mutex if there's nothing to send.
std::atomic_bool m_send_ready{false};
explicit HTTPClient(NodeId node_id, CService addr) : m_node_id(node_id), m_addr(addr)
{
m_origin = addr.ToStringAddrPort();
@ -296,6 +314,11 @@ public:
// Try to read an HTTP request from the receive buffer
bool ReadRequest(std::unique_ptr<HTTPRequest>& req);
// Push data from m_send_buffer to the connected socket via m_server
// Returns false if we are done with this client and Sockman can
// therefore skip the next read operation from it.
bool SendBytesFromBuffer() EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
// Disable copies (should only be used as shared pointers)
HTTPClient(const HTTPClient&) = delete;
HTTPClient& operator=(const HTTPClient&) = delete;
@ -336,7 +359,7 @@ public:
* @param[out] cancel_recv Should always be set upon return and if it is true,
* then the next attempt to receive data from that node will be omitted.
*/
virtual void EventReadyToSend(NodeId node_id, bool& cancel_recv) override {};
virtual void EventReadyToSend(NodeId node_id, bool& cancel_recv) override;
/**
* Called when new data has been received.
@ -359,6 +382,22 @@ public:
* @param[in] errmsg Message describing the error.
*/
virtual void EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) override {};
/**
* Can be used to temporarily pause sends on a connection.
* SockMan would only call EventReadyToSend() if this returns true.
* The implementation in SockMan always returns true.
* @param[in] node_id Connection for which to confirm or omit the next call to EventReadyToSend().
*/
virtual bool ShouldTryToSend(NodeId node_id) const override;
/**
* SockMan would only call Recv() on a connection's socket if this returns true.
* Can be used to temporarily pause receives on a connection.
* The implementation in SockMan always returns true.
* @param[in] node_id Connection for which to confirm or omit the next receive.
*/
virtual bool ShouldTryToRecv(NodeId node_id) const override;
};
} // namespace http_bitcoin

View file

@ -3,6 +3,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <httpserver.h>
#include <time.h>
#include <rpc/protocol.h>
#include <test/util/net.h>
#include <test/util/setup_common.h>
@ -281,6 +282,10 @@ BOOST_AUTO_TEST_CASE(http_request_tests)
BOOST_AUTO_TEST_CASE(http_client_server_tests)
{
// Hard code the timestamp for the Date header in the HTTP response
// Wed Dec 11 00:47:09 2024 UTC
SetMockTime(1733878029);
// Queue of connected sockets returned by listening socket (represents network interface)
std::shared_ptr<DynSock::Queue> accepted_sockets{std::make_shared<DynSock::Queue>()};
@ -349,8 +354,37 @@ BOOST_AUTO_TEST_CASE(http_client_server_tests)
// Check the received request
BOOST_CHECK_EQUAL(requests.front()->m_body, "{\"method\":\"getblockcount\",\"params\":[],\"id\":1}\n");
// Respond to request
requests.front()->WriteReply(HTTP_OK, "874140\n");
}
// Check the sent response from the mock client at the other end of the mock socket
std::string expected = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Length: 7\r\n"
"Content-Type: text/html; charset=ISO-8859-1\r\n"
"Date: Wed, 11 Dec 2024 00:47:09 GMT\r\n"
"\r\n"
"874140\n";
std::string actual;
// Wait up to one minute for all the bytes to appear in the "send" pipe.
char buf[0x10000] = {};
attempts = 6000;
while (attempts > 0)
{
ssize_t bytes_read = connected_socket_pipes->send.GetBytes(buf, sizeof(buf), 0);
if (bytes_read > 0) {
actual.append(buf, bytes_read);
if (actual == expected) break;
}
std::this_thread::sleep_for(10ms);
--attempts;
}
BOOST_CHECK_EQUAL(actual, expected);
// Close server
server.interruptNet();
// Wait for I/O loop to finish, after all sockets are closed