walletdb: Handle cursor internally

Instead of returning a Dbc (BDB cursor object) and having the caller
deal with the cursor, make BerkeleyBatch handle the cursor internally.

This prepares BerkeleyBatch to work with other database systems as Dbc
objects are BDB specific.
This commit is contained in:
Andrew Chow 2020-05-14 21:17:01 -04:00
parent 39bd9ddb87
commit ca24edfbc1
3 changed files with 50 additions and 35 deletions

View file

@ -335,7 +335,7 @@ void BerkeleyEnvironment::CheckpointLSN(const std::string& strFile)
}
BerkeleyBatch::BerkeleyBatch(BerkeleyDatabase& database, const char* pszMode, bool fFlushOnCloseIn) : pdb(nullptr), activeTxn(nullptr)
BerkeleyBatch::BerkeleyBatch(BerkeleyDatabase& database, const char* pszMode, bool fFlushOnCloseIn) : pdb(nullptr), activeTxn(nullptr), m_cursor(nullptr)
{
fReadOnly = (!strchr(pszMode, '+') && !strchr(pszMode, 'w'));
fFlushOnClose = fFlushOnCloseIn;
@ -442,6 +442,7 @@ void BerkeleyBatch::Close()
activeTxn->abort();
activeTxn = nullptr;
pdb = nullptr;
CloseCursor();
if (fFlushOnClose)
Flush();
@ -528,17 +529,15 @@ bool BerkeleyBatch::Rewrite(BerkeleyDatabase& database, const char* pszSkip)
fSuccess = false;
}
Dbc* pcursor = db.GetCursor();
if (pcursor)
if (db.StartCursor()) {
while (fSuccess) {
CDataStream ssKey(SER_DISK, CLIENT_VERSION);
CDataStream ssValue(SER_DISK, CLIENT_VERSION);
int ret1 = db.ReadAtCursor(pcursor, ssKey, ssValue);
if (ret1 == DB_NOTFOUND) {
pcursor->close();
bool complete;
bool ret1 = db.ReadAtCursor(ssKey, ssValue, complete);
if (complete) {
break;
} else if (ret1 != 0) {
pcursor->close();
} else if (!ret1) {
fSuccess = false;
break;
}
@ -556,6 +555,8 @@ bool BerkeleyBatch::Rewrite(BerkeleyDatabase& database, const char* pszSkip)
if (ret2 > 0)
fSuccess = false;
}
db.CloseCursor();
}
if (fSuccess) {
db.Close();
env->CloseDb(strFile);
@ -738,27 +739,30 @@ void BerkeleyDatabase::ReloadDbEnv()
}
}
Dbc* BerkeleyBatch::GetCursor()
bool BerkeleyBatch::StartCursor()
{
assert(!m_cursor);
if (!pdb)
return nullptr;
Dbc* pcursor = nullptr;
int ret = pdb->cursor(nullptr, &pcursor, 0);
if (ret != 0)
return nullptr;
return pcursor;
return false;
int ret = pdb->cursor(nullptr, &m_cursor, 0);
return ret == 0;
}
int BerkeleyBatch::ReadAtCursor(Dbc* pcursor, CDataStream& ssKey, CDataStream& ssValue)
bool BerkeleyBatch::ReadAtCursor(CDataStream& ssKey, CDataStream& ssValue, bool& complete)
{
complete = false;
if (m_cursor == nullptr) return false;
// Read at cursor
SafeDbt datKey;
SafeDbt datValue;
int ret = pcursor->get(datKey, datValue, DB_NEXT);
int ret = m_cursor->get(datKey, datValue, DB_NEXT);
if (ret == DB_NOTFOUND) {
complete = true;
}
if (ret != 0)
return ret;
return false;
else if (datKey.get_data() == nullptr || datValue.get_data() == nullptr)
return 99999;
return false;
// Convert to streams
ssKey.SetType(SER_DISK);
@ -767,7 +771,14 @@ int BerkeleyBatch::ReadAtCursor(Dbc* pcursor, CDataStream& ssKey, CDataStream& s
ssValue.SetType(SER_DISK);
ssValue.clear();
ssValue.write((char*)datValue.get_data(), datValue.get_size());
return 0;
return true;
}
void BerkeleyBatch::CloseCursor()
{
if (!m_cursor) return;
m_cursor->close();
m_cursor = nullptr;
}
bool BerkeleyBatch::TxnBegin()

View file

@ -211,6 +211,7 @@ protected:
Db* pdb;
std::string strFile;
DbTxn* activeTxn;
Dbc* m_cursor;
bool fReadOnly;
bool fFlushOnClose;
BerkeleyEnvironment *env;
@ -323,8 +324,9 @@ public:
return (ret == 0);
}
Dbc* GetCursor();
int ReadAtCursor(Dbc* pcursor, CDataStream& ssKey, CDataStream& ssValue);
bool StartCursor();
bool ReadAtCursor(CDataStream& ssKey, CDataStream& ssValue, bool& complete);
void CloseCursor();
bool TxnBegin();
bool TxnCommit();
bool TxnAbort();

