multiprocess: Add IPC connectAddress and listenAddress methods

Allow listening on and connecting to unix sockets.
This commit is contained in:
Russell Yanofsky 2018-08-23 13:42:31 -04:00
parent 4da20434d4
commit 955d4077aa
6 changed files with 181 additions and 1 deletions

View file

@ -41,6 +41,11 @@ class Init;
//! to make other proxy objects calling other remote interfaces. It can also
//! destroy the initial interfaces::Init object to close the connection and
//! shut down the spawned process.
//!
//! When connecting to an existing process, the steps are similar to spawning a
//! new process, except a socket is created instead of a socketpair, and
//! destroying an Init interface doesn't end the process, since there can be
//! multiple connections.
class Ipc
{
public:
@ -54,6 +59,17 @@ public:
//! true. If this is not a spawned child process, return false.
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. connectAddress returns an interface pointer if
//! the connection was established, returns null if address is empty ("") or
//! disabled ("0") or if a connection was refused but not required ("auto"),
//! and throws an exception if there was an unexpected error.
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
//! Connect to a socket address and make a client interface proxy object
//! using provided callback. Throws an exception if there was an error.
virtual void listenAddress(std::string& address) = 0;
//! Add cleanup callback to remote interface that will run when the
//! interface is deleted.
template<typename Interface>

View file

@ -23,6 +23,8 @@
#include <mutex>
#include <optional>
#include <string>
#include <sys/socket.h>
#include <system_error>
#include <thread>
namespace ipc {
@ -51,6 +53,14 @@ public:
startLoop(exe_name);
return mp::ConnectStream<messages::Init>(*m_loop, fd);
}
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override
{
startLoop(exe_name);
if (::listen(listen_fd, /*backlog=*/5) != 0) {
throw std::system_error(errno, std::system_category());
}
mp::ListenConnections<messages::Init>(*m_loop, listen_fd, init);
}
void serve(int fd, const char* exe_name, interfaces::Init& init) override
{
assert(!m_loop);

View file

@ -2,6 +2,7 @@
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <common/args.h>
#include <common/system.h>
#include <interfaces/init.h>
#include <interfaces/ipc.h>
@ -56,6 +57,35 @@ public:
exit_status = EXIT_SUCCESS;
return true;
}
std::unique_ptr<interfaces::Init> connectAddress(std::string& address) override
{
if (address.empty() || address == "0") return nullptr;
int fd;
if (address == "auto") {
// Treat "auto" the same as "unix" except don't treat it an as error
// if the connection is not accepted. Just return null so the caller
// can work offline without a connection, or spawn a new
// bitcoin-node process and connect to it.
address = "unix";
try {
fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address);
} catch (const std::system_error& e) {
// If connection type is auto and socket path isn't accepting connections, or doesn't exist, catch the error and return null;
if (e.code() == std::errc::connection_refused || e.code() == std::errc::no_such_file_or_directory) {
return nullptr;
}
throw;
}
} else {
fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address);
}
return m_protocol->connect(fd, m_exe_name);
}
void listenAddress(std::string& address) override
{
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
m_protocol->listen(fd, m_exe_name, m_init);
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
m_protocol->addCleanup(type, iface, std::move(cleanup));

View file

@ -4,22 +4,28 @@
#include <ipc/process.h>
#include <ipc/protocol.h>
#include <logging.h>
#include <mp/util.h>
#include <tinyformat.h>
#include <util/fs.h>
#include <util/strencodings.h>
#include <util/syserror.h>
#include <cstdint>
#include <cstdlib>
#include <errno.h>
#include <exception>
#include <iostream>
#include <stdexcept>
#include <string.h>
#include <system_error>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <utility>
#include <vector>
using util::RemovePrefixView;
namespace ipc {
namespace {
class ProcessImpl : public Process
@ -54,7 +60,95 @@ public:
}
return true;
}
int connect(const fs::path& data_dir,
const std::string& dest_exe_name,
std::string& address) override;
int bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) override;
};
static bool ParseAddress(std::string& address,
const fs::path& data_dir,
const std::string& dest_exe_name,
struct sockaddr_un& addr,
std::string& error)
{
if (address.compare(0, 4, "unix") == 0 && (address.size() == 4 || address[4] == ':')) {
fs::path path;
if (address.size() <= 5) {
path = data_dir / fs::PathFromString(strprintf("%s.sock", RemovePrefixView(dest_exe_name, "bitcoin-")));
} else {
path = data_dir / fs::PathFromString(address.substr(5));
}
std::string path_str = fs::PathToString(path);
address = strprintf("unix:%s", path_str);
if (path_str.size() >= sizeof(addr.sun_path)) {
error = strprintf("Unix address path %s exceeded maximum socket path length", fs::quoted(fs::PathToString(path)));
return false;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path_str.c_str(), sizeof(addr.sun_path)-1);
return true;
}
error = strprintf("Unrecognized address '%s'", address);
return false;
}
int ProcessImpl::connect(const fs::path& data_dir,
const std::string& dest_exe_name,
std::string& address)
{
struct sockaddr_un addr;
std::string error;
if (!ParseAddress(address, data_dir, dest_exe_name, addr, error)) {
throw std::invalid_argument(error);
}
int fd;
if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) {
throw std::system_error(errno, std::system_category());
}
if (::connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
return fd;
}
int connect_error = errno;
if (::close(fd) != 0) {
LogPrintf("Error closing file descriptor %i '%s': %s\n", fd, address, SysErrorString(errno));
}
throw std::system_error(connect_error, std::system_category());
}
int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std::string& address)
{
struct sockaddr_un addr;
std::string error;
if (!ParseAddress(address, data_dir, exe_name, addr, error)) {
throw std::invalid_argument(error);
}
if (addr.sun_family == AF_UNIX) {
fs::path path = addr.sun_path;
if (path.has_parent_path()) fs::create_directories(path.parent_path());
if (fs::symlink_status(path).type() == fs::file_type::socket) {
fs::remove(path);
}
}
int fd;
if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) {
throw std::system_error(errno, std::system_category());
}
if (::bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
return fd;
}
int bind_error = errno;
if (::close(fd) != 0) {
LogPrintf("Error closing file descriptor %i: %s\n", fd, SysErrorString(errno));
}
throw std::system_error(bind_error, std::system_category());
}
} // namespace
std::unique_ptr<Process> MakeProcess() { return std::make_unique<ProcessImpl>(); }

