Compare commits

...

9 commits

Author SHA1 Message Date
Ryan Ofsky
ae0f33017f
Merge 0b72656ecf into 3a29ba33dc 2025-04-28 16:17:29 +00:00
Ryan Ofsky
0b72656ecf ipc: Handle bitcoin-wallet disconnections
This fixes an error reported by Antoine Poinsot <darosior@protonmail.com> in
https://github.com/bitcoin-core/libmultiprocess/issues/123 that does not happen
in master, but does happen with https://github.com/bitcoin/bitcoin/pull/10102
applied, where if the child bitcoin-wallet process is killed (either by an
external signal or by Ctrl-C as reported in the issue) the bitcoin-node process
will not shutdown cleanly after that because chain client flush()
calls will fail.

This change fixes the problem by handling ipc::Exception errors thrown during
the flush() calls, and it relies on the fixes to disconnect detection
implemented in https://github.com/bitcoin-core/libmultiprocess/pull/160 to work
effectively.
2025-04-24 15:20:58 -04:00
Ryan Ofsky
55b4df08f1 ipc: Add Ctrl-C handler for spawned subprocesses
This fixes an error reported by Antoine Poinsot <darosior@protonmail.com> in
https://github.com/bitcoin-core/libmultiprocess/issues/123 that does not happen
in master, but does happen with https://github.com/bitcoin/bitcoin/pull/10102
applied, where if Ctrl-C is pressed when `bitcoin-node` is started, it is
handled by both `bitcoin-node` and `bitcoin-wallet` processes, causing the
wallet to shutdown abruptly instead of waiting for the node and shutting down
cleanly.

This change fixes the problem by having the wallet process print to stdout when
it receives a Ctrl-C signal but not otherwise react, letting the node shut
everything down cleanly.
2025-04-24 15:15:08 -04:00
Ryan Ofsky
85bed077bd doc: Improve IPC interface comments
Fix some comments that were referring to previous versions of these methods and
did not make sense.
2025-04-24 15:13:05 -04:00
Ryan Ofsky
a324fc01f2 ipc: Avoid waiting for clients to disconnect when shutting down
This fixes behavior reported by Antoine Poinsot <darosior@protonmail.com>
https://github.com/bitcoin/bitcoin/pull/29409#issuecomment-2546088852 where if
an IPC client is connected, the node will wait forever for it to disconnect
before exiting.
2025-04-24 15:02:19 -04:00
Ryan Ofsky
b18f78fb1e test: Add unit test coverage for Init and Shutdown code
Currently this code is not called in unit tests. Calling should make it
possible to write tests for things like IPC exceptions being thrown during
shutdown.
2025-04-24 05:53:03 -04:00
Ryan Ofsky
197b2aaaaa ipc: Use EventLoopRef instead of addClient/removeClient
Use EventLoopRef to avoid reference counting bugs and be more exception safe
and deal with removal of addClient/removeClient methods in
https://github.com/bitcoin-core/libmultiprocess/pull/160

A test update is also required due to
https://github.com/bitcoin-core/libmultiprocess/pull/160 to deal with changed
reference count semantics. In IpcPipeTest(), it is is now necessary to destroy
the client Proxy object instead of just the client Connection object to
decrease the event loop reference count and allow the loop to exit so the test
does not hang on shutdown.
2025-04-24 04:53:03 -05:00
Ryan Ofsky
b9e16ff790 Merge commit 'c021835739272d1daa8b4a2afc8e8c7d093d534c' into pr/ipc-stop-base 2025-04-24 10:53:03 +01:00
Ryan Ofsky
c021835739 Squashed 'src/ipc/libmultiprocess/' changes from 35944ffd23fa..f15ef6cdeec5
f15ef6cdeec5 Improve IPC client disconnected exceptions
4a9a387e68b4 test: Add test coverage for client & server disconnections
a848ec60d490 refactor: Add clang thread safety annotations to EventLoop
4b97111ada84 refactor: Remove DestructorCatcher and AsyncCallable
276eb8f99d05 refactor: Drop addClient/removeClient methods
025a77ec2e46 refactor: Use EventLoopRef instead of addClient/removeClient
394f966e93f8 refactor: Add ProxyContext EventLoop* member
c1aa2d7dd546 proxy-io.h: Add EventLoopRef RAII class handle addClient/removeClient refcounting
2e02532f4720 proxy-io.h: Add more detailed EventLoop comment