View file

@ -699,8 +699,7 @@ DBErrors WalletBatch::LoadWallet(CWallet* pwallet)
}
// Get cursor
Dbc* pcursor = m_batch.GetCursor();
if (!pcursor)
if (!m_batch.StartCursor())
{
pwallet->WalletLogPrintf("Error getting wallet database cursor\n");
return DBErrors::CORRUPT;
@ -711,11 +710,14 @@ DBErrors WalletBatch::LoadWallet(CWallet* pwallet)
// Read next record
CDataStream ssKey(SER_DISK, CLIENT_VERSION);
CDataStream ssValue(SER_DISK, CLIENT_VERSION);
int ret = m_batch.ReadAtCursor(pcursor, ssKey, ssValue);
if (ret == DB_NOTFOUND)
bool complete;
bool ret = m_batch.ReadAtCursor(ssKey, ssValue, complete);
if (complete) {
break;
else if (ret != 0)
}
else if (!ret)
{
m_batch.CloseCursor();
pwallet->WalletLogPrintf("Error reading next record from wallet database\n");
return DBErrors::CORRUPT;
}
@ -742,10 +744,10 @@ DBErrors WalletBatch::LoadWallet(CWallet* pwallet)
if (!strErr.empty())
pwallet->WalletLogPrintf("%s\n", strErr);
}
pcursor->close();
} catch (...) {
result = DBErrors::CORRUPT;
}
m_batch.CloseCursor();
// Set the active ScriptPubKeyMans
for (auto spk_man_pair : wss.m_active_external_spks) {
@ -849,8 +851,7 @@ DBErrors WalletBatch::FindWalletTx(std::vector<uint256>& vTxHash, std::list<CWal
}
// Get cursor
Dbc* pcursor = m_batch.GetCursor();
if (!pcursor)
if (!m_batch.StartCursor())
{
LogPrintf("Error getting wallet database cursor\n");
return DBErrors::CORRUPT;
@ -861,11 +862,12 @@ DBErrors WalletBatch::FindWalletTx(std::vector<uint256>& vTxHash, std::list<CWal
// Read next record
CDataStream ssKey(SER_DISK, CLIENT_VERSION);
CDataStream ssValue(SER_DISK, CLIENT_VERSION);
int ret = m_batch.ReadAtCursor(pcursor, ssKey, ssValue);
if (ret == DB_NOTFOUND)
bool complete;
bool ret = m_batch.ReadAtCursor(ssKey, ssValue, complete);
if (complete) {
break;
else if (ret != 0)
{
} else if (!ret) {
m_batch.CloseCursor();
LogPrintf("Error reading next record from wallet database\n");
return DBErrors::CORRUPT;
}
@ -880,10 +882,10 @@ DBErrors WalletBatch::FindWalletTx(std::vector<uint256>& vTxHash, std::list<CWal
ssValue >> vWtx.back();
}
}
pcursor->close();
} catch (...) {
result = DBErrors::CORRUPT;
}
m_batch.CloseCursor();
return result;
}