diff --git a/src/extract/extract.cpp b/src/extract/extract.cpp index c46e7044..60787e6c 100644 --- a/src/extract/extract.cpp +++ b/src/extract/extract.cpp @@ -29,28 +29,108 @@ along with this program. If not, see . #include #include -void Extract::open_file(const osmium::io::Header& header, osmium::io::overwrite output_overwrite, osmium::io::fsync sync, OptionClean const* clean) { +void Extract::writer_loop() { + try { + while (true) { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return m_flush_pending || m_shutdown; }); + + if (m_shutdown && !m_flush_pending) { + break; + } + + // Reset m_flush_pending under the lock so that swap_and_flush() + // can observe the transition to false only after the buffer has + // been moved out and is no longer shared. + m_clean->apply_to(m_flush_buffer); + auto buf = std::move(m_flush_buffer); + m_flush_buffer = osmium::memory::Buffer{buffer_size, + osmium::memory::Buffer::auto_grow::no}; + m_flush_pending = false; + lock.unlock(); + m_cv.notify_one(); + + // The osmium Writer call (compression + I/O) runs outside the lock + // so the main thread can keep filling m_fill_buffer concurrently. + (*m_writer)(std::move(buf)); + } + } catch (...) { + std::unique_lock lock{m_mutex}; + m_writer_exception = std::current_exception(); + m_flush_pending = false; + m_shutdown = true; + lock.unlock(); + m_cv.notify_all(); + } +} + +void Extract::check_writer_exception() { + if (m_writer_exception) { + std::rethrow_exception(m_writer_exception); + } +} + +void Extract::swap_and_flush() { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; }); + check_writer_exception(); + + std::swap(m_fill_buffer, m_flush_buffer); + m_fill_buffer = osmium::memory::Buffer{buffer_size, + osmium::memory::Buffer::auto_grow::no}; + m_flush_pending = true; + lock.unlock(); + m_cv.notify_one(); +} + +void Extract::open_file(const osmium::io::Header& header, + osmium::io::overwrite output_overwrite, + osmium::io::fsync sync, + OptionClean const* clean) { m_clean = clean; - m_writer = std::make_unique(m_output_file, header, output_overwrite, sync); + m_writer = std::make_unique(m_output_file, header, + output_overwrite, sync); + m_writer_thread = std::thread{&Extract::writer_loop, this}; } void Extract::close_file() { - if (m_writer) { - if (m_buffer.committed() > 0) { - m_clean->apply_to(m_buffer); - (*m_writer)(std::move(m_buffer)); + if (!m_writer) { + return; + } + + if (m_fill_buffer.committed() > 0) { + swap_and_flush(); + } + + { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; }); + check_writer_exception(); + m_shutdown = true; + } + m_cv.notify_one(); + m_writer_thread.join(); + + check_writer_exception(); + m_writer->close(); +} + +Extract::~Extract() { + if (m_writer_thread.joinable()) { + { + std::lock_guard lock{m_mutex}; + m_shutdown = true; } - m_writer->close(); + m_cv.notify_one(); + m_writer_thread.join(); } } void Extract::write(const osmium::memory::Item& item) { - if (m_buffer.capacity() - m_buffer.committed() < item.padded_size()) { - m_clean->apply_to(m_buffer); - (*m_writer)(std::move(m_buffer)); - m_buffer = osmium::memory::Buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + if (m_fill_buffer.capacity() - m_fill_buffer.committed() < item.padded_size()) { + swap_and_flush(); } - m_buffer.push_back(item); + m_fill_buffer.push_back(item); } std::string Extract::envelope_as_text() const { @@ -58,4 +138,3 @@ std::string Extract::envelope_as_text() const { ss << m_envelope; return ss.str(); } - diff --git a/src/extract/extract.hpp b/src/extract/extract.hpp index 782bafc1..9d00616d 100644 --- a/src/extract/extract.hpp +++ b/src/extract/extract.hpp @@ -29,12 +29,17 @@ along with this program. If not, see . #include #include #include +#include #include #include #include +#include +#include #include +#include #include +#include #include #include @@ -46,20 +51,35 @@ class Extract { std::string m_description; std::vector m_header_options; osmium::Box m_envelope; - osmium::memory::Buffer m_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + + // Double-buffering: the main thread fills m_fill_buffer while the writer + // thread flushes m_flush_buffer to disk asynchronously. + osmium::memory::Buffer m_fill_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + osmium::memory::Buffer m_flush_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + std::unique_ptr m_writer; const OptionClean* m_clean = nullptr; + std::thread m_writer_thread; + std::mutex m_mutex; + std::condition_variable m_cv; + bool m_flush_pending = false; + bool m_shutdown = false; + std::exception_ptr m_writer_exception; + + void writer_loop(); + void swap_and_flush(); + void check_writer_exception(); + public: Extract(const osmium::io::File& output_file, const std::string& description, const osmium::Box& envelope) : m_output_file(output_file), m_description(description), - m_envelope(envelope), - m_writer(nullptr) { + m_envelope(envelope) { } - virtual ~Extract() = default; + virtual ~Extract(); const std::string& output() const noexcept { return m_output_file.filename();