git-subtree-dir: src/ipc/libmultiprocess
git-subtree-split: f15ef6cdeec54c83f52e00de47d57b09f9a5f03b
2025-04-24 10:53:03 +01:00
21 changed files with 481 additions and 233 deletions

View file

@ -589,6 +589,14 @@ void ArgsManager::AddHiddenArgs(const std::vector<std::string>& names)
}
}
void ArgsManager::ClearArgs()
{
LOCK(cs_args);
m_settings = {};
m_available_args.clear();
m_network_only_args.clear();
}
void ArgsManager::CheckMultipleCLIArgs() const
{
LOCK(cs_args);

View file

@ -359,11 +359,7 @@ protected:
/**
* Clear available arguments
*/
void ClearArgs() {
LOCK(cs_args);
m_available_args.clear();
m_network_only_args.clear();
}
void ClearArgs();
/**
* Check CLI command args

View file

@ -33,6 +33,7 @@
#include <interfaces/ipc.h>
#include <interfaces/mining.h>
#include <interfaces/node.h>
#include <ipc/exception.h>
#include <kernel/caches.h>
#include <kernel/context.h>
#include <key.h>
@ -298,8 +299,13 @@ void Shutdown(NodeContext& node)
StopREST();
StopRPC();
StopHTTPServer();
for (const auto& client : node.chain_clients) {
client->flush();
for (auto& client : node.chain_clients) {
try {
client->flush();
} catch (const ipc::Exception& e) {
LogDebug(BCLog::IPC, "Chain client did not disconnect cleanly: %s", e.what());
client.reset();
}
}
StopMapPort();
@ -374,7 +380,7 @@ void Shutdown(NodeContext& node)
}
}
for (const auto& client : node.chain_clients) {
client->stop();
if (client) client->stop();
}
#ifdef ENABLE_ZMQ
@ -398,6 +404,12 @@ void Shutdown(NodeContext& node)
RemovePidFile(*node.args);
// If any -ipcbind clients are still connected, disconnect them now so they
// do not block shutdown.
if (interfaces::Ipc* ipc = node.init->ipc()) {
ipc->disconnectIncoming();
}
LogPrintf("%s: done\n", __func__);
}

View file

@ -59,17 +59,20 @@ public:
//! true. If this is not a spawned child process, return false.
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. connectAddress returns an interface pointer if
//! the connection was established, returns null if address is empty ("") or
//! disabled ("0") or if a connection was refused but not required ("auto"),
//! and throws an exception if there was an unexpected error.
//! Connect to a socket address and return a pointer to its Init interface.
//! Returns a non-null pointer if the connection was established, returns
//! null if address is empty ("") or disabled ("0") or if a connection was
//! refused but not required ("auto"), and throws an exception if there was
//! an unexpected error.
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. Throws an exception if there was an error.
//! Listen on a socket address exposing this process's init interface to
//! clients. Throws an exception if there was an error.
virtual void listenAddress(std::string& address) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to remote interface that will run when the
//! interface is deleted.
template<typename Interface>

View file

@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
m_loop_ref.reset();
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
@ -71,6 +68,13 @@ public:
m_loop->loop();
m_loop.reset();
}
void disconnectIncoming() override
{
if (!m_loop) return;
m_loop->sync([&] {
m_loop->m_incoming_connections.clear();
});
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
@ -83,10 +87,7 @@ public:
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
m_loop_ref.emplace(*m_loop);
promise.set_value();
m_loop->loop();
m_loop.reset();
@ -96,6 +97,7 @@ public:
Context m_context;
std::thread m_loop_thread;
std::optional<mp::EventLoop> m_loop;
std::optional<mp::EventLoopRef> m_loop_ref;
};
} // namespace

View file

@ -17,6 +17,7 @@
#include <cstdlib>
#include <functional>
#include <memory>
#include <signal.h>
#include <stdexcept>
#include <string.h>
#include <string>
@ -26,6 +27,27 @@
namespace ipc {
namespace {
#ifndef WIN32
std::string g_ignore_ctrl_c;
void HandleCtrlC(int)
{
(void)write(STDOUT_FILENO, g_ignore_ctrl_c.data(), g_ignore_ctrl_c.size());
}
#endif
void IgnoreCtrlC(std::string message)
{
#ifndef WIN32
g_ignore_ctrl_c = std::move(message);
struct sigaction sa{};
sa.sa_handler = HandleCtrlC;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sigaction(SIGINT, &sa, nullptr);
#endif
}
class IpcImpl : public interfaces::Ipc
{
public:
@ -53,6 +75,7 @@ public:
if (!m_process->checkSpawned(argc, argv, fd)) {
return false;
}
IgnoreCtrlC(strprintf("[%s] SIGINT received — waiting for parent to shut down.\n", m_exe_name));
m_protocol->serve(fd, m_exe_name, m_init);
exit_status = EXIT_SUCCESS;
return true;
@ -86,6 +109,10 @@ public:
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
m_protocol->listen(fd, m_exe_name, m_init);
}
void disconnectIncoming() override
{
m_protocol->disconnectIncoming();
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
m_protocol->addCleanup(type, iface, std::move(cleanup));

View file

@ -14,9 +14,10 @@
#include <assert.h>
#include <functional>
#include <optional>
#include <kj/function.h>
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
@ -129,6 +130,25 @@ std::string LongThreadName(const char* exe_name);
//! Event loop implementation.
//!
//! Cap'n Proto threading model is very simple: all I/O operations are
//! asynchronous and must be performed on a single thread. This includes:
//!
//! - Code starting an asynchronous operation (calling a function that returns a
//! promise object)
//! - Code notifying that an asynchronous operation is complete (code using a
//! fulfiller object)
//! - Code handling a completed operation (code chaining or waiting for a promise)
//!
//! All this code needs to run on one thread, and the EventLoop::loop() method
//! is the entry point for this thread. ProxyClient and ProxyServer objects that
//! use other threads and need to perform I/O operations post to this thread
//! using EventLoop::post() and EventLoop::sync() methods.
//!
//! Specifically, because ProxyClient methods can be called from arbitrary
//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
//! methods use the EventLoop thread to send requests, and ProxyServer methods
//! use the thread to return results.
//!
//! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
class EventLoop
{
@ -144,7 +164,7 @@ public:
//! Run function on event loop thread. Does not return until function completes.
//! Must be called while the loop() function is active.
void post(const std::function<void()>& fn);
void post(kj::Function<void()> fn);
//! Wrapper around EventLoop::post that takes advantage of the
//! fact that callable will not go out of scope to avoid requirement that it
@ -152,7 +172,7 @@ public:
template <typename Callable>
void sync(Callable&& callable)
{
post(std::ref(callable));
post(std::forward<Callable>(callable));
}
//! Start asynchronous worker thread if necessary. This is only done if
@ -166,13 +186,10 @@ public:
//! is important that ProxyServer::m_impl destructors do not run on the
//! eventloop thread because they may need it to do I/O if they perform
//! other IPC calls.
void startAsyncThread(std::unique_lock<std::mutex>& lock);
void startAsyncThread() MP_REQUIRES(m_mutex);
//! Add/remove remote client reference counts.
void addClient(std::unique_lock<std::mutex>& lock);
bool removeClient(std::unique_lock<std::mutex>& lock);
//! Check if loop should exit.
bool done(std::unique_lock<std::mutex>& lock);
bool done() MP_REQUIRES(m_mutex);
Logger log()
{
@ -195,10 +212,10 @@ public:
std::thread m_async_thread;
//! Callback function to run on event loop thread during post() or sync() call.
const std::function<void()>* m_post_fn = nullptr;
kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
//! Callback functions to run on async thread.
CleanupList m_async_fns;
CleanupList m_async_fns MP_GUARDED_BY(m_mutex);
//! Pipe read handle used to wake up the event loop thread.
int m_wait_fd = -1;
@ -208,11 +225,11 @@ public:
//! Number of clients holding references to ProxyServerBase objects that
//! reference this event loop.
int m_num_clients = 0;
int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
//! Mutex and condition variable used to post tasks to event loop and async
//! thread.
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;
//! Capnp IO context.
@ -263,11 +280,9 @@ struct Waiter
// in the case where a capnp response is sent and a brand new
// request is immediately received.
while (m_fn) {
auto fn = std::move(m_fn);
m_fn = nullptr;
lock.unlock();
fn();
lock.lock();
auto fn = std::move(*m_fn);
m_fn.reset();
Unlock(lock, fn);
}
const bool done = pred();
return done;
@ -276,7 +291,7 @@ struct Waiter
std::mutex m_mutex;
std::condition_variable m_cv;
std::function<void()> m_fn;
std::optional<kj::Function<void()>> m_fn;
};
//! Object holding network & rpc state associated with either an incoming server
@ -290,21 +305,13 @@ public:
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcClient(m_network))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
Connection(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream_,
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
{
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
//! Run cleanup functions. Must be called from the event loop thread. First
//! calls synchronous cleanup functions while blocked (to free capnp
@ -333,12 +340,12 @@ public:
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
[f = std::move(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
}
EventLoop& m_loop;
EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{m_loop};
LoggingErrorHandler m_error_handler{*m_loop};
kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@ -381,21 +388,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
: m_client(std::move(client)), m_context(connection)
{
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
// Handler for the connection getting destroyed before this client object.
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
m_context.connection = nullptr;
});
@ -423,16 +421,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
Sub::destroy(*this);
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.connection->m_loop.sync([&]() {
m_context.loop->sync([&]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
{
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
if (destroy_connection) {
delete m_context.connection;
m_context.connection = nullptr;
@ -454,8 +447,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
: m_impl(std::move(impl)), m_context(&connection)
{
assert(m_impl);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
@ -489,8 +480,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
});
}
assert(m_context.cleanup_fns.empty());
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
//! If the capnp interface defined a special "destroy" method, as described the

View file

@ -558,7 +558,7 @@ template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
@ -567,7 +567,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
}
//! Entry point called by generated client code that looks like:
@ -582,12 +582,9 @@ void serverDestroy(Server& server)
template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{
if (!proxy_client.m_context.connection) {
throw std::logic_error("clientInvoke call made after disconnect");
}
if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
// If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop
@ -598,7 +595,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
@ -606,18 +603,27 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
proxy_client.m_context.connection->m_loop.sync([&]() {
const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
invoke_context.thread_context.waiter->m_cv.notify_all();
return;
}
auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.connection->m_loop.logPlain()
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
@ -631,9 +637,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
invoke_context.thread_context.waiter->m_cv.notify_all();
},
[&](const ::kj::Exception& e) {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
invoke_context.thread_context.waiter->m_cv.notify_all();
@ -643,7 +653,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
}
//! Invoke callable `fn()` that may return void. If it does return void, replace
@ -682,7 +693,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;
int req = ++server_reqs;
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
try {
@ -699,14 +710,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (const std::exception& e) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
server.m_context.loop->log() << "IPC server unhandled exception";
throw;
}
}

View file

@ -8,6 +8,7 @@
#include <mp/util.h>
#include <array>
#include <cassert>
#include <functional>
#include <list>
#include <stddef.h>
@ -47,13 +48,34 @@ inline void CleanupRun(CleanupList& fns) {
}
}
//! Event loop smart pointer automatically managing m_num_clients.
//! If a lock pointer argument is passed, the specified lock will be used,
//! otherwise EventLoop::m_mutex will be locked when needed.
class EventLoopRef
{
public:
explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr);
EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; }
EventLoopRef(const EventLoopRef&) = delete;
EventLoopRef& operator=(const EventLoopRef&) = delete;
EventLoopRef& operator=(EventLoopRef&&) = delete;
~EventLoopRef() { reset(); }
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
EventLoop* operator->() const { assert(m_loop); return m_loop; }
bool reset(Lock* lock = nullptr);
EventLoop* m_loop{nullptr};
Lock* m_lock{nullptr};
};
//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
EventLoopRef loop;
CleanupList cleanup_fns;
ProxyContext(Connection* connection) : connection(connection) {}
ProxyContext(Connection* connection);
};
//! Base class for generated ProxyClient classes that implement a C++ interface

View file

@ -64,8 +64,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
auto& server = server_context.proxy_server;
int req = server_context.req;
auto invoke = MakeAsyncCallable(
[fulfiller = kj::mv(future.fulfiller),
auto invoke = [fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
@ -132,35 +131,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
return;
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.connection->m_loop.sync([&] {
server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_context.connection->m_loop.sync([&]() {
server.m_context.loop->sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
}
});
};
// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.connection->m_loop.log()
server.m_context.loop->log()
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
} else {
server.m_context.connection->m_loop.log()
server.m_context.loop->log()
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}

View file

@ -6,6 +6,7 @@
#define MP_UTIL_H
#include <capnp/schema.h>
#include <cassert>
#include <cstddef>
#include <functional>
#include <future>
@ -13,11 +14,13 @@
#include <kj/exception.h>
#include <kj/string-tree.h>
#include <memory>
#include <mutex>
#include <string.h>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>
#include <variant>
#include <vector>
namespace mp {
@ -130,6 +133,58 @@ const char* TypeName()
return short_name ? short_name + 1 : display_name;
}
//! Convenient wrapper around std::variant<T*, T>
template <typename T>
struct PtrOrValue {
std::variant<T*, T> data;
template <typename... Args>
PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant<T*, T>{std::in_place_type<T>, std::forward<Args>(args)...}) {}
T& operator*() { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() { return &**this; }
T& operator*() const { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
T* operator->() const { return &**this; }
};
// Annotated mutex and lock class (https://clang.llvm.org/docs/ThreadSafetyAnalysis.html)
#if defined(__clang__) && (!defined(SWIG))
#define MP_TSA(x) __attribute__((x))
#else
#define MP_TSA(x) // no-op
#endif
#define MP_CAPABILITY(x) MP_TSA(capability(x))
#define MP_SCOPED_CAPABILITY MP_TSA(scoped_lockable)
#define MP_REQUIRES(x) MP_TSA(requires_capability(x))
#define MP_ACQUIRE(...) MP_TSA(acquire_capability(__VA_ARGS__))
#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__))
#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x))
#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x))
class MP_CAPABILITY("mutex") Mutex {
public:
void lock() MP_ACQUIRE() { m_mutex.lock(); }
void unlock() MP_RELEASE() { m_mutex.unlock(); }
std::mutex m_mutex;
};
class MP_SCOPED_CAPABILITY Lock {
public:
explicit Lock(Mutex& m) MP_ACQUIRE(m) : m_lock(m.m_mutex) {}
~Lock() MP_RELEASE() {}
void unlock() MP_RELEASE() { m_lock.unlock(); }
void lock() MP_ACQUIRE() { m_lock.lock(); }
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
{
assert(m_lock.mutex() == &mutex.m_mutex);
assert(m_lock);
}
std::unique_lock<std::mutex> m_lock;
};
//! Analog to std::lock_guard that unlocks instead of locks.
template <typename Lock>
struct UnlockGuard
@ -146,46 +201,6 @@ void Unlock(Lock& lock, Callback&& callback)
callback();
}
//! Needed for libc++/macOS compatibility. Lets code work with shared_ptr nothrow declaration
//! https://github.com/capnproto/capnproto/issues/553#issuecomment-328554603
template <typename T>
struct DestructorCatcher
{
T value;
template <typename... Params>
DestructorCatcher(Params&&... params) : value(kj::fwd<Params>(params)...)
{
}
~DestructorCatcher() noexcept try {
} catch (const kj::Exception& e) { // NOLINT(bugprone-empty-catch)
}
};
//! Wrapper around callback function for compatibility with std::async.
//!
//! std::async requires callbacks to be copyable and requires noexcept
//! destructors, but this doesn't work well with kj types which are generally
//! move-only and not noexcept.
template <typename Callable>
struct AsyncCallable
{
AsyncCallable(Callable&& callable) : m_callable(std::make_shared<DestructorCatcher<Callable>>(std::move(callable)))
{
}
AsyncCallable(const AsyncCallable&) = default;
AsyncCallable(AsyncCallable&&) = default;
~AsyncCallable() noexcept = default;
ResultOf<Callable> operator()() const { return (m_callable->value)(); }
mutable std::shared_ptr<DestructorCatcher<Callable>> m_callable;
};
//! Construct AsyncCallable object.
template <typename Callable>
AsyncCallable<std::remove_reference_t<Callable>> MakeAsyncCallable(Callable&& callable)
{
return std::move(callable);
}
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::string ThreadName(const char* exe_name);

View file

@ -22,6 +22,7 @@
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/function.h>
#include <kj/memory.h>
#include <map>
#include <memory>
@ -48,6 +49,36 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
m_loop.log() << "Uncaught exception in daemonized task.";
}
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
{
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
loop_lock->assert_locked(m_loop->m_mutex);
m_loop->m_num_clients += 1;
}
bool EventLoopRef::reset(Lock* lock)
{
bool done = false;
if (m_loop) {
auto loop_lock{PtrOrValue{lock ? lock : m_lock, m_loop->m_mutex}};
loop_lock->assert_locked(m_loop->m_mutex);
assert(m_loop->m_num_clients > 0);
m_loop->m_num_clients -= 1;
if (m_loop->done()) {
done = true;
m_loop->m_cv.notify_all();
int post_fd{m_loop->m_post_fd};
loop_lock->unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
}
m_loop = nullptr;
}
return done;
}
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
Connection::~Connection()
{
// Shut down RPC system first, since this will garbage collect Server
@ -103,18 +134,18 @@ Connection::~Connection()
m_sync_cleanup_fns.pop_front();
}
while (!m_async_cleanup_fns.empty()) {
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
const Lock lock(m_loop->m_mutex);
m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
m_async_cleanup_fns.pop_front();
}
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.startAsyncThread(lock);
m_loop.removeClient(lock);
Lock lock(m_loop->m_mutex);
m_loop->startAsyncThread();
m_loop.reset(&lock);
}
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_loop->m_mutex);
// Add cleanup callbacks to the front of list, so sync cleanup functions run
// in LIFO order. This is a good approach because sync cleanup functions are
// added as client objects are created, and it is natural to clean up
@ -128,13 +159,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
void Connection::removeSyncCleanup(CleanupIt it)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_loop->m_mutex);
m_sync_cleanup_fns.erase(it);
}
void Connection::addAsyncCleanup(std::function<void()> fn)
{
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
const Lock lock(m_loop->m_mutex);
// Add async cleanup callbacks to the back of the list. Unlike the sync
// cleanup list, this list order is more significant because it determines
// the order server objects are destroyed when there is a sudden disconnect,
@ -170,7 +201,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
EventLoop::~EventLoop()
{
if (m_async_thread.joinable()) m_async_thread.join();
const std::lock_guard<std::mutex> lock(m_mutex);
const Lock lock(m_mutex);
KJ_ASSERT(m_post_fn == nullptr);
KJ_ASSERT(m_async_fns.empty());
KJ_ASSERT(m_wait_fd == -1);
@ -195,14 +226,14 @@ void EventLoop::loop()
for (;;) {
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
std::unique_lock<std::mutex> lock(m_mutex);
Lock lock(m_mutex);
if (m_post_fn) {
Unlock(lock, *m_post_fn);
m_post_fn = nullptr;
m_cv.notify_all();
} else if (done(lock)) {
} else if (done()) {
// Intentionally do not break if m_post_fn was set, even if done()
// would return true, to ensure that the removeClient write(post_fd)
// would return true, to ensure that the EventLoopRef write(post_fd)
// call always succeeds and the loop does not exit between the time
// that the done condition is set and the write call is made.
break;
@ -213,75 +244,62 @@ void EventLoop::loop()
log() << "EventLoop::loop bye.";
wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd));
const std::unique_lock<std::mutex> lock(m_mutex);
const Lock lock(m_mutex);
m_wait_fd = -1;
m_post_fd = -1;
}
void EventLoop::post(const std::function<void()>& fn)
void EventLoop::post(kj::Function<void()> fn)
{
if (std::this_thread::get_id() == m_thread_id) {
fn();
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
addClient(lock);
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
Lock lock(m_mutex);
EventLoopRef ref(*this, &lock);
m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
m_post_fn = &fn;
int post_fd{m_post_fd};
Unlock(lock, [&] {
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1));
});
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
removeClient(lock);
m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
}
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
{
m_num_clients -= 1;
if (done(lock)) {
m_cv.notify_all();
int post_fd{m_post_fd};
lock.unlock();
char buffer = 0;
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
return true;
}
return false;
}
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
void EventLoop::startAsyncThread()
{
if (m_async_thread.joinable()) {
m_cv.notify_all();
} else if (!m_async_fns.empty()) {
m_async_thread = std::thread([this] {
std::unique_lock<std::mutex> lock(m_mutex);
while (true) {
Lock lock(m_mutex);
while (!done()) {
if (!m_async_fns.empty()) {
addClient(lock);
EventLoopRef ref{*this, &lock};
const std::function<void()> fn = std::move(m_async_fns.front());
m_async_fns.pop_front();
Unlock(lock, fn);
if (removeClient(lock)) break;
// Important to explictly call ref.reset() here and
// explicitly break if the EventLoop is done, not relying on
// while condition above. Reason is that end of `ref`
// lifetime can cause EventLoop::loop() to exit, and if
// there is external code that immediately deletes the
// EventLoop object as soon as EventLoop::loop() method
// returns, checking the while condition may crash.
if (ref.reset()) break;
// Continue without waiting in case there are more async_fns
continue;
} else if (m_num_clients == 0) {
break;
}
m_cv.wait(lock);
m_cv.wait(lock.m_lock);
}
});
}
}
bool EventLoop::done(std::unique_lock<std::mutex>& lock)
bool EventLoop::done()
{
assert(m_num_clients >= 0);
assert(lock.owns_lock());
assert(lock.mutex() == &m_mutex);
return m_num_clients == 0 && m_async_fns.empty();
}
@ -375,7 +393,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);

View file

@ -23,32 +23,82 @@
namespace mp {
namespace test {
/**
* Test setup class creating a two way connection between a
* ProxyServer<FooInterface> object and a ProxyClient<FooInterface>.
*
* Provides client_disconnect and server_disconnect lambdas that can be used to
* trigger disconnects and test handling of broken and closed connections.
*
* Accepts a client_owns_connection option to test different ProxyClient
* destroy_connection values and control whether destroying the ProxyClient
* object destroys the client Connection object. Normally it makes sense for
* this to be true to simplify shutdown and avoid needing to call
* client_disconnect manually, but false allows testing more ProxyClient
* behavior and the "IPC client method called after disconnect" code path.
*/
class TestSetup
{
public:
std::thread thread;
std::function<void()> server_disconnect;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
TestSetup(bool client_owns_connection = true)
: thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
if (raise) throw std::runtime_error(log);
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
auto server_connection =
std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) {
auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
std::make_shared<FooImplementation>(), connection);
return capnp::Capability::Client(kj::mv(server_proxy));
});
server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
// Set handler to destroy the server when the client disconnects. This
// is ignored if server_disconnect() is called instead.
server_connection->onDisconnect([&] { server_connection.reset(); });
auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
client_connection.get(), /* destroy_connection= */ client_owns_connection);
if (client_owns_connection) {
client_connection.release();
} else {
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
}
client_promise.set_value(std::move(client_proxy));
loop.loop();
}}
{
client = client_promise.get_future().get();
}
~TestSetup()
{
// Test that client cleanup_fns are executed.
bool destroyed = false;
client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; });
client.reset();
KJ_EXPECT(destroyed);
thread.join();
}
};
KJ_TEST("Call FooInterface methods")
{
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>(
connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) {
auto foo_server = kj::heap<ProxyServer<messages::FooInterface>>(std::make_shared<FooImplementation>(), connection);
return capnp::Capability::Client(kj::mv(foo_server));
});
connection_server->onDisconnect([&] { connection_server.reset(); });
loop.loop();
});
auto foo = foo_promise.get_future().get();
KJ_EXPECT(foo->add(1, 2) == 3);
FooStruct in;
@ -127,14 +177,40 @@ KJ_TEST("Call FooInterface methods")
mut.message = "init";
foo->passMutable(mut);
KJ_EXPECT(mut.message == "init build pass call return read");
}
disconnect_client();
thread.join();
KJ_TEST("Call IPC method after client connection is closed")
{
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.client_disconnect();
bool destroyed = false;
foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; });
foo.reset();
KJ_EXPECT(destroyed);
bool disconnected{false};
try {
foo->add(1, 2);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
KJ_TEST("Calling IPC method after server connection is closed")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
setup.server_disconnect();
bool disconnected{false};
try {
foo->add(1, 2);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}
} // namespace test

