diff --git a/src/node/blockstorage.cpp b/src/node/blockstorage.cpp index 5e1de9f0af4..3d73459bf6c 100644 --- a/src/node/blockstorage.cpp +++ b/src/node/blockstorage.cpp @@ -942,11 +942,12 @@ bool BlockManager::WriteBlockUndo(const CBlockUndo& blockundo, BlockValidationSt { // Open history file to append - AutoFile fileout{OpenUndoFile(pos)}; - if (fileout.IsNull()) { + AutoFile file{OpenUndoFile(pos)}; + if (file.IsNull()) { LogError("OpenUndoFile failed for %s while writing block undo", pos.ToString()); return FatalError(m_opts.notifications, state, _("Failed to write undo data.")); } + BufferedWriter fileout{file}; // Write index header fileout << GetParams().MessageStart() << blockundo_size; @@ -959,11 +960,7 @@ bool BlockManager::WriteBlockUndo(const CBlockUndo& blockundo, BlockValidationSt fileout << blockundo << hasher.GetHash(); } - // Make sure `AutoFile` goes out of scope before we call `FlushUndoFile` - if (fileout.fclose()) { - LogError("WriteBlockUndo: fclose failed"); - return false; - } + fileout.flush(); // Make sure `AutoFile`/`BufferedWriter` go out of scope before we call `FlushUndoFile` } // rev files are written in block height order, whereas blk files are written as blocks come in (often out of order) @@ -1090,12 +1087,13 @@ FlatFilePos BlockManager::WriteBlock(const CBlock& block, int nHeight) LogError("FindNextBlockPos failed for %s while writing block", pos.ToString()); return FlatFilePos(); } - AutoFile fileout{OpenBlockFile(pos, /*fReadOnly=*/false)}; - if (fileout.IsNull()) { + AutoFile file{OpenBlockFile(pos, /*fReadOnly=*/false)}; + if (file.IsNull()) { LogError("OpenBlockFile failed for %s while writing block", pos.ToString()); m_opts.notifications.fatalError(_("Failed to write block.")); return FlatFilePos(); } + BufferedWriter fileout{file}; // Write index header fileout << GetParams().MessageStart() << block_size; diff --git a/src/streams.cpp b/src/streams.cpp index d26f526edb9..19c2b474452 100644 --- a/src/streams.cpp +++ b/src/streams.cpp @@ -87,21 +87,29 @@ void AutoFile::write(std::span src) } if (m_position.has_value()) *m_position += src.size(); } else { - if (!m_position.has_value()) throw std::ios_base::failure("AutoFile::write: position unknown"); std::array buf; - while (src.size() > 0) { + while (src.size()) { auto buf_now{std::span{buf}.first(std::min(src.size(), buf.size()))}; - std::copy(src.begin(), src.begin() + buf_now.size(), buf_now.begin()); - util::Xor(buf_now, m_xor, *m_position); - if (std::fwrite(buf_now.data(), 1, buf_now.size(), m_file) != buf_now.size()) { - throw std::ios_base::failure{"AutoFile::write: failed"}; - } + std::copy_n(src.begin(), buf_now.size(), buf_now.begin()); + write_buffer(buf_now); src = src.subspan(buf_now.size()); - *m_position += buf_now.size(); } } } +void AutoFile::write_buffer(std::span src) +{ + if (!m_file) throw std::ios_base::failure("AutoFile::write_buffer: file handle is nullptr"); + if (m_xor.size()) { + if (!m_position) throw std::ios_base::failure("AutoFile::write_buffer: obfuscation position unknown"); + util::Xor(src, m_xor, *m_position); // obfuscate in-place + } + if (std::fwrite(src.data(), 1, src.size(), m_file) != src.size()) { + throw std::ios_base::failure("AutoFile::write_buffer: write failed"); + } + if (m_position) *m_position += src.size(); +} + bool AutoFile::Commit() { return ::FileCommit(m_file); diff --git a/src/streams.h b/src/streams.h index f19bef5df22..1ebcff3671f 100644 --- a/src/streams.h +++ b/src/streams.h @@ -445,6 +445,9 @@ public: /** Wrapper around TruncateFile(). */ bool Truncate(unsigned size); + //! Write a mutable buffer more efficiently than write(), obfuscating the buffer in-place. + void write_buffer(std::span src); + // // Stream subset // @@ -658,4 +661,45 @@ public: } }; +/** + * Wrapper that buffers writes to an underlying stream. + * Requires underlying stream to support write_buffer() method + * for efficient buffer flushing and obfuscation. + */ +template +class BufferedWriter +{ + S& m_dst; + DataBuffer m_buf; + size_t m_buf_pos{0}; + +public: + explicit BufferedWriter(S& stream LIFETIMEBOUND, size_t size = 1 << 16) : m_dst{stream}, m_buf(size) {} + + ~BufferedWriter() { flush(); } + + void flush() + { + if (m_buf_pos) m_dst.write_buffer(std::span{m_buf}.first(m_buf_pos)); + m_buf_pos = 0; + } + + void write(std::span src) + { + while (const auto available{std::min(src.size(), m_buf.size() - m_buf_pos)}) { + std::copy_n(src.begin(), available, m_buf.begin() + m_buf_pos); + m_buf_pos += available; + if (m_buf_pos == m_buf.size()) flush(); + src = src.subspan(available); + } + } + + template + BufferedWriter& operator<<(const T& obj) + { + Serialize(*this, obj); + return *this; + } +}; + #endif // BITCOIN_STREAMS_H diff --git a/src/test/streams_tests.cpp b/src/test/streams_tests.cpp index 6f47b98f297..c7b5cd353e0 100644 --- a/src/test/streams_tests.cpp +++ b/src/test/streams_tests.cpp @@ -607,6 +607,94 @@ BOOST_AUTO_TEST_CASE(buffered_reader_matches_autofile_random_content) fs::remove(test_file.FileName(pos)); } +BOOST_AUTO_TEST_CASE(buffered_writer_matches_autofile_random_content) +{ + const size_t file_size{1 + m_rng.randrange(1 << 17)}; + const size_t buf_size{1 + m_rng.randrange(file_size)}; + const FlatFilePos pos{0, 0}; + + const FlatFileSeq test_buffered{m_args.GetDataDirBase(), "buffered_write_test", node::BLOCKFILE_CHUNK_SIZE}; + const FlatFileSeq test_direct{m_args.GetDataDirBase(), "direct_write_test", node::BLOCKFILE_CHUNK_SIZE}; + const std::vector obfuscation{m_rng.randbytes(8)}; + + { + DataBuffer test_data{m_rng.randbytes(file_size)}; + + AutoFile direct_file{test_direct.Open(pos, /*read_only=*/false), obfuscation}; + + AutoFile buffered_file{test_buffered.Open(pos, /*read_only=*/false), obfuscation}; + BufferedWriter buffered{buffered_file, buf_size}; + + for (size_t total_written{0}; total_written < file_size;) { + const size_t write_size{Assert(std::min(1 + m_rng.randrange(m_rng.randbool() ? buf_size : 2 * buf_size), file_size - total_written))}; + + auto current_span = std::span{test_data}.subspan(total_written, write_size); + direct_file.write(current_span); + buffered.write(current_span); + + total_written += write_size; + } + // Destructors of AutoFile and BufferedWriter will flush/close here + } + + // Compare the resulting files + DataBuffer direct_result{file_size}; + { + AutoFile verify_direct{test_direct.Open(pos, /*read_only=*/true), obfuscation}; + verify_direct.read(direct_result); + + DataBuffer excess_byte{1}; + BOOST_CHECK_EXCEPTION(verify_direct.read(excess_byte), std::ios_base::failure, HasReason{"end of file"}); + } + + DataBuffer buffered_result{file_size}; + { + AutoFile verify_buffered{test_buffered.Open(pos, /*read_only=*/true), obfuscation}; + verify_buffered.read(buffered_result); + + DataBuffer excess_byte{1}; + BOOST_CHECK_EXCEPTION(verify_buffered.read(excess_byte), std::ios_base::failure, HasReason{"end of file"}); + } + + BOOST_CHECK_EQUAL_COLLECTIONS( + direct_result.begin(), direct_result.end(), + buffered_result.begin(), buffered_result.end() + ); + + fs::remove(test_direct.FileName(pos)); + fs::remove(test_buffered.FileName(pos)); +} + +BOOST_AUTO_TEST_CASE(buffered_writer_reader) +{ + const uint32_t v1{m_rng.rand32()}, v2{m_rng.rand32()}, v3{m_rng.rand32()}; + const fs::path test_file{m_args.GetDataDirBase() / "test_buffered_write_read.bin"}; + + // Write out the values through a precisely sized BufferedWriter + { + AutoFile file{fsbridge::fopen(test_file, "w+b")}; + BufferedWriter f(file, sizeof(v1) + sizeof(v2) + sizeof(v3)); + f << v1 << v2; + f.write(std::as_bytes(std::span{&v3, 1})); + } + // Read back and verify using BufferedReader + { + uint32_t _v1{0}, _v2{0}, _v3{0}; + AutoFile file{fsbridge::fopen(test_file, "rb")}; + BufferedReader f(std::move(file), sizeof(v1) + sizeof(v2) + sizeof(v3)); + f >> _v1 >> _v2; + f.read(std::as_writable_bytes(std::span{&_v3, 1})); + BOOST_CHECK_EQUAL(_v1, v1); + BOOST_CHECK_EQUAL(_v2, v2); + BOOST_CHECK_EQUAL(_v3, v3); + + DataBuffer excess_byte{1}; + BOOST_CHECK_EXCEPTION(f.read(excess_byte), std::ios_base::failure, HasReason{"end of file"}); + } + + fs::remove(test_file); +} + BOOST_AUTO_TEST_CASE(streams_hashed) { DataStream stream{};