refactor: Make httpserver work queue a unique_ptr

This commit is contained in:
MarcoFalke 2021-06-22 08:39:42 +02:00
parent 8cdf91735f
commit fa92e60f38
No known key found for this signature in database
GPG key ID: CE2B75697E69A548

View file

@ -136,7 +136,7 @@ static struct evhttp* eventHTTP = nullptr;
//! List of subnets to allow RPC connections from //! List of subnets to allow RPC connections from
static std::vector<CSubNet> rpc_allow_subnets; static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread //! Work queue for handling longer requests off the event loop thread
static WorkQueue<HTTPClosure>* workQueue = nullptr; static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
//! Handlers for (sub)paths //! Handlers for (sub)paths
static std::vector<HTTPPathHandler> pathHandlers; static std::vector<HTTPPathHandler> pathHandlers;
//! Bound listening sockets //! Bound listening sockets
@ -256,10 +256,10 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
// Dispatch to worker thread // Dispatch to worker thread
if (i != iend) { if (i != iend) {
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler)); std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
assert(workQueue); assert(g_work_queue);
if (workQueue->Enqueue(item.get())) if (g_work_queue->Enqueue(item.get())) {
item.release(); /* if true, queue took ownership */ item.release(); /* if true, queue took ownership */
else { } else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
} }
@ -392,7 +392,7 @@ bool InitHTTPServer()
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth); LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
workQueue = new WorkQueue<HTTPClosure>(workQueueDepth); g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
// transfer ownership to eventBase/HTTP via .release() // transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release(); eventBase = base_ctr.release();
eventHTTP = http_ctr.release(); eventHTTP = http_ctr.release();
@ -424,7 +424,7 @@ void StartHTTPServer()
g_thread_http = std::thread(ThreadHTTP, eventBase); g_thread_http = std::thread(ThreadHTTP, eventBase);
for (int i = 0; i < rpcThreads; i++) { for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue, i); g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
} }
} }
@ -435,16 +435,17 @@ void InterruptHTTPServer()
// Reject requests on current connections // Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
} }
if (workQueue) if (g_work_queue) {
workQueue->Interrupt(); g_work_queue->Interrupt();
}
} }
void StopHTTPServer() void StopHTTPServer()
{ {
LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (workQueue) { if (g_work_queue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
for (auto& thread: g_thread_http_workers) { for (auto& thread : g_thread_http_workers) {
thread.join(); thread.join();
} }
g_thread_http_workers.clear(); g_thread_http_workers.clear();
@ -467,10 +468,7 @@ void StopHTTPServer()
event_base_free(eventBase); event_base_free(eventBase);
eventBase = nullptr; eventBase = nullptr;
} }
if (workQueue) { g_work_queue.reset();
delete workQueue;
workQueue = nullptr;
}
LogPrint(BCLog::HTTP, "Stopped HTTP server\n"); LogPrint(BCLog::HTTP, "Stopped HTTP server\n");
} }