View file

@ -58,6 +58,9 @@ public:
//! clients and servers independently.
virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) = 0;
//! Disconnect any incoming connections that are still connected.
virtual void disconnectIncoming() = 0;
//! Add cleanup callback to interface that will run when the interface is
//! deleted.
virtual void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) = 0;

View file

@ -121,6 +121,13 @@ public:
m_reachable.clear();
}
void Reset() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
AssertLockNotHeld(m_mutex);
LOCK(m_mutex);
m_reachable = DefaultNets();
}
[[nodiscard]] bool Contains(Network net) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
AssertLockNotHeld(m_mutex);
@ -142,17 +149,21 @@ public:
}
private:
mutable Mutex m_mutex;
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){
NET_UNROUTABLE,
NET_IPV4,
NET_IPV6,
NET_ONION,
NET_I2P,
NET_CJDNS,
NET_INTERNAL
static std::unordered_set<Network> DefaultNets()
{
return {
NET_UNROUTABLE,
NET_IPV4,
NET_IPV6,
NET_ONION,
NET_I2P,
NET_CJDNS,
NET_INTERNAL
};
};
mutable Mutex m_mutex;
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){DefaultNets()};
};
extern ReachableNets g_reachable_nets;

View file

@ -323,6 +323,12 @@ void SetRPCWarmupStatus(const std::string& newStatus)
rpcWarmupStatus = newStatus;
}
void SetRPCWarmupStarting()
{
LOCK(g_rpc_warmup_mutex);
fRPCInWarmup = true;
}
void SetRPCWarmupFinished()
{
LOCK(g_rpc_warmup_mutex);

View file

@ -30,6 +30,7 @@ void RpcInterruptionPoint();
*/
void SetRPCWarmupStatus(const std::string& newStatus);
/* Mark warmup as done. RPC calls will be processed from now on. */
void SetRPCWarmupStarting();
void SetRPCWarmupFinished();
/* returns the current warmup state. */

View file

@ -63,6 +63,7 @@ add_executable(test_bitcoin
net_peer_eviction_tests.cpp
net_tests.cpp
netbase_tests.cpp
node_init_tests.cpp
node_warnings_tests.cpp
orphanage_tests.cpp
pcp_tests.cpp

View file

@ -53,9 +53,8 @@ static std::string TempPath(std::string_view pattern)
//! on the object through FooInterface (defined in ipc_test.capnp).
void IpcPipeTest()
{
// Setup: create FooImplemention object and listen for FooInterface requests
// Setup: create FooImplementation object and listen for FooInterface requests
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@ -63,9 +62,8 @@ void IpcPipeTest()
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
connection_client.release(), /* destroy_connection= */ true);
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
@ -125,8 +123,8 @@ void IpcPipeTest()
auto script2{foo->passScript(script1)};
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
// Test cleanup: disconnect pipe and join thread
disconnect_client();
// Test cleanup: disconnect and join thread
foo.reset();
thread.join();
}

View file

@ -702,6 +702,7 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port)
BOOST_AUTO_TEST_CASE(LimitedAndReachable_Network)
{
g_reachable_nets.Reset();
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV4));
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV6));
BOOST_CHECK(g_reachable_nets.Contains(NET_ONION));

