1010import software .amazon .awssdk .crt .CRT ;
1111import software .amazon .awssdk .crt .CrtResource ;
1212import software .amazon .awssdk .crt .http .*;
13+ import software .amazon .awssdk .crt .io .ClientBootstrap ;
14+ import software .amazon .awssdk .crt .io .EventLoopGroup ;
15+ import software .amazon .awssdk .crt .io .HostResolver ;
16+ import software .amazon .awssdk .crt .io .SocketOptions ;
17+ import software .amazon .awssdk .crt .io .TlsContext ;
18+ import software .amazon .awssdk .crt .io .TlsContextOptions ;
1319
1420import java .net .URI ;
1521import java .nio .ByteBuffer ;
@@ -23,6 +29,37 @@ public class WriteDataTest extends HttpRequestResponseFixture {
2329 private final static int H1_TLS_PORT = 8082 ;
2430 private final static int H2_TLS_PORT = 3443 ;
2531
32+ /**
33+ * Build an {@link HttpStreamManager} suitable for the localhost mock server
34+ * (self-signed cert -> verify peer disabled). Mirrors the setup used by
35+ * the stream-manager and connection-pool tests for localhost.
36+ */
37+ private HttpStreamManager createLocalhostStreamManager (URI uri , HttpVersion expectedVersion ) {
38+ try (EventLoopGroup eventLoopGroup = new EventLoopGroup (1 );
39+ HostResolver resolver = new HostResolver (eventLoopGroup );
40+ ClientBootstrap bootstrap = new ClientBootstrap (eventLoopGroup , resolver );
41+ SocketOptions sockOpts = new SocketOptions ();
42+ TlsContextOptions tlsOpts = (expectedVersion == HttpVersion .HTTP_2
43+ ? TlsContextOptions .createDefaultClient ().withAlpnList ("h2" )
44+ : TlsContextOptions .createDefaultClient ().withAlpnList ("http/1.1" ))
45+ .withVerifyPeer (false );
46+ TlsContext tlsContext = createHttpClientTlsContext (tlsOpts )) {
47+ HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions ()
48+ .withClientBootstrap (bootstrap )
49+ .withSocketOptions (sockOpts )
50+ .withTlsContext (tlsContext )
51+ .withUri (uri )
52+ .withMaxConnections (1 );
53+ Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions ()
54+ .withConnectionManagerOptions (h1Options );
55+ HttpStreamManagerOptions options = new HttpStreamManagerOptions ()
56+ .withHTTP1ConnectionManagerOptions (h1Options )
57+ .withHTTP2StreamManagerOptions (h2Options )
58+ .withExpectedProtocol (expectedVersion );
59+ return HttpStreamManager .create (options );
60+ }
61+ }
62+
2663 @ Test
2764 public void testHttp2WriteData () throws Exception {
2865 skipIfAndroid ();
@@ -326,4 +363,135 @@ public void onResponseComplete(HttpStream stream, int errorCode) {}
326363 }
327364 }
328365 }
366+
367+ /**
368+ * Smoke test: stream acquired from {@link HttpStreamManager} for HTTP/2
369+ * correctly threads {@code useManualDataWrites=true} through to the stream
370+ * and allows a simple "hello world" body to be sent via writeData().
371+ */
372+ @ Test
373+ public void testHttp2StreamManagerWriteData () throws Exception {
374+ skipIfAndroid ();
375+ skipIfLocalhostUnavailable ();
376+
377+ URI uri = new URI (HOST + ":" + H2_TLS_PORT );
378+ byte [] payload = "hello world" .getBytes (StandardCharsets .UTF_8 );
379+
380+ HttpHeader [] headers = new HttpHeader [] {
381+ new HttpHeader (":method" , "PUT" ),
382+ new HttpHeader (":path" , "/echo" ),
383+ new HttpHeader (":scheme" , "https" ),
384+ new HttpHeader (":authority" , uri .getHost ()),
385+ new HttpHeader ("content-length" , Integer .toString (payload .length )),
386+ };
387+ Http2Request request = new Http2Request (headers , null );
388+
389+ CompletableFuture <Void > reqCompleted = new CompletableFuture <>();
390+ TestHttpResponse response = new TestHttpResponse ();
391+
392+ CompletableFuture <Void > shutdownComplete ;
393+ try (HttpStreamManager streamManager = createLocalhostStreamManager (uri , HttpVersion .HTTP_2 )) {
394+ shutdownComplete = streamManager .getShutdownCompleteFuture ();
395+
396+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler () {
397+ @ Override
398+ public void onResponseHeaders (HttpStreamBase stream , int responseStatusCode , int blockType ,
399+ HttpHeader [] nextHeaders ) {
400+ response .statusCode = responseStatusCode ;
401+ response .headers .addAll (Arrays .asList (nextHeaders ));
402+ }
403+
404+ @ Override
405+ public int onResponseBody (HttpStreamBase stream , byte [] bodyBytesIn ) {
406+ response .bodyBuffer .put (bodyBytesIn );
407+ return bodyBytesIn .length ;
408+ }
409+
410+ @ Override
411+ public void onResponseComplete (HttpStreamBase stream , int errorCode ) {
412+ response .onCompleteErrorCode = errorCode ;
413+ reqCompleted .complete (null );
414+ }
415+ };
416+
417+ try (HttpStreamBase stream = streamManager .acquireStream (request , streamHandler , true )
418+ .get (60 , TimeUnit .SECONDS )) {
419+ stream .writeData (payload , true ).get (5 , TimeUnit .SECONDS );
420+ reqCompleted .get (60 , TimeUnit .SECONDS );
421+ }
422+ }
423+
424+ Assert .assertEquals (CRT .AWS_CRT_SUCCESS , response .onCompleteErrorCode );
425+ Assert .assertEquals (200 , response .statusCode );
426+ String body = response .getBody ();
427+ Assert .assertTrue ("Response should contain sent body: " + body ,
428+ body .contains ("\" body\" : \" hello world\" " ));
429+
430+ shutdownComplete .get (60 , TimeUnit .SECONDS );
431+ CrtResource .waitForNoResources ();
432+ }
433+
434+ /**
435+ * Smoke test: stream acquired from {@link HttpStreamManager} for HTTP/1.1
436+ * correctly threads {@code useManualDataWrites=true} through to the stream
437+ * and allows a simple "hello world" body to be sent via writeData().
438+ */
439+ @ Test
440+ public void testHttp1StreamManagerWriteData () throws Exception {
441+ skipIfAndroid ();
442+ skipIfLocalhostUnavailable ();
443+
444+ URI uri = new URI (HOST + ":" + H1_TLS_PORT );
445+ byte [] payload = "hello world" .getBytes (StandardCharsets .UTF_8 );
446+
447+ HttpHeader [] headers = new HttpHeader [] {
448+ new HttpHeader ("Host" , uri .getHost ()),
449+ new HttpHeader ("Content-Length" , Integer .toString (payload .length )),
450+ };
451+ HttpRequest request = new HttpRequest ("PUT" , "/echo" , headers , null );
452+
453+ CompletableFuture <Void > reqCompleted = new CompletableFuture <>();
454+ TestHttpResponse response = new TestHttpResponse ();
455+
456+ CompletableFuture <Void > shutdownComplete ;
457+ try (HttpStreamManager streamManager = createLocalhostStreamManager (uri , HttpVersion .HTTP_1_1 )) {
458+ shutdownComplete = streamManager .getShutdownCompleteFuture ();
459+
460+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler () {
461+ @ Override
462+ public void onResponseHeaders (HttpStreamBase stream , int responseStatusCode , int blockType ,
463+ HttpHeader [] nextHeaders ) {
464+ response .statusCode = responseStatusCode ;
465+ response .headers .addAll (Arrays .asList (nextHeaders ));
466+ }
467+
468+ @ Override
469+ public int onResponseBody (HttpStreamBase stream , byte [] bodyBytesIn ) {
470+ response .bodyBuffer .put (bodyBytesIn );
471+ return bodyBytesIn .length ;
472+ }
473+
474+ @ Override
475+ public void onResponseComplete (HttpStreamBase stream , int errorCode ) {
476+ response .onCompleteErrorCode = errorCode ;
477+ reqCompleted .complete (null );
478+ }
479+ };
480+
481+ try (HttpStreamBase stream = streamManager .acquireStream (request , streamHandler , true )
482+ .get (60 , TimeUnit .SECONDS )) {
483+ stream .writeData (payload , true ).get (5 , TimeUnit .SECONDS );
484+ reqCompleted .get (60 , TimeUnit .SECONDS );
485+ }
486+ }
487+
488+ Assert .assertEquals (CRT .AWS_CRT_SUCCESS , response .onCompleteErrorCode );
489+ Assert .assertEquals (200 , response .statusCode );
490+ String body = response .getBody ();
491+ Assert .assertTrue ("Response should contain sent data: " + body ,
492+ body .contains ("\" data\" : \" hello world\" " ));
493+
494+ shutdownComplete .get (60 , TimeUnit .SECONDS );
495+ CrtResource .waitForNoResources ();
496+ }
329497}
0 commit comments