View file

@ -34,6 +34,16 @@ public:
//! process. If so, return true and a file descriptor for communicating
//! with the parent process.
virtual bool checkSpawned(int argc, char* argv[], int& fd) = 0;
//! Canonicalize and connect to address, returning socket descriptor.
virtual int connect(const fs::path& data_dir,
const std::string& dest_exe_name,
std::string& address) = 0;
//! Create listening socket, bind and canonicalize address, and return socket descriptor.
virtual int bind(const fs::path& data_dir,
const std::string& exe_name,
std::string& address) = 0;
};
//! Constructor for Process interface. Implementation will vary depending on

View file

@ -25,11 +25,31 @@ public:
//! Return Init interface that forwards requests over given socket descriptor.
//! Socket communication is handled on a background thread.
//!
//! @note It could be potentially useful in the future to add
//! std::function<void()> on_disconnect callback argument here. But there
//! isn't an immediate need, because the protocol implementation can clean
//! up its own state (calling ProxyServer destructors, etc) on disconnect,
//! and any client calls will just throw ipc::Exception errors after a
//! disconnect.
virtual std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) = 0;
//! Listen for connections on provided socket descriptor, accept them, and
//! handle requests on accepted connections. This method doesn't block, and
//! performs I/O on a background thread.
virtual void listen(int listen_fd, const char* exe_name, interfaces::Init& init) = 0;
//! Handle requests on provided socket descriptor, forwarding them to the
//! provided Init interface. Socket communication is handled on the
//! current thread, and this call blocks until the socket is closed.
//!
//! @note: If this method is called, it needs be called before connect() or
//! listen() methods, because for ease of implementation it's inflexible and
//! always runs the event loop in the foreground thread. It can share its
//! event loop with the other methods but can't share an event loop that was
//! created by them. This isn't really a problem because serve() is only
//! called by spawned child processes that call it immediately to
//! communicate back with parent processes.
virtual void serve(int fd, const char* exe_name, interfaces::Init& init) = 0;
//! Add cleanup callback to interface that will run when the interface is