@@ -63,8 +63,9 @@ struct SharedQueue {
6363
6464struct RequestResponse {
6565 // request data
66- client::Command request;
67- std::string normalizedTopic;
66+ client::Command request;
67+ std::unique_ptr<IoBuffer> body;
68+ std::string normalizedTopic;
6869
6970 // response data
7071 std::string responseStatus;
@@ -168,11 +169,6 @@ struct ClientSessionBase {
168169 }
169170
170171 void submitRequest (client::Command &&cmd, SubscriptionMode mode, std::string preferredMimeType, std::optional<std::uint64_t > longPollIdx) {
171- auto topic = cmd.topic ;
172- if (cmd.command == mdp::Command::Set) {
173- topic = URI<>::UriFactory (topic).addQueryParameter (" _bodyOverride" , std::string{ cmd.data .asString () }).build ();
174- }
175-
176172 std::string longPollIdxParam;
177173 if (longPollIdx) {
178174 longPollIdxParam = std::to_string (*longPollIdx);
@@ -188,6 +184,8 @@ struct ClientSessionBase {
188184 break ;
189185 }
190186 }
187+
188+ auto topic = cmd.topic ;
191189 if (!longPollIdxParam.empty ()) {
192190 topic = URI<>::UriFactory (topic).addQueryParameter (" LongPollingIdx" , longPollIdxParam).build ();
193191 }
@@ -199,10 +197,13 @@ struct ClientSessionBase {
199197 const auto ts = std::to_string (opencmw::load_test::timestamp ().count ());
200198#endif
201199 constexpr uint8_t noCopy = NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE;
200+
201+ const auto method = (cmd.command == mdp::Command::Set) ? u8span (" POST" ) : u8span (" GET" );
202+
202203 auto headers = std::vector{
203- nv (u8span (" :method" ), u8span ( " GET " ) , noCopy), //
204- nv (u8span (" :path" ), u8span (path)), //
205- nv (u8span (" :scheme" ), u8span (scheme)), //
204+ nv (u8span (" :method" ), method , noCopy), //
205+ nv (u8span (" :path" ), u8span (path)), //
206+ nv (u8span (" :scheme" ), u8span (scheme)), //
206207 nv (u8span (" :authority" ), u8span (host)),
207208#ifdef OPENCMW_PROFILE_HTTP
208209 nv (u8span (" x-timestamp" ), u8span (ts))
@@ -212,9 +213,6 @@ struct ClientSessionBase {
212213 headers.push_back (nv (u8span (" accept" ), u8span (preferredMimeType)));
213214 headers.push_back (nv (u8span (" content-type" ), u8span (preferredMimeType)));
214215 }
215- if (cmd.command == mdp::Command::Set) {
216- headers.push_back (nv (u8span (" x-opencmw-method" ), u8span (" PUT" ), noCopy));
217- }
218216
219217 RequestResponse rr;
220218 rr.request = std::move (cmd);
@@ -224,7 +222,12 @@ struct ClientSessionBase {
224222 rr.normalizedTopic = rr.request .topic .str ();
225223 }
226224
227- const TStreamId streamId = self ().submitRequestImpl (headers);
225+ if (!rr.request .data .empty ()) {
226+ // we need a pointer that survives rr being moved
227+ rr.body = std::make_unique<IoBuffer>(std::move (rr.request .data ));
228+ }
229+
230+ const TStreamId streamId = self ().submitRequestImpl (headers, rr.body .get ());
228231 if (streamId < 0 ) {
229232 rr.reportError (std::format (" Could not submit request: {}" , nghttp2_strerror (streamId)));
230233 return ;
@@ -486,8 +489,25 @@ struct Http2ClientSession : public ClientSessionBase<Http2ClientSession, int32_t
486489 return _socket._state == TcpSocket::Connected ? _writeBuffer.wantsToWrite (_session) : (_socket._state == TcpSocket::Connecting || _socket._state == TcpSocket::SSLConnectWantsWrite);
487490 }
488491
489- int32_t submitRequestImpl (const std::vector<nghttp2_nv> &headers) {
490- auto streamId = nghttp2_submit_request (_session, nullptr , headers.data (), headers.size (), nullptr , nullptr );
492+ int32_t submitRequestImpl (const std::vector<nghttp2_nv> &headers, IoBuffer *body) {
493+ nghttp2_data_provider2 data_prd;
494+ data_prd.read_callback = nullptr ;
495+
496+ if (body && !body->empty ()) {
497+ data_prd.source .ptr = body;
498+ data_prd.read_callback = [](nghttp2_session *, int32_t /* stream_id*/ , uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void * /* user_data*/ ) {
499+ auto ioBuffer = static_cast <IoBuffer *>(source->ptr );
500+ const std::size_t copy_len = std::min (length, ioBuffer->size () - ioBuffer->position ());
501+ std::copy (ioBuffer->data () + ioBuffer->position (), ioBuffer->data () + ioBuffer->position () + copy_len, buf);
502+ ioBuffer->skip (static_cast <int >(copy_len));
503+ if (ioBuffer->position () == ioBuffer->size ()) {
504+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
505+ }
506+ return static_cast <ssize_t >(copy_len);
507+ };
508+ }
509+
510+ auto streamId = nghttp2_submit_request2 (_session, nullptr , headers.data (), headers.size (), &data_prd, nullptr );
491511 if (streamId < 0 ) {
492512 HTTP_DBG (" Client::submitRequest: nghttp2_submit_request failed: {}" , nghttp2_strerror (streamId));
493513 }
0 commit comments