mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-04-29 14:59:39 -04:00
Compare commits
19 commits
ae0f33017f
...
37772ab12e
Author | SHA1 | Date | |
---|---|---|---|
|
37772ab12e | ||
|
65714c162c | ||
|
a4eee6d50b | ||
|
af6cffa36d | ||
|
33e6538b30 | ||
|
fa48be3ba4 | ||
|
aaaa45399c | ||
|
cccc1f4e91 | ||
|
c7e2b9e264 | ||
|
fa58f40b89 | ||
|
5b38e62ccb | ||
|
db845b915f | ||
|
6427d6f175 | ||
|
8ca7049cea | ||
|
cf1c26a3a2 | ||
|
197b2aaaaa | ||
|
b9e16ff790 | ||
|
c021835739 | ||
|
fadf12a56c |
30 changed files with 606 additions and 311 deletions
|
@ -589,6 +589,14 @@ void ArgsManager::AddHiddenArgs(const std::vector<std::string>& names)
|
|||
}
|
||||
}
|
||||
|
||||
void ArgsManager::ClearArgs()
|
||||
{
|
||||
LOCK(cs_args);
|
||||
m_settings = {};
|
||||
m_available_args.clear();
|
||||
m_network_only_args.clear();
|
||||
}
|
||||
|
||||
void ArgsManager::CheckMultipleCLIArgs() const
|
||||
{
|
||||
LOCK(cs_args);
|
||||
|
|
|
@ -359,11 +359,7 @@ protected:
|
|||
/**
|
||||
* Clear available arguments
|
||||
*/
|
||||
void ClearArgs() {
|
||||
LOCK(cs_args);
|
||||
m_available_args.clear();
|
||||
m_network_only_args.clear();
|
||||
}
|
||||
void ClearArgs();
|
||||
|
||||
/**
|
||||
* Check CLI command args
|
||||
|
|
18
src/init.cpp
18
src/init.cpp
|
@ -33,6 +33,7 @@
|
|||
#include <interfaces/ipc.h>
|
||||
#include <interfaces/mining.h>
|
||||
#include <interfaces/node.h>
|
||||
#include <ipc/exception.h>
|
||||
#include <kernel/caches.h>
|
||||
#include <kernel/context.h>
|
||||
#include <key.h>
|
||||
|
@ -298,8 +299,13 @@ void Shutdown(NodeContext& node)
|
|||
StopREST();
|
||||
StopRPC();
|
||||
StopHTTPServer();
|
||||
for (const auto& client : node.chain_clients) {
|
||||
client->flush();
|
||||
for (auto& client : node.chain_clients) {
|
||||
try {
|
||||
client->flush();
|
||||
} catch (const ipc::Exception& e) {
|
||||
LogDebug(BCLog::IPC, "Chain client did not disconnect cleanly: %s", e.what());
|
||||
client.reset();
|
||||
}
|
||||
}
|
||||
StopMapPort();
|
||||
|
||||
|
@ -374,7 +380,7 @@ void Shutdown(NodeContext& node)
|
|||
}
|
||||
}
|
||||
for (const auto& client : node.chain_clients) {
|
||||
client->stop();
|
||||
if (client) client->stop();
|
||||
}
|
||||
|
||||
#ifdef ENABLE_ZMQ
|
||||
|
@ -398,6 +404,12 @@ void Shutdown(NodeContext& node)
|
|||
|
||||
RemovePidFile(*node.args);
|
||||
|
||||
// If any -ipcbind clients are still connected, disconnect them now so they
|
||||
// do not block shutdown.
|
||||
if (interfaces::Ipc* ipc = node.init->ipc()) {
|
||||
ipc->disconnectIncoming();
|
||||
}
|
||||
|
||||
LogPrintf("%s: done\n", __func__);
|
||||
}
|
||||
|
||||
|
|
|
@ -59,17 +59,20 @@ public:
|
|||
//! true. If this is not a spawned child process, return false.
|
||||
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
|
||||
|
||||
//! Connect to a socket address and make a client interface proxy object
|
||||
//! using provided callback. connectAddress returns an interface pointer if
|
||||
//! the connection was established, returns null if address is empty ("") or
|
||||
//! disabled ("0") or if a connection was refused but not required ("auto"),
|
||||
//! and throws an exception if there was an unexpected error.
|
||||
//! Connect to a socket address and return a pointer to its Init interface.
|
||||
//! Returns a non-null pointer if the connection was established, returns
|
||||
//! null if address is empty ("") or disabled ("0") or if a connection was
|
||||
//! refused but not required ("auto"), and throws an exception if there was
|
||||
//! an unexpected error.
|
||||
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
|
||||
|
||||
//! Connect to a socket address and make a client interface proxy object
|
||||
//! using provided callback. Throws an exception if there was an error.
|
||||
//! Listen on a socket address exposing this process's init interface to
|
||||
//! clients. Throws an exception if there was an error.
|
||||
virtual void listenAddress(std::string& address) = 0;
|
||||
|
||||
//! Disconnect any incoming connections that are still connected.
|
||||
virtual void disconnectIncoming() = 0;
|
||||
|
||||
//! Add cleanup callback to remote interface that will run when the
|
||||
//! interface is deleted.
|
||||
template<typename Interface>
|
||||
|
|
|
@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
|
|||
public:
|
||||
~CapnpProtocol() noexcept(true)
|
||||
{
|
||||
if (m_loop) {
|
||||
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
|
||||
m_loop->removeClient(lock);
|
||||
}
|
||||
m_loop_ref.reset();
|
||||
if (m_loop_thread.joinable()) m_loop_thread.join();
|
||||
assert(!m_loop);
|
||||
};
|
||||
|
@ -71,6 +68,13 @@ public:
|
|||
m_loop->loop();
|
||||
m_loop.reset();
|
||||
}
|
||||
void disconnectIncoming() override
|
||||
{
|
||||
if (!m_loop) return;
|
||||
m_loop->sync([&] {
|
||||
m_loop->m_incoming_connections.clear();
|
||||
});
|
||||
}
|
||||
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
|
||||
{
|
||||
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
|
||||
|
@ -83,10 +87,7 @@ public:
|
|||
m_loop_thread = std::thread([&] {
|
||||
util::ThreadRename("capnp-loop");
|
||||
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
|
||||
m_loop->addClient(lock);
|
||||
}
|
||||
m_loop_ref.emplace(*m_loop);
|
||||
promise.set_value();
|
||||
m_loop->loop();
|
||||
m_loop.reset();
|
||||
|
@ -96,6 +97,7 @@ public:
|
|||
Context m_context;
|
||||
std::thread m_loop_thread;
|
||||
std::optional<mp::EventLoop> m_loop;
|
||||
std::optional<mp::EventLoopRef> m_loop_ref;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <cstdlib>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <signal.h>
|
||||
#include <stdexcept>
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
|
@ -26,6 +27,27 @@
|
|||
|
||||
namespace ipc {
|
||||
namespace {
|
||||
#ifndef WIN32
|
||||
std::string g_ignore_ctrl_c;
|
||||
|
||||
void HandleCtrlC(int)
|
||||
{
|
||||
(void)write(STDOUT_FILENO, g_ignore_ctrl_c.data(), g_ignore_ctrl_c.size());
|
||||
}
|
||||
#endif
|
||||
|
||||
void IgnoreCtrlC(std::string message)
|
||||
{
|
||||
#ifndef WIN32
|
||||
g_ignore_ctrl_c = std::move(message);
|
||||
struct sigaction sa{};
|
||||
sa.sa_handler = HandleCtrlC;
|
||||
sigemptyset(&sa.sa_mask);
|
||||
sa.sa_flags = SA_RESTART;
|
||||
sigaction(SIGINT, &sa, nullptr);
|
||||
#endif
|
||||
}
|
||||
|
||||
class IpcImpl : public interfaces::Ipc
|
||||
{
|
||||
public:
|
||||
|
@ -53,6 +75,7 @@ public:
|
|||
if (!m_process->checkSpawned(argc, argv, fd)) {
|
||||
return false;
|
||||
}
|
||||
IgnoreCtrlC(strprintf("[%s] SIGINT received — waiting for parent to shut down.\n", m_exe_name));
|
||||
m_protocol->serve(fd, m_exe_name, m_init);
|
||||
exit_status = EXIT_SUCCESS;
|
||||
return true;
|
||||
|
@ -86,6 +109,10 @@ public:
|
|||
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
|
||||
m_protocol->listen(fd, m_exe_name, m_init);
|
||||
}
|
||||
void disconnectIncoming() override
|
||||
{
|
||||
m_protocol->disconnectIncoming();
|
||||
}
|
||||
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
|
||||
{
|
||||
m_protocol->addCleanup(type, iface, std::move(cleanup));
|
||||
|
|
|
@ -14,9 +14,10 @@
|
|||
|
||||
#include <assert.h>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <kj/function.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
|
@ -129,6 +130,25 @@ std::string LongThreadName(const char* exe_name);
|
|||
|
||||
//! Event loop implementation.
|
||||
//!
|
||||
//! Cap'n Proto threading model is very simple: all I/O operations are
|
||||
//! asynchronous and must be performed on a single thread. This includes:
|
||||
//!
|
||||
//! - Code starting an asynchronous operation (calling a function that returns a
|
||||
//! promise object)
|
||||
//! - Code notifying that an asynchronous operation is complete (code using a
|
||||
//! fulfiller object)
|
||||
//! - Code handling a completed operation (code chaining or waiting for a promise)
|
||||
//!
|
||||
//! All this code needs to run on one thread, and the EventLoop::loop() method
|
||||
//! is the entry point for this thread. ProxyClient and ProxyServer objects that
|
||||
//! use other threads and need to perform I/O operations post to this thread
|
||||
//! using EventLoop::post() and EventLoop::sync() methods.
|
||||
//!
|
||||
//! Specifically, because ProxyClient methods can be called from arbitrary
|
||||
//! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
|
||||
//! methods use the EventLoop thread to send requests, and ProxyServer methods
|
||||
//! use the thread to return results.
|
||||
//!
|
||||
//! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
|
||||
class EventLoop
|
||||
{
|
||||
|
@ -144,7 +164,7 @@ public:
|
|||
|
||||
//! Run function on event loop thread. Does not return until function completes.
|
||||
//! Must be called while the loop() function is active.
|
||||
void post(const std::function<void()>& fn);
|
||||
void post(kj::Function<void()> fn);
|
||||
|
||||
//! Wrapper around EventLoop::post that takes advantage of the
|
||||
//! fact that callable will not go out of scope to avoid requirement that it
|
||||
|
@ -152,7 +172,7 @@ public:
|
|||
template <typename Callable>
|
||||
void sync(Callable&& callable)
|
||||
{
|
||||
post(std::ref(callable));
|
||||
post(std::forward<Callable>(callable));
|
||||
}
|
||||
|
||||
//! Start asynchronous worker thread if necessary. This is only done if
|
||||
|
@ -166,13 +186,10 @@ public:
|
|||
//! is important that ProxyServer::m_impl destructors do not run on the
|
||||
//! eventloop thread because they may need it to do I/O if they perform
|
||||
//! other IPC calls.
|
||||
void startAsyncThread(std::unique_lock<std::mutex>& lock);
|
||||
void startAsyncThread() MP_REQUIRES(m_mutex);
|
||||
|
||||
//! Add/remove remote client reference counts.
|
||||
void addClient(std::unique_lock<std::mutex>& lock);
|
||||
bool removeClient(std::unique_lock<std::mutex>& lock);
|
||||
//! Check if loop should exit.
|
||||
bool done(std::unique_lock<std::mutex>& lock);
|
||||
bool done() MP_REQUIRES(m_mutex);
|
||||
|
||||
Logger log()
|
||||
{
|
||||
|
@ -195,10 +212,10 @@ public:
|
|||
std::thread m_async_thread;
|
||||
|
||||
//! Callback function to run on event loop thread during post() or sync() call.
|
||||
const std::function<void()>* m_post_fn = nullptr;
|
||||
kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
|
||||
|
||||
//! Callback functions to run on async thread.
|
||||
CleanupList m_async_fns;
|
||||
CleanupList m_async_fns MP_GUARDED_BY(m_mutex);
|
||||
|
||||
//! Pipe read handle used to wake up the event loop thread.
|
||||
int m_wait_fd = -1;
|
||||
|
@ -208,11 +225,11 @@ public:
|
|||
|
||||
//! Number of clients holding references to ProxyServerBase objects that
|
||||
//! reference this event loop.
|
||||
int m_num_clients = 0;
|
||||
int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
|
||||
|
||||
//! Mutex and condition variable used to post tasks to event loop and async
|
||||
//! thread.
|
||||
std::mutex m_mutex;
|
||||
Mutex m_mutex;
|
||||
std::condition_variable m_cv;
|
||||
|
||||
//! Capnp IO context.
|
||||
|
@ -263,11 +280,9 @@ struct Waiter
|
|||
// in the case where a capnp response is sent and a brand new
|
||||
// request is immediately received.
|
||||
while (m_fn) {
|
||||
auto fn = std::move(m_fn);
|
||||
m_fn = nullptr;
|
||||
lock.unlock();
|
||||
fn();
|
||||
lock.lock();
|
||||
auto fn = std::move(*m_fn);
|
||||
m_fn.reset();
|
||||
Unlock(lock, fn);
|
||||
}
|
||||
const bool done = pred();
|
||||
return done;
|
||||
|
@ -276,7 +291,7 @@ struct Waiter
|
|||
|
||||
std::mutex m_mutex;
|
||||
std::condition_variable m_cv;
|
||||
std::function<void()> m_fn;
|
||||
std::optional<kj::Function<void()>> m_fn;
|
||||
};
|
||||
|
||||
//! Object holding network & rpc state associated with either an incoming server
|
||||
|
@ -290,21 +305,13 @@ public:
|
|||
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
|
||||
: m_loop(loop), m_stream(kj::mv(stream_)),
|
||||
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
|
||||
m_rpc_system(::capnp::makeRpcClient(m_network))
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.addClient(lock);
|
||||
}
|
||||
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
|
||||
Connection(EventLoop& loop,
|
||||
kj::Own<kj::AsyncIoStream>&& stream_,
|
||||
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
|
||||
: m_loop(loop), m_stream(kj::mv(stream_)),
|
||||
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
|
||||
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.addClient(lock);
|
||||
}
|
||||
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
|
||||
|
||||
//! Run cleanup functions. Must be called from the event loop thread. First
|
||||
//! calls synchronous cleanup functions while blocked (to free capnp
|
||||
|
@ -333,12 +340,12 @@ public:
|
|||
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
|
||||
// error in cases where f deletes this Connection object.
|
||||
m_on_disconnect.add(m_network.onDisconnect().then(
|
||||
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
|
||||
[f = std::move(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
|
||||
}
|
||||
|
||||
EventLoop& m_loop;
|
||||
EventLoopRef m_loop;
|
||||
kj::Own<kj::AsyncIoStream> m_stream;
|
||||
LoggingErrorHandler m_error_handler{m_loop};
|
||||
LoggingErrorHandler m_error_handler{*m_loop};
|
||||
kj::TaskSet m_on_disconnect{m_error_handler};
|
||||
::capnp::TwoPartyVatNetwork m_network;
|
||||
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
|
||||
|
@ -381,21 +388,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
|
|||
: m_client(std::move(client)), m_context(connection)
|
||||
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
|
||||
m_context.connection->m_loop.addClient(lock);
|
||||
}
|
||||
|
||||
// Handler for the connection getting destroyed before this client object.
|
||||
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
|
||||
// Release client capability by move-assigning to temporary.
|
||||
{
|
||||
typename Interface::Client(std::move(m_client));
|
||||
}
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
|
||||
m_context.connection->m_loop.removeClient(lock);
|
||||
}
|
||||
m_context.connection = nullptr;
|
||||
});
|
||||
|
||||
|
@ -423,16 +421,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
|
|||
Sub::destroy(*this);
|
||||
|
||||
// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
|
||||
m_context.connection->m_loop.sync([&]() {
|
||||
m_context.loop->sync([&]() {
|
||||
// Release client capability by move-assigning to temporary.
|
||||
{
|
||||
typename Interface::Client(std::move(m_client));
|
||||
}
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
|
||||
m_context.connection->m_loop.removeClient(lock);
|
||||
}
|
||||
|
||||
if (destroy_connection) {
|
||||
delete m_context.connection;
|
||||
m_context.connection = nullptr;
|
||||
|
@ -454,8 +447,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
|
|||
: m_impl(std::move(impl)), m_context(&connection)
|
||||
{
|
||||
assert(m_impl);
|
||||
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
|
||||
m_context.connection->m_loop.addClient(lock);
|
||||
}
|
||||
|
||||
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
|
||||
|
@ -489,8 +480,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
|
|||
});
|
||||
}
|
||||
assert(m_context.cleanup_fns.empty());
|
||||
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
|
||||
m_context.connection->m_loop.removeClient(lock);
|
||||
}
|
||||
|
||||
//! If the capnp interface defined a special "destroy" method, as described the
|
||||
|
|
|
@ -558,7 +558,7 @@ template <typename Client>
|
|||
void clientDestroy(Client& client)
|
||||
{
|
||||
if (client.m_context.connection) {
|
||||
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
|
||||
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
|
||||
} else {
|
||||
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
|
||||
}
|
||||
|
@ -567,7 +567,7 @@ void clientDestroy(Client& client)
|
|||
template <typename Server>
|
||||
void serverDestroy(Server& server)
|
||||
{
|
||||
server.m_context.connection->m_loop.log() << "IPC server destroy " << typeid(server).name();
|
||||
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
|
||||
}
|
||||
|
||||
//! Entry point called by generated client code that looks like:
|
||||
|
@ -582,12 +582,9 @@ void serverDestroy(Server& server)
|
|||
template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
|
||||
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
|
||||
{
|
||||
if (!proxy_client.m_context.connection) {
|
||||
throw std::logic_error("clientInvoke call made after disconnect");
|
||||
}
|
||||
if (!g_thread_context.waiter) {
|
||||
assert(g_thread_context.thread_name.empty());
|
||||
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
|
||||
g_thread_context.thread_name = ThreadName(proxy_client.m_context.loop->m_exe_name);
|
||||
// If next assert triggers, it means clientInvoke is being called from
|
||||
// the capnp event loop thread. This can happen when a ProxyServer
|
||||
// method implementation that runs synchronously on the event loop
|
||||
|
@ -598,7 +595,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
|
|||
// declaration so the server method runs in a dedicated thread.
|
||||
assert(!g_thread_context.loop_thread);
|
||||
g_thread_context.waiter = std::make_unique<Waiter>();
|
||||
proxy_client.m_context.connection->m_loop.logPlain()
|
||||
proxy_client.m_context.loop->logPlain()
|
||||
<< "{" << g_thread_context.thread_name
|
||||
<< "} IPC client first request from current thread, constructing waiter";
|
||||
}
|
||||
|
@ -606,18 +603,27 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
|
|||
std::exception_ptr exception;
|
||||
std::string kj_exception;
|
||||
bool done = false;
|
||||
proxy_client.m_context.connection->m_loop.sync([&]() {
|
||||
const char* disconnected = nullptr;
|
||||
proxy_client.m_context.loop->sync([&]() {
|
||||
if (!proxy_client.m_context.connection) {
|
||||
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
|
||||
done = true;
|
||||
disconnected = "IPC client method called after disconnect.";
|
||||
invoke_context.thread_context.waiter->m_cv.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
auto request = (proxy_client.m_client.*get_request)(nullptr);
|
||||
using Request = CapRequestTraits<decltype(request)>;
|
||||
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
|
||||
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
|
||||
proxy_client.m_context.connection->m_loop.logPlain()
|
||||
proxy_client.m_context.loop->logPlain()
|
||||
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
|
||||
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());
|
||||
|
||||
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
|
||||
proxy_client.m_context.loop->m_task_set->add(request.send().then(
|
||||
[&](::capnp::Response<typename Request::Results>&& response) {
|
||||
proxy_client.m_context.connection->m_loop.logPlain()
|
||||
proxy_client.m_context.loop->logPlain()
|
||||
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
|
||||
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
|
||||
try {
|
||||
|
@ -631,9 +637,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
|
|||
invoke_context.thread_context.waiter->m_cv.notify_all();
|
||||
},
|
||||
[&](const ::kj::Exception& e) {
|
||||
kj_exception = kj::str("kj::Exception: ", e).cStr();
|
||||
proxy_client.m_context.connection->m_loop.logPlain()
|
||||
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
|
||||
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
|
||||
disconnected = "IPC client method call interrupted by disconnect.";
|
||||
} else {
|
||||
kj_exception = kj::str("kj::Exception: ", e).cStr();
|
||||
proxy_client.m_context.loop->logPlain()
|
||||
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
|
||||
}
|
||||
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
|
||||
done = true;
|
||||
invoke_context.thread_context.waiter->m_cv.notify_all();
|
||||
|
@ -643,7 +653,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
|
|||
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
|
||||
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
|
||||
if (exception) std::rethrow_exception(exception);
|
||||
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
|
||||
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
|
||||
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
|
||||
}
|
||||
|
||||
//! Invoke callable `fn()` that may return void. If it does return void, replace
|
||||
|
@ -682,7 +693,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
|
|||
using Results = typename decltype(call_context.getResults())::Builds;
|
||||
|
||||
int req = ++server_reqs;
|
||||
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
|
||||
server.m_context.loop->log() << "IPC server recv request #" << req << " "
|
||||
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
|
||||
|
||||
try {
|
||||
|
@ -699,14 +710,14 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
|
|||
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
|
||||
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
|
||||
.then([&server, req](CallContext call_context) {
|
||||
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
|
||||
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
|
||||
<< " " << LogEscape(call_context.getResults().toString());
|
||||
});
|
||||
} catch (const std::exception& e) {
|
||||
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
|
||||
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
|
||||
throw;
|
||||
} catch (...) {
|
||||
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
|
||||
server.m_context.loop->log() << "IPC server unhandled exception";
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include <mp/util.h>
|
||||
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <stddef.h>
|
||||
|
@ -47,13 +48,34 @@ inline void CleanupRun(CleanupList& fns) {
|
|||
}
|
||||
}
|
||||
|
||||
//! Event loop smart pointer automatically managing m_num_clients.
|
||||
//! If a lock pointer argument is passed, the specified lock will be used,
|
||||
//! otherwise EventLoop::m_mutex will be locked when needed.
|
||||
class EventLoopRef
|
||||
{
|
||||
public:
|
||||
explicit EventLoopRef(EventLoop& loop, Lock* lock = nullptr);
|
||||
EventLoopRef(EventLoopRef&& other) noexcept : m_loop(other.m_loop) { other.m_loop = nullptr; }
|
||||
EventLoopRef(const EventLoopRef&) = delete;
|
||||
EventLoopRef& operator=(const EventLoopRef&) = delete;
|
||||
EventLoopRef& operator=(EventLoopRef&&) = delete;
|
||||
~EventLoopRef() { reset(); }
|
||||
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
|
||||
EventLoop* operator->() const { assert(m_loop); return m_loop; }
|
||||
bool reset(Lock* lock = nullptr);
|
||||
|
||||
EventLoop* m_loop{nullptr};
|
||||
Lock* m_lock{nullptr};
|
||||
};
|
||||
|
||||
//! Context data associated with proxy client and server classes.
|
||||
struct ProxyContext
|
||||
{
|
||||
Connection* connection;
|
||||
EventLoopRef loop;
|
||||
CleanupList cleanup_fns;
|
||||
|
||||
ProxyContext(Connection* connection) : connection(connection) {}
|
||||
ProxyContext(Connection* connection);
|
||||
};
|
||||
|
||||
//! Base class for generated ProxyClient classes that implement a C++ interface
|
||||
|
|
|
@ -64,8 +64,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
|||
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
|
||||
auto& server = server_context.proxy_server;
|
||||
int req = server_context.req;
|
||||
auto invoke = MakeAsyncCallable(
|
||||
[fulfiller = kj::mv(future.fulfiller),
|
||||
auto invoke = [fulfiller = kj::mv(future.fulfiller),
|
||||
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
|
||||
const auto& params = call_context.getParams();
|
||||
Context::Reader context_arg = Accessor::get(params);
|
||||
|
@ -132,35 +131,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
|
|||
return;
|
||||
}
|
||||
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
|
||||
server.m_context.connection->m_loop.sync([&] {
|
||||
server.m_context.loop->sync([&] {
|
||||
auto fulfiller_dispose = kj::mv(fulfiller);
|
||||
fulfiller_dispose->fulfill(kj::mv(call_context));
|
||||
});
|
||||
}))
|
||||
{
|
||||
server.m_context.connection->m_loop.sync([&]() {
|
||||
server.m_context.loop->sync([&]() {
|
||||
auto fulfiller_dispose = kj::mv(fulfiller);
|
||||
fulfiller_dispose->reject(kj::mv(*exception));
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Lookup Thread object specified by the client. The specified thread should
|
||||
// be a local Thread::Server object, but it needs to be looked up
|
||||
// asynchronously with getLocalServer().
|
||||
auto thread_client = context_arg.getThread();
|
||||
return server.m_context.connection->m_threads.getLocalServer(thread_client)
|
||||
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
|
||||
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
|
||||
// Assuming the thread object is found, pass it a pointer to the
|
||||
// `invoke` lambda above which will invoke the function on that
|
||||
// thread.
|
||||
KJ_IF_MAYBE (thread_server, perhaps) {
|
||||
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
|
||||
server.m_context.connection->m_loop.log()
|
||||
server.m_context.loop->log()
|
||||
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
|
||||
thread.m_thread_context.waiter->post(std::move(invoke));
|
||||
} else {
|
||||
server.m_context.connection->m_loop.log()
|
||||
server.m_context.loop->log()
|
||||
<< "IPC server error request #" << req << ", missing thread to execute request";
|
||||
throw std::runtime_error("invalid thread handle");
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#define MP_UTIL_H
|
||||
|
||||
#include <capnp/schema.h>
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
|
@ -13,11 +14,13 @@
|
|||
#include <kj/exception.h>
|
||||
#include <kj/string-tree.h>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
namespace mp {
|
||||
|
@ -130,6 +133,58 @@ const char* TypeName()
|
|||
return short_name ? short_name + 1 : display_name;
|
||||
}
|
||||
|
||||
//! Convenient wrapper around std::variant<T*, T>
|
||||
template <typename T>
|
||||
struct PtrOrValue {
|
||||
std::variant<T*, T> data;
|
||||
|
||||
template <typename... Args>
|
||||
PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant<T*, T>{std::in_place_type<T>, std::forward<Args>(args)...}) {}
|
||||
|
||||
T& operator*() { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
|
||||
T* operator->() { return &**this; }
|
||||
T& operator*() const { return data.index() ? std::get<T>(data) : *std::get<T*>(data); }
|
||||
T* operator->() const { return &**this; }
|
||||
};
|
||||
|
||||
// Annotated mutex and lock class (https://clang.llvm.org/docs/ThreadSafetyAnalysis.html)
|
||||
#if defined(__clang__) && (!defined(SWIG))
|
||||
#define MP_TSA(x) __attribute__((x))
|
||||
#else
|
||||
#define MP_TSA(x) // no-op
|
||||
#endif
|
||||
|
||||
#define MP_CAPABILITY(x) MP_TSA(capability(x))
|
||||
#define MP_SCOPED_CAPABILITY MP_TSA(scoped_lockable)
|
||||
#define MP_REQUIRES(x) MP_TSA(requires_capability(x))
|
||||
#define MP_ACQUIRE(...) MP_TSA(acquire_capability(__VA_ARGS__))
|
||||
#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__))
|
||||
#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x))
|
||||
#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x))
|
||||
|
||||
class MP_CAPABILITY("mutex") Mutex {
|
||||
public:
|
||||
void lock() MP_ACQUIRE() { m_mutex.lock(); }
|
||||
void unlock() MP_RELEASE() { m_mutex.unlock(); }
|
||||
|
||||
std::mutex m_mutex;
|
||||
};
|
||||
|
||||
class MP_SCOPED_CAPABILITY Lock {
|
||||
public:
|
||||
explicit Lock(Mutex& m) MP_ACQUIRE(m) : m_lock(m.m_mutex) {}
|
||||
~Lock() MP_RELEASE() {}
|
||||
void unlock() MP_RELEASE() { m_lock.unlock(); }
|
||||
void lock() MP_ACQUIRE() { m_lock.lock(); }
|
||||
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
|
||||
{
|
||||
assert(m_lock.mutex() == &mutex.m_mutex);
|
||||
assert(m_lock);
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> m_lock;
|
||||
};
|
||||
|
||||
//! Analog to std::lock_guard that unlocks instead of locks.
|
||||
template <typename Lock>
|
||||
struct UnlockGuard
|
||||
|
@ -146,46 +201,6 @@ void Unlock(Lock& lock, Callback&& callback)
|
|||
callback();
|
||||
}
|
||||
|
||||
//! Needed for libc++/macOS compatibility. Lets code work with shared_ptr nothrow declaration
|
||||
//! https://github.com/capnproto/capnproto/issues/553#issuecomment-328554603
|
||||
template <typename T>
|
||||
struct DestructorCatcher
|
||||
{
|
||||
T value;
|
||||
template <typename... Params>
|
||||
DestructorCatcher(Params&&... params) : value(kj::fwd<Params>(params)...)
|
||||
{
|
||||
}
|
||||
~DestructorCatcher() noexcept try {
|
||||
} catch (const kj::Exception& e) { // NOLINT(bugprone-empty-catch)
|
||||
}
|
||||
};
|
||||
|
||||
//! Wrapper around callback function for compatibility with std::async.
|
||||
//!
|
||||
//! std::async requires callbacks to be copyable and requires noexcept
|
||||
//! destructors, but this doesn't work well with kj types which are generally
|
||||
//! move-only and not noexcept.
|
||||
template <typename Callable>
|
||||
struct AsyncCallable
|
||||
{
|
||||
AsyncCallable(Callable&& callable) : m_callable(std::make_shared<DestructorCatcher<Callable>>(std::move(callable)))
|
||||
{
|
||||
}
|
||||
AsyncCallable(const AsyncCallable&) = default;
|
||||
AsyncCallable(AsyncCallable&&) = default;
|
||||
~AsyncCallable() noexcept = default;
|
||||
ResultOf<Callable> operator()() const { return (m_callable->value)(); }
|
||||
mutable std::shared_ptr<DestructorCatcher<Callable>> m_callable;
|
||||
};
|
||||
|
||||
//! Construct AsyncCallable object.
|
||||
template <typename Callable>
|
||||
AsyncCallable<std::remove_reference_t<Callable>> MakeAsyncCallable(Callable&& callable)
|
||||
{
|
||||
return std::move(callable);
|
||||
}
|
||||
|
||||
//! Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
|
||||
std::string ThreadName(const char* exe_name);
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include <kj/common.h>
|
||||
#include <kj/debug.h>
|
||||
#include <kj/exception.h>
|
||||
#include <kj/function.h>
|
||||
#include <kj/memory.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
|
@ -48,6 +49,36 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
|
|||
m_loop.log() << "Uncaught exception in daemonized task.";
|
||||
}
|
||||
|
||||
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
|
||||
{
|
||||
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
|
||||
loop_lock->assert_locked(m_loop->m_mutex);
|
||||
m_loop->m_num_clients += 1;
|
||||
}
|
||||
|
||||
bool EventLoopRef::reset(Lock* lock)
|
||||
{
|
||||
bool done = false;
|
||||
if (m_loop) {
|
||||
auto loop_lock{PtrOrValue{lock ? lock : m_lock, m_loop->m_mutex}};
|
||||
loop_lock->assert_locked(m_loop->m_mutex);
|
||||
assert(m_loop->m_num_clients > 0);
|
||||
m_loop->m_num_clients -= 1;
|
||||
if (m_loop->done()) {
|
||||
done = true;
|
||||
m_loop->m_cv.notify_all();
|
||||
int post_fd{m_loop->m_post_fd};
|
||||
loop_lock->unlock();
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
|
||||
}
|
||||
m_loop = nullptr;
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
|
||||
|
||||
Connection::~Connection()
|
||||
{
|
||||
// Shut down RPC system first, since this will garbage collect Server
|
||||
|
@ -103,18 +134,18 @@ Connection::~Connection()
|
|||
m_sync_cleanup_fns.pop_front();
|
||||
}
|
||||
while (!m_async_cleanup_fns.empty()) {
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
|
||||
m_async_cleanup_fns.pop_front();
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.startAsyncThread(lock);
|
||||
m_loop.removeClient(lock);
|
||||
Lock lock(m_loop->m_mutex);
|
||||
m_loop->startAsyncThread();
|
||||
m_loop.reset(&lock);
|
||||
}
|
||||
|
||||
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
// Add cleanup callbacks to the front of list, so sync cleanup functions run
|
||||
// in LIFO order. This is a good approach because sync cleanup functions are
|
||||
// added as client objects are created, and it is natural to clean up
|
||||
|
@ -128,13 +159,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
|
|||
|
||||
void Connection::removeSyncCleanup(CleanupIt it)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
m_sync_cleanup_fns.erase(it);
|
||||
}
|
||||
|
||||
void Connection::addAsyncCleanup(std::function<void()> fn)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
// Add async cleanup callbacks to the back of the list. Unlike the sync
|
||||
// cleanup list, this list order is more significant because it determines
|
||||
// the order server objects are destroyed when there is a sudden disconnect,
|
||||
|
@ -170,7 +201,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
|
|||
EventLoop::~EventLoop()
|
||||
{
|
||||
if (m_async_thread.joinable()) m_async_thread.join();
|
||||
const std::lock_guard<std::mutex> lock(m_mutex);
|
||||
const Lock lock(m_mutex);
|
||||
KJ_ASSERT(m_post_fn == nullptr);
|
||||
KJ_ASSERT(m_async_fns.empty());
|
||||
KJ_ASSERT(m_wait_fd == -1);
|
||||
|
@ -195,14 +226,14 @@ void EventLoop::loop()
|
|||
for (;;) {
|
||||
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
|
||||
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
Lock lock(m_mutex);
|
||||
if (m_post_fn) {
|
||||
Unlock(lock, *m_post_fn);
|
||||
m_post_fn = nullptr;
|
||||
m_cv.notify_all();
|
||||
} else if (done(lock)) {
|
||||
} else if (done()) {
|
||||
// Intentionally do not break if m_post_fn was set, even if done()
|
||||
// would return true, to ensure that the removeClient write(post_fd)
|
||||
// would return true, to ensure that the EventLoopRef write(post_fd)
|
||||
// call always succeeds and the loop does not exit between the time
|
||||
// that the done condition is set and the write call is made.
|
||||
break;
|
||||
|
@ -213,75 +244,62 @@ void EventLoop::loop()
|
|||
log() << "EventLoop::loop bye.";
|
||||
wait_stream = nullptr;
|
||||
KJ_SYSCALL(::close(post_fd));
|
||||
const std::unique_lock<std::mutex> lock(m_mutex);
|
||||
const Lock lock(m_mutex);
|
||||
m_wait_fd = -1;
|
||||
m_post_fd = -1;
|
||||
}
|
||||
|
||||
void EventLoop::post(const std::function<void()>& fn)
|
||||
void EventLoop::post(kj::Function<void()> fn)
|
||||
{
|
||||
if (std::this_thread::get_id() == m_thread_id) {
|
||||
fn();
|
||||
return;
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
addClient(lock);
|
||||
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
|
||||
Lock lock(m_mutex);
|
||||
EventLoopRef ref(*this, &lock);
|
||||
m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
|
||||
m_post_fn = &fn;
|
||||
int post_fd{m_post_fd};
|
||||
Unlock(lock, [&] {
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1));
|
||||
});
|
||||
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
|
||||
removeClient(lock);
|
||||
m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
|
||||
}
|
||||
|
||||
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
|
||||
|
||||
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
|
||||
{
|
||||
m_num_clients -= 1;
|
||||
if (done(lock)) {
|
||||
m_cv.notify_all();
|
||||
int post_fd{m_post_fd};
|
||||
lock.unlock();
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
|
||||
void EventLoop::startAsyncThread()
|
||||
{
|
||||
if (m_async_thread.joinable()) {
|
||||
m_cv.notify_all();
|
||||
} else if (!m_async_fns.empty()) {
|
||||
m_async_thread = std::thread([this] {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (true) {
|
||||
Lock lock(m_mutex);
|
||||
while (!done()) {
|
||||
if (!m_async_fns.empty()) {
|
||||
addClient(lock);
|
||||
EventLoopRef ref{*this, &lock};
|
||||
const std::function<void()> fn = std::move(m_async_fns.front());
|
||||
m_async_fns.pop_front();
|
||||
Unlock(lock, fn);
|
||||
if (removeClient(lock)) break;
|
||||
// Important to explictly call ref.reset() here and
|
||||
// explicitly break if the EventLoop is done, not relying on
|
||||
// while condition above. Reason is that end of `ref`
|
||||
// lifetime can cause EventLoop::loop() to exit, and if
|
||||
// there is external code that immediately deletes the
|
||||
// EventLoop object as soon as EventLoop::loop() method
|
||||
// returns, checking the while condition may crash.
|
||||
if (ref.reset()) break;
|
||||
// Continue without waiting in case there are more async_fns
|
||||
continue;
|
||||
} else if (m_num_clients == 0) {
|
||||
break;
|
||||
}
|
||||
m_cv.wait(lock);
|
||||
m_cv.wait(lock.m_lock);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
bool EventLoop::done(std::unique_lock<std::mutex>& lock)
|
||||
bool EventLoop::done()
|
||||
{
|
||||
assert(m_num_clients >= 0);
|
||||
assert(lock.owns_lock());
|
||||
assert(lock.mutex() == &m_mutex);
|
||||
return m_num_clients == 0 && m_async_fns.empty();
|
||||
}
|
||||
|
||||
|
@ -375,7 +393,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
|
|||
const std::string from = context.getParams().getName();
|
||||
std::promise<ThreadContext*> thread_context;
|
||||
std::thread thread([&thread_context, from, this]() {
|
||||
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
|
||||
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
|
||||
g_thread_context.waiter = std::make_unique<Waiter>();
|
||||
thread_context.set_value(&g_thread_context);
|
||||
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
|
||||
|
|
|
@ -23,32 +23,82 @@
|
|||
namespace mp {
|
||||
namespace test {
|
||||
|
||||
/**
|
||||
* Test setup class creating a two way connection between a
|
||||
* ProxyServer<FooInterface> object and a ProxyClient<FooInterface>.
|
||||
*
|
||||
* Provides client_disconnect and server_disconnect lambdas that can be used to
|
||||
* trigger disconnects and test handling of broken and closed connections.
|
||||
*
|
||||
* Accepts a client_owns_connection option to test different ProxyClient
|
||||
* destroy_connection values and control whether destroying the ProxyClient
|
||||
* object destroys the client Connection object. Normally it makes sense for
|
||||
* this to be true to simplify shutdown and avoid needing to call
|
||||
* client_disconnect manually, but false allows testing more ProxyClient
|
||||
* behavior and the "IPC client method called after disconnect" code path.
|
||||
*/
|
||||
class TestSetup
|
||||
{
|
||||
public:
|
||||
std::thread thread;
|
||||
std::function<void()> server_disconnect;
|
||||
std::function<void()> client_disconnect;
|
||||
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
|
||||
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
|
||||
|
||||
TestSetup(bool client_owns_connection = true)
|
||||
: thread{[&] {
|
||||
EventLoop loop("mptest", [](bool raise, const std::string& log) {
|
||||
std::cout << "LOG" << raise << ": " << log << "\n";
|
||||
if (raise) throw std::runtime_error(log);
|
||||
});
|
||||
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
|
||||
|
||||
auto server_connection =
|
||||
std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) {
|
||||
auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
|
||||
std::make_shared<FooImplementation>(), connection);
|
||||
return capnp::Capability::Client(kj::mv(server_proxy));
|
||||
});
|
||||
server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
|
||||
// Set handler to destroy the server when the client disconnects. This
|
||||
// is ignored if server_disconnect() is called instead.
|
||||
server_connection->onDisconnect([&] { server_connection.reset(); });
|
||||
|
||||
auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
|
||||
auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
|
||||
client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
|
||||
client_connection.get(), /* destroy_connection= */ client_owns_connection);
|
||||
if (client_owns_connection) {
|
||||
client_connection.release();
|
||||
} else {
|
||||
client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
|
||||
}
|
||||
|
||||
client_promise.set_value(std::move(client_proxy));
|
||||
loop.loop();
|
||||
}}
|
||||
{
|
||||
client = client_promise.get_future().get();
|
||||
}
|
||||
|
||||
~TestSetup()
|
||||
{
|
||||
// Test that client cleanup_fns are executed.
|
||||
bool destroyed = false;
|
||||
client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; });
|
||||
client.reset();
|
||||
KJ_EXPECT(destroyed);
|
||||
|
||||
thread.join();
|
||||
}
|
||||
};
|
||||
|
||||
KJ_TEST("Call FooInterface methods")
|
||||
{
|
||||
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> foo_promise;
|
||||
std::function<void()> disconnect_client;
|
||||
std::thread thread([&]() {
|
||||
EventLoop loop("mptest", [](bool raise, const std::string& log) {
|
||||
std::cout << "LOG" << raise << ": " << log << "\n";
|
||||
});
|
||||
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
|
||||
TestSetup setup;
|
||||
ProxyClient<messages::FooInterface>* foo = setup.client.get();
|
||||
|
||||
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]));
|
||||
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>(
|
||||
connection_client->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
|
||||
connection_client.get(), /* destroy_connection= */ false);
|
||||
foo_promise.set_value(std::move(foo_client));
|
||||
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
|
||||
|
||||
auto connection_server = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]), [&](Connection& connection) {
|
||||
auto foo_server = kj::heap<ProxyServer<messages::FooInterface>>(std::make_shared<FooImplementation>(), connection);
|
||||
return capnp::Capability::Client(kj::mv(foo_server));
|
||||
});
|
||||
connection_server->onDisconnect([&] { connection_server.reset(); });
|
||||
loop.loop();
|
||||
});
|
||||
|
||||
auto foo = foo_promise.get_future().get();
|
||||
KJ_EXPECT(foo->add(1, 2) == 3);
|
||||
|
||||
FooStruct in;
|
||||
|
@ -127,14 +177,40 @@ KJ_TEST("Call FooInterface methods")
|
|||
mut.message = "init";
|
||||
foo->passMutable(mut);
|
||||
KJ_EXPECT(mut.message == "init build pass call return read");
|
||||
}
|
||||
|
||||
disconnect_client();
|
||||
thread.join();
|
||||
KJ_TEST("Call IPC method after client connection is closed")
|
||||
{
|
||||
TestSetup setup{/*client_owns_connection=*/false};
|
||||
ProxyClient<messages::FooInterface>* foo = setup.client.get();
|
||||
KJ_EXPECT(foo->add(1, 2) == 3);
|
||||
setup.client_disconnect();
|
||||
|
||||
bool destroyed = false;
|
||||
foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; });
|
||||
foo.reset();
|
||||
KJ_EXPECT(destroyed);
|
||||
bool disconnected{false};
|
||||
try {
|
||||
foo->add(1, 2);
|
||||
} catch (const std::runtime_error& e) {
|
||||
KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect.");
|
||||
disconnected = true;
|
||||
}
|
||||
KJ_EXPECT(disconnected);
|
||||
}
|
||||
|
||||
KJ_TEST("Calling IPC method after server connection is closed")
|
||||
{
|
||||
TestSetup setup;
|
||||
ProxyClient<messages::FooInterface>* foo = setup.client.get();
|
||||
KJ_EXPECT(foo->add(1, 2) == 3);
|
||||
setup.server_disconnect();
|
||||
|
||||
bool disconnected{false};
|
||||
try {
|
||||
foo->add(1, 2);
|
||||
} catch (const std::runtime_error& e) {
|
||||
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
|
||||
disconnected = true;
|
||||
}
|
||||
KJ_EXPECT(disconnected);
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
|
|
|
@ -58,6 +58,9 @@ public:
|
|||
//! clients and servers independently.
|
||||
virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) = 0;
|
||||
|
||||
//! Disconnect any incoming connections that are still connected.
|
||||
virtual void disconnectIncoming() = 0;
|
||||
|
||||
//! Add cleanup callback to interface that will run when the interface is
|
||||
//! deleted.
|
||||
virtual void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) = 0;
|
||||
|
|
|
@ -121,6 +121,13 @@ public:
|
|||
m_reachable.clear();
|
||||
}
|
||||
|
||||
void Reset() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_mutex);
|
||||
LOCK(m_mutex);
|
||||
m_reachable = DefaultNets();
|
||||
}
|
||||
|
||||
[[nodiscard]] bool Contains(Network net) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_mutex);
|
||||
|
@ -142,17 +149,21 @@ public:
|
|||
}
|
||||
|
||||
private:
|
||||
mutable Mutex m_mutex;
|
||||
|
||||
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){
|
||||
NET_UNROUTABLE,
|
||||
NET_IPV4,
|
||||
NET_IPV6,
|
||||
NET_ONION,
|
||||
NET_I2P,
|
||||
NET_CJDNS,
|
||||
NET_INTERNAL
|
||||
static std::unordered_set<Network> DefaultNets()
|
||||
{
|
||||
return {
|
||||
NET_UNROUTABLE,
|
||||
NET_IPV4,
|
||||
NET_IPV6,
|
||||
NET_ONION,
|
||||
NET_I2P,
|
||||
NET_CJDNS,
|
||||
NET_INTERNAL
|
||||
};
|
||||
};
|
||||
|
||||
mutable Mutex m_mutex;
|
||||
std::unordered_set<Network> m_reachable GUARDED_BY(m_mutex){DefaultNets()};
|
||||
};
|
||||
|
||||
extern ReachableNets g_reachable_nets;
|
||||
|
|
|
@ -323,6 +323,12 @@ void SetRPCWarmupStatus(const std::string& newStatus)
|
|||
rpcWarmupStatus = newStatus;
|
||||
}
|
||||
|
||||
void SetRPCWarmupStarting()
|
||||
{
|
||||
LOCK(g_rpc_warmup_mutex);
|
||||
fRPCInWarmup = true;
|
||||
}
|
||||
|
||||
void SetRPCWarmupFinished()
|
||||
{
|
||||
LOCK(g_rpc_warmup_mutex);
|
||||
|
|
|
@ -30,6 +30,7 @@ void RpcInterruptionPoint();
|
|||
*/
|
||||
void SetRPCWarmupStatus(const std::string& newStatus);
|
||||
/* Mark warmup as done. RPC calls will be processed from now on. */
|
||||
void SetRPCWarmupStarting();
|
||||
void SetRPCWarmupFinished();
|
||||
|
||||
/* returns the current warmup state. */
|
||||
|
|
|
@ -63,6 +63,7 @@ add_executable(test_bitcoin
|
|||
net_peer_eviction_tests.cpp
|
||||
net_tests.cpp
|
||||
netbase_tests.cpp
|
||||
node_init_tests.cpp
|
||||
node_warnings_tests.cpp
|
||||
orphanage_tests.cpp
|
||||
pcp_tests.cpp
|
||||
|
|
|
@ -53,9 +53,8 @@ static std::string TempPath(std::string_view pattern)
|
|||
//! on the object through FooInterface (defined in ipc_test.capnp).
|
||||
void IpcPipeTest()
|
||||
{
|
||||
// Setup: create FooImplemention object and listen for FooInterface requests
|
||||
// Setup: create FooImplementation object and listen for FooInterface requests
|
||||
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
|
||||
std::function<void()> disconnect_client;
|
||||
std::thread thread([&]() {
|
||||
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); });
|
||||
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
|
||||
|
@ -63,9 +62,8 @@ void IpcPipeTest()
|
|||
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
|
||||
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
|
||||
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
|
||||
connection_client.get(), /* destroy_connection= */ false);
|
||||
connection_client.release(), /* destroy_connection= */ true);
|
||||
foo_promise.set_value(std::move(foo_client));
|
||||
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
|
||||
|
||||
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
|
||||
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
|
||||
|
@ -125,8 +123,8 @@ void IpcPipeTest()
|
|||
auto script2{foo->passScript(script1)};
|
||||
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
|
||||
|
||||
// Test cleanup: disconnect pipe and join thread
|
||||
disconnect_client();
|
||||
// Test cleanup: disconnect and join thread
|
||||
foo.reset();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
|
|
|
@ -702,6 +702,7 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port)
|
|||
|
||||
BOOST_AUTO_TEST_CASE(LimitedAndReachable_Network)
|
||||
{
|
||||
g_reachable_nets.Reset();
|
||||
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV4));
|
||||
BOOST_CHECK(g_reachable_nets.Contains(NET_IPV6));
|
||||
BOOST_CHECK(g_reachable_nets.Contains(NET_ONION));
|
||||
|
|
49
src/test/node_init_tests.cpp
Normal file
49
src/test/node_init_tests.cpp
Normal file
|
@ -0,0 +1,49 @@
|
|||
// Copyright (c) 2025 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <init.h>
|
||||
#include <interfaces/init.h>
|
||||
#include <rpc/server.h>
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <test/util/setup_common.h>
|
||||
|
||||
using node::NodeContext;
|
||||
|
||||
BOOST_FIXTURE_TEST_SUITE(node_init_tests, BasicTestingSetup)
|
||||
|
||||
//! Custom implementation of interfaces::Init for testing.
|
||||
class TestInit : public interfaces::Init
|
||||
{
|
||||
public:
|
||||
TestInit(NodeContext& node) : m_node(node)
|
||||
{
|
||||
InitContext(m_node);
|
||||
m_node.init = this;
|
||||
}
|
||||
std::unique_ptr<interfaces::Chain> makeChain() override { return interfaces::MakeChain(m_node); }
|
||||
std::unique_ptr<interfaces::WalletLoader> makeWalletLoader(interfaces::Chain& chain) override
|
||||
{
|
||||
return MakeWalletLoader(chain, *Assert(m_node.args));
|
||||
}
|
||||
NodeContext& m_node;
|
||||
};
|
||||
|
||||
BOOST_AUTO_TEST_CASE(init_test)
|
||||
{
|
||||
// Reset logging, config file path, rpc state, reachable nets to avoid errors in AppInitMain
|
||||
LogInstance().DisconnectTestLogger();
|
||||
m_node.args->SetConfigFilePath({});
|
||||
SetRPCWarmupStarting();
|
||||
g_reachable_nets.Reset();
|
||||
|
||||
// Run through initialization and shutdown code.
|
||||
TestInit init{m_node};
|
||||
BOOST_CHECK(AppInitInterfaces(m_node));
|
||||
BOOST_CHECK(AppInitMain(m_node));
|
||||
Interrupt(m_node);
|
||||
Shutdown(m_node);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
|
@ -214,7 +214,10 @@ BasicTestingSetup::~BasicTestingSetup()
|
|||
} else {
|
||||
fs::remove_all(m_path_root);
|
||||
}
|
||||
// Clear all arguments except for -datadir, which GUI tests currently rely
|
||||
// on to be set even after the testing setup is destroyed.
|
||||
gArgs.ClearArgs();
|
||||
gArgs.ForceSetArg("-datadir", fs::PathToString(m_path_root));
|
||||
}
|
||||
|
||||
ChainTestingSetup::ChainTestingSetup(const ChainType chainType, TestOpts opts)
|
||||
|
|
|
@ -111,7 +111,7 @@ class PSBTTest(BitcoinTestFramework):
|
|||
# Mine a transaction that credits the offline address
|
||||
offline_addr = offline_node.getnewaddress(address_type="bech32m")
|
||||
online_addr = w2.getnewaddress(address_type="bech32m")
|
||||
wonline.importaddress(offline_addr, "", False)
|
||||
wonline.importaddress(offline_addr, label="", rescan=False)
|
||||
mining_wallet = mining_node.get_wallet_rpc(self.default_wallet_name)
|
||||
mining_wallet.sendtoaddress(address=offline_addr, amount=1.0)
|
||||
self.generate(mining_node, nblocks=1, sync_fun=lambda: self.sync_all([online_node, mining_node]))
|
||||
|
@ -312,9 +312,9 @@ class PSBTTest(BitcoinTestFramework):
|
|||
wmulti = self.nodes[2].get_wallet_rpc('wmulti')
|
||||
|
||||
# Create all the addresses
|
||||
p2sh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], "", "legacy")['address']
|
||||
p2wsh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], "", "bech32")['address']
|
||||
p2sh_p2wsh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], "", "p2sh-segwit")['address']
|
||||
p2sh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], label="", address_type="legacy")["address"]
|
||||
p2wsh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], label="", address_type="bech32")["address"]
|
||||
p2sh_p2wsh = wmulti.addmultisigaddress(2, [pubkey0, pubkey1, pubkey2], label="", address_type="p2sh-segwit")["address"]
|
||||
p2wpkh = self.nodes[1].getnewaddress("", "bech32")
|
||||
p2pkh = self.nodes[1].getnewaddress("", "legacy")
|
||||
p2sh_p2wpkh = self.nodes[1].getnewaddress("", "p2sh-segwit")
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2017-2022 The Bitcoin Core developers
|
||||
# Copyright (c) 2017-present The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Class for bitcoind node under test"""
|
||||
|
@ -209,7 +209,7 @@ class TestNode():
|
|||
def __getattr__(self, name):
|
||||
"""Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
|
||||
if self.use_cli:
|
||||
return getattr(RPCOverloadWrapper(self.cli, True), name)
|
||||
return getattr(RPCOverloadWrapper(self.cli), name)
|
||||
else:
|
||||
assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
|
||||
return getattr(RPCOverloadWrapper(self.rpc), name)
|
||||
|
@ -374,7 +374,7 @@ class TestNode():
|
|||
|
||||
def get_wallet_rpc(self, wallet_name):
|
||||
if self.use_cli:
|
||||
return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)), True)
|
||||
return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)))
|
||||
else:
|
||||
assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected")
|
||||
wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
|
||||
|
@ -925,17 +925,13 @@ class TestNodeCLI():
|
|||
return cli_stdout.rstrip("\n")
|
||||
|
||||
class RPCOverloadWrapper():
|
||||
def __init__(self, rpc, cli=False):
|
||||
def __init__(self, rpc):
|
||||
self.rpc = rpc
|
||||
self.is_cli = cli
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.rpc, name)
|
||||
|
||||
def createwallet_passthrough(self, *args, **kwargs):
|
||||
return self.__getattr__("createwallet")(*args, **kwargs)
|
||||
|
||||
def importprivkey(self, privkey, label=None, rescan=None):
|
||||
def importprivkey(self, privkey, *, label=None, rescan=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importprivkey')(privkey, label, rescan)
|
||||
|
@ -943,13 +939,13 @@ class RPCOverloadWrapper():
|
|||
req = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
'label': label if label else '',
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
|
||||
def addmultisigaddress(self, nrequired, keys, label=None, address_type=None):
|
||||
def addmultisigaddress(self, nrequired, keys, *, label=None, address_type=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('addmultisigaddress')(nrequired, keys, label, address_type)
|
||||
|
@ -957,14 +953,14 @@ class RPCOverloadWrapper():
|
|||
req = [{
|
||||
'desc': cms['descriptor'],
|
||||
'timestamp': 0,
|
||||
'label': label if label else ''
|
||||
'label': label if label else '',
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
return cms
|
||||
|
||||
def importpubkey(self, pubkey, label=None, rescan=None):
|
||||
def importpubkey(self, pubkey, *, label=None, rescan=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importpubkey')(pubkey, label, rescan)
|
||||
|
@ -972,13 +968,13 @@ class RPCOverloadWrapper():
|
|||
req = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
'label': label if label else '',
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
|
||||
def importaddress(self, address, label=None, rescan=None, p2sh=None):
|
||||
def importaddress(self, address, *, label=None, rescan=None, p2sh=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importaddress')(address, label, rescan, p2sh)
|
||||
|
@ -992,13 +988,13 @@ class RPCOverloadWrapper():
|
|||
reqs = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
'label': label if label else '',
|
||||
}]
|
||||
if is_hex and p2sh:
|
||||
reqs.append({
|
||||
'desc': descsum_create('p2sh(raw(' + address + '))'),
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
'label': label if label else '',
|
||||
})
|
||||
import_res = self.importdescriptors(reqs)
|
||||
for res in import_res:
|
||||
|
|
|
@ -344,7 +344,7 @@ class AddressTypeTest(BitcoinTestFramework):
|
|||
self.test_address(3, self.nodes[3].getrawchangeaddress(), multisig=False, typ='bech32')
|
||||
|
||||
self.log.info('test invalid address type arguments')
|
||||
assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].addmultisigaddress, 2, [compressed_1, compressed_2], None, '')
|
||||
assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].addmultisigaddress, 2, [compressed_1, compressed_2], address_type="")
|
||||
assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getnewaddress, None, '')
|
||||
assert_raises_rpc_error(-5, "Unknown address type ''", self.nodes[3].getrawchangeaddress, '')
|
||||
assert_raises_rpc_error(-5, "Unknown address type 'bech23'", self.nodes[3].getrawchangeaddress, 'bech23')
|
||||
|
|
|
@ -26,11 +26,12 @@ from test_framework.util import (
|
|||
assert_raises_rpc_error,
|
||||
)
|
||||
|
||||
LAST_KEYPOOL_INDEX = 9 # Index of the last derived address with the keypool size of 10
|
||||
|
||||
class BackwardsCompatibilityTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.setup_clean_chain = True
|
||||
self.num_nodes = 10
|
||||
self.num_nodes = 8
|
||||
# Add new version after each release:
|
||||
self.extra_args = [
|
||||
["-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # Pre-release: use to mine blocks. noban for immediate tx relay
|
||||
|
@ -38,11 +39,9 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
|
|||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v25.0
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v24.0.1
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v23.0
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v22.0
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1", f"-keypool={LAST_KEYPOOL_INDEX + 1}"], # v22.0
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v0.21.0
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v0.20.1
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=noban@127.0.0.1"], # v0.19.1
|
||||
["-nowallet", "-walletrbf=1", "-addresstype=bech32", "-whitelist=127.0.0.1"], # v0.18.1
|
||||
]
|
||||
self.wallet_names = [self.default_wallet_name]
|
||||
|
||||
|
@ -60,8 +59,6 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
|
|||
220000,
|
||||
210000,
|
||||
200100,
|
||||
190100,
|
||||
180100,
|
||||
])
|
||||
|
||||
self.start_nodes()
|
||||
|
@ -85,21 +82,98 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
|
|||
node_major, _, _ = self.split_version(node)
|
||||
return node_major >= major
|
||||
|
||||
def test_v22_inactivehdchain_path(self):
|
||||
self.log.info("Testing inactive hd chain bad derivation path cleanup")
|
||||
# 0.21.x and 22.x would both produce bad derivation paths when topping up an inactive hd chain
|
||||
# Make sure that this is being automatically cleaned up by migration
|
||||
node_master = self.nodes[1]
|
||||
node_v22 = self.nodes[self.num_nodes - 5]
|
||||
wallet_name = "bad_deriv_path"
|
||||
node_v22.createwallet(wallet_name=wallet_name, descriptors=False)
|
||||
bad_deriv_wallet = node_v22.get_wallet_rpc(wallet_name)
|
||||
|
||||
# Make a dump of the wallet to get an unused address
|
||||
dump_path = node_v22.wallets_path / f"{wallet_name}.dump"
|
||||
bad_deriv_wallet.dumpwallet(dump_path)
|
||||
addr = None
|
||||
seed = None
|
||||
with open(dump_path, encoding="utf8") as f:
|
||||
for line in f:
|
||||
if f"hdkeypath=m/0'/0'/{LAST_KEYPOOL_INDEX}'" in line:
|
||||
addr = line.split(" ")[4].split("=")[1]
|
||||
elif " hdseed=1 " in line:
|
||||
seed = line.split(" ")[0]
|
||||
assert addr is not None
|
||||
assert seed is not None
|
||||
# Rotate seed and unload
|
||||
bad_deriv_wallet.sethdseed()
|
||||
bad_deriv_wallet.unloadwallet()
|
||||
# Receive at addr to trigger inactive chain topup on next load
|
||||
self.nodes[0].sendtoaddress(addr, 1)
|
||||
self.generate(self.nodes[0], 1, sync_fun=self.no_op)
|
||||
self.sync_all(nodes=[self.nodes[0], node_master, node_v22])
|
||||
node_v22.loadwallet(wallet_name)
|
||||
|
||||
# Dump again to find bad hd keypath
|
||||
bad_deriv_path = f"m/0'/0'/{LAST_KEYPOOL_INDEX}'/0'/0'/{LAST_KEYPOOL_INDEX + 1}'"
|
||||
good_deriv_path = f"m/0h/0h/{LAST_KEYPOOL_INDEX + 1}h"
|
||||
os.unlink(dump_path)
|
||||
bad_deriv_wallet.dumpwallet(dump_path)
|
||||
bad_path_addr = None
|
||||
with open(dump_path, encoding="utf8") as f:
|
||||
for line in f:
|
||||
if f"hdkeypath={bad_deriv_path}" in line:
|
||||
bad_path_addr = line.split(" ")[4].split("=")[1]
|
||||
assert bad_path_addr is not None
|
||||
assert_equal(bad_deriv_wallet.getaddressinfo(bad_path_addr)["hdkeypath"], bad_deriv_path)
|
||||
|
||||
# Verify that this bad derivation path addr is actually at m/0'/0'/10' by making a new wallet with the same seed but larger keypool
|
||||
node_v22.createwallet(wallet_name="path_verify", descriptors=False, blank=True)
|
||||
verify_wallet = node_v22.get_wallet_rpc("path_verify")
|
||||
verify_wallet.sethdseed(True, seed)
|
||||
# Bad addr is after keypool, so need to generate it by refilling
|
||||
verify_wallet.keypoolrefill(LAST_KEYPOOL_INDEX + 2)
|
||||
assert_equal(verify_wallet.getaddressinfo(bad_path_addr)["hdkeypath"], good_deriv_path.replace("h", "'"))
|
||||
|
||||
# Migrate with master
|
||||
# Since all keymeta records are now deleted after migration, the derivation path
|
||||
# should now be correct as it is derived on-the-fly from the inactive hd chain's descriptor
|
||||
backup_path = node_v22.wallets_path / f"{wallet_name}.bak"
|
||||
bad_deriv_wallet.backupwallet(backup_path)
|
||||
wallet_dir_master = os.path.join(node_master.wallets_path, wallet_name)
|
||||
os.makedirs(wallet_dir_master, exist_ok=True)
|
||||
shutil.copy(backup_path, os.path.join(wallet_dir_master, "wallet.dat"))
|
||||
node_master.migratewallet(wallet_name)
|
||||
bad_deriv_wallet_master = node_master.get_wallet_rpc(wallet_name)
|
||||
assert_equal(bad_deriv_wallet_master.getaddressinfo(bad_path_addr)["hdkeypath"], good_deriv_path)
|
||||
bad_deriv_wallet_master.unloadwallet()
|
||||
|
||||
# If we have sqlite3, verify that there are no keymeta records
|
||||
try:
|
||||
import sqlite3
|
||||
wallet_db = node_master.wallets_path / wallet_name / "wallet.dat"
|
||||
conn = sqlite3.connect(wallet_db)
|
||||
with conn:
|
||||
# Retrieve all records that have the "keymeta" prefix. The remaining key data varies for each record.
|
||||
keymeta_rec = conn.execute("SELECT value FROM main where key >= x'076b65796d657461' AND key < x'076b65796d657462'").fetchone()
|
||||
assert_equal(keymeta_rec, None)
|
||||
conn.close()
|
||||
except ImportError:
|
||||
self.log.warning("sqlite3 module not available, skipping lack of keymeta records check")
|
||||
|
||||
def run_test(self):
|
||||
node_miner = self.nodes[0]
|
||||
node_master = self.nodes[1]
|
||||
node_v21 = self.nodes[self.num_nodes - 4]
|
||||
node_v18 = self.nodes[self.num_nodes - 1]
|
||||
node_v21 = self.nodes[self.num_nodes - 2]
|
||||
node_v20 = self.nodes[self.num_nodes - 1] # bdb only
|
||||
|
||||
legacy_nodes = self.nodes[2:] # Nodes that support legacy wallets
|
||||
legacy_only_nodes = self.nodes[-3:] # Nodes that only support legacy wallets
|
||||
descriptors_nodes = self.nodes[2:-3] # Nodes that support descriptor wallets
|
||||
descriptors_nodes = self.nodes[2:-1] # Nodes that support descriptor wallets
|
||||
|
||||
self.generatetoaddress(node_miner, COINBASE_MATURITY + 1, node_miner.getnewaddress())
|
||||
|
||||
# Sanity check the test framework:
|
||||
res = node_v18.getblockchaininfo()
|
||||
assert_equal(res['blocks'], COINBASE_MATURITY + 1)
|
||||
assert_equal(node_v20.getblockchaininfo()["blocks"], COINBASE_MATURITY + 1)
|
||||
|
||||
self.log.info("Test wallet backwards compatibility...")
|
||||
# Create a number of wallets and open them in older versions:
|
||||
|
@ -206,13 +280,11 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
|
|||
)
|
||||
|
||||
# Check that descriptor wallets don't work on legacy only nodes
|
||||
self.log.info("Test descriptor wallet incompatibility on:")
|
||||
for node in legacy_only_nodes:
|
||||
self.log.info(f"- {node.version}")
|
||||
# Descriptor wallets appear to be corrupted wallets to old software
|
||||
assert self.major_version_less_than(node, 21)
|
||||
for wallet_name in ["w1", "w2", "w3"]:
|
||||
assert_raises_rpc_error(-4, "Wallet file verification failed: wallet.dat corrupt, salvage failed", node.loadwallet, wallet_name)
|
||||
self.log.info("Test descriptor wallet incompatibility on v0.20")
|
||||
# Descriptor wallets appear to be corrupted wallets to old software
|
||||
assert self.major_version_equals(node_v20, 20)
|
||||
for wallet_name in ["w1", "w2", "w3"]:
|
||||
assert_raises_rpc_error(-4, "Wallet file verification failed: wallet.dat corrupt, salvage failed", node_v20.loadwallet, wallet_name)
|
||||
|
||||
# w1 cannot be opened by 0.21 since it contains a taproot descriptor
|
||||
self.log.info("Test that 0.21 cannot open wallet containing tr() descriptors")
|
||||
|
@ -308,5 +380,7 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
|
|||
# Legacy wallets are no longer supported. Trying to load these should result in an error
|
||||
assert_raises_rpc_error(-18, "The wallet appears to be a Legacy wallet, please use the wallet migration tool (migratewallet RPC)", node_master.restorewallet, wallet_name, backup_path)
|
||||
|
||||
self.test_v22_inactivehdchain_path()
|
||||
|
||||
if __name__ == '__main__':
|
||||
BackwardsCompatibilityTest(__file__).main()
|
||||
|
|
|
@ -192,7 +192,7 @@ class RawTransactionsTest(BitcoinTestFramework):
|
|||
watchonly_address = self.nodes[0].getnewaddress()
|
||||
watchonly_pubkey = self.nodes[0].getaddressinfo(watchonly_address)["pubkey"]
|
||||
self.watchonly_amount = Decimal(200)
|
||||
wwatch.importpubkey(watchonly_pubkey, "", True)
|
||||
wwatch.importpubkey(watchonly_pubkey, label="", rescan=True)
|
||||
self.watchonly_utxo = self.create_outpoints(self.nodes[0], outputs=[{watchonly_address: self.watchonly_amount}])[0]
|
||||
|
||||
# Lock UTXO so nodes[0] doesn't accidentally spend it
|
||||
|
|
|
@ -49,7 +49,7 @@ class WalletLabelsTest(BitcoinTestFramework):
|
|||
assert_equal(response[0]['error']['message'], "Invalid label name")
|
||||
|
||||
for rpc_call in rpc_calls:
|
||||
assert_raises_rpc_error(-11, "Invalid label name", *rpc_call, "*")
|
||||
assert_raises_rpc_error(-11, "Invalid label name", *rpc_call, label="*")
|
||||
|
||||
def run_test(self):
|
||||
# Check that there's no UTXO on the node
|
||||
|
|
|
@ -24,34 +24,7 @@ SHA256_SUMS = {
|
|||
"d86fc90824a85c38b25c8488115178d5785dbc975f5ff674f9f5716bc8ad6e65": {"tag": "v0.14.3", "tarball": "bitcoin-0.14.3-arm-linux-gnueabihf.tar.gz"},
|
||||
"1b0a7408c050e3d09a8be8e21e183ef7ee570385dc41216698cc3ab392a484e7": {"tag": "v0.14.3", "tarball": "bitcoin-0.14.3-osx64.tar.gz"},
|
||||
"706e0472dbc933ed2757650d54cbcd780fd3829ebf8f609b32780c7eedebdbc9": {"tag": "v0.14.3", "tarball": "bitcoin-0.14.3-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
"d40f18b4e43c6e6370ef7db9131f584fbb137276ec2e3dba67a4b267f81cb644": {"tag": "v0.15.2", "tarball": "bitcoin-0.15.2-aarch64-linux-gnu.tar.gz"},
|
||||
"54fb877a148a6ad189a1e1ab1ff8b11181e58ff2aaf430da55b3fd46ae549a6b": {"tag": "v0.15.2", "tarball": "bitcoin-0.15.2-arm-linux-gnueabihf.tar.gz"},
|
||||
"87e9340ff3d382d543b2b69112376077f0c8b4f7450d372e83b68f5a1e22b2df": {"tag": "v0.15.2", "tarball": "bitcoin-0.15.2-osx64.tar.gz"},
|
||||
"566be44190fd76daa01f13d428939dadfb8e3daacefc8fa17f433cad28f73bd5": {"tag": "v0.15.2", "tarball": "bitcoin-0.15.2-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
"0768c6c15caffbaca6524824c9563b42c24f70633c681c2744649158aa3fd484": {"tag": "v0.16.3", "tarball": "bitcoin-0.16.3-aarch64-linux-gnu.tar.gz"},
|
||||
"fb2818069854a6ad20ea03b28b55dbd35d8b1f7d453e90b83eace5d0098a2a87": {"tag": "v0.16.3", "tarball": "bitcoin-0.16.3-arm-linux-gnueabihf.tar.gz"},
|
||||
"78c3bff3b619a19aed575961ea43cc9e142959218835cf51aede7f0b764fc25d": {"tag": "v0.16.3", "tarball": "bitcoin-0.16.3-osx64.tar.gz"},
|
||||
"5d422a9d544742bc0df12427383f9c2517433ce7b58cf672b9a9b17c2ef51e4f": {"tag": "v0.16.3", "tarball": "bitcoin-0.16.3-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
"5a6b35d1a348a402f2d2d6ab5aed653a1a1f13bc63aaaf51605e3501b0733b7a": {"tag": "v0.17.2", "tarball": "bitcoin-0.17.2-aarch64-linux-gnu.tar.gz"},
|
||||
"d1913a5d19c8e8da4a67d1bd5205d03c8614dfd2e02bba2fe3087476643a729e": {"tag": "v0.17.2", "tarball": "bitcoin-0.17.2-arm-linux-gnueabihf.tar.gz"},
|
||||
"a783ba20706dbfd5b47fbedf42165fce70fbbc7d78003305d964f6b3da14887f": {"tag": "v0.17.2", "tarball": "bitcoin-0.17.2-osx64.tar.gz"},
|
||||
"943f9362b9f11130177839116f48f809d83478b4c28591d486ee9a7e35179da6": {"tag": "v0.17.2", "tarball": "bitcoin-0.17.2-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
"88f343af72803b851c7da13874cc5525026b0b55e63e1b5e1298390c4688adc6": {"tag": "v0.18.1", "tarball": "bitcoin-0.18.1-aarch64-linux-gnu.tar.gz"},
|
||||
"cc7d483e4b20c5dabd4dcaf304965214cf4934bcc029ca99cbc9af00d3771a1f": {"tag": "v0.18.1", "tarball": "bitcoin-0.18.1-arm-linux-gnueabihf.tar.gz"},
|
||||
"b7bbcee7a7540f711b171d6981f939ca8482005fde22689bc016596d80548bb1": {"tag": "v0.18.1", "tarball": "bitcoin-0.18.1-osx64.tar.gz"},
|
||||
"425ee5ec631ae8da71ebc1c3f5c0269c627cf459379b9b030f047107a28e3ef8": {"tag": "v0.18.1", "tarball": "bitcoin-0.18.1-riscv64-linux-gnu.tar.gz"},
|
||||
"600d1db5e751fa85903e935a01a74f5cc57e1e7473c15fd3e17ed21e202cfe5a": {"tag": "v0.18.1", "tarball": "bitcoin-0.18.1-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
"3a80431717842672df682bdb619e66523b59541483297772a7969413be3502ff": {"tag": "v0.19.1", "tarball": "bitcoin-0.19.1-aarch64-linux-gnu.tar.gz"},
|
||||
"657f28213823d240dd3324d14829702f9ad6f0710f8bdd1c379cb3c447197f48": {"tag": "v0.19.1", "tarball": "bitcoin-0.19.1-arm-linux-gnueabihf.tar.gz"},
|
||||
"1ae1b87de26487075cd2fd22e0d4ead87d969bd55c44f2f1d873ecdc6147ebb3": {"tag": "v0.19.1", "tarball": "bitcoin-0.19.1-osx64.tar.gz"},
|
||||
"aa7a9563b48aa79252c8e7b6a41c07a5441bd9f14c5e4562cc72720ea6cb0ee5": {"tag": "v0.19.1", "tarball": "bitcoin-0.19.1-riscv64-linux-gnu.tar.gz"},
|
||||
"5fcac9416e486d4960e1a946145566350ca670f9aaba99de6542080851122e4c": {"tag": "v0.19.1", "tarball": "bitcoin-0.19.1-x86_64-linux-gnu.tar.gz"},
|
||||
#
|
||||
|
||||
"60c93e3462c303eb080be7cf623f1a7684b37fd47a018ad3848bc23e13c84e1c": {"tag": "v0.20.1", "tarball": "bitcoin-0.20.1-aarch64-linux-gnu.tar.gz"},
|
||||
"55b577e0fb306fb429d4be6c9316607753e8543e5946b542d75d876a2f08654c": {"tag": "v0.20.1", "tarball": "bitcoin-0.20.1-arm-linux-gnueabihf.tar.gz"},
|
||||
"b9024dde373ea7dad707363e07ec7e265383204127539ae0c234bff3a61da0d1": {"tag": "v0.20.1", "tarball": "bitcoin-0.20.1-osx64.tar.gz"},
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright 2014 BitPay Inc.
|
||||
# Copyright 2016-2017 The Bitcoin Core developers
|
||||
# Copyright 2016-present The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test framework for bitcoin utils.
|
||||
|
@ -155,15 +155,16 @@ def bctest(testDir, testObj, buildenv):
|
|||
|
||||
if "error_txt" in testObj:
|
||||
want_error = testObj["error_txt"]
|
||||
# Compare error text
|
||||
# TODO: ideally, we'd compare the strings exactly and also assert
|
||||
# That stderr is empty if no errors are expected. However, bitcoin-tx
|
||||
# emits DISPLAY errors when running as a windows application on
|
||||
# linux through wine. Just assert that the expected error text appears
|
||||
# somewhere in stderr.
|
||||
# A partial match instead of an exact match makes writing tests easier
|
||||
# and should be sufficient.
|
||||
if want_error not in res.stderr:
|
||||
logging.error(f"Error mismatch:\nExpected: {want_error}\nReceived: {res.stderr.rstrip()}\nres: {str(res)}")
|
||||
raise Exception
|
||||
else:
|
||||
if res.stderr:
|
||||
logging.error(f"Unexpected error received: {res.stderr.rstrip()}\nres: {str(res)}")
|
||||
raise Exception
|
||||
|
||||
|
||||
def parse_output(a, fmt):
|
||||
"""Parse the output according to specified format.
|
||||
|
|
Loading…
Add table
Reference in a new issue