This commit is contained in:
Ryan Ofsky 2025-04-28 18:06:59 -04:00 committed by GitHub
commit 37772ab12e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 484 additions and 233 deletions

View file

@ -589,6 +589,14 @@ void ArgsManager::AddHiddenArgs(const std::vector<std::string>& names)
} }
} }
void ArgsManager::ClearArgs()
{
LOCK(cs_args);
m_settings = {};
m_available_args.clear();
m_network_only_args.clear();
}
void ArgsManager::CheckMultipleCLIArgs() const void ArgsManager::CheckMultipleCLIArgs() const
{ {
LOCK(cs_args); LOCK(cs_args);

View file

@ -359,11 +359,7 @@ protected:
/** /**
* Clear available arguments * Clear available arguments
*/ */
void ClearArgs() { void ClearArgs();
LOCK(cs_args);
m_available_args.clear();
m_network_only_args.clear();
}
/** /**
* Check CLI command args * Check CLI command args

View file

@ -33,6 +33,7 @@
#include <interfaces/ipc.h> #include <interfaces/ipc.h>
#include <interfaces/mining.h> #include <interfaces/mining.h>
#include <interfaces/node.h> #include <interfaces/node.h>
#include <ipc/exception.h>
#include <kernel/caches.h> #include <kernel/caches.h>
#include <kernel/context.h> #include <kernel/context.h>
#include <key.h> #include <key.h>
@ -298,8 +299,13 @@ void Shutdown(NodeContext& node)
StopREST(); StopREST();
StopRPC(); StopRPC();
StopHTTPServer(); StopHTTPServer();
for (const auto& client : node.chain_clients) { for (auto& client : node.chain_clients) {
try {
client->flush(); client->flush();
} catch (const ipc::Exception& e) {
LogDebug(BCLog::IPC, "Chain client did not disconnect cleanly: %s", e.what());
client.reset();
}
} }
StopMapPort(); StopMapPort();
@ -374,7 +380,7 @@ void Shutdown(NodeContext& node)
} }
} }
for (const auto& client : node.chain_clients) { for (const auto& client : node.chain_clients) {
client->stop(); if (client) client->stop();
} }
#ifdef ENABLE_ZMQ #ifdef ENABLE_ZMQ
@ -398,6 +404,12 @@ void Shutdown(NodeContext& node)
RemovePidFile(*node.args); RemovePidFile(*node.args);
// If any -ipcbind clients are still connected, disconnect them now so they
// do not block shutdown.
if (interfaces::Ipc* ipc = node.init->ipc()) {
ipc->disconnectIncoming();
}
LogPrintf("%s: done\n", __func__); LogPrintf("%s: done\n", __func__);
} }

View file

@ -59,17 +59,20 @@ public:
//! true. If this is not a spawned child process, return false. //! true. If this is not a spawned child process, return false.
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0; virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
//! Connect to a socket address and make a client interface proxy object //! Connect to a socket address and return a pointer to its Init interface.
//! using provided callback. connectAddress returns an interface pointer if //! Returns a non-null pointer if the connection was established, returns
//! the connection was established, returns null if address is empty ("") or //! null if address is empty ("") or disabled ("0") or if a connection was
//! disabled ("0") or if a connection was refused but not required ("auto"), //! refused but not required ("auto"), and throws an exception if there was
//! and throws an exception if there was an unexpected error. //! an unexpected error.
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0; virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
//! Connect to a socket address and make a client interface proxy object //! Listen on a socket address exposing this process's init interface to
//! using provided callback. Throws an exception if there was an error. //! clients. Throws an exception if there was an error.
virtual void listenAddress(std::string& address) = 0; virtual void listenAddress(std::string& address) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to remote interface that will run when the //! Add cleanup callback to remote interface that will run when the
//! interface is deleted. //! interface is deleted.
template<typename Interface> template<typename Interface>

View file

@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
public: public:
~CapnpProtocol() noexcept(true) ~CapnpProtocol() noexcept(true)
{ {
if (m_loop) { m_loop_ref.reset();
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
if (m_loop_thread.joinable()) m_loop_thread.join(); if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop); assert(!m_loop);
}; };
@ -71,6 +68,13 @@ public:
m_loop->loop(); m_loop->loop();
m_loop.reset(); m_loop.reset();
} }
void disconnectIncoming() override
{
if (!m_loop) return;
m_loop->sync([&] {
m_loop->m_incoming_connections.clear();
});
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{ {
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup)); mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
@ -83,10 +87,7 @@ public:
m_loop_thread = std::thread([&] { m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop"); util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context); m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{ m_loop_ref.emplace(*m_loop);
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
promise.set_value(); promise.set_value();
m_loop->loop(); m_loop->loop();
m_loop.reset(); m_loop.reset();
@ -96,6 +97,7 @@ public:
Context m_context; Context m_context;
std::thread m_loop_thread; std::thread m_loop_thread;
std::optional<mp::EventLoop> m_loop; std::optional<mp::EventLoop> m_loop;
std::optional<mp::EventLoopRef> m_loop_ref;
}; };
} // namespace } // namespace

View file

@ -17,6 +17,7 @@
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <signal.h>
#include <stdexcept> #include <stdexcept>
#include <string.h> #include <string.h>
#include <string> #include <string>
@ -26,6 +27,27 @@
namespace ipc { namespace ipc {
namespace { namespace {
#ifndef WIN32
std::string g_ignore_ctrl_c;
void HandleCtrlC(int)
{
(void)write(STDOUT_FILENO, g_ignore_ctrl_c.data(), g_ignore_ctrl_c.size());
}
#endif
void IgnoreCtrlC(std::string message)
{
#ifndef WIN32
g_ignore_ctrl_c = std::move(message);
struct sigaction sa{};
sa.sa_handler = HandleCtrlC;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, nullptr);
#endif
}
class IpcImpl : public interfaces::Ipc class IpcImpl : public interfaces::Ipc
{ {
public: public:
@ -53,6 +75,7 @@ public:
if (!m_process->checkSpawned(argc, argv, fd)) { if (!m_process->checkSpawned(argc, argv, fd)) {
return false; return false;
} }
IgnoreCtrlC(strprintf("[%s] SIGINT received — waiting for parent to shut down.\n", m_exe_name));
m_protocol->serve(fd, m_exe_name, m_init); m_protocol->serve(fd, m_exe_name, m_init);
exit_status = EXIT_SUCCESS; exit_status = EXIT_SUCCESS;
return true; return true;
@ -86,6 +109,10 @@ public:
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address); int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
m_protocol->listen(fd, m_exe_name, m_init); m_protocol->listen(fd, m_exe_name, m_init);
} }
void disconnectIncoming() override
{
m_protocol->disconnectIncoming();
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{ {
m_protocol->addCleanup(type, iface, std::move(cleanup)); m_protocol->addCleanup(type, iface, std::move(cleanup));

View file

@ -14,9 +14,10 @@
#include <assert.h> #include <assert.h>
#include <functional> #include <functional>
#include <optional> #include <kj/function.h>
#include <map> #include <map>
#include <memory> #include <memory>
#include <optional>
#include <sstream> #include <sstream>
#include <string> #include <string>
@ -129,6 +130,25 @@ std::string LongThreadName(const char* exe_name);
//! Event loop implementation. //! Event loop implementation.
//! //!
//! Cap'n Proto threading model is very simple: all I/O operations are
//! asynchronous and must be performed on a single thread. This includes:
//!
//! - Code starting an asynchronous operation (calling a function that returns a
//! promise object)
//! - Code notifying that an asynchronous operation is complete (code using a
//! fulfiller object)
//! - Code handling a completed operation (code chaining or waiting for a promise)
//!
//! All this code needs to run on one thread, and the EventLoop::loop() method
//! is the entry point for this thread. ProxyClient and ProxyServer objects that
//! use other threads and need to perform I/O operations post to this thread
//! using EventLoop::post() and EventLoop::sync() methods.
//!
//! Specifically, because ProxyClient methods can be called from arbitrary
//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
//! methods use the EventLoop thread to send requests, and ProxyServer methods
//! use the thread to return results.
//!
//! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
class EventLoop class EventLoop
{ {
@ -144,7 +164,7 @@ public:
//! Run function on event loop thread. Does not return until function completes. //! Run function on event loop thread. Does not return until function completes.
//! Must be called while the loop() function is active. //! Must be called while the loop() function is active.
void post(const std::function<void()>& fn); void post(kj::Function<void()> fn);
//! Wrapper around EventLoop::post that takes advantage of the //! Wrapper around EventLoop::post that takes advantage of the
//! fact that callable will not go out of scope to avoid requirement that it //! fact that callable will not go out of scope to avoid requirement that it
@ -152,7 +172,7 @@ public:
template <typename Callable> template <typename Callable>
void sync(Callable&& callable) void sync(Callable&& callable)
{ {
post(std::ref(callable)); post(std::forward<Callable>(callable));
} }
//! Start asynchronous worker thread if necessary. This is only done if //! Start asynchronous worker thread if necessary. This is only done if
@ -166,13 +186,10 @@ public:
//! is important that ProxyServer::m_impl destructors do not run on the //! is important that ProxyServer::m_impl destructors do not run on the
//! eventloop thread because they may need it to do I/O if they perform //! eventloop thread because they may need it to do I/O if they perform
//! other IPC calls. //! other IPC calls.
void startAsyncThread(std::unique_lock<std::mutex>& lock); void startAsyncThread() MP_REQUIRES(m_mutex);
//! Add/remove remote client reference counts.
void addClient(std::unique_lock<std::mutex>& lock);
bool removeClient(std::unique_lock<std::mutex>& lock);
//! Check if loop should exit. //! Check if loop should exit.
bool done(std::unique_lock<std::mutex>& lock); bool done() MP_REQUIRES(m_mutex);
Logger log() Logger log()
{ {
@ -195,10 +212,10 @@ public:
std::thread m_async_thread; std::thread m_async_thread;
//! Callback function to run on event loop thread during post() or sync() call. //! Callback function to run on event loop thread during post() or sync() call.
const std::function<void()>* m_post_fn = nullptr; kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
//! Callback functions to run on async thread. //! Callback functions to run on async thread.
CleanupList m_async_fns; CleanupList m_async_fns MP_GUARDED_BY(m_mutex);
//! Pipe read handle used to wake up the event loop thread. //! Pipe read handle used to wake up the event loop thread.
int m_wait_fd = -1; int m_wait_fd = -1;
@ -208,11 +225,11 @@ public:
//! Number of clients holding references to ProxyServerBase objects that //! Number of clients holding references to ProxyServerBase objects that
//! reference this event loop. //! reference this event loop.
int m_num_clients = 0; int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
//! Mutex and condition variable used to post tasks to event loop and async //! Mutex and condition variable used to post tasks to event loop and async
//! thread. //! thread.
std::mutex m_mutex; Mutex m_mutex;
std::condition_variable m_cv; std::condition_variable m_cv;
//! Capnp IO context. //! Capnp IO context.
@ -263,11 +280,9 @@ struct Waiter
// in the case where a capnp response is sent and a brand new // in the case where a capnp response is sent and a brand new
// request is immediately received. // request is immediately received.
while (m_fn) { while (m_fn) {
auto fn = std::move(m_fn); auto fn = std::move(*m_fn);
m_fn = nullptr; m_fn.reset();
lock.unlock(); Unlock(lock, fn);
fn();
lock.lock();
} }
const bool done = pred(); const bool done = pred();
return done; return done;
@ -276,7 +291,7 @@ struct Waiter
std::mutex m_mutex; std::mutex m_mutex;
std::condition_variable m_cv; std::condition_variable m_cv;
std::function<void()> m_fn; std::optional<kj::Function<void()>> m_fn;
}; };
//! Object holding network & rpc state associated with either an incoming server //! Object holding network & rpc state associated with either an incoming server
@ -290,21 +305,13 @@ public:
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_) Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
: m_loop(loop), m_stream(kj::mv(stream_)), : m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()), m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcClient(m_network)) m_rpc_system(::capnp::makeRpcClient(m_network)) {}
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
Connection(EventLoop& loop, Connection(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream_, kj::Own<kj::AsyncIoStream>&& stream_,
const std::function<::capnp::Capability::Client(Connection&)>& make_client) const std::function<::capnp::Capability::Client(Connection&)>& make_client)
: m_loop(loop), m_stream(kj::mv(stream_)), : m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()), m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
//! Run cleanup functions. Must be called from the event loop thread. First //! Run cleanup functions. Must be called from the event loop thread. First
//! calls synchronous cleanup functions while blocked (to free capnp //! calls synchronous cleanup functions while blocked (to free capnp
@ -333,12 +340,12 @@ public:
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself" // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object. // error in cases where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then( m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); })); [f = std::move(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
} }
EventLoop& m_loop; EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream; kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{m_loop}; LoggingErrorHandler m_error_handler{*m_loop};
kj::TaskSet m_on_disconnect{m_error_handler}; kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network; ::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system; std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@ -381,21 +388,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
: m_client(std::move(client)), m_context(connection) : m_client(std::move(client)), m_context(connection)
{ {
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
// Handler for the connection getting destroyed before this client object. // Handler for the connection getting destroyed before this client object.
auto cleanup_it = m_context.connection->addSyncCleanup([this]() { auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary. // Release client capability by move-assigning to temporary.
{ {
typename Interface::Client(std::move(m_client)); typename Interface::Client(std::move(m_client));
} }
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
m_context.connection = nullptr; m_context.connection = nullptr;
}); });
@ -423,16 +421,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
Sub::destroy(*this); Sub::destroy(*this);
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.connection->m_loop.sync([&]() { m_context.loop->sync([&]() {
// Release client capability by move-assigning to temporary. // Release client capability by move-assigning to temporary.
{ {
typename Interface::Client(std::move(m_client)); typename Interface::Client(std::move(m_client));
} }
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
if (destroy_connection) { if (destroy_connection) {
delete m_context.connection; delete m_context.connection;
m_context.connection = nullptr; m_context.connection = nullptr;
@ -454,8 +447,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
: m_impl(std::move(impl)), m_context(&connection) : m_impl(std::move(impl)), m_context(&connection)
{ {
assert(m_impl); assert(m_impl);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
} }
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
@ -489,8 +480,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
}); });
} }
assert(m_context.cleanup_fns.empty()); assert(m_context.cleanup_fns.empty());
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
} }
//! If the capnp interface defined a special "destroy" method, as described the //! If the capnp interface defined a special "destroy" method, as described the

View file

@ -558,7 +558,7 @@ template <typename Client>
void clientDestroy(Client& client) void clientDestroy(Client& client)
{ {
if (client.m_context.connection) { if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name(); client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
} else { } else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
} }
@ -567,7 +567,7 @@ void clientDestroy(Client& client)
template <typename Server> template <typename Server>
void serverDestroy(Server& server) void serverDestroy(Server& server)
{ {
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name(); server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
} }
//! Entry point called by generated client code that looks like: //! Entry point called by generated client code that looks like:
@ -582,12 +582,9 @@ void serverDestroy(Server& server)
template <typename ProxyClient, typename GetRequest, typename... FieldObjs> template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields) void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{ {
if (!proxy_client.m_context.connection) {
throw std::logic_error("clientInvoke call made after disconnect");
}
if (!g_thread_context.waiter) { if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty()); assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name); g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
// If next assert triggers, it means clientInvoke is being called from // If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer // the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop // method implementation that runs synchronously on the event loop
@ -598,7 +595,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread. // declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread); assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>(); g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.connection->m_loop.logPlain() proxy_client.m_context.loop->logPlain()
<< "{" << g_thread_context.thread_name << "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter"; << "} IPC client first request from current thread, constructing waiter";
} }
@ -606,18 +603,27 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::exception_ptr exception; std::exception_ptr exception;
std::string kj_exception; std::string kj_exception;
bool done = false; bool done = false;
proxy_client.m_context.connection->m_loop.sync([&]() { const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
invoke_context.thread_context.waiter->m_cv.notify_all();
return;
}
auto request = (proxy_client.m_client.*get_request)(nullptr); auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>; using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields; using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.connection->m_loop.logPlain() proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send " << "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString()); << TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then( proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) { [&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.connection->m_loop.logPlain() proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv " << "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString()); << TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try { try {
@ -631,9 +637,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
invoke_context.thread_context.waiter->m_cv.notify_all(); invoke_context.thread_context.waiter->m_cv.notify_all();
}, },
[&](const ::kj::Exception& e) { [&](const ::kj::Exception& e) {
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr(); kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.connection->m_loop.logPlain() proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex); const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true; done = true;
invoke_context.thread_context.waiter->m_cv.notify_all(); invoke_context.thread_context.waiter->m_cv.notify_all();
@ -643,7 +653,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex); std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; }); invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception); if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception; if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
} }
//! Invoke callable `fn()` that may return void. If it does return void, replace //! Invoke callable `fn()` that may return void. If it does return void, replace
@ -682,7 +693,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds; using Results = typename decltype(call_context.getResults())::Builds;
int req = ++server_reqs; int req = ++server_reqs;
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " " server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString()); << TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
try { try {
@ -699,14 +710,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); }) [&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) { .then([&server, req](CallContext call_context) {
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>() server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString()); << " " << LogEscape(call_context.getResults().toString());
}); });
} catch (const std::exception& e) { } catch (const std::exception& e) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what(); server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
throw; throw;
} catch (...) { } catch (...) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception"; server.m_context.loop->log() << "IPC server unhandled exception";
throw; throw;
} }
} }

View file

@ -8,6 +8,7 @@
#include <mp/util.h> #include <mp/util.h>
#include <array> #include <array>
#include <cassert>
#include <functional> #include <functional>
#include <list> #include <list>
#include <stddef.h> #include <stddef.h>
@ -47,13 +48,34 @@ inline void CleanupRun(CleanupList& fns) {
} }
} }
//! Event loop smart pointer automatically managing m_num_clients.
//! If a lock pointer argument is passed, the specified lock will be used,
//! otherwise EventLoop::m_mutex will be locked when needed.
class EventLoopRef
{
public:
explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr);
EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; }
EventLoopRef(const EventLoopRef&) = delete;
EventLoopRef& operator=(const EventLoopRef&) = delete;
EventLoopRef& operator=(EventLoopRef&&) = delete;
~EventLoopRef() { reset(); }
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
EventLoop* operator->() const { assert(m_loop); return m_loop; }
bool reset(Lock* lock = nullptr);
EventLoop* m_loop{nullptr};
Lock* m_lock{nullptr};
};
//! Context data associated with proxy client and server classes. //! Context data associated with proxy client and server classes.
struct ProxyContext struct ProxyContext
{ {
Connection* connection; Connection* connection;
EventLoopRef loop;
CleanupList cleanup_fns; CleanupList cleanup_fns;
ProxyContext(Connection* connection) : connection(connection) {} ProxyContext(Connection* connection);
}; };
//! Base class for generated ProxyClient classes that implement a C++ interface //! Base class for generated ProxyClient classes that implement a C++ interface

View file

@ -64,8 +64,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>(); auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
auto& server = server_context.proxy_server; auto& server = server_context.proxy_server;
int req = server_context.req; int req = server_context.req;
auto invoke = MakeAsyncCallable( auto invoke = [fulfiller = kj::mv(future.fulfiller),
[fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable { call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
const auto& params = call_context.getParams(); const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params); Context::Reader context_arg = Accessor::get(params);
@ -132,35 +131,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
return; return;
} }
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.connection->m_loop.sync([&] { server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller); auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context)); fulfiller_dispose->fulfill(kj::mv(call_context));
}); });
})) }))
{ {
server.m_context.connection->m_loop.sync([&]() { server.m_context.loop->sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller); auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception)); fulfiller_dispose->reject(kj::mv(*exception));
}); });
} }
}); };
// Lookup Thread object specified by the client. The specified thread should // Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up // be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer(). // asynchronously with getLocalServer().
auto thread_client = context_arg.getThread(); auto thread_client = context_arg.getThread();
return server.m_context.connection->m_threads.getLocalServer(thread_client) return server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) { .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the // Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that // `invoke` lambda above which will invoke the function on that
// thread. // thread.
KJ_IF_MAYBE (thread_server, perhaps) { KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server); const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.connection->m_loop.log() server.m_context.loop->log()
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke)); thread.m_thread_context.waiter->post(std::move(invoke));
} else { } else {
server.m_context.connection->m_loop.log() server.m_context.loop->log()
<< "IPC server error request #" << req << ", missing thread to execute request"; << "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle"); throw std::runtime_error("invalid thread handle");
} }

View file

@ -6,6 +6,7 @@
#define MP_UTIL_H #define MP_UTIL_H
#include <capnp/schema.h> #include <capnp/schema.h>
#include <cassert>
#include <cstddef> #include <cstddef>
#include <functional> #include <functional>
#include <future> #include <future>
@ -13,11 +14,13 @@
#include <kj/exception.h> #include <kj/exception.h>
#include <kj/string-tree.h> #include <kj/string-tree.h>
#include <memory> #include <memory>
#include <mutex>
#include <string.h> #include <string.h>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <variant>
#include <vector> #include <vector>
namespace mp { namespace mp {
@ -130,6 +133,58 @@ const char* TypeName()
return short_name ? short_name + 1 : display_name; return short_name ? short_name + 1 : display_name;
} }
//! Convenient wrapper around std::variant<T*, T>
template <typename T>
struct PtrOrValue {
std::variant<T*, T> data;
template <typename... Args>
PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant<T*, T>{std::in_place_type<T>, std::forward<Args>(args)...}) {}
T& operator*() { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() { return &**this; }
T& operator*() const { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() const { return &**this; }
};
// Annotated mutex and lock class (https://clang.llvm.org/docs/ThreadSafetyAnalysis.html)
#if defined(__clang__) && (!defined(SWIG))
#define MP_TSA(x) __attribute__((x))
#else
#define MP_TSA(x) // no-op
#endif
#define MP_CAPABILITY(x) MP_TSA(capability(x))
#define MP_SCOPED_CAPABILITY MP_TSA(scoped_lockable)
#define MP_REQUIRES(x) MP_TSA(requires_capability(x))
#define MP_ACQUIRE(...) MP_TSA(acquire_capability(__VA_ARGS__))
#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__))
#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x))
#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x))
class MP_CAPABILITY("mutex") Mutex {
public:
void lock() MP_ACQUIRE() { m_mutex.lock(); }
void unlock() MP_RELEASE() { m_mutex.unlock(); }
std::mutex m_mutex;
};
class MP_SCOPED_CAPABILITY Lock {
public:
explicit Lock(Mutex& m) MP_ACQUIRE(m) : m_lock(m.m_mutex) {}
~Lock() MP_RELEASE() {}
void unlock() MP_RELEASE() { m_lock.unlock(); }
void lock() MP_ACQUIRE() { m_lock.lock(); }
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
{
assert(m_lock.mutex() == &mutex.m_mutex);
assert(m_lock);
}
std::unique_lock<std::mutex> m_lock;
};
//! Analog to std::lock_guard that unlocks instead of locks. //! Analog to std::lock_guard that unlocks instead of locks.
template <typename Lock> template <typename Lock>
struct UnlockGuard struct UnlockGuard
@ -146,46 +201,6 @@ void Unlock(Lock& lock, Callback&& callback)
callback(); callback();
} }
//! Needed for libc++/macOS compatibility. Lets code work with shared_ptr nothrow declaration
//! https://github.com/capnproto/capnproto/issues/553#issuecomment-328554603
template <typename T>
struct DestructorCatcher
{
T value;
template <typename... Params>
DestructorCatcher(Params&&... params) : value(kj::fwd<Params>(params)...)
{
}
~DestructorCatcher() noexcept try {
} catch (const kj::Exception& e) { // NOLINT(bugprone-empty-catch)
}
};
//! Wrapper around callback function for compatibility with std::async.
//!
//! std::async requires callbacks to be copyable and requires noexcept
//! destructors, but this doesn't work well with kj types which are generally
//! move-only and not noexcept.
template <typename Callable>
struct AsyncCallable
{
AsyncCallable(Callable&& callable) : m_callable(std::make_shared<DestructorCatcher<Callable>>(std::move(callable)))
{
}
AsyncCallable(const AsyncCallable&) = default;
AsyncCallable(AsyncCallable&&) = default;
~AsyncCallable() noexcept = default;
ResultOf<Callable> operator()() const { return (m_callable->value)(); }
mutable std::shared_ptr<DestructorCatcher<Callable>> m_callable;
};
//! Construct AsyncCallable object.
template <typename Callable>
AsyncCallable<std::remove_reference_t<Callable>> MakeAsyncCallable(Callable&& callable)
{
return std::move(callable);
}
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}". //! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::string ThreadName(const char* exe_name); std::string ThreadName(const char* exe_name);

View file

@ -22,6 +22,7 @@
#include <kj/common.h> #include <kj/common.h>
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/exception.h> #include <kj/exception.h>
#include <kj/function.h>
#include <kj/memory.h> #include <kj/memory.h>
#include <map> #include <map>
#include <memory> #include <memory>
@ -48,6 +49,36 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
m_loop.log() << "Uncaught exception in daemonized task."; m_loop.log() << "Uncaught exception in daemonized task.";
} }
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
{
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
loop_lock->assert_locked(m_loop->m_mutex);
m_loop->m_num_clients += 1;
}
bool EventLoopRef::reset(Lock* lock)
{
bool done = false;
if (m_loop) {
auto loop_lock{PtrOrValue{lock ? lock : m_lock, m_loop->m_mutex}};
loop_lock->assert_locked(m_loop->m_mutex);
assert(m_loop->m_num_clients > 0);
m_loop->m_num_clients -= 1;
if (m_loop->done()) {
done = true;
m_loop->m_cv.notify_all();
int post_fd{m_loop->m_post_fd};
loop_lock->unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
}
m_loop = nullptr;
}
return done;
}
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
Connection::~Connection() Connection::~Connection()
{ {
// Shut down RPC system first, since this will garbage collect Server // Shut down RPC system first, since this will garbage collect Server
@ -103,18 +134,18 @@ Connection::~Connection()
m_sync_cleanup_fns.pop_front(); m_sync_cleanup_fns.pop_front();
} }
while (!m_async_cleanup_fns.empty()) { while (!m_async_cleanup_fns.empty()) {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex); const Lock lock(m_loop->m_mutex);
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front())); m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
m_async_cleanup_fns.pop_front(); m_async_cleanup_fns.pop_front();
} }
std::unique_lock<std::mutex> lock(m_loop.m_mutex); Lock lock(m_loop->m_mutex);
m_loop.startAsyncThread(lock); m_loop->startAsyncThread();
m_loop.removeClient(lock); m_loop.reset(&lock);
} }
CleanupIt Connection::addSyncCleanup(std::function<void()> fn) CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
{ {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex); const Lock lock(m_loop->m_mutex);
// Add cleanup callbacks to the front of list, so sync cleanup functions run // Add cleanup callbacks to the front of list, so sync cleanup functions run
// in LIFO order. This is a good approach because sync cleanup functions are // in LIFO order. This is a good approach because sync cleanup functions are
// added as client objects are created, and it is natural to clean up // added as client objects are created, and it is natural to clean up
@ -128,13 +159,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
void Connection::removeSyncCleanup(CleanupIt it) void Connection::removeSyncCleanup(CleanupIt it)
{ {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex); const Lock lock(m_loop->m_mutex);
m_sync_cleanup_fns.erase(it); m_sync_cleanup_fns.erase(it);
} }
void Connection::addAsyncCleanup(std::function<void()> fn) void Connection::addAsyncCleanup(std::function<void()> fn)
{ {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex); const Lock lock(m_loop->m_mutex);
// Add async cleanup callbacks to the back of the list. Unlike the sync // Add async cleanup callbacks to the back of the list. Unlike the sync
// cleanup list, this list order is more significant because it determines // cleanup list, this list order is more significant because it determines
// the order server objects are destroyed when there is a sudden disconnect, // the order server objects are destroyed when there is a sudden disconnect,
@ -170,7 +201,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
EventLoop::~EventLoop() EventLoop::~EventLoop()
{ {
if (m_async_thread.joinable()) m_async_thread.join(); if (m_async_thread.joinable()) m_async_thread.join();
const std::lock_guard<std::mutex> lock(m_mutex); const Lock lock(m_mutex);
KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(m_post_fn == nullptr);
KJ_ASSERT(m_async_fns.empty()); KJ_ASSERT(m_async_fns.empty());
KJ_ASSERT(m_wait_fd == -1); KJ_ASSERT(m_wait_fd == -1);
@ -195,14 +226,14 @@ void EventLoop::loop()
for (;;) { for (;;) {
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
std::unique_lock<std::mutex> lock(m_mutex); Lock lock(m_mutex);
if (m_post_fn) { if (m_post_fn) {
Unlock(lock, *m_post_fn); Unlock(lock, *m_post_fn);
m_post_fn = nullptr; m_post_fn = nullptr;
m_cv.notify_all(); m_cv.notify_all();
} else if (done(lock)) { } else if (done()) {
// Intentionally do not break if m_post_fn was set, even if done() // Intentionally do not break if m_post_fn was set, even if done()
// would return true, to ensure that the removeClient write(post_fd) // would return true, to ensure that the EventLoopRef write(post_fd)
// call always succeeds and the loop does not exit between the time // call always succeeds and the loop does not exit between the time
// that the done condition is set and the write call is made. // that the done condition is set and the write call is made.
break; break;
@ -213,75 +244,62 @@ void EventLoop::loop()
log() << "EventLoop::loop bye."; log() << "EventLoop::loop bye.";
wait_stream = nullptr; wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd)); KJ_SYSCALL(::close(post_fd));
const std::unique_lock<std::mutex> lock(m_mutex); const Lock lock(m_mutex);
m_wait_fd = -1; m_wait_fd = -1;
m_post_fd = -1; m_post_fd = -1;
} }
void EventLoop::post(const std::function<void()>& fn) void EventLoop::post(kj::Function<void()> fn)
{ {
if (std::this_thread::get_id() == m_thread_id) { if (std::this_thread::get_id() == m_thread_id) {
fn(); fn();
return; return;
} }
std::unique_lock<std::mutex> lock(m_mutex); Lock lock(m_mutex);
addClient(lock); EventLoopRef ref(*this, &lock);
m_cv.wait(lock, [this] { return m_post_fn == nullptr; }); m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
m_post_fn = &fn; m_post_fn = &fn;
int post_fd{m_post_fd}; int post_fd{m_post_fd};
Unlock(lock, [&] { Unlock(lock, [&] {
char buffer = 0; char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); KJ_SYSCALL(write(post_fd, &buffer, 1));
}); });
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; }); m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
removeClient(lock);
} }
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; } void EventLoop::startAsyncThread()
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
{
m_num_clients -= 1;
if (done(lock)) {
m_cv.notify_all();
int post_fd{m_post_fd};
lock.unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
return true;
}
return false;
}
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
{ {
if (m_async_thread.joinable()) { if (m_async_thread.joinable()) {
m_cv.notify_all(); m_cv.notify_all();
} else if (!m_async_fns.empty()) { } else if (!m_async_fns.empty()) {
m_async_thread = std::thread([this] { m_async_thread = std::thread([this] {
std::unique_lock<std::mutex> lock(m_mutex); Lock lock(m_mutex);
while (true) { while (!done()) {
if (!m_async_fns.empty()) { if (!m_async_fns.empty()) {
addClient(lock); EventLoopRef ref{*this, &lock};
const std::function<void()> fn = std::move(m_async_fns.front()); const std::function<void()> fn = std::move(m_async_fns.front());
m_async_fns.pop_front(); m_async_fns.pop_front();
Unlock(lock, fn); Unlock(lock, fn);
if (removeClient(lock)) break; // Important to explictly call ref.reset() here and
// explicitly break if the EventLoop is done, not relying on
// while condition above. Reason is that end of `ref`
// lifetime can cause EventLoop::loop() to exit, and if
// there is external code that immediately deletes the
// EventLoop object as soon as EventLoop::loop() method
// returns, checking the while condition may crash.
if (ref.reset()) break;
// Continue without waiting in case there are more async_fns
continue; continue;
} else if (m_num_clients == 0) {
break;
} }
m_cv.wait(lock); m_cv.wait(lock.m_lock);
} }
}); });
} }
} }
bool EventLoop::done(std::unique_lock<std::mutex>& lock) bool EventLoop::done()
{ {
assert(m_num_clients >= 0); assert(m_num_clients >= 0);
assert(lock.owns_lock());
assert(lock.mutex() == &m_mutex);
return m_num_clients == 0 && m_async_fns.empty(); return m_num_clients == 0 && m_async_fns.empty();
} }
@ -375,7 +393,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
const std::string from = context.getParams().getName(); const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context; std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() { std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")"; g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>(); g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context); thread_context.set_value(&g_thread_context);
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex); std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);

View file

@ -23,32 +23,82 @@
namespace mp { namespace mp {
namespace test { namespace test {
KJ_TEST("Call FooInterface methods") /**
* Test setup class creating a two way connection between a
* ProxyServer<FooInterface> object and a ProxyClient<FooInterface>.
*
* Provides client_disconnect and server_disconnect lambdas that can be used to
* trigger disconnects and test handling of broken and closed connections.
*
* Accepts a client_owns_connection option to test different ProxyClient
* destroy_connection values and control whether destroying the ProxyClient
* object destroys the client Connection object. Normally it makes sense for
* this to be true to simplify shutdown and avoid needing to call
* client_disconnect manually, but false allows testing more ProxyClient
* behavior and the "IPC client method called after disconnect" code path.
*/
class TestSetup
{ {
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> foo_promise; public:
std::function<void()> disconnect_client; std::thread thread;
std::thread thread([&]() { std::function<void()> server_disconnect;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
TestSetup(bool client_owns_connection = true)
: thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) { EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n"; std::cout << "LOG" << raise << ": " << log << "\n";
if (raise) throw std::runtime_error(log);
}); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe(); auto pipe = loop.m_io_context.provider->newTwoWayPipe();
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0])); auto server_connection =
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>( std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) {
connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(), auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
connection_client.get(), /* destroy_connection= */ false); std::make_shared<FooImplementation>(), connection);
foo_promise.set_value(std::move(foo_client)); return capnp::Capability::Client(kj::mv(server_proxy));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) {
auto foo_server = kj::heap<ProxyServer<messages::FooInterface>>(std::make_shared<FooImplementation>(), connection);
return capnp::Capability::Client(kj::mv(foo_server));
}); });
connection_server->onDisconnect([&] { connection_server.reset(); }); server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
// Set handler to destroy the server when the client disconnects. This
// is ignored if server_disconnect() is called instead.
server_connection->onDisconnect([&] { server_connection.reset(); });
auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
client_connection.get(), /* destroy_connection= */ client_owns_connection);
if (client_owns_connection) {
client_connection.release();
} else {
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
}
client_promise.set_value(std::move(client_proxy));
loop.loop(); loop.loop();
}); }}
{
client = client_promise.get_future().get();
}
~TestSetup()
{
// Test that client cleanup_fns are executed.
bool destroyed = false;
client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; });
client.reset();
KJ_EXPECT(destroyed);
thread.join();
}
};
KJ_TEST("Call FooInterface methods")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
auto foo = foo_promise.get_future().get();
KJ_EXPECT(foo->add(1, 2) == 3); KJ_EXPECT(foo->add(1, 2) == 3);
FooStruct in; FooStruct in;
@ -127,14 +177,40 @@ KJ_TEST("Call FooInterface methods")
mut.message = "init"; mut.message = "init";
foo->passMutable(mut); foo->passMutable(mut);
KJ_EXPECT(mut.message == "init build pass call return read"); KJ_EXPECT(mut.message == "init build pass call return read");
}
disconnect_client(); KJ_TEST("Call IPC method after client connection is closed")
thread.join(); {
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.client_disconnect();
bool destroyed = false; bool disconnected{false};
foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; }); try {
foo.reset(); foo->add(1, 2);
KJ_EXPECT(destroyed); } catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
KJ_TEST("Calling IPC method after server connection is closed")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.server_disconnect();
bool disconnected{false};
try {
foo->add(1, 2);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
} }
} // namespace test } // namespace test

