From 32e2162667169797dc977820439a8e876d415276 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Thu, 11 Dec 2025 15:58:57 +0000 Subject: [PATCH 1/6] Create body_write_stream --- include/boost/beast2/body_write_stream.hpp | 253 ++++++++ .../boost/beast2/impl/body_write_stream.hpp | 243 +++++++ test/unit/body_read_stream.cpp | 4 + test/unit/body_write_stream.cpp | 603 ++++++++++++++++++ 4 files changed, 1103 insertions(+) create mode 100644 include/boost/beast2/body_write_stream.hpp create mode 100644 include/boost/beast2/impl/body_write_stream.hpp create mode 100644 test/unit/body_write_stream.cpp diff --git a/include/boost/beast2/body_write_stream.hpp b/include/boost/beast2/body_write_stream.hpp new file mode 100644 index 00000000..5249f504 --- /dev/null +++ b/include/boost/beast2/body_write_stream.hpp @@ -0,0 +1,253 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/beast2 +// + +#ifndef BOOST_BEAST2_BODY_WRITE_STREAM_HPP +#define BOOST_BEAST2_BODY_WRITE_STREAM_HPP + +#include +#include +#include + +#include + +namespace boost { +namespace beast2 { + +/** A body writer for HTTP/1 messages. + + This type is modelled on asio's AsyncWriteStream, and is constructed with a + reference to an underlying AsyncWriteStream. + + Any call to `async_write_some` initially triggers writes to the underlying + stream until all of the HTTP headers and at least one byte of the body have + been written and processed. Thereafter, each subsequent call to + `async_write_some` processes at least one byte of the body, triggering, where + required, calls to the underlying stream's `async_write_some` method. The + body data is read from the referenced ConstBufferSequence. + + All processing depends on a beast2::serializer object owned by the caller and + referenced in the construction of this object. + + @par Deviations from AsyncWriteStream + + This type deviates from the strict AsyncWriteStream requirements in the + following ways: + + @li Deferred error reporting: If an error or cancellation occurs + after data has been successfully committed to the serializer, the + operation completes with success and reports the number of bytes + consumed. The error is saved and reported on the next call to + `async_write_some`. This differs from the AsyncWriteStream requirement + that on error, `bytes_transferred` must be 0. This behaviour ensures + that the caller knows exactly how many bytes were consumed by the + serializer, preventing data loss or duplication. + + @see + @ref http_proto::serializer. +*/ +template +class body_write_stream +{ + +public: + /** The type of the executor associated with the stream. + + This will be the type of executor used to invoke completion handlers + which do not have an explicit associated executor. + */ + using executor_type = + decltype(std::declval().get_executor()); + + /** Return the executor associated with the object. + + This function may be used to obtain the executor object that the stream + uses to dispatch completion handlers without an associated executor. + + @return A copy of the executor that stream will use to dispatch + handlers. + */ + executor_type + get_executor() + { + return stream_.get_executor(); + } + + /** Constructor + + This constructor creates the stream which retains a reference to the + underlying stream. The underlying stream then needs to be open before + data can be written + + @param s The underlying stream to which the HTTP message is written. + This object's executor is initialized to that of the + underlying stream. + + @param sr A http_proto::serializer object which will perform the serialization of + the HTTP message and extraction of the body. This must be + initialized by the caller and ownership of the serializer is + retained by the caller, which must guarantee that it remains + valid until the handler is called. + + @param srs A http_proto::serializer::stream object which must have been + obtained by a call to `start_stream` on `sr`. Ownership of the + serializer::stream is + retained by the caller, which must guarantee that it remains + valid until the handler is called. + */ + explicit body_write_stream( + AsyncWriteStream& s, + http_proto::serializer& sr, + http_proto::serializer::stream& srs); + + /** Write some data asynchronously. + + This function is used to asynchronously write data to the stream. + + This call always returns immediately. The asynchronous operation will + continue until one of the following conditions is true: + + @li One or more bytes are written from `cb` to the body stored in the + serializer and one or more bytes are written from the serializer to the + underlying stream. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, is + implemented in terms of calls to the underlying stream's + `async_write_some` function. The program must ensure that no other calls + implemented using the underlying stream's `async_write_some` are + performed until this operation completes. + + @param cb The buffer sequence from which the body data will be read. If + the size of the buffer sequence is zero bytes, the operation always + completes immediately with no error. Although the buffers object may be + copied as necessary, ownership of the underlying memory blocks is + retained by the caller, which must guarantee that they remain valid until + the handler is called. Where the internal buffer of the contained + serializer is not of sufficient size to hold the data to be copied from + cb, the remainder may be written by subsequent calls to this function. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of the + handler must be: + @code + void handler( + error_code const& error, // result of operation + std::size_t bytes_transferred // the number of bytes consumed from + // cb by the serializer + ); + @endcode + Regardless of whether the asynchronous operation + completes immediately or not, the completion handler will not be invoked + from within this function. On immediate completion, invocation of the + handler will be performed in a manner equivalent to using + `asio::async_immediate`. + + @note The `async_write_some` operation may not transmit all of the + requested number of bytes. Consider using the function + `asio::async_write` if you need to ensure that the requested amount of + data is written before the asynchronous operation completes. + + @note This function does not guarantee that all of the consumed data is + written to the underlying stream. For this reason one or more calls to + `async_write_some` must be followed by a call to `async_close` to put the + serializer into the `done` state and to write all data remaining in the + serializer to the underlying stream. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_write_some + operation. + */ + template< + class ConstBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(system::error_code, std::size_t)) + CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( + CompletionToken, + void(system::error_code, std::size_t)) + async_write_some(ConstBufferSequence const& cb, CompletionToken&& handler); + + /** Close serializer::stream and flush remaining data to the underlying stream asynchronously. + + This function is used to asynchronously call `close` on the + `serializer::stream` object referenced in the construction of this + `body_write_stream` and write all remaining data in the serializer to the + underlying stream. + + This call always returns immediately. The asynchronous operation will + continue until one of the following conditions is true: + + @li All remaining output bytes of the serializer are written to the + underlying stream and the serializer's `is_done()` method returns true. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, is + implemented in terms of calls to the underlying stream's + `async_write_some` function. The program must ensure that no other calls + implemented using the underlying stream's `async_write_some` are + performed until this operation completes. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of the + handler must be: + @code + void handler( + error_code const& error // result of operation + ); + @endcode + Regardless of whether the asynchronous operation + completes immediately or not, the completion handler will not be invoked + from within this function. On immediate completion, invocation of the + handler will be performed in a manner equivalent to using + `asio::async_immediate`. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_write_some + operation. + */ + template< + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(system::error_code)) + CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( + CompletionToken, + void(system::error_code)) + async_close(CompletionToken&& handler); + +private: + AsyncWriteStream& stream_; + http_proto::serializer& sr_; + http_proto::serializer::stream& srs_; + system::error_code ec_; +}; + +} // beast2 +} // boost + +#include + +#endif // BOOST_BEAST2_BODY_WRITE_STREAM_HPP diff --git a/include/boost/beast2/impl/body_write_stream.hpp b/include/boost/beast2/impl/body_write_stream.hpp new file mode 100644 index 00000000..0d7a8f87 --- /dev/null +++ b/include/boost/beast2/impl/body_write_stream.hpp @@ -0,0 +1,243 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/beast2 +// + +#ifndef BOOST_BEAST2_IMPL_BODY_WRITE_STREAM_HPP +#define BOOST_BEAST2_IMPL_BODY_WRITE_STREAM_HPP + +#include + +#include +#include +#include + +namespace boost { +namespace beast2 { + +namespace detail { + +template +class body_write_stream_close_op : public asio::coroutine +{ + AsyncWriteStream& stream_; + http_proto::serializer& sr_; + http_proto::serializer::stream& srs_; + +public: + body_write_stream_close_op( + AsyncWriteStream& s, + http_proto::serializer& sr, + http_proto::serializer::stream& srs) noexcept + : stream_(s) + , sr_(sr) + , srs_(srs) + { + } + + template + void + operator()( + Self& self, + system::error_code ec = {}, + std::size_t = 0) + { + BOOST_ASIO_CORO_REENTER(*this) + { + self.reset_cancellation_state(asio::enable_total_cancellation()); + + srs_.close(); + + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "async_write_some")); + beast2::async_write(stream_, sr_, std::move(self)); + } + + self.complete(ec); + } + } +}; + +template +class body_write_stream_op : public asio::coroutine +{ + AsyncWriteStream& stream_; + ConstBufferSequence cb_; + http_proto::serializer& sr_; + http_proto::serializer::stream& srs_; + system::error_code& saved_ec_; + std::size_t bytes_; + +public: + body_write_stream_op( + AsyncWriteStream& s, + ConstBufferSequence const& cb, + http_proto::serializer& sr, + http_proto::serializer::stream& srs, + system::error_code& saved_ec) noexcept + : stream_(s) + , cb_(cb) + , sr_(sr) + , srs_(srs) + , saved_ec_(saved_ec) + , bytes_(0) + { + } + + template + void + operator()( + Self& self, + system::error_code ec = {}, + std::size_t = 0) + { + BOOST_ASIO_CORO_REENTER(*this) + { + self.reset_cancellation_state(asio::enable_total_cancellation()); + + bytes_ = 0; + + // Check for a saved error from a previous call + if(saved_ec_.failed()) + { + ec = saved_ec_; + saved_ec_ = {}; + } + else if(buffers::size(cb_) == 0) + // A zero-sized buffer is a special case, we are required to + // complete immediately with no error. + ; + else if(sr_.is_done() || + !srs_.is_open()) + // The serializer and stream are in the wrong state. + ec = asio::error::not_connected; + + if(ec.failed() || + buffers::size(cb_) == 0) + { + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "immediate")); + auto io_ex = self.get_io_executor(); + asio::async_immediate( + io_ex, + asio::append(std::move(self), ec)); + } + goto upcall; + } + + // The serializer's internal buffer may be full, so we may have no + // option but to try to write to the stream to clear space. + // This may require multiple attempts as buffer space cannot + // be cleared until the headers have been written. + while(!(bytes_ = asio::buffer_copy(srs_.prepare(), cb_))) + { + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "async_write_some")); + async_write_some(stream_, sr_, std::move(self)); + } + + if(sr_.is_done() || + !srs_.is_open()) + ec = asio::error::not_connected; + else if(!!self.cancelled()) + ec = asio::error::operation_aborted; + + if(ec.failed()) + goto upcall; + } + srs_.commit(bytes_); + + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "async_write_some")); + beast2::async_write_some(stream_, sr_, std::move(self)); + } + + // Save error/cancellation for next call, but report success with bytes + if(ec.failed() || + !!self.cancelled()) + { + saved_ec_ = ec.failed() ? ec : asio::error::operation_aborted; + ec = {}; + } + + upcall: + self.complete(ec, bytes_); + } + } +}; + +} // detail + +//------------------------------------------------ + +// TODO: copy in Beast's stream traits to check if AsyncWriteStream +// is an AsyncWriteStream, and also static_assert that body_write_stream is too. + +template +body_write_stream:: +body_write_stream( + AsyncWriteStream& s, + http_proto::serializer& sr, + http_proto::serializer::stream& srs) + : stream_(s) + , sr_(sr) + , srs_(srs) +{ +} + +template +template< + class ConstBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(system::error_code, std::size_t)) + CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( + CompletionToken, + void(system::error_code, std::size_t)) +body_write_stream:: +async_write_some( + ConstBufferSequence const& cb, + CompletionToken&& token) +{ + return asio:: + async_compose( + detail::body_write_stream_op{ + stream_, cb, sr_, srs_, ec_ }, + token, + stream_); +} + +template +template< + BOOST_ASIO_COMPLETION_TOKEN_FOR(void(system::error_code)) + CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( + CompletionToken, + void(system::error_code)) +body_write_stream:: +async_close( + CompletionToken&& token) +{ + return asio:: + async_compose( + detail::body_write_stream_close_op{ + stream_, sr_, srs_ }, + token, + stream_); +} + +} // beast2 +} // boost + +#endif // BOOST_BEAST2_IMPL_BODY_WRITE_STREAM_HPP diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp index 773b84b7..366e428e 100644 --- a/test/unit/body_read_stream.cpp +++ b/test/unit/body_read_stream.cpp @@ -23,6 +23,8 @@ namespace boost { namespace beast2 { +namespace { + template std::string test_to_string(Buffers const& bs) @@ -359,6 +361,8 @@ struct single_tester : public ctx_base } }; +} // anonymous namespace + struct body_read_stream_test { void diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp new file mode 100644 index 00000000..9288f35a --- /dev/null +++ b/test/unit/body_write_stream.cpp @@ -0,0 +1,603 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/beast2 +// + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "test_helpers.hpp" +#include + +#include +#include + +namespace boost { +namespace beast2 { + +namespace { + +template +std::string +test_to_string(Buffers const& bs) +{ + std::string s(buffers::size(bs), 0); + s.resize(buffers::copy(buffers::make_buffer(&s[0], s.size()), bs)); + return s; +} + +class test_handler +{ + boost::optional ec_; + boost::optional n_; + bool pass_ = false; + boost::source_location loc_{ BOOST_CURRENT_LOCATION }; + +public: + test_handler(boost::source_location loc = BOOST_CURRENT_LOCATION) + : loc_(loc) + { + } + + explicit test_handler( + system::error_code ec, + std::size_t n = 0, + boost::source_location loc = BOOST_CURRENT_LOCATION) + : ec_(ec) + , n_(n) + , loc_(loc) + { + } + + test_handler(test_handler&& other) noexcept + : ec_(other.ec_) + , n_(other.n_) + , pass_(boost::exchange(other.pass_, true)) + , loc_(other.loc_) + { + } + + ~test_handler() + { + test_suite::any_runner::instance().test( + pass_, "handler never invoked", "", loc_.file_name(), loc_.line()); + } + + template + void + operator()(system::error_code ec, std::size_t n = 0, Args&&...) + { + test_suite::any_runner::instance().test( + !pass_, + "handler invoked multiple times", + "", + loc_.file_name(), + loc_.line()); + + test_suite::any_runner::instance().test( + !ec_.has_value() || ec == *ec_, + ec.message().c_str(), + "", + loc_.file_name(), + loc_.line()); + + char buf[64]; + snprintf(buf, 64, "%u", (unsigned int)n); + + test_suite::any_runner::instance().test( + !n_.has_value() || n == *n_, + buf, + "", + loc_.file_name(), + loc_.line()); + + pass_ = true; + } +}; + +// Parser service install done in a base class to avoid order-of-initialisation +// issues (this needs to happen before the parser pr_ is constructed) +struct ctx_base +{ + capy::polystore capy_ctx_; + + ctx_base() + { + http_proto::install_parser_service(capy_ctx_, {}); + http_proto::install_serializer_service(capy_ctx_, {}); + } +}; + +struct single_tester : public ctx_base +{ + std::string body_ = "Hello World!"; + + std::string header_ = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 12\r\n" + "\r\n"; + + std::string msg_ = header_ + body_; + + std::size_t header_length_ = header_.size(); + std::size_t body_length_ = body_.size(); + std::size_t msg_length_ = msg_.size(); + + boost::asio::io_context ioc_; + + test::stream ts_; + http_proto::response_parser pr_; + + // Create a destination buffer + std::string s_; + boost::buffers::string_buffer buf_; + + // The object under test + body_read_stream brs_; + + test::stream wts_, rts_; + + http_proto::serializer sr_; + + http_proto::response res_; + + http_proto::serializer::stream srs_; + + // http_proto::response res(headers); + // sr.start(res, buffers::const_buffer(body.data(), body.size())); + + body_write_stream bws_; + + single_tester() + : ts_(ioc_, msg_) + , pr_(capy_ctx_) + , sr_(capy_ctx_) + , buf_(&s_) + , brs_(ts_, pr_) + , wts_(ioc_) + , rts_(ioc_) + , res_(header_) + , bws_(wts_, sr_, srs_) + { + wts_.connect(rts_); + pr_.reset(); + pr_.start(); + sr_.reset(); + srs_ = sr_.start_stream(res_); + } + + void + async_read_some(std::size_t bs, system::error_code ec, std::size_t n) + { + brs_.async_read_some(buf_.prepare(bs), test_handler(ec, n)); + } + + void + async_write_some(std::size_t bs, system::error_code ec, std::size_t n) + { + bws_.async_write_some(buf_.prepare(bs), test_handler(ec, n)); + } + + void + async_close(system::error_code ec) + { + bws_.async_close(test_handler(ec, 0)); + } + + buffers::const_buffer + make_test_buffer(std::size_t size) + { + std::string val = body_.substr(0, size); + val.resize(size, '.'); + boost::buffers::string_buffer sb(&val); + return sb.data(); + } + + std::size_t + chunking_expected_n( + std::size_t bs, + std::size_t cs, + bool first, + std::size_t read_so_far) + { + std::size_t expected = 0; + if(read_so_far < body_length_) + { + expected = cs; + // In the first iteration we remove any of the data that was + // associcated with the headers. + if(first) + { + expected -= (header_length_ % cs); + // The `beast2::async_read_some` will always read move from + // the wire immediately after the headers, even if we have a + // partial body in memory already. This should be removable + // once `async_read_some` changes. + if(expected < cs) + { + expected += cs; + } + } + expected = std::min(expected, body_length_ - read_so_far); + expected = std::min(bs, expected); + } + return expected; + } + + struct chunking_handler + { + system::error_code ec_; + std::size_t* written_; + + chunking_handler( + system::error_code ec, + std::size_t* written) + : ec_(ec) + , written_(written) + { + } + + void + operator()(system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(ec, ec_); + *written_ += n; + } + }; + + // Ensure the edge case of being passed a zero-sized buffer works. + void + test_zero_sized_buffer() + { + // Ensure a read into a zero sized buffer returns with no error. + std::string val; + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + bws_.async_write_some(cb, test_handler(system::error_code{}, 0)); + test::run(ioc_); + } + + // Test for a given buffer size (bs) and stream read size (cs) + void + test_with_chunking(std::size_t bs, std::size_t cs, int iters = 15) + { + wts_.write_size(cs); // Limit und stream write size to cs + + std::string finals; + finals.reserve(bs * iters); + + std::size_t total = 0; + std::size_t prev = 0; + std::size_t writes = 0; + for(int i = 0; i < iters; i++) + { + // Construct a buffer of size bs + std::string val = body_.substr(0, bs); + val.resize(bs, '.'); + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + + finals += val; + + // Calculate how many bytes we expect to read on each iteration + // std::size_t expected = chunking_expected_n(bs, cs, (i == 0), + // total); + + while(cb.size() > 0) + { + std::size_t emin, emax; + if(srs_.capacity()) + { + emin = std::max( + (std::size_t)1, std::min({ bs, srs_.capacity(), cb.size() })); + emax = emin; + } + else + { + emin = 1; + emax = std::max((std::size_t)1, bs); + } + + std::size_t written = 0; + bws_.async_write_some( + cb, + chunking_handler( + system::error_code{}, + &written)); + + auto count = test::run(ioc_); + BOOST_TEST_GE(count, 1); + BOOST_TEST_LE(count, (size_t) 1 + header_length_ / cs); + + //std::cout << "count " << count << std::endl; + + total += written; + cb += written; + writes++; + + if (written < emin) + { + std::cout << "err" << std::endl; + } + + BOOST_TEST_GE(total, writes); + BOOST_TEST_GE(written, emin); + BOOST_TEST_LE(written, emax); + + BOOST_TEST_GE(rts_.nwrite_bytes() - prev, 1); + + prev = rts_.nwrite_bytes(); + + BOOST_TEST_LE(rts_.nwrite_bytes(), bs * (i+1) + header_length_); + + BOOST_TEST(!sr_.is_done()); + } + } + + BOOST_TEST_EQ(total, bs * iters); + + BOOST_TEST_LE( + rts_.nwrite_bytes(), cs * writes + header_length_); + + bws_.async_close(test::success_handler()); + + auto count = test::run(ioc_); + BOOST_TEST_GE(count, 1); + + BOOST_TEST_GT(rts_.nwrite_bytes(), iters + writes); + BOOST_TEST_LE(rts_.nwrite_bytes(), cs * (writes + count) + header_length_); + + BOOST_TEST_EQ(rts_.nwrite_bytes(), bs * iters + header_length_); + + BOOST_TEST(sr_.is_done()); + + BOOST_TEST(rts_.str() == header_ + finals); + } + + void + test_with_cancellation(std::size_t len) + { + std::string val = body_.substr(0, len); + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + + // Add a signal to test cancellation + asio::cancellation_signal c_signal; + + // First call: cancellation occurs after data is written to serializer. + // The callback should receive success with the bytes written, + // and the cancellation error is saved for the next call. + // The actual bytes written is min(len, body_length_) since body_.substr + // is bounded by body_ size. + std::size_t expected_bytes = std::min(len, body_length_); + bws_.async_write_some( + cb, + asio::bind_cancellation_slot( + c_signal.slot(), test_handler(system::error_code{}, expected_bytes))); + + // send a cancellation + c_signal.emit(asio::cancellation_type::total); + + // Run up until the point of cancellation. + test::run(ioc_); + + BOOST_TEST(!sr_.is_done()); + + // Second call: should receive the saved cancellation error + // with zero bytes. + std::string remainder = body_.substr(len); + boost::buffers::string_buffer sb2(&remainder); + auto cb2 = sb2.data(); + + bws_.async_write_some( + cb2, + test_handler(asio::error::operation_aborted, 0)); + + // Run to deliver the saved error. + test::run(ioc_); + + BOOST_TEST(!sr_.is_done()); + + // Third call: write the remainder successfully. + std::size_t remainder_len = body_length_ - len; + bws_.async_write_some( + cb2, + test_handler(system::error_code{}, remainder_len)); + + test::run(ioc_); + + BOOST_TEST(!sr_.is_done()); + + // Fourth call: close the stream and verify the message. + bws_.async_close(test_handler(system::error_code{})); + + test::run(ioc_); + + BOOST_TEST(sr_.is_done()); + BOOST_TEST(rts_.str() == msg_); + } + + void + test_asio_async_write(std::size_t cs, bool use_asio_buffer) + { + // limit chunk size on the underlying stream + wts_.write_size(cs); + + if(use_asio_buffer) + { + asio::async_write( + bws_, + asio::buffer(body_.data(), body_.size()), + test_handler(system::error_code{}, body_length_)); + } + else + { + asio::async_write( + bws_, + buffers::const_buffer(body_.data(), body_.size()), + test_handler(system::error_code{}, body_length_)); + } + + test::run(ioc_); + + BOOST_TEST(!sr_.is_done()); + + // Close the stream to flush remaining data + bws_.async_close(test_handler(system::error_code{})); + + test::run(ioc_); + + BOOST_TEST(sr_.is_done()); + BOOST_TEST(rts_.str() == msg_); + } + + void + test_stream_errors() + { + // Create a write test stream that fails on the first write. + test::fail_count fc(0, asio::error::network_down); + test::stream wts(ioc_, fc); + test::stream rts(ioc_); + wts.connect(rts); + + // Create a new body_write_stream with the failing stream + body_write_stream bws(wts, sr_, srs_); + + // First call: data is committed to the serializer before the + // stream write fails. Due to deferred error handling, this + // returns success with the committed bytes, and saves the error. + std::string val = body_; + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + + bws.async_write_some( + cb, + test_handler(system::error_code{}, body_length_)); + + // The operation completes with 1 handler invocation. + BOOST_TEST_EQ(test::run(ioc_), 1); + + // Second call: receives the deferred error with 0 bytes. + bws.async_write_some( + cb, + test_handler(asio::error::network_down, 0)); + + // The deferred error is returned via async_immediate. + BOOST_TEST_EQ(test::run(ioc_), 1); + } + + void + test_close_errors() + { + // Create a write test stream that fails on the second write. + // The first write will succeed (writing body data), but the + // close operation will fail when flushing remaining data. + test::fail_count fc(1, asio::error::network_down); + test::stream wts(ioc_, fc); + test::stream rts(ioc_); + wts.connect(rts); + + // Create a new body_write_stream with the failing stream + body_write_stream bws(wts, sr_, srs_); + + // Write body data - this should succeed. + std::string val = body_; + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + + bws.async_write_some( + cb, + test_handler(system::error_code{}, body_length_)); + + BOOST_TEST_EQ(test::run(ioc_), 1); + + // Close the stream - this should fail when trying to flush + // the remaining serializer data to the underlying stream. + bws.async_close(test_handler(asio::error::network_down)); + + BOOST_TEST_GE(test::run(ioc_), 1); + } +}; + +} // anonymous namespace. + +struct body_write_stream_test +{ + void + run() + { + // Read into a zero sized buffer should return immediately without error + if (false) { + single_tester().test_zero_sized_buffer(); + } + + // async_read_some reads the body for various chunk + // sizes. + if (false) { + int sizes[] = { 1, 2, 5, 13, 233, 1597, 10'000, 100'000 }; + // Iterate through buffer sizes + for(std::size_t bs: sizes) + { + std::cout << "bs: " << bs << std::endl; + // Iterate through chunk sizes + for(std::size_t cs : sizes) + { + if(cs > bs / 100) + { + std::cout << "bs: " << bs << " cs: " << cs << std::endl; + single_tester().test_with_chunking(bs, cs); + } + } + } + } + + // Test async_write_some cancellation + { + // Iterate through different amounts of data written before cancellation. + // Only go up to body_length_ since that's the maximum useful data. + std::size_t body_len = single_tester().body_length_; + for(std::size_t len = 1; len <= body_len; len++) + { + single_tester().test_with_cancellation(len); + } + } + + // Test asio::async_write works with body_write_stream + { + // pick a representative chunk size + std::size_t cs = 5; + + // Perform the test using the Boost Buffers buffer directly + single_tester().test_asio_async_write(cs, false); + // And again using an asio buffer wrapper + single_tester().test_asio_async_write(cs, true); + } + + // async_write_some reports stream errors + { + single_tester().test_stream_errors(); + } + + // async_close reports stream errors + { + single_tester().test_close_errors(); + } + } +}; + +TEST_SUITE(body_write_stream_test, "boost.beast2.body_write_stream"); + +} // beast2 +} // boost From 371327a063aafff99422432d04472851226a6dc3 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 5 Jan 2026 16:42:37 +0000 Subject: [PATCH 2/6] compiler warning fix --- test/unit/body_write_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp index 9288f35a..2e9f8be6 100644 --- a/test/unit/body_write_stream.cpp +++ b/test/unit/body_write_stream.cpp @@ -163,8 +163,8 @@ struct single_tester : public ctx_base single_tester() : ts_(ioc_, msg_) , pr_(capy_ctx_) - , sr_(capy_ctx_) , buf_(&s_) + , sr_(capy_ctx_) , brs_(ts_, pr_) , wts_(ioc_) , rts_(ioc_) From dd967d5afd045d9d08904ad750bd49ad82a6e44f Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 5 Jan 2026 16:55:34 +0000 Subject: [PATCH 3/6] compiler warning fix --- test/unit/body_write_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp index 2e9f8be6..0c889934 100644 --- a/test/unit/body_write_stream.cpp +++ b/test/unit/body_write_stream.cpp @@ -164,10 +164,10 @@ struct single_tester : public ctx_base : ts_(ioc_, msg_) , pr_(capy_ctx_) , buf_(&s_) - , sr_(capy_ctx_) , brs_(ts_, pr_) , wts_(ioc_) , rts_(ioc_) + , sr_(capy_ctx_) , res_(header_) , bws_(wts_, sr_, srs_) { From af2cb8738e2c55d395ac11f961db6bf20ae5feaf Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 5 Jan 2026 17:47:27 +0000 Subject: [PATCH 4/6] add coroutine test --- test/unit/body_write_stream.cpp | 119 ++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp index 0c889934..150bdde5 100644 --- a/test/unit/body_write_stream.cpp +++ b/test/unit/body_write_stream.cpp @@ -9,12 +9,17 @@ #include #include +#include +#include #include +#include #include #include #include +#include #include +#include #include #include @@ -531,6 +536,113 @@ struct single_tester : public ctx_base } }; +#ifdef BOOST_BEAST2_HAS_CORO + +// Result type for async write operations +struct write_result +{ + system::error_code ec; + std::size_t bytes_transferred; +}; + +// Helper to wrap async_write_some for coroutines +template +capy::async_op +coro_write_some( + body_write_stream& bws, + ConstBufferSequence const& buffers) +{ + return capy::make_async_op( + bws.async_write_some(buffers, asio::deferred)); +} + +// Helper to wrap async_close for coroutines +template +capy::async_op +coro_close(body_write_stream& bws) +{ + return capy::make_async_op( + bws.async_close(asio::deferred)); +} + +capy::task +do_coro_write( + test::stream& wts, + test::stream& rts, + http_proto::serializer& sr, + http_proto::serializer::stream& srs, + std::string const& body, + std::string const& expected_msg) +{ + body_write_stream bws(wts, sr, srs); + + // Write body data using co_await + buffers::const_buffer cb(body.data(), body.size()); + std::size_t total_written = 0; + + while(cb.size() > 0) + { + auto result = co_await coro_write_some(bws, cb); + BOOST_TEST(!result.ec.failed()); + BOOST_TEST_GT(result.bytes_transferred, 0u); + total_written += result.bytes_transferred; + cb += result.bytes_transferred; + } + + BOOST_TEST_EQ(total_written, body.size()); + BOOST_TEST(!sr.is_done()); + + // Close the stream + auto ec = co_await coro_close(bws); + BOOST_TEST(!ec.failed()); + + BOOST_TEST(sr.is_done()); + BOOST_TEST_EQ(rts.str(), expected_msg); + + co_return; +} + +void +test_coroutine() +{ + // Set up context with parser and serializer services + capy::polystore capy_ctx; + http_proto::install_parser_service(capy_ctx, {}); + http_proto::install_serializer_service(capy_ctx, {}); + + std::string body = "Hello World!"; + std::string header = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 12\r\n" + "\r\n"; + std::string expected_msg = header + body; + + asio::io_context ioc; + + test::stream wts(ioc); + test::stream rts(ioc); + wts.connect(rts); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(header); + auto srs = sr.start_stream(res); + + capy::spawn( + wrap_executor(ioc.get_executor()), + do_coro_write(wts, rts, sr, srs, body, expected_msg), + [](system::result result) + { + if(result.has_error()) + std::rethrow_exception(result.error()); + }); + + ioc.run(); +} + +#endif // BOOST_BEAST2_HAS_CORO + } // anonymous namespace. struct body_write_stream_test @@ -594,6 +706,13 @@ struct body_write_stream_test { single_tester().test_close_errors(); } + +#ifdef BOOST_BEAST2_HAS_CORO + // Test C++20 coroutine compatibility + { + test_coroutine(); + } +#endif } }; From 397251cc6d33ba6b9609869d64be7202b9898ec0 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Tue, 6 Jan 2026 10:57:18 +0000 Subject: [PATCH 5/6] code coverage fix --- test/unit/body_write_stream.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp index 150bdde5..e864e976 100644 --- a/test/unit/body_write_stream.cpp +++ b/test/unit/body_write_stream.cpp @@ -657,8 +657,8 @@ struct body_write_stream_test // async_read_some reads the body for various chunk // sizes. - if (false) { - int sizes[] = { 1, 2, 5, 13, 233, 1597, 10'000, 100'000 }; + if (true) { + int sizes[] = { 1, 2, 13, 1597, 100'000 }; // Iterate through buffer sizes for(std::size_t bs: sizes) { @@ -666,10 +666,12 @@ struct body_write_stream_test // Iterate through chunk sizes for(std::size_t cs : sizes) { - if(cs > bs / 100) + if(cs > bs / 10000) { - std::cout << "bs: " << bs << " cs: " << cs << std::endl; - single_tester().test_with_chunking(bs, cs); + int iters = std::min((size_t)10, (cs / bs) + 1); + std::cout << "bs: " << bs << " cs: " << cs + << " iters: " << iters << std::endl; + single_tester().test_with_chunking(bs, cs, iters); } } } From 5a8dfd9d43881a0621331ba53c111cb6d15a848f Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Tue, 6 Jan 2026 15:44:37 +0000 Subject: [PATCH 6/6] github comments addressed --- include/boost/beast2/body_write_stream.hpp | 26 +- .../boost/beast2/impl/body_write_stream.hpp | 133 +++++----- test/unit/body_write_stream.cpp | 233 +++++++++++++----- 3 files changed, 251 insertions(+), 141 deletions(-) diff --git a/include/boost/beast2/body_write_stream.hpp b/include/boost/beast2/body_write_stream.hpp index 5249f504..50c36220 100644 --- a/include/boost/beast2/body_write_stream.hpp +++ b/include/boost/beast2/body_write_stream.hpp @@ -19,6 +19,16 @@ namespace boost { namespace beast2 { +namespace detail { + +template +class body_write_stream_op; + +template +class body_write_stream_close_op; + +} // detail + /** A body writer for HTTP/1 messages. This type is modelled on asio's AsyncWriteStream, and is constructed with a @@ -95,15 +105,13 @@ class body_write_stream valid until the handler is called. @param srs A http_proto::serializer::stream object which must have been - obtained by a call to `start_stream` on `sr`. Ownership of the - serializer::stream is - retained by the caller, which must guarantee that it remains - valid until the handler is called. + obtained by a call to `start_stream` on `sr`. Ownership of + the serializer::stream is transferred to this object. */ explicit body_write_stream( AsyncWriteStream& s, http_proto::serializer& sr, - http_proto::serializer::stream& srs); + http_proto::serializer::stream srs); /** Write some data asynchronously. @@ -239,9 +247,15 @@ class body_write_stream async_close(CompletionToken&& handler); private: + template + friend class detail::body_write_stream_op; + + template + friend class detail::body_write_stream_close_op; + AsyncWriteStream& stream_; http_proto::serializer& sr_; - http_proto::serializer::stream& srs_; + http_proto::serializer::stream srs_; system::error_code ec_; }; diff --git a/include/boost/beast2/impl/body_write_stream.hpp b/include/boost/beast2/impl/body_write_stream.hpp index 0d7a8f87..813abc44 100644 --- a/include/boost/beast2/impl/body_write_stream.hpp +++ b/include/boost/beast2/impl/body_write_stream.hpp @@ -12,6 +12,7 @@ #include +#include #include #include #include @@ -24,18 +25,12 @@ namespace detail { template class body_write_stream_close_op : public asio::coroutine { - AsyncWriteStream& stream_; - http_proto::serializer& sr_; - http_proto::serializer::stream& srs_; + body_write_stream& bws_; public: body_write_stream_close_op( - AsyncWriteStream& s, - http_proto::serializer& sr, - http_proto::serializer::stream& srs) noexcept - : stream_(s) - , sr_(sr) - , srs_(srs) + body_write_stream& bws) noexcept + : bws_(bws) { } @@ -50,15 +45,33 @@ class body_write_stream_close_op : public asio::coroutine { self.reset_cancellation_state(asio::enable_total_cancellation()); - srs_.close(); + // Check for a saved error from a previous async_write_some call. + if(bws_.ec_.failed()) + { + ec = bws_.ec_; + bws_.ec_ = {}; + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "immediate")); + auto io_ex = self.get_io_executor(); + asio::async_immediate( + io_ex, + asio::append(std::move(self), ec)); + } + goto upcall; + } + + bws_.srs_.close(); BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION( (__FILE__, __LINE__, "async_write_some")); - beast2::async_write(stream_, sr_, std::move(self)); + beast2::async_write(bws_.stream_, bws_.sr_, std::move(self)); } + upcall: self.complete(ec); } } @@ -67,25 +80,16 @@ class body_write_stream_close_op : public asio::coroutine template class body_write_stream_op : public asio::coroutine { - AsyncWriteStream& stream_; + body_write_stream& bws_; ConstBufferSequence cb_; - http_proto::serializer& sr_; - http_proto::serializer::stream& srs_; - system::error_code& saved_ec_; std::size_t bytes_; public: body_write_stream_op( - AsyncWriteStream& s, - ConstBufferSequence const& cb, - http_proto::serializer& sr, - http_proto::serializer::stream& srs, - system::error_code& saved_ec) noexcept - : stream_(s) + body_write_stream& bws, + ConstBufferSequence const& cb) noexcept + : bws_(bws) , cb_(cb) - , sr_(sr) - , srs_(srs) - , saved_ec_(saved_ec) , bytes_(0) { } @@ -99,28 +103,19 @@ class body_write_stream_op : public asio::coroutine { BOOST_ASIO_CORO_REENTER(*this) { - self.reset_cancellation_state(asio::enable_total_cancellation()); + // Verify preconditions + BOOST_ASSERT(!bws_.sr_.is_done()); - bytes_ = 0; + self.reset_cancellation_state(asio::enable_total_cancellation()); - // Check for a saved error from a previous call - if(saved_ec_.failed()) - { - ec = saved_ec_; - saved_ec_ = {}; - } - else if(buffers::size(cb_) == 0) - // A zero-sized buffer is a special case, we are required to - // complete immediately with no error. - ; - else if(sr_.is_done() || - !srs_.is_open()) - // The serializer and stream are in the wrong state. - ec = asio::error::not_connected; - - if(ec.failed() || + // A zero-sized buffer is a special case, we are required to + // complete immediately with no error. Also check for a saved + // error from a previous call. + if(bws_.ec_.failed() || buffers::size(cb_) == 0) { + ec = bws_.ec_; + bws_.ec_ = {}; BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION( @@ -137,39 +132,36 @@ class body_write_stream_op : public asio::coroutine // option but to try to write to the stream to clear space. // This may require multiple attempts as buffer space cannot // be cleared until the headers have been written. - while(!(bytes_ = asio::buffer_copy(srs_.prepare(), cb_))) + for(;;) { + bytes_ = asio::buffer_copy(bws_.srs_.prepare(), cb_); + bws_.srs_.commit(bytes_); + BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION( (__FILE__, __LINE__, "async_write_some")); - async_write_some(stream_, sr_, std::move(self)); + async_write_some(bws_.stream_, bws_.sr_, std::move(self)); } - if(sr_.is_done() || - !srs_.is_open()) - ec = asio::error::not_connected; - else if(!!self.cancelled()) - ec = asio::error::operation_aborted; - if(ec.failed()) - goto upcall; - } - srs_.commit(bytes_); + { + if(bytes_ != 0) + { + bws_.ec_ = ec; + ec = {}; + } + break; + } - BOOST_ASIO_CORO_YIELD - { - BOOST_ASIO_HANDLER_LOCATION( - (__FILE__, __LINE__, "async_write_some")); - beast2::async_write_some(stream_, sr_, std::move(self)); - } + if(bytes_ != 0) + break; - // Save error/cancellation for next call, but report success with bytes - if(ec.failed() || - !!self.cancelled()) - { - saved_ec_ = ec.failed() ? ec : asio::error::operation_aborted; - ec = {}; + if(!!self.cancelled()) + { + ec = asio::error::operation_aborted; + break; + } } upcall: @@ -190,11 +182,13 @@ body_write_stream:: body_write_stream( AsyncWriteStream& s, http_proto::serializer& sr, - http_proto::serializer::stream& srs) + http_proto::serializer::stream srs) : stream_(s) , sr_(sr) - , srs_(srs) + , srs_(std::move(srs)) { + // Verify preconditions + BOOST_ASSERT(srs_.is_open()); } template @@ -213,7 +207,7 @@ async_write_some( return asio:: async_compose( detail::body_write_stream_op{ - stream_, cb, sr_, srs_, ec_ }, + *this, cb }, token, stream_); } @@ -231,8 +225,7 @@ async_close( { return asio:: async_compose( - detail::body_write_stream_close_op{ - stream_, sr_, srs_ }, + detail::body_write_stream_close_op{ *this }, token, stream_); } diff --git a/test/unit/body_write_stream.cpp b/test/unit/body_write_stream.cpp index e864e976..82b0d05d 100644 --- a/test/unit/body_write_stream.cpp +++ b/test/unit/body_write_stream.cpp @@ -28,6 +28,7 @@ #include #include +#include namespace boost { namespace beast2 { @@ -158,12 +159,9 @@ struct single_tester : public ctx_base http_proto::response res_; - http_proto::serializer::stream srs_; + std::size_t srs_capacity_; - // http_proto::response res(headers); - // sr.start(res, buffers::const_buffer(body.data(), body.size())); - - body_write_stream bws_; + std::optional> bws_; single_tester() : ts_(ioc_, msg_) @@ -174,13 +172,20 @@ struct single_tester : public ctx_base , rts_(ioc_) , sr_(capy_ctx_) , res_(header_) - , bws_(wts_, sr_, srs_) { wts_.connect(rts_); pr_.reset(); pr_.start(); sr_.reset(); - srs_ = sr_.start_stream(res_); + auto srs = sr_.start_stream(res_); + srs_capacity_ = srs.capacity(); + bws_.emplace(wts_, sr_, std::move(srs)); + } + + body_write_stream& + bws() + { + return *bws_; } void @@ -192,13 +197,13 @@ struct single_tester : public ctx_base void async_write_some(std::size_t bs, system::error_code ec, std::size_t n) { - bws_.async_write_some(buf_.prepare(bs), test_handler(ec, n)); + bws().async_write_some(buf_.prepare(bs), test_handler(ec, n)); } void async_close(system::error_code ec) { - bws_.async_close(test_handler(ec, 0)); + bws().async_close(test_handler(ec, 0)); } buffers::const_buffer @@ -270,7 +275,7 @@ struct single_tester : public ctx_base std::string val; boost::buffers::string_buffer sb(&val); auto cb = sb.data(); - bws_.async_write_some(cb, test_handler(system::error_code{}, 0)); + bws().async_write_some(cb, test_handler(system::error_code{}, 0)); test::run(ioc_); } @@ -302,21 +307,11 @@ struct single_tester : public ctx_base while(cb.size() > 0) { - std::size_t emin, emax; - if(srs_.capacity()) - { - emin = std::max( - (std::size_t)1, std::min({ bs, srs_.capacity(), cb.size() })); - emax = emin; - } - else - { - emin = 1; - emax = std::max((std::size_t)1, bs); - } + std::size_t emin = 1; + std::size_t emax = std::min({ bs, srs_capacity_, cb.size() }); std::size_t written = 0; - bws_.async_write_some( + bws().async_write_some( cb, chunking_handler( system::error_code{}, @@ -356,7 +351,7 @@ struct single_tester : public ctx_base BOOST_TEST_LE( rts_.nwrite_bytes(), cs * writes + header_length_); - bws_.async_close(test::success_handler()); + bws().async_close(test::success_handler()); auto count = test::run(ioc_); BOOST_TEST_GE(count, 1); @@ -372,7 +367,7 @@ struct single_tester : public ctx_base } void - test_with_cancellation(std::size_t len) + test_with_ignored_cancel_signal(std::size_t len) { std::string val = body_.substr(0, len); boost::buffers::string_buffer sb(&val); @@ -382,15 +377,15 @@ struct single_tester : public ctx_base asio::cancellation_signal c_signal; // First call: cancellation occurs after data is written to serializer. - // The callback should receive success with the bytes written, - // and the cancellation error is saved for the next call. - // The actual bytes written is min(len, body_length_) since body_.substr - // is bounded by body_ size. + // The callback should receive success with the bytes written. + // With the simplified loop, cancellation after successful write + // is treated as success (the data was written). std::size_t expected_bytes = std::min(len, body_length_); - bws_.async_write_some( + bws().async_write_some( cb, asio::bind_cancellation_slot( - c_signal.slot(), test_handler(system::error_code{}, expected_bytes))); + c_signal.slot(), + test_handler(system::error_code{}, expected_bytes))); // send a cancellation c_signal.emit(asio::cancellation_type::total); @@ -400,24 +395,14 @@ struct single_tester : public ctx_base BOOST_TEST(!sr_.is_done()); - // Second call: should receive the saved cancellation error - // with zero bytes. + // Second call: write the remainder successfully. + // Cancellation after successful write is not saved, so this succeeds. std::string remainder = body_.substr(len); boost::buffers::string_buffer sb2(&remainder); auto cb2 = sb2.data(); - bws_.async_write_some( - cb2, - test_handler(asio::error::operation_aborted, 0)); - - // Run to deliver the saved error. - test::run(ioc_); - - BOOST_TEST(!sr_.is_done()); - - // Third call: write the remainder successfully. std::size_t remainder_len = body_length_ - len; - bws_.async_write_some( + bws().async_write_some( cb2, test_handler(system::error_code{}, remainder_len)); @@ -425,8 +410,8 @@ struct single_tester : public ctx_base BOOST_TEST(!sr_.is_done()); - // Fourth call: close the stream and verify the message. - bws_.async_close(test_handler(system::error_code{})); + // Third call: close the stream and verify the message. + bws().async_close(test_handler(system::error_code{})); test::run(ioc_); @@ -443,14 +428,14 @@ struct single_tester : public ctx_base if(use_asio_buffer) { asio::async_write( - bws_, + bws(), asio::buffer(body_.data(), body_.size()), test_handler(system::error_code{}, body_length_)); } else { asio::async_write( - bws_, + bws(), buffers::const_buffer(body_.data(), body_.size()), test_handler(system::error_code{}, body_length_)); } @@ -460,7 +445,7 @@ struct single_tester : public ctx_base BOOST_TEST(!sr_.is_done()); // Close the stream to flush remaining data - bws_.async_close(test_handler(system::error_code{})); + bws().async_close(test_handler(system::error_code{})); test::run(ioc_); @@ -477,8 +462,13 @@ struct single_tester : public ctx_base test::stream rts(ioc_); wts.connect(rts); + // Create a fresh serializer for this test + http_proto::serializer sr(capy_ctx_); + sr.reset(); + http_proto::response res(header_); + // Create a new body_write_stream with the failing stream - body_write_stream bws(wts, sr_, srs_); + body_write_stream bws(wts, sr, sr.start_stream(res)); // First call: data is committed to the serializer before the // stream write fails. Due to deferred error handling, this @@ -503,6 +493,44 @@ struct single_tester : public ctx_base BOOST_TEST_EQ(test::run(ioc_), 1); } + void + test_close_with_saved_error() + { + // Create a write test stream that fails on the first write. + test::fail_count fc(0, asio::error::network_down); + test::stream wts(ioc_, fc); + test::stream rts(ioc_); + wts.connect(rts); + + // Create a fresh serializer for this test + http_proto::serializer sr(capy_ctx_); + sr.reset(); + http_proto::response res(header_); + + // Create a new body_write_stream with the failing stream + body_write_stream bws(wts, sr, sr.start_stream(res)); + + // First call: data is committed to the serializer before the + // stream write fails. Due to deferred error handling, this + // returns success with the committed bytes, and saves the error. + std::string val = body_; + boost::buffers::string_buffer sb(&val); + auto cb = sb.data(); + + bws.async_write_some( + cb, + test_handler(system::error_code{}, body_length_)); + + // The operation completes with 1 handler invocation. + BOOST_TEST_EQ(test::run(ioc_), 1); + + // async_close receives the saved error immediately. + bws.async_close(test_handler(asio::error::network_down)); + + // The deferred error is returned via async_immediate. + BOOST_TEST_EQ(test::run(ioc_), 1); + } + void test_close_errors() { @@ -514,8 +542,16 @@ struct single_tester : public ctx_base test::stream rts(ioc_); wts.connect(rts); + // Limit write size so data remains in serializer after first write. + wts.write_size(1); + + // Create a fresh serializer for this test + http_proto::serializer sr(capy_ctx_); + sr.reset(); + http_proto::response res(header_); + // Create a new body_write_stream with the failing stream - body_write_stream bws(wts, sr_, srs_); + body_write_stream bws(wts, sr, sr.start_stream(res)); // Write body data - this should succeed. std::string val = body_; @@ -534,6 +570,64 @@ struct single_tester : public ctx_base BOOST_TEST_GE(test::run(ioc_), 1); } + + // Test cancellation during buffer-clearing loop (when bytes_ == 0). + // This covers the case where the serializer buffer is full and we're + // waiting for space, then get cancelled before any user data is copied. + void + test_cancel_during_buffer_clear() + { + wts_.write_size(1); // Very slow drain + + // First, fill the serializer's buffer completely by writing + // data equal to its capacity + std::size_t cap = srs_capacity_; + std::string fill_data(cap, 'F'); + buffers::const_buffer fill_cb(fill_data.data(), fill_data.size()); + + bool fill_complete = false; + bws().async_write_some( + fill_cb, + [&](system::error_code, std::size_t) + { + fill_complete = true; + }); + + test::run(ioc_); + BOOST_TEST(fill_complete); + + // Now the buffer should be full. The next write should enter + // the buffer-clearing loop with bytes_ == 0 on the first iteration. + std::string more_data(64, 'X'); + buffers::const_buffer cb(more_data.data(), more_data.size()); + + asio::cancellation_signal c_signal; + + system::error_code result_ec; + std::size_t result_bytes = 0; + + bws().async_write_some( + cb, + asio::bind_cancellation_slot( + c_signal.slot(), + [&](system::error_code ec, std::size_t n) + { + result_ec = ec; + result_bytes = n; + })); + + // Emit cancellation immediately - we should be in the loop + // with bytes_ == 0 because the buffer is full + c_signal.emit(asio::cancellation_type::total); + + // Let the operation complete + test::run(ioc_); + + // Should complete with operation_aborted and 0 bytes + // because cancellation occurred while bytes_ == 0 + BOOST_TEST_EQ(result_ec, asio::error::operation_aborted); + BOOST_TEST_EQ(result_bytes, 0u); + } }; #ifdef BOOST_BEAST2_HAS_CORO @@ -570,11 +664,11 @@ do_coro_write( test::stream& wts, test::stream& rts, http_proto::serializer& sr, - http_proto::serializer::stream& srs, + http_proto::serializer::stream srs, std::string const& body, std::string const& expected_msg) { - body_write_stream bws(wts, sr, srs); + body_write_stream bws(wts, sr, std::move(srs)); // Write body data using co_await buffers::const_buffer cb(body.data(), body.size()); @@ -631,7 +725,7 @@ test_coroutine() capy::spawn( wrap_executor(ioc.get_executor()), - do_coro_write(wts, rts, sr, srs, body, expected_msg), + do_coro_write(wts, rts, sr, std::move(srs), body, expected_msg), [](system::result result) { if(result.has_error()) @@ -651,40 +745,39 @@ struct body_write_stream_test run() { // Read into a zero sized buffer should return immediately without error - if (false) { + { single_tester().test_zero_sized_buffer(); } // async_read_some reads the body for various chunk // sizes. - if (true) { + { int sizes[] = { 1, 2, 13, 1597, 100'000 }; // Iterate through buffer sizes for(std::size_t bs: sizes) { - std::cout << "bs: " << bs << std::endl; // Iterate through chunk sizes for(std::size_t cs : sizes) { if(cs > bs / 10000) { - int iters = std::min((size_t)10, (cs / bs) + 1); - std::cout << "bs: " << bs << " cs: " << cs - << " iters: " << iters << std::endl; + auto iters = static_cast( + std::min((size_t)10, (cs / bs) + 1)); single_tester().test_with_chunking(bs, cs, iters); } } } } - // Test async_write_some cancellation + // Test async_write_some with ignored cancellation signal { - // Iterate through different amounts of data written before cancellation. - // Only go up to body_length_ since that's the maximum useful data. + // Iterate through different amounts of data written before + // cancellation. Only go up to body_length_ since that's the + // maximum useful data. std::size_t body_len = single_tester().body_length_; for(std::size_t len = 1; len <= body_len; len++) { - single_tester().test_with_cancellation(len); + single_tester().test_with_ignored_cancel_signal(len); } } @@ -704,11 +797,21 @@ struct body_write_stream_test single_tester().test_stream_errors(); } - // async_close reports stream errors + // async_close reports saved errors + { + single_tester().test_close_with_saved_error(); + } + + // async_close reports stream errors during flush { single_tester().test_close_errors(); } + // Test cancellation during buffer-clearing loop + { + single_tester().test_cancel_during_buffer_clear(); + } + #ifdef BOOST_BEAST2_HAS_CORO // Test C++20 coroutine compatibility {