http: use a queue to pipeline requests from each connected client

See https://www.rfc-editor.org/rfc/rfc7230#section-6.3.2

> A server MAY process a sequence of pipelined requests in
  parallel if they all have safe methods (Section 4.2.1 of [RFC7231]),
  but it MUST send the corresponding responses in the same order that
  the requests were received.

We choose NOT to process requests in parallel. They are executed in
the order recevied as well as responded to in the order received.
This prevents race conditions where old state may get sent in response
to requests that are very quick to process but were requested later on
in the queue.
This commit is contained in:
Matthew Zipkin 2025-04-03 12:28:42 -04:00
parent d0224eecde
commit cd059b6e14
No known key found for this signature in database
GPG key ID: E7E2984B6289C93A
2 changed files with 43 additions and 2 deletions

View file

@ -1037,6 +1037,9 @@ void HTTPRequest::WriteReply(HTTPStatusCode status, std::span<const std::byte> r
// in the next loop iteration.
m_client->m_send_ready = true;
}
// Signal to the Sockman I/O loop that we are ready to handle the next request.
m_client->m_req_busy = false;
}
bool HTTPClient::ReadRequest(std::unique_ptr<HTTPRequest>& req)
@ -1211,8 +1214,8 @@ void HTTPServer::EventGotData(NodeId node_id, std::span<const uint8_t> data)
req->m_client->m_origin,
req->m_client->m_node_id);
// handle request
m_request_dispatcher(std::move(req));
// add request to client queue
client->m_req_queue.push_back(std::move(req));
}
}
@ -1238,6 +1241,26 @@ void HTTPServer::EventGotPermanentReadError(NodeId node_id, const std::string& e
client->m_disconnect = true;
}
void HTTPServer::EventIOLoopCompletedForOne(NodeId node_id)
{
// Get the HTTPClient
auto client{GetClientById(node_id)};
if (client == nullptr) {
return;
}
// If we are already handling a request from
// this client, do nothing.
if (client->m_req_busy) return;
// Otherwise, if there is a new pending request, handle it.
if (!client->m_req_queue.empty()) {
client->m_req_busy = true;
m_request_dispatcher(std::move(client->m_req_queue.front()));
client->m_req_queue.pop_front();
}
}
void HTTPServer::EventIOLoopCompletedForAll()
{
DisconnectClients();

View file

@ -297,6 +297,15 @@ public:
// and attempt to read HTTP requests from here.
std::vector<std::byte> m_recv_buffer{};
// Requests from a client must be processed in the order in which
// they were received, blocking on a per-client basis. We won't
// process the next request in the queue if we are currently busy
// handling a previous request.
std::deque<std::unique_ptr<HTTPRequest>> m_req_queue;
// Set to true by the main thread when a request is popped off
// and passed to a worker, reset to false by the worker thread.
std::atomic_bool m_req_busy{false};
// Response data destined for this client.
// Written to directly by http worker threads, read and erased by Sockman I/O
Mutex m_send_mutex;
@ -406,6 +415,15 @@ public:
*/
virtual void EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) override;
/**
* SockMan has completed the current send+recv iteration for a given connection.
* It will do another send+recv for this connection after processing all other connections.
* Can be used to execute periodic tasks for a given connection.
* The implementation in SockMan does nothing.
* @param[in] node_id Connection for which send+recv has been done.
*/
virtual void EventIOLoopCompletedForOne(NodeId node_id) override;
/**
* SockMan has completed send+recv for all nodes.
* Can be used to execute periodic tasks for all nodes, like disconnecting