View file

@ -58,6 +58,9 @@ public:
//! clients and servers independently. //! clients and servers independently.
virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) = 0; virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to interface that will run when the interface is //! Add cleanup callback to interface that will run when the interface is
//! deleted. //! deleted.
virtual void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) = 0; virtual void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) = 0;

View file

@ -121,6 +121,13 @@ public:
m_reachable.clear(); m_reachable.clear();
} }
void Reset() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
AssertLockNotHeld(m_mutex);
LOCK(m_mutex);
m_reachable = DefaultNets();
}
[[nodiscard]] bool Contains(Network net) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) [[nodiscard]] bool Contains(Network net) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{ {
AssertLockNotHeld(m_mutex); AssertLockNotHeld(m_mutex);
@ -142,9 +149,9 @@ public:
} }
private: private:
mutable Mutex m_mutex; static std::unordered_set<Network> DefaultNets()
{
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){ return {
NET_UNROUTABLE, NET_UNROUTABLE,
NET_IPV4, NET_IPV4,
NET_IPV6, NET_IPV6,
@ -155,6 +162,10 @@ private:
}; };
}; };
mutable Mutex m_mutex;
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){DefaultNets()};
};
extern ReachableNets g_reachable_nets; extern ReachableNets g_reachable_nets;
/** /**

View file

@ -323,6 +323,12 @@ void SetRPCWarmupStatus(const std::string& newStatus)
rpcWarmupStatus = newStatus; rpcWarmupStatus = newStatus;
} }
void SetRPCWarmupStarting()
{
LOCK(g_rpc_warmup_mutex);
fRPCInWarmup = true;
}
void SetRPCWarmupFinished() void SetRPCWarmupFinished()
{ {
LOCK(g_rpc_warmup_mutex); LOCK(g_rpc_warmup_mutex);

View file

@ -30,6 +30,7 @@ void RpcInterruptionPoint();
*/ */
void SetRPCWarmupStatus(const std::string& newStatus); void SetRPCWarmupStatus(const std::string& newStatus);
/* Mark warmup as done. RPC calls will be processed from now on. */ /* Mark warmup as done. RPC calls will be processed from now on. */
void SetRPCWarmupStarting();
void SetRPCWarmupFinished(); void SetRPCWarmupFinished();
/* returns the current warmup state. */ /* returns the current warmup state. */

