|
| 1 | +#include "fd_bundle_client_private.h" |
| 2 | +#include "../../waltz/h2/fd_h2_rbuf_ossl.h" |
| 3 | +#include "../../waltz/grpc/fd_grpc.h" |
| 4 | +#include <sys/socket.h> |
| 5 | +#include <openssl/ssl.h> |
| 6 | +#include <openssl/err.h> |
| 7 | + |
| 8 | +/* Forward declarations */ |
| 9 | + |
| 10 | +static fd_h2_callbacks_t const fd_bundle_h2_callbacks; |
| 11 | + |
| 12 | +ulong |
| 13 | +fd_bundle_client_align( void ) { |
| 14 | + return alignof(fd_bundle_client_t); |
| 15 | +} |
| 16 | + |
| 17 | +ulong |
| 18 | +fd_bundle_client_footprint( void ) { |
| 19 | + ulong l = FD_LAYOUT_INIT; |
| 20 | + l = FD_LAYOUT_APPEND( l, alignof(fd_bundle_client_t), sizeof(fd_bundle_client_t) ); |
| 21 | + l = FD_LAYOUT_APPEND( l, alignof(fd_bundle_client_bufs_t), sizeof(fd_bundle_client_bufs_t) ); |
| 22 | + l = FD_LAYOUT_APPEND( l, fd_bundle_h2_stream_pool_align(), fd_bundle_h2_stream_pool_footprint( FD_BUNDLE_CLIENT_MAX_STREAMS ) ); |
| 23 | + return FD_LAYOUT_FINI( l, fd_bundle_client_align() ); |
| 24 | +} |
| 25 | + |
| 26 | +fd_bundle_client_t * |
| 27 | +fd_bundle_client_new( void * mem, |
| 28 | + SSL * ssl, |
| 29 | + fd_bundle_client_metrics_t * metrics ) { |
| 30 | + FD_SCRATCH_ALLOC_INIT( l, mem ); |
| 31 | + void * client_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_bundle_client_t), sizeof(fd_bundle_client_t) ); |
| 32 | + void * bufs_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_bundle_client_bufs_t), sizeof(fd_bundle_client_bufs_t) ); |
| 33 | + void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_bundle_h2_stream_pool_align(), fd_bundle_h2_stream_pool_footprint( FD_BUNDLE_CLIENT_MAX_STREAMS ) ); |
| 34 | + FD_SCRATCH_ALLOC_FINI( l, fd_bundle_client_align() ); |
| 35 | + |
| 36 | + fd_bundle_client_t * client = client_mem; |
| 37 | + fd_bundle_client_bufs_t * bufs = bufs_mem; |
| 38 | + |
| 39 | + fd_bundle_h2_stream_t * stream_pool = |
| 40 | + fd_bundle_h2_stream_pool_join( fd_bundle_h2_stream_pool_new( stream_pool_mem, FD_BUNDLE_CLIENT_MAX_STREAMS ) ); |
| 41 | + if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */ |
| 42 | + |
| 43 | + *client = (fd_bundle_client_t){ |
| 44 | + .ssl = ssl, |
| 45 | + .stream_pool = stream_pool, |
| 46 | + .nanopb_rx = bufs->nanopb_rx, |
| 47 | + .nanopb_tx = bufs->nanopb_tx, |
| 48 | + .frame_scratch = bufs->frame_scratch, |
| 49 | + .metrics = metrics |
| 50 | + }; |
| 51 | + fd_h2_rbuf_init( client->frame_rx, bufs->frame_rx_buf, sizeof(bufs->frame_rx_buf) ); |
| 52 | + fd_h2_rbuf_init( client->frame_tx, bufs->frame_tx_buf, sizeof(bufs->frame_tx_buf) ); |
| 53 | + |
| 54 | + fd_h2_conn_init_client( client->conn ); |
| 55 | + client->conn->ctx = client; |
| 56 | + |
| 57 | + /* Don't memset bufs for better performance */ |
| 58 | + |
| 59 | + return client; |
| 60 | +} |
| 61 | + |
| 62 | +void * |
| 63 | +fd_bundle_client_delete( fd_bundle_client_t * client ) { |
| 64 | + return client; |
| 65 | +} |
| 66 | + |
| 67 | +static int |
| 68 | +fd_ossl_log_error( char const * str, |
| 69 | + ulong len, |
| 70 | + void * ctx ) { |
| 71 | + (void)ctx; |
| 72 | + FD_LOG_WARNING(( "%.*s", (int)len, str )); |
| 73 | + return 0; |
| 74 | +} |
| 75 | + |
| 76 | +void |
| 77 | +fd_bundle_client_rxtx( fd_bundle_client_t * client ) { |
| 78 | + SSL * ssl = client->ssl; |
| 79 | + if( FD_UNLIKELY( !client->ssl_hs_done ) ) { |
| 80 | + int res = SSL_do_handshake( ssl ); |
| 81 | + if( res<=0 ) { |
| 82 | + int error = SSL_get_error( ssl, res ); |
| 83 | + if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return; |
| 84 | + ERR_print_errors_cb( fd_ossl_log_error, NULL ); |
| 85 | + client->failed = 1; |
| 86 | + return; |
| 87 | + } else { |
| 88 | + client->ssl_hs_done = 1; |
| 89 | + } |
| 90 | + } |
| 91 | + |
| 92 | + fd_h2_conn_t * conn = client->conn; |
| 93 | + fd_h2_rbuf_ssl_read( client->frame_rx, ssl ); |
| 94 | + if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx ); |
| 95 | + fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, FD_BUNDLE_CLIENT_BUFSZ, &fd_bundle_h2_callbacks ); |
| 96 | + fd_h2_rbuf_ssl_write( client->frame_tx, ssl ); |
| 97 | +} |
| 98 | + |
| 99 | +/* fd_bundle_client_request continue attempts to write a request data |
| 100 | + frame. */ |
| 101 | + |
| 102 | +static int |
| 103 | +fd_bundle_client_request_continue1( fd_bundle_client_t * client ) { |
| 104 | + fd_h2_stream_t * stream = client->request_stream; |
| 105 | + fd_h2_tx_op_copy( client->conn, stream, client->frame_tx, client->request_tx_op ); |
| 106 | + if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0; |
| 107 | + if( FD_UNLIKELY( stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0; |
| 108 | + /* Request finished */ |
| 109 | + client->request_stream = NULL; |
| 110 | + return 1; |
| 111 | +} |
| 112 | + |
| 113 | +static int |
| 114 | +fd_bundle_client_request_continue( fd_bundle_client_t * client ) { |
| 115 | + if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0; |
| 116 | + if( FD_UNLIKELY( !client->request_stream ) ) return 0; |
| 117 | + if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0; |
| 118 | + return fd_bundle_client_request_continue1( client ); |
| 119 | +} |
| 120 | + |
| 121 | +/* fd_bundle_client_stream_acquire grabs a new stream ID and a stream |
| 122 | + object. */ |
| 123 | + |
| 124 | +static inline int |
| 125 | +fd_bundle_client_stream_acquire_is_safe( fd_bundle_client_t * client ) { |
| 126 | + /* Sufficient quota to start a stream? */ |
| 127 | + if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 <= client->conn->peer_settings.max_concurrent_streams ) ) return 0; |
| 128 | + |
| 129 | + /* Free stream object available? */ |
| 130 | + if( FD_UNLIKELY( !fd_bundle_h2_stream_pool_free( client->stream_pool ) ) ) return 0; |
| 131 | + if( FD_UNLIKELY( client->stream_cnt >= FD_BUNDLE_CLIENT_MAX_STREAMS ) ) return 0; |
| 132 | + |
| 133 | + return 1; |
| 134 | +} |
| 135 | + |
| 136 | +static fd_h2_stream_t * |
| 137 | +fd_bundle_client_stream_acquire( fd_bundle_client_t * client ) { |
| 138 | + if( FD_UNLIKELY( client->stream_cnt >= FD_BUNDLE_CLIENT_MAX_STREAMS ) ) { |
| 139 | + FD_LOG_CRIT(( "stream pool exhausted" )); |
| 140 | + } |
| 141 | + |
| 142 | + fd_h2_conn_t * conn = client->conn; |
| 143 | + uint const stream_id = client->conn->rx_stream_next; |
| 144 | + conn->rx_stream_next += 2U; |
| 145 | + |
| 146 | + fd_bundle_h2_stream_t * stream_node = fd_bundle_h2_stream_pool_ele_acquire( client->stream_pool ); |
| 147 | + |
| 148 | + fd_h2_stream_t * stream = fd_h2_stream_open( fd_h2_stream_init( &stream_node->s ), conn, stream_id ); |
| 149 | + client->request_stream = stream; |
| 150 | + client->stream_ids[ stream_id ] = stream_id; |
| 151 | + client->stream_cnt++; |
| 152 | + return stream; |
| 153 | +} |
| 154 | + |
| 155 | +static void |
| 156 | +fd_bundle_client_stream_release( fd_bundle_client_t * client, |
| 157 | + fd_h2_stream_t * stream ) { |
| 158 | + if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */ |
| 159 | + |
| 160 | + /* Deallocate tx_op */ |
| 161 | + if( FD_UNLIKELY( stream == client->request_stream ) ) { |
| 162 | + client->request_stream = NULL; |
| 163 | + *client->request_tx_op = (fd_h2_tx_op_t){0}; |
| 164 | + } |
| 165 | + |
| 166 | + /* Remove stream from map */ |
| 167 | + int map_idx = -1; |
| 168 | + for( int i=0UL; i<FD_BUNDLE_CLIENT_MAX_STREAMS; i++ ) { |
| 169 | + if( client->stream_ids[ i ] == stream->stream_id ) { |
| 170 | + map_idx = i; |
| 171 | + } |
| 172 | + } |
| 173 | + if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */ |
| 174 | + if( (ulong)map_idx+1 < client->stream_cnt ) { |
| 175 | + client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ]; |
| 176 | + client->streams [ map_idx ] = client->streams [ client->stream_cnt-1 ]; |
| 177 | + client->stream_cnt--; |
| 178 | + } |
| 179 | + |
| 180 | + fd_bundle_h2_stream_t * stream_node = (void *)( (ulong)stream - offsetof(fd_bundle_h2_stream_t, s) ); |
| 181 | + fd_bundle_h2_stream_pool_ele_release( client->stream_pool, stream_node ); |
| 182 | +} |
| 183 | + |
| 184 | +int |
| 185 | +fd_bundle_client_request_start( |
| 186 | + fd_bundle_client_t * client, |
| 187 | + char const * path, |
| 188 | + ulong path_len, |
| 189 | + pb_msgdesc_t const * fields, |
| 190 | + void const * message, |
| 191 | + char const * auth_token, |
| 192 | + ulong auth_token_sz |
| 193 | +) { |
| 194 | + /* Sanity check conn */ |
| 195 | + if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0; |
| 196 | + if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0; |
| 197 | + if( FD_UNLIKELY( !fd_bundle_client_stream_acquire_is_safe( client ) ) ) return 0; |
| 198 | + |
| 199 | + /* Encode message */ |
| 200 | + FD_STATIC_ASSERT( sizeof((fd_bundle_client_bufs_t *)0)->nanopb_rx == sizeof(fd_grpc_hdr_t)+FD_BUNDLE_CLIENT_MSG_SZ_MAX, sz ); |
| 201 | + uchar * proto_buf = client->nanopb_rx + sizeof(fd_grpc_hdr_t); |
| 202 | + pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, FD_BUNDLE_CLIENT_MSG_SZ_MAX ); |
| 203 | + if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) { |
| 204 | + FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path )); |
| 205 | + return 0; |
| 206 | + } |
| 207 | + ulong const serialized_sz = ostream.bytes_written; |
| 208 | + |
| 209 | + /* Create gRPC length prefix */ |
| 210 | + fd_grpc_hdr_t hdr = { .compressed=0, .msg_sz=(uint)serialized_sz }; |
| 211 | + memcpy( client->nanopb_rx, &hdr, sizeof(fd_grpc_hdr_t) ); |
| 212 | + ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t); |
| 213 | + |
| 214 | + /* Allocate stream descriptor */ |
| 215 | + fd_h2_stream_t * stream = fd_bundle_client_stream_acquire( client ); |
| 216 | + uint const stream_id = stream->stream_id; |
| 217 | + |
| 218 | + /* Write HTTP/2 request headers */ |
| 219 | + fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id ); |
| 220 | + fd_grpc_req_hdrs_t req_meta = { |
| 221 | + .path = path, |
| 222 | + .path_len = path_len, |
| 223 | + .https = 1, /* bundle_client assumes TLS encryption for now */ |
| 224 | + |
| 225 | + .bearer_auth = auth_token, |
| 226 | + .bearer_auth_len = auth_token_sz |
| 227 | + }; |
| 228 | + if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs( &req_meta, client->frame_tx ) ) ) { |
| 229 | + FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path )); |
| 230 | + return 0; |
| 231 | + } |
| 232 | + fd_h2_tx_commit( client->conn, client->frame_tx ); |
| 233 | + |
| 234 | + /* Queue request payload for send |
| 235 | + (Protobuf message might have to be fragmented into multiple HTTP/2 |
| 236 | + DATA frames if the client gets blocked) */ |
| 237 | + fd_h2_tx_op_init( client->request_tx_op, client->nanopb_rx, payload_sz, FD_H2_FLAG_END_STREAM ); |
| 238 | + fd_bundle_client_request_continue1( client ); |
| 239 | + client->metrics->requests_sent++; |
| 240 | + |
| 241 | + FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu", (int)path_len, path, serialized_sz )); |
| 242 | + |
| 243 | + return 1; |
| 244 | +} |
| 245 | + |
| 246 | +/* A HTTP/2 flow control change might unblock a queued request send op */ |
| 247 | + |
| 248 | +void |
| 249 | +fd_bundle_h2_window_update( fd_h2_conn_t * conn, |
| 250 | + uint increment ) { |
| 251 | + (void)increment; |
| 252 | + fd_bundle_client_request_continue( conn->ctx ); |
| 253 | +} |
| 254 | + |
| 255 | +void |
| 256 | +fd_bundle_h2_stream_window_update( fd_h2_conn_t * conn, |
| 257 | + fd_h2_stream_t * stream, |
| 258 | + uint increment ) { |
| 259 | + (void)stream; (void)increment; |
| 260 | + fd_bundle_client_request_continue( conn->ctx ); |
| 261 | +} |
| 262 | + |
| 263 | +/* fd_bundle_h2_callbacks specifies h2->bundle_client callbacks. |
| 264 | + Stored in .rodata for security. Must be kept in sync with fd_h2 to |
| 265 | + avoid NULL pointers. */ |
| 266 | + |
| 267 | +static fd_h2_callbacks_t const fd_bundle_h2_callbacks = { |
| 268 | + .stream_create = fd_h2_noop_stream_create, |
| 269 | + .stream_query = fd_bundle_h2_stream_query, |
| 270 | + .conn_established = fd_h2_noop_conn_established, |
| 271 | + .conn_final = fd_h2_noop_conn_final, |
| 272 | + .headers = fd_bundle_h2_cb_headers, |
| 273 | + .data = fd_bundle_h2_cb_data, |
| 274 | + .rst_stream = fd_bundle_h2_rst_stream, |
| 275 | + .window_update = fd_bundle_h2_window_update, |
| 276 | + .stream_window_update = fd_bundle_h2_stream_window_update, |
| 277 | +}; |
0 commit comments