Skip to content

Commit 20420e5

Browse files
authored
[R] add streaming support (#12821)
* add streaming option to r client * support callback function to process data stream * add stream test, minor bug fixes * fix api client * return void earlier if streaming, add tests
1 parent 84ac06a commit 20420e5

File tree

11 files changed

+488
-114
lines changed

11 files changed

+488
-114
lines changed

modules/openapi-generator/src/main/java/org/openapitools/codegen/languages/RClientCodegen.java

-2
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,9 @@ public void processOpts() {
224224
additionalProperties.put(CodegenConstants.PACKAGE_NAME, packageName);
225225
additionalProperties.put(CodegenConstants.PACKAGE_VERSION, packageVersion);
226226
additionalProperties.put(CodegenConstants.EXCEPTION_ON_FAILURE, returnExceptionOnFailure);
227-
228227
additionalProperties.put(USE_DEFAULT_EXCEPTION, this.useDefaultExceptionHandling);
229228
additionalProperties.put(USE_RLANG_EXCEPTION, this.useRlangExceptionHandling);
230229

231-
232230
additionalProperties.put("apiDocPath", apiDocPath);
233231
additionalProperties.put("modelDocPath", modelDocPath);
234232

modules/openapi-generator/src/main/resources/r/api.mustache

+24-3
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,23 @@
177177
{{#optionalParams}}
178178
#' @param {{{paramName}}} (optional) {{{description}}}{{#defaultValue}} (default value: {{{.}}}){{/defaultValue}}
179179
{{/optionalParams}}
180+
{{#vendorExtensions.x-streaming}}
181+
#' @param stream_callback (optional) callback function to process the data stream
182+
{{/vendorExtensions.x-streaming}}
180183
{{#returnType}}
181184
#' @param data_file (optional) name of the data file to save the result
182185
{{/returnType}}
183186
#' @param ... Other optional arguments
184187
#' @return {{{returnType}}}{{^returnType}}void{{/returnType}}
185188
#' @export
186-
{{{operationId}}} = function({{#requiredParams}}{{paramName}}, {{/requiredParams}}{{#optionalParams}}{{paramName}}={{^defaultValue}}NULL{{/defaultValue}}{{{defaultValue}}}, {{/optionalParams}}{{#returnType}}data_file=NULL, {{/returnType}}...) {
187-
api_response <- self${{{operationId}}}WithHttpInfo({{#allParams}}{{paramName}}, {{/allParams}}{{#returnType}}data_file = data_file, {{/returnType}}...)
189+
{{{operationId}}} = function({{#requiredParams}}{{paramName}}, {{/requiredParams}}{{#optionalParams}}{{paramName}}={{^defaultValue}}NULL{{/defaultValue}}{{{defaultValue}}}, {{/optionalParams}}{{#vendorExtensions.x-streaming}}stream_callback=NULL, {{/vendorExtensions.x-streaming}}{{#returnType}}data_file=NULL, {{/returnType}}...) {
190+
api_response <- self${{{operationId}}}WithHttpInfo({{#allParams}}{{paramName}}, {{/allParams}}{{#vendorExtensions.x-streaming}}stream_callback = stream_callback, {{/vendorExtensions.x-streaming}}{{#returnType}}data_file = data_file, {{/returnType}}...)
191+
{{#vendorExtensions.x-streaming}}
192+
if (typeof(stream_callback) == "closure") { # return void if streaming is enabled
193+
return(invisible(NULL))
194+
}
195+
196+
{{/vendorExtensions.x-streaming}}
188197
resp <- api_response$response
189198
if (httr::status_code(resp) >= 200 && httr::status_code(resp) <= 299) {
190199
api_response$content
@@ -207,13 +216,16 @@
207216
{{#optionalParams}}
208217
#' @param {{{paramName}}} (optional) {{{description}}}{{#defaultValue}} (default value: {{{.}}}){{/defaultValue}}
209218
{{/optionalParams}}
219+
{{#vendorExtensions.x-streaming}}
220+
#' @param stream_callback (optional) callback function to process the data stream
221+
{{/vendorExtensions.x-streaming}}
210222
{{#returnType}}
211223
#' @param data_file (optional) name of the data file to save the result
212224
{{/returnType}}
213225
#' @param ... Other optional arguments
214226
#' @return API response ({{{returnType}}}{{^returnType}}void{{/returnType}}) with additional information such as HTTP status code, headers
215227
#' @export
216-
{{{operationId}}}WithHttpInfo = function({{#requiredParams}}{{paramName}}, {{/requiredParams}}{{#optionalParams}}{{paramName}}={{^defaultValue}}NULL{{/defaultValue}}{{{defaultValue}}}, {{/optionalParams}}{{#returnType}}data_file = NULL, {{/returnType}}...) {
228+
{{{operationId}}}WithHttpInfo = function({{#requiredParams}}{{paramName}}, {{/requiredParams}}{{#optionalParams}}{{paramName}}={{^defaultValue}}NULL{{/defaultValue}}{{{defaultValue}}}, {{/optionalParams}}{{#vendorExtensions.x-streaming}}stream_callback=NULL, {{/vendorExtensions.x-streaming}}{{#returnType}}data_file = NULL, {{/returnType}}...) {
217229
args <- list(...)
218230
query_params <- list()
219231
header_params <- c()
@@ -311,8 +323,17 @@
311323
query_params = query_params,
312324
header_params = header_params,
313325
body = body,
326+
{{#vendorExtensions.x-streaming}}
327+
stream_callback = stream_callback,
328+
{{/vendorExtensions.x-streaming}}
314329
...)
315330
331+
{{#vendorExtensions.x-streaming}}
332+
if (typeof(stream_callback) == "closure") { # return void if streaming is enabled
333+
return(invisible(NULL))
334+
}
335+
336+
{{/vendorExtensions.x-streaming}}
316337
if (httr::status_code(resp) >= 200 && httr::status_code(resp) <= 299) {
317338
{{#returnType}}
318339
{{#isPrimitiveType}}

modules/openapi-generator/src/main/resources/r/api_client.mustache

+35-10
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ ApiClient <- R6::R6Class(
128128
#' @param ... Other optional arguments.
129129
#' @return HTTP response
130130
#' @export
131-
CallApi = function(url, method, query_params, header_params, body, ...) {
131+
CallApi = function(url, method, query_params, header_params, body, stream_callback = NULL, ...) {
132132
133-
resp <- self$Execute(url, method, query_params, header_params, body, ...)
133+
resp <- self$Execute(url, method, query_params, header_params, body, stream_callback = stream_callback, ...)
134134
status_code <- httr::status_code(resp)
135135
136136
if (is.null(self$max_retry_attempts)) {
@@ -142,7 +142,7 @@ ApiClient <- R6::R6Class(
142142
for (i in 1 : self$max_retry_attempts) {
143143
if (status_code %in% self$retry_status_codes) {
144144
Sys.sleep((2 ^ i) + stats::runif(n = 1, min = 0, max = 1))
145-
resp <- self$Execute(url, method, query_params, header_params, body, ...)
145+
resp <- self$Execute(url, method, query_params, header_params, body, stream_callback = stream_callback, ...)
146146
status_code <- httr::status_code(resp)
147147
} else {
148148
break;
@@ -162,10 +162,11 @@ ApiClient <- R6::R6Class(
162162
#' @param query_params The query parameters.
163163
#' @param header_params The header parameters.
164164
#' @param body The HTTP request body.
165+
#' @param stream_callback callback function to process data stream
165166
#' @param ... Other optional arguments.
166167
#' @return HTTP response
167168
#' @export
168-
Execute = function(url, method, query_params, header_params, body, ...) {
169+
Execute = function(url, method, query_params, header_params, body, stream_callback = NULL, ...) {
169170
headers <- httr::add_headers(c(header_params, self$default_headers))
170171
171172
{{! Adding timeout that can be set at the apiClient object level}}
@@ -175,17 +176,41 @@ ApiClient <- R6::R6Class(
175176
}
176177
177178
if (method == "GET") {
178-
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), ...)
179+
if (typeof(stream_callback) == "closure") {
180+
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
181+
} else {
182+
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), ...)
183+
}
179184
} else if (method == "POST") {
180-
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), ...)
185+
if (typeof(stream_callback) == "closure") {
186+
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
187+
} else {
188+
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), ...)
189+
}
181190
} else if (method == "PUT") {
182-
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
191+
if (typeof(stream_callback) == "closure") {
192+
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
193+
} else {
194+
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
195+
}
183196
} else if (method == "PATCH") {
184-
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
197+
if (typeof(stream_callback) == "closure") {
198+
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
199+
} else {
200+
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
201+
}
185202
} else if (method == "HEAD") {
186-
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
203+
if (typeof(stream_callback) == "closure") {
204+
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
205+
} else {
206+
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
207+
}
187208
} else if (method == "DELETE") {
188-
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
209+
if (typeof(stream_callback) == "closure") {
210+
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
211+
} else {
212+
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
213+
}
189214
} else {
190215
err_msg <- "Http method must be `GET`, `HEAD`, `OPTIONS`, `POST`, `PATCH`, `PUT` or `DELETE`."
191216
{{#useDefaultExceptionHandling}}

modules/openapi-generator/src/main/resources/r/api_doc.mustache

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ result <- tryCatch(
5353
# to save the result into a file, simply add the optional `data_file` parameter, e.g.
5454
# api_instance${{{operationId}}}({{#requiredParams}}var_{{{paramName}}}{{^-last}}, {{/-last}}{{/requiredParams}}{{#optionalParams}}{{#-first}}{{#requiredParams.0}}, {{/requiredParams.0}}{{/-first}}{{{paramName}}} = var_{{{paramName}}}{{^-last}}, {{/-last}}{{/optionalParams}}{{#allParams}}{{#-first}}, {{/-first}}{{/allParams}}data_file = "result.txt"),
5555
{{/returnType}}
56-
api_instance${{{operationId}}}({{#requiredParams}}var_{{{paramName}}}{{^-last}}, {{/-last}}{{/requiredParams}}{{#optionalParams}}{{#-first}}{{#requiredParams.0}}, {{/requiredParams.0}}{{/-first}}{{{paramName}}} = var_{{{paramName}}}{{^-last}}, {{/-last}}{{/optionalParams}}),
56+
api_instance${{{operationId}}}({{#requiredParams}}var_{{{paramName}}}{{^-last}}, {{/-last}}{{/requiredParams}}{{#optionalParams}}{{#-first}}{{#requiredParams.0}}, {{/requiredParams.0}}{{/-first}}{{{paramName}}} = var_{{{paramName}}}{{^-last}}, {{/-last}}{{/optionalParams}}{{#vendorExtensions.x-streaming}}, stream_callback = function(x){ print(length(x)) }{{/vendorExtensions.x-streaming}}),
5757
ApiException = function(ex) ex
5858
)
5959
# In case of error, print the error object

modules/openapi-generator/src/test/resources/3_0/r/petstore.yaml

+32
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,38 @@ paths:
185185
description: Pet not found
186186
security:
187187
- api_key: []
188+
'/pet/{petId}?streaming':
189+
get:
190+
tags:
191+
- pet
192+
summary: Find pet by ID (streaming)
193+
description: Returns a single pet
194+
operationId: getPetByIdStreaming
195+
x-streaming: true
196+
parameters:
197+
- name: petId
198+
in: path
199+
description: ID of pet to return
200+
required: true
201+
schema:
202+
type: integer
203+
format: int64
204+
responses:
205+
'200':
206+
description: successful operation
207+
content:
208+
application/xml:
209+
schema:
210+
$ref: '#/components/schemas/Pet'
211+
application/json:
212+
schema:
213+
$ref: '#/components/schemas/Pet'
214+
'400':
215+
description: Invalid ID supplied
216+
'404':
217+
description: Pet not found
218+
security:
219+
- api_key: []
188220
post:
189221
tags:
190222
- pet

samples/client/petstore/R/R/api_client.R

+35-10
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ ApiClient <- R6::R6Class(
133133
#' @param ... Other optional arguments.
134134
#' @return HTTP response
135135
#' @export
136-
CallApi = function(url, method, query_params, header_params, body, ...) {
136+
CallApi = function(url, method, query_params, header_params, body, stream_callback = NULL, ...) {
137137

138-
resp <- self$Execute(url, method, query_params, header_params, body, ...)
138+
resp <- self$Execute(url, method, query_params, header_params, body, stream_callback = stream_callback, ...)
139139
status_code <- httr::status_code(resp)
140140

141141
if (is.null(self$max_retry_attempts)) {
@@ -147,7 +147,7 @@ ApiClient <- R6::R6Class(
147147
for (i in 1 : self$max_retry_attempts) {
148148
if (status_code %in% self$retry_status_codes) {
149149
Sys.sleep((2 ^ i) + stats::runif(n = 1, min = 0, max = 1))
150-
resp <- self$Execute(url, method, query_params, header_params, body, ...)
150+
resp <- self$Execute(url, method, query_params, header_params, body, stream_callback = stream_callback, ...)
151151
status_code <- httr::status_code(resp)
152152
} else {
153153
break;
@@ -167,10 +167,11 @@ ApiClient <- R6::R6Class(
167167
#' @param query_params The query parameters.
168168
#' @param header_params The header parameters.
169169
#' @param body The HTTP request body.
170+
#' @param stream_callback callback function to process data stream
170171
#' @param ... Other optional arguments.
171172
#' @return HTTP response
172173
#' @export
173-
Execute = function(url, method, query_params, header_params, body, ...) {
174+
Execute = function(url, method, query_params, header_params, body, stream_callback = NULL, ...) {
174175
headers <- httr::add_headers(c(header_params, self$default_headers))
175176

176177
http_timeout <- NULL
@@ -179,17 +180,41 @@ ApiClient <- R6::R6Class(
179180
}
180181

181182
if (method == "GET") {
182-
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), ...)
183+
if (typeof(stream_callback) == "closure") {
184+
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
185+
} else {
186+
httr::GET(url, query = query_params, headers, http_timeout, httr::user_agent(self$`user_agent`), ...)
187+
}
183188
} else if (method == "POST") {
184-
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), ...)
189+
if (typeof(stream_callback) == "closure") {
190+
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
191+
} else {
192+
httr::POST(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, httr::user_agent(self$`user_agent`), ...)
193+
}
185194
} else if (method == "PUT") {
186-
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
195+
if (typeof(stream_callback) == "closure") {
196+
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
197+
} else {
198+
httr::PUT(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
199+
}
187200
} else if (method == "PATCH") {
188-
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
201+
if (typeof(stream_callback) == "closure") {
202+
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
203+
} else {
204+
httr::PATCH(url, query = query_params, headers, body = body, httr::content_type("application/json"), http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
205+
}
189206
} else if (method == "HEAD") {
190-
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
207+
if (typeof(stream_callback) == "closure") {
208+
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
209+
} else {
210+
httr::HEAD(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
211+
}
191212
} else if (method == "DELETE") {
192-
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
213+
if (typeof(stream_callback) == "closure") {
214+
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), write_stream(stream_callback), ...)
215+
} else {
216+
httr::DELETE(url, query = query_params, headers, http_timeout, http_timeout, httr::user_agent(self$`user_agent`), ...)
217+
}
193218
} else {
194219
err_msg <- "Http method must be `GET`, `HEAD`, `OPTIONS`, `POST`, `PATCH`, `PUT` or `DELETE`."
195220
rlang::abort(message = err_msg, .subclass = "ApiException", ApiException = ApiException$new(status = 0, reason = err_msg))

0 commit comments

Comments
 (0)