View file

@ -63,6 +63,7 @@ add_executable(test_bitcoin
net_peer_eviction_tests.cpp net_peer_eviction_tests.cpp
net_tests.cpp net_tests.cpp
netbase_tests.cpp netbase_tests.cpp
node_init_tests.cpp
node_warnings_tests.cpp node_warnings_tests.cpp
orphanage_tests.cpp orphanage_tests.cpp
pcp_tests.cpp pcp_tests.cpp

View file

@ -53,9 +53,8 @@ static std::string TempPath(std::string_view pattern)
//! on the object through FooInterface (defined in ipc_test.capnp). //! on the object through FooInterface (defined in ipc_test.capnp).
void IpcPipeTest() void IpcPipeTest()
{ {
// Setup: create FooImplemention object and listen for FooInterface requests // Setup: create FooImplementation object and listen for FooInterface requests
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise; std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() { std::thread thread([&]() {
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); }); mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe(); auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@ -63,9 +62,8 @@ void IpcPipeTest()
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0])); auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>( auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(), connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false); connection_client.release(), /* destroy_connection= */ true);
foo_promise.set_value(std::move(foo_client)); foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) { auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection); auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
@ -125,8 +123,8 @@ void IpcPipeTest()
auto script2{foo->passScript(script1)}; auto script2{foo->passScript(script1)};
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2)); BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
// Test cleanup: disconnect pipe and join thread // Test cleanup: disconnect and join thread
disconnect_client(); foo.reset();
thread.join(); thread.join();
} }

