Merge #13686: ZMQ: Small cleanups in the ZMQ code

6fe2ef2acb scripted-diff: Rename SendMessage to SendZmqMessage. (Daniel Kraft)
a3ffb6ebeb Replace zmqconfig.h by a simple zmqutil. (Daniel Kraft)
7f2ad1b9ac Use std::unique_ptr for CZMQNotifierFactory. (Daniel Kraft)
b93b9d5456 Simplify and fix notifier removal on error. (Daniel Kraft)
e15b1cfc31 Various cleanups in zmqnotificationinterface. (Daniel Kraft)

Pull request description:

  This contains various small code cleanups that make the ZMQ code easier to read and maintain (at least in my opinion).  The only functional change is that a potential memory leak is fixed that would have occured when a notifier is removed from the `notifiers` list after its callback function returned `false` (which is likely not relevant in practice but still a bug).

ACKs for top commit:
  instagibbs:
    utACK 6fe2ef2acb
  hebasto:
    re-ACK 6fe2ef2acb, only the latest commit got a scripted-diff since my [previous](https://github.com/bitcoin/bitcoin/pull/13686#pullrequestreview-487649808) review.

Tree-SHA512: 8206f8713bf3698d7cd4cb235f6657dc1c4dd920f50a8c5f371a559dd17ce5ab6d94d6281165eef860a22fc844a6bb25489ada12c83ebc780efd7ccdc0860f70
This commit is contained in:
fanquake 2020-09-19 16:51:35 +08:00
commit 831b0ecea9
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
10 changed files with 104 additions and 110 deletions

View file

@ -263,10 +263,10 @@ BITCOIN_CORE_H = \
walletinitinterface.h \ walletinitinterface.h \
warnings.h \ warnings.h \
zmq/zmqabstractnotifier.h \ zmq/zmqabstractnotifier.h \
zmq/zmqconfig.h\
zmq/zmqnotificationinterface.h \ zmq/zmqnotificationinterface.h \
zmq/zmqpublishnotifier.h \ zmq/zmqpublishnotifier.h \
zmq/zmqrpc.h zmq/zmqrpc.h \
zmq/zmqutil.h
obj/build.h: FORCE obj/build.h: FORCE
@ -345,7 +345,8 @@ libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \ zmq/zmqabstractnotifier.cpp \
zmq/zmqnotificationinterface.cpp \ zmq/zmqnotificationinterface.cpp \
zmq/zmqpublishnotifier.cpp \ zmq/zmqpublishnotifier.cpp \
zmq/zmqrpc.cpp zmq/zmqrpc.cpp \
zmq/zmqutil.cpp
endif endif

View file

@ -4,6 +4,8 @@
#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqabstractnotifier.h>
#include <cassert>
const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier() CZMQAbstractNotifier::~CZMQAbstractNotifier()

View file

@ -5,12 +5,16 @@
#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#include <zmq/zmqconfig.h> #include <util/memory.h>
#include <memory>
#include <string>
class CBlockIndex; class CBlockIndex;
class CTransaction;
class CZMQAbstractNotifier; class CZMQAbstractNotifier;
typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); using CZMQNotifierFactory = std::unique_ptr<CZMQAbstractNotifier> (*)();
class CZMQAbstractNotifier class CZMQAbstractNotifier
{ {
@ -21,9 +25,9 @@ public:
virtual ~CZMQAbstractNotifier(); virtual ~CZMQAbstractNotifier();
template <typename T> template <typename T>
static CZMQAbstractNotifier* Create() static std::unique_ptr<CZMQAbstractNotifier> Create()
{ {
return new T(); return MakeUnique<T>();
} }
std::string GetType() const { return type; } std::string GetType() const { return type; }

View file

@ -1,22 +0,0 @@
// Copyright (c) 2014-2019 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
#define BITCOIN_ZMQ_ZMQCONFIG_H
#if defined(HAVE_CONFIG_H)
#include <config/bitcoin-config.h>
#endif
#include <stdarg.h>
#if ENABLE_ZMQ
#include <zmq.h>
#endif
#include <primitives/transaction.h>
void zmqError(const char *str);
#endif // BITCOIN_ZMQ_ZMQCONFIG_H

View file

@ -4,15 +4,13 @@
#include <zmq/zmqnotificationinterface.h> #include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqpublishnotifier.h> #include <zmq/zmqpublishnotifier.h>
#include <zmq/zmqutil.h>
#include <zmq.h>
#include <validation.h> #include <validation.h>
#include <util/system.h> #include <util/system.h>
void zmqError(const char *str)
{
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
{ {
} }
@ -20,61 +18,52 @@ CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
CZMQNotificationInterface::~CZMQNotificationInterface() CZMQNotificationInterface::~CZMQNotificationInterface()
{ {
Shutdown(); Shutdown();
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
} }
std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
{ {
std::list<const CZMQAbstractNotifier*> result; std::list<const CZMQAbstractNotifier*> result;
for (const auto* n : notifiers) { for (const auto& n : notifiers) {
result.push_back(n); result.push_back(n.get());
} }
return result; return result;
} }
CZMQNotificationInterface* CZMQNotificationInterface::Create() CZMQNotificationInterface* CZMQNotificationInterface::Create()
{ {
CZMQNotificationInterface* notificationInterface = nullptr;
std::map<std::string, CZMQNotifierFactory> factories; std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories) for (const auto& entry : factories)
{ {
std::string arg("-zmq" + entry.first); std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg)) if (gArgs.IsArgSet(arg))
{ {
CZMQNotifierFactory factory = entry.second; const auto& factory = entry.second;
std::string address = gArgs.GetArg(arg, ""); const std::string address = gArgs.GetArg(arg, "");
CZMQAbstractNotifier *notifier = factory(); std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first); notifier->SetType(entry.first);
notifier->SetAddress(address); notifier->SetAddress(address);
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier); notifiers.push_back(std::move(notifier));
} }
} }
if (!notifiers.empty()) if (!notifiers.empty())
{ {
notificationInterface = new CZMQNotificationInterface(); std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
notificationInterface->notifiers = notifiers; notificationInterface->notifiers = std::move(notifiers);
if (!notificationInterface->Initialize()) if (notificationInterface->Initialize()) {
{ return notificationInterface.release();
delete notificationInterface;
notificationInterface = nullptr;
} }
} }
return notificationInterface; return nullptr;
} }
// Called at startup to conditionally set up ZMQ socket(s) // Called at startup to conditionally set up ZMQ socket(s)
@ -95,26 +84,15 @@ bool CZMQNotificationInterface::Initialize()
return false; return false;
} }
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); for (auto& notifier : notifiers) {
for (; i!=notifiers.end(); ++i) if (notifier->Initialize(pcontext)) {
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
} } else {
else
{
LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break; return false;
} }
} }
if (i!=notifiers.end())
{
return false;
}
return true; return true;
} }
@ -124,9 +102,7 @@ void CZMQNotificationInterface::Shutdown()
LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
if (pcontext) if (pcontext)
{ {
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) for (auto& notifier : notifiers) {
{
CZMQAbstractNotifier *notifier = *i;
LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown(); notifier->Shutdown();
} }
@ -136,24 +112,32 @@ void CZMQNotificationInterface::Shutdown()
} }
} }
namespace {
template <typename Function>
void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
{
for (auto i = notifiers.begin(); i != notifiers.end(); ) {
CZMQAbstractNotifier* notifier = i->get();
if (func(notifier)) {
++i;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
} // anonymous namespace
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
{ {
if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
return; return;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
{ return notifier->NotifyBlock(pindexNew);
CZMQAbstractNotifier *notifier = *i; });
if (notifier->NotifyBlock(pindexNew))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
} }
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
@ -162,19 +146,9 @@ void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
// all the same external callback. // all the same external callback.
const CTransaction& tx = *ptx; const CTransaction& tx = *ptx;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
{ return notifier->NotifyTransaction(tx);
CZMQAbstractNotifier *notifier = *i; });
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
} }
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)

