Skip to content

Commit 2fc3989

Browse files
committed
bind out write data api
1 parent eba5fc1 commit 2fc3989

5 files changed

Lines changed: 114 additions & 24 deletions

File tree

awscrt/http.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ def new(cls,
281281
def request(self,
282282
request: 'HttpRequest',
283283
on_response: Optional[Callable[..., None]] = None,
284-
on_body: Optional[Callable[..., None]] = None) -> 'HttpClientStream':
284+
on_body: Optional[Callable[..., None]] = None,
285+
manual_write: bool = False) -> 'HttpClientStream':
285286
"""Create :class:`HttpClientStream` to carry out the request/response exchange.
286287
287288
NOTE: The HTTP stream sends no data until :meth:`HttpClientStream.activate()`
@@ -320,10 +321,14 @@ def request(self,
320321
An exception raise by this function will cause the HTTP stream to end in error.
321322
This callback is always invoked on the connection's event-loop thread.
322323
324+
manual_write (bool): If True, enables manual data writing on the stream.
325+
This allows calling :meth:`HttpClientStream.write_data()` to stream
326+
the request body in chunks. Works for both HTTP/1.1 and HTTP/2.
327+
323328
Returns:
324329
HttpClientStream:
325330
"""
326-
return HttpClientStream(self, request, on_response, on_body)
331+
return HttpClientStream(self, request, on_response, on_body, manual_write)
327332

328333
def close(self) -> "concurrent.futures.Future":
329334
"""Close the connection.
@@ -586,6 +591,40 @@ def update_window(self, increment_size: int) -> None:
586591
"""
587592
_awscrt.http_stream_update_window(self, increment_size)
588593

594+
def write_data(self,
595+
data_stream: Union[InputStream, Any],
596+
end_stream: bool = False) -> "concurrent.futures.Future":
597+
'''Write a chunk of data to the request body stream.
598+
599+
Works for both HTTP/1.1 and HTTP/2 streams.
600+
The stream must have been created with ``manual_write=True``.
601+
You must call :meth:`activate()` before using this method.
602+
603+
Args:
604+
data_stream (InputStream): Data to write. If not an InputStream,
605+
it will be wrapped in one. Can be None to write zero bytes.
606+
607+
end_stream (bool): True to indicate this is the last chunk and no more data
608+
will be sent. False if more chunks will follow.
609+
610+
Returns:
611+
concurrent.futures.Future: Future that completes when the write operation
612+
is done. The future will contain None on success, or an exception on failure.
613+
'''
614+
future = Future()
615+
body_stream = InputStream.wrap(data_stream, allow_none=True)
616+
617+
def on_write_complete(error_code: int) -> None:
618+
if future.cancelled():
619+
return
620+
if error_code:
621+
future.set_exception(awscrt.exceptions.from_code(error_code))
622+
else:
623+
future.set_result(None)
624+
625+
_awscrt.http_stream_write_data(self, body_stream, end_stream, on_write_complete)
626+
return future
627+
589628