View file

@ -0,0 +1,49 @@
// Copyright (c) 2025 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <init.h>
#include <interfaces/init.h>
#include <rpc/server.h>
#include <boost/test/unit_test.hpp>
#include <test/util/setup_common.h>
using node::NodeContext;
BOOST_FIXTURE_TEST_SUITE(node_init_tests, BasicTestingSetup)
//! Custom implementation of interfaces::Init for testing.
class TestInit : public interfaces::Init
{
public:
TestInit(NodeContext& node) : m_node(node)
{
InitContext(m_node);
m_node.init = this;
}
std::unique_ptr<interfaces::Chain> makeChain() override { return interfaces::MakeChain(m_node); }
std::unique_ptr<interfaces::WalletLoader> makeWalletLoader(interfaces::Chain& chain) override
{
return MakeWalletLoader(chain, *Assert(m_node.args));
}
NodeContext& m_node;
};
BOOST_AUTO_TEST_CASE(init_test)
{
// Reset logging, config file path, rpc state, reachable nets to avoid errors in AppInitMain
LogInstance().DisconnectTestLogger();
m_node.args->SetConfigFilePath({});
SetRPCWarmupStarting();
g_reachable_nets.Reset();
// Run through initialization and shutdown code.
TestInit init{m_node};
BOOST_CHECK(AppInitInterfaces(m_node));
BOOST_CHECK(AppInitMain(m_node));
Interrupt(m_node);
Shutdown(m_node);
}
BOOST_AUTO_TEST_SUITE_END()