View file

@ -7,6 +7,7 @@
#include <validationinterface.h> #include <validationinterface.h>
#include <list> #include <list>
#include <memory>
class CBlockIndex; class CBlockIndex;
class CZMQAbstractNotifier; class CZMQAbstractNotifier;
@ -34,7 +35,7 @@ private:
CZMQNotificationInterface(); CZMQNotificationInterface();
void *pcontext; void *pcontext;
std::list<CZMQAbstractNotifier*> notifiers; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
}; };
extern CZMQNotificationInterface* g_zmq_notification_interface; extern CZMQNotificationInterface* g_zmq_notification_interface;

View file

@ -2,13 +2,23 @@
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <zmq/zmqpublishnotifier.h>
#include <chain.h> #include <chain.h>
#include <chainparams.h> #include <chainparams.h>
#include <streams.h>
#include <zmq/zmqpublishnotifier.h>
#include <validation.h>
#include <util/system.h>
#include <rpc/server.h> #include <rpc/server.h>
#include <streams.h>
#include <util/system.h>
#include <validation.h>
#include <zmq/zmqutil.h>
#include <zmq.h>
#include <cstdarg>
#include <cstddef>
#include <map>
#include <string>
#include <utility>
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
@ -149,7 +159,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = nullptr; psocket = nullptr;
} }
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
{ {
assert(psocket); assert(psocket);
@ -173,7 +183,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHBLOCK, data, 32); return SendZmqMessage(MSG_HASHBLOCK, data, 32);
} }
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -183,7 +193,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHTX, data, 32); return SendZmqMessage(MSG_HASHTX, data, 32);
} }
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
@ -204,7 +214,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block; ss << block;
} }
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
} }
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -213,5 +223,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex()); LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << transaction; ss << transaction;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
} }

View file

@ -22,7 +22,7 @@ public:
* data * data
* message sequence number * message sequence number
*/ */
bool SendMessage(const char *command, const void* data, size_t size); bool SendZmqMessage(const char *command, const void* data, size_t size);
bool Initialize(void *pcontext) override; bool Initialize(void *pcontext) override;
void Shutdown() override; void Shutdown() override;

14
src/zmq/zmqutil.cpp Normal file
View file

@ -0,0 +1,14 @@
// Copyright (c) 2014-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <zmq/zmqutil.h>
#include <logging.h>
#include <zmq.h>
void zmqError(const char* str)
{
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
}

10
src/zmq/zmqutil.h Normal file
View file

@ -0,0 +1,10 @@
// Copyright (c) 2014-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQUTIL_H
#define BITCOIN_ZMQ_ZMQUTIL_H
void zmqError(const char* str);
#endif // BITCOIN_ZMQ_ZMQUTIL_H