ipc: Use EventLoopRef instead of addClient/removeClient

Use EventLoopRef to avoid reference counting bugs and be more exception safe
and deal with removal of addClient/removeClient methods in
https://github.com/bitcoin-core/libmultiprocess/pull/160

A test update is also required due to
https://github.com/bitcoin-core/libmultiprocess/pull/160 to deal with changed
reference count semantics. In IpcPipeTest(), it is is now necessary to destroy
the client Proxy object instead of just the client Connection object to
decrease the event loop reference count and allow the loop to exit so the test
does not hang on shutdown.
This commit is contained in:
Ryan Ofsky 2025-02-09 14:21:37 -05:00
parent b9e16ff790
commit 197b2aaaaa
2 changed files with 7 additions and 14 deletions

View file

@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
m_loop_ref.reset();
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
@ -83,10 +80,7 @@ public:
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
m_loop_ref.emplace(*m_loop);
promise.set_value();
m_loop->loop();
m_loop.reset();
@ -96,6 +90,7 @@ public:
Context m_context;
std::thread m_loop_thread;
std::optional<mp::EventLoop> m_loop;
std::optional<mp::EventLoopRef> m_loop_ref;
};
} // namespace

View file

@ -53,9 +53,8 @@ static std::string TempPath(std::string_view pattern)
//! on the object through FooInterface (defined in ipc_test.capnp).
void IpcPipeTest()
{
// Setup: create FooImplemention object and listen for FooInterface requests
// Setup: create FooImplementation object and listen for FooInterface requests
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@ -63,9 +62,8 @@ void IpcPipeTest()
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
connection_client.release(), /* destroy_connection= */ true);
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
@ -125,8 +123,8 @@ void IpcPipeTest()
auto script2{foo->passScript(script1)};
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
// Test cleanup: disconnect pipe and join thread
disconnect_client();
// Test cleanup: disconnect and join thread
foo.reset();
thread.join();
}