590629
class HttpClientStream(HttpClientStreamBase):
591630
"""HTTP stream that sends a request and receives a response.
@@ -608,8 +647,9 @@ def __init__(self,
608647
connection: HttpClientConnection,
609648
request: 'HttpRequest',
610649
on_response: Optional[Callable[..., None]] = None,
611-
on_body: Optional[Callable[..., None]] = None) -> None:
612-
self._init_common(connection, request, on_response, on_body)
650+
on_body: Optional[Callable[..., None]] = None,
651+
manual_write: bool = False) -> None:
652+
self._init_common(connection, request, on_response, on_body, manual_write)
613653

614654
def activate(self) -> None:
615655
"""Begin sending the request.
@@ -683,16 +723,16 @@ def activate(self) -> None:
683723
def write_data(self,
684724
data_stream: Union[InputStream, Any],
685725
end_stream: bool = False) -> "concurrent.futures.Future":
686-
"""Write a chunk of data to the request body stream.
726+
'''Write a chunk of data to the request body stream.
687727
688728
This method is only available when the stream was created with
689729
manual_write=True. This allows incremental writing of request data.
690730
691-
Note: In the asyncio version, this is replaced by the request_body_generator parameter
692-
which accepts an async generator.
731+
Note: This method is inherited from the base class and works for both
732+
HTTP/1.1 and HTTP/2. It is kept here for backward compatibility.
693733
694734
Args:
695-
data_stream (Union[InputStream, Any]): Data to write. If not an InputStream,
735+
data_stream (InputStream): Data to write. If not an InputStream,
696736
it will be wrapped in one. Can be None to send an empty chunk.
697737
698738
end_stream (bool): True to indicate this is the last chunk and no more data
@@ -701,21 +741,8 @@ def write_data(self,
701741
Returns:
702742
concurrent.futures.Future: Future that completes when the write operation
703743
is done. The future will contain None on success, or an exception on failure.
704-
"""
705-
future = Future()
706-
body_stream = InputStream.wrap(data_stream, allow_none=True)
707-
708-
def on_write_complete(error_code: int) -> None:
709-
if future.cancelled():
710-
# the future was cancelled, so we don't need to set the result or exception
711-
return
712-
if error_code:
713-
future.set_exception(awscrt.exceptions.from_code(error_code))
714-
else:
715-
future.set_result(None)
716-
717-
_awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete)
718-
return future
744+
'''
745+
return super().write_data(data_stream, end_stream)
719746

720747

721748
class HttpMessageBase(NativeResource):

source/http.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args);
5050
PyObject *aws_py_http_client_stream_activate(PyObject *self, PyObject *args);
5151

5252
PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args);
53+
PyObject *aws_py_http_stream_write_data(PyObject *self, PyObject *args);
5354

5455
/* Create capsule around new request-style aws_http_message struct */
5556
PyObject *aws_py_http_message_new_request(PyObject *self, PyObject *args);

source/http_stream.c

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args) {
303303
.on_complete = s_on_stream_complete,
304304
.on_h2_remote_end_stream = s_on_h2_remote_end_stream,
305305
.user_data = stream,
306+
.use_manual_data_writes = http2_manual_write,
306307
.http2_use_manual_data_writes = http2_manual_write,
307308
};
308309

@@ -410,3 +411,63 @@ PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args)
410411
}
411412
Py_RETURN_NONE;
412413
}
414+
415+
static void s_on_http_stream_write_data_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
416+
(void)stream;
417+
PyObject *py_on_write_complete = (PyObject *)user_data;
418+
AWS_FATAL_ASSERT(py_on_write_complete);
419+
PyGILState_STATE state;
420+
if (aws_py_gilstate_ensure(&state)) {
421+
return; /* Python has shut down. Nothing matters anymore, but don't crash */
422+
}
423+
424+
PyObject *result = PyObject_CallFunction(py_on_write_complete, "(i)", error_code);
425+
if (result) {
426+
Py_DECREF(result);
427+
} else {
428+
PyErr_WriteUnraisable(PyErr_Occurred());
429+
}
430+
Py_DECREF(py_on_write_complete);
431+
PyGILState_Release(state);
432+
}
433+
434+
PyObject *aws_py_http_stream_write_data(PyObject *self, PyObject *args) {
435+
(void)self;
436+
437+
PyObject *py_stream = NULL;
438+
PyObject *py_body_stream = NULL;
439+
int end_stream = false;
440+
PyObject *py_on_write_complete = NULL;
441+
if (!PyArg_ParseTuple(args, "OOpO", &py_stream, &py_body_stream, &end_stream, &py_on_write_complete)) {
442+
return NULL;
443+
}
444+
445+
struct aws_http_stream *http_stream = aws_py_get_http_stream(py_stream);
446+
if (!http_stream) {
447+
return NULL;
448+
}
449+
450+
struct aws_input_stream *body_stream = NULL;
451+
if (py_body_stream != Py_None) {
452+
body_stream = aws_py_get_input_stream(py_body_stream);
453+
if (!body_stream) {
454+
return PyErr_AwsLastError();
455+
}
456+
}
457+
458+
Py_INCREF(py_on_write_complete);
459+
460+
struct aws_http_stream_write_data_options write_options = {
461+
.data = body_stream,
462+
.end_stream = end_stream,
463+
.on_complete = s_on_http_stream_write_data_complete,
464+
.user_data = py_on_write_complete,
465+
};
466+
467+
int error = aws_http_stream_write_data(http_stream, &write_options);
468+
if (error) {
469+
Py_DECREF(py_on_write_complete);
470+
return PyErr_AwsLastError();
471+
}
472+
Py_RETURN_NONE;
473+
}

source/module.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,7 @@ static PyMethodDef s_module_methods[] = {
10521052
AWS_PY_METHOD_DEF(http_client_stream_new, METH_VARARGS),
10531053
AWS_PY_METHOD_DEF(http_client_stream_activate, METH_VARARGS),
10541054
AWS_PY_METHOD_DEF(http2_client_stream_write_data, METH_VARARGS),
1055+
AWS_PY_METHOD_DEF(http_stream_write_data, METH_VARARGS),
10551056
AWS_PY_METHOD_DEF(http_message_new_request, METH_VARARGS),
10561057
AWS_PY_METHOD_DEF(http_message_get_request_method, METH_VARARGS),
10571058
AWS_PY_METHOD_DEF(http_message_set_request_method, METH_VARARGS),

0 commit comments

Comments
 (0)