View file

@ -702,6 +702,7 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port)
BOOST_AUTO_TEST_CASE(LimitedAndReachable_Network) BOOST_AUTO_TEST_CASE(LimitedAndReachable_Network)
{ {
g_reachable_nets.Reset();
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV4)); BOOST_CHECK(g_reachable_nets.Contains(NET_IPV4));
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV6)); BOOST_CHECK(g_reachable_nets.Contains(NET_IPV6));
BOOST_CHECK(g_reachable_nets.Contains(NET_ONION)); BOOST_CHECK(g_reachable_nets.Contains(NET_ONION));

View file

@ -0,0 +1,49 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <init.h>
#include <interfaces/init.h>
#include <rpc/server.h>
#include <boost/test/unit_test.hpp>
#include <test/util/setup_common.h>
using node::NodeContext;
BOOST_FIXTURE_TEST_SUITE(node_init_tests, BasicTestingSetup)
//! Custom implementation of interfaces::Init for testing.
class TestInit : public interfaces::Init
{
public:
TestInit(NodeContext& node) : m_node(node)
{
InitContext(m_node);
m_node.init = this;
}
std::unique_ptr<interfaces::Chain> makeChain() override { return interfaces::MakeChain(m_node); }
std::unique_ptr<interfaces::WalletLoader> makeWalletLoader(interfaces::Chain& chain) override
{
return MakeWalletLoader(chain, *Assert(m_node.args));
}
NodeContext& m_node;
};
BOOST_AUTO_TEST_CASE(init_test)
{
// Reset logging, config file path, rpc state, reachable nets to avoid errors in AppInitMain
LogInstance().DisconnectTestLogger();
m_node.args->SetConfigFilePath({});
SetRPCWarmupStarting();
g_reachable_nets.Reset();
// Run through initialization and shutdown code.
TestInit init{m_node};
BOOST_CHECK(AppInitInterfaces(m_node));
BOOST_CHECK(AppInitMain(m_node));
Interrupt(m_node);
Shutdown(m_node);
}
BOOST_AUTO_TEST_SUITE_END()

View file

@ -214,7 +214,10 @@ BasicTestingSetup::~BasicTestingSetup()
} else { } else {
fs::remove_all(m_path_root); fs::remove_all(m_path_root);
} }
// Clear all arguments except for -datadir, which GUI tests currently rely
// on to be set even after the testing setup is destroyed.
gArgs.ClearArgs(); gArgs.ClearArgs();
gArgs.ForceSetArg("-datadir", fs::PathToString(m_path_root));
} }
ChainTestingSetup::ChainTestingSetup(const ChainType chainType, TestOpts opts) ChainTestingSetup::ChainTestingSetup(const ChainType chainType, TestOpts opts)