diff --git a/NAMESPACE b/NAMESPACE index eede0b96e..2883ad092 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -91,6 +91,7 @@ export(request) export(send) export(send_aio) export(serial_config) +export(server) export(set_promise_context) export(socket) export(stat) diff --git a/NEWS.md b/NEWS.md index f30e823aa..51cefe82c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,6 +4,7 @@ * Adds support for threaded dispatcher in `mirai`. * Adds 'recvAio' method for `promises::as.promise()` and `promises::is.promising()` to enable 'recvAio' promises. +* Adds `server()`, an HTTP REST server for evaluation of R expressions (experimental). #### Updates diff --git a/R/server.R b/R/server.R new file mode 100644 index 000000000..131aead96 --- /dev/null +++ b/R/server.R @@ -0,0 +1,79 @@ +# Copyright (C) 2022-2024 Hibiki AI Limited +# +# This file is part of nanonext. +# +# nanonext is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# nanonext is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# nanonext. If not, see . + +# nanonext - server - HTTP REST Server ----------------------------------------- + +#' Start REST Server +#' +#' Creates an instance of an HTTP REST server which evaluates R expressions sent +#' to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this +#' should only be deployed on the local machine (using the 127.0.0.1 +#' loopback address) in a trusted environment. +#' +#' @param url full http address including hostname, port and path at which to +#' host the server. +#' +#' @details Query the API with an HTTP client using the \sQuote{POST} method, +#' with the request data being the R expression as a text string. The +#' received response body will consist of the evaluation result as a text +#' string (if of the appropriate type), or otherwise a serialized R object, +#' which should be passed to \code{\link{unserialize}}. +#' +#' If the expression could not be parsed or evaluated, the response will be +#' returned with a status code of 500 and a blank body. +#' +#' User interrupts will only be processed after the next query has been +#' completed, hence return from the function may not be immediate. Use +#' \sQuote{ctrl + \\} to forcibly quit the entire R session if required. +#' +#' @return This function never returns. +#' +#' @examples +#' if (interactive()) { +#' +#' # run server in a new session: +#' # Rscript -e 'nanonext::server("http://127.0.0.1:5555/api/rest")' +#' +#' # query using curl: +#' # curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' +#' +#' ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' method = "POST", +#' data = "format(Sys.time())" +#' ) +#' +#' # error will return status of 500 +#' ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' method = "POST", +#' data = "not_valid()" +#' ) +#' +#' res <- ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' convert = FALSE, +#' method = "POST", +#' data = "data.frame(random = nanonext::random(3))" +#' ) +#' if (!is_error_value(res$data)) unserialize(res$data) +#' +#' } +#' +#' @export +#' +server <- function(url = "http://127.0.0.1:5555/api/rest") + .Call(rnng_rest_server, url) diff --git a/man/server.Rd b/man/server.Rd new file mode 100644 index 000000000..c724480b5 --- /dev/null +++ b/man/server.Rd @@ -0,0 +1,68 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/server.R +\name{server} +\alias{server} +\title{Start REST Server} +\usage{ +server(url = "http://127.0.0.1:5555/api/rest") +} +\arguments{ +\item{url}{full http address including hostname, port and path at which to +host the server.} +} +\value{ +This function never returns. +} +\description{ +Creates an instance of an HTTP REST server which evaluates R expressions sent + to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this + should only be deployed on the local machine (using the 127.0.0.1 + loopback address) in a trusted environment. +} +\details{ +Query the API with an HTTP client using the \sQuote{POST} method, + with the request data being the R expression as a text string. The + received response body will consist of the evaluation result as a text + string (if of the appropriate type), or otherwise a serialized R object, + which should be passed to \code{\link{unserialize}}. + + If the expression could not be parsed or evaluated, the response will be + returned with a status code of 500 and a blank body. + + User interrupts will only be processed after the next query has been + completed, hence return from the function may not be immediate. Use + \sQuote{ctrl + \\} to forcibly quit the entire R session if required. +} +\examples{ +if (interactive()) { + +# run server in a new session: +# Rscript -e 'nanonext::server("http://127.0.0.1:5555/api/rest")' + +# query using curl: +# curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' + +ncurl( + "http://127.0.0.1:5555/api/rest", + method = "POST", + data = "format(Sys.time())" +) + +# error will return status of 500 +ncurl( + "http://127.0.0.1:5555/api/rest", + method = "POST", + data = "not_valid()" +) + +res <- ncurl( + "http://127.0.0.1:5555/api/rest", + convert = FALSE, + method = "POST", + data = "data.frame(random = nanonext::random(3))" +) +if (!is_error_value(res$data)) unserialize(res$data) + +} + +} diff --git a/src/init.c b/src/init.c index f03fb3339..d9a83cc17 100644 --- a/src/init.c +++ b/src/init.c @@ -167,6 +167,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_recv", (DL_FUNC) &rnng_recv, 4}, {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6}, {"rnng_request", (DL_FUNC) &rnng_request, 7}, + {"rnng_rest_server", (DL_FUNC) &rnng_rest_server, 1}, {"rnng_send", (DL_FUNC) &rnng_send, 4}, {"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5}, {"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4}, diff --git a/src/nanonext.h b/src/nanonext.h index 8acf5791e..ac9b577bc 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -294,6 +294,7 @@ int nano_matchargs(const SEXP); void pipe_cb_signal(nng_pipe, nng_pipe_ev, void *); void tls_finalizer(SEXP); +void nano_printf(const int, const char *, ...); SEXP rnng_advance_rng_state(void); SEXP rnng_aio_call(SEXP); @@ -347,6 +348,7 @@ SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_rest_server(SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_serial_config(SEXP, SEXP, SEXP, SEXP); diff --git a/src/server.c b/src/server.c new file mode 100644 index 000000000..2d793c7f8 --- /dev/null +++ b/src/server.c @@ -0,0 +1,289 @@ +// Copyright (C) 2022-2024 Hibiki AI Limited +// +// This file is part of nanonext. +// +// nanonext is free software: you can redistribute it and/or modify it under the +// terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// nanonext is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +// A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// nanonext. If not, see . + +// nanonext - HTTP REST Sever -------------------------------------------------- + +#define NANONEXT_HTTP +#define NANONEXT_PROTOCOLS +#include "nanonext.h" + +// REST server ----------------------------------------------------------------- + +// This file contains modified code with the following licence: +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +typedef enum { + SEND_REQ, + RECV_REP, +} job_state; + +typedef struct rest_job { + nng_aio * http_aio; // aio from HTTP we must reply to + nng_http_res * http_res; // HTTP response object + job_state state; // 0 = sending, 1 = receiving + nng_msg * msg; // request message + nng_aio * aio; // request flow + nng_ctx ctx; // context on the request socket + struct rest_job *next; // next on the freelist +} rest_job; + +nng_mtx * job_lock; +rest_job *job_freelist; +nng_socket req_sock; + +static void fatal(const char *reason, int xc) { + nano_printf(1, "%s: %s\n", reason, nng_strerror(xc)); +} + +static void rest_recycle_job(rest_job *job) { + + if (job->http_res != NULL) { + nng_http_res_free(job->http_res); + job->http_res = NULL; + } + if (job->msg != NULL) { + nng_msg_free(job->msg); + job->msg = NULL; + } + if (nng_ctx_id(job->ctx) != 0) { + nng_ctx_close(job->ctx); + } + + nng_mtx_lock(job_lock); + job->next = job_freelist; + job_freelist = job; + nng_mtx_unlock(job_lock); + +} + +static void rest_http_fatal(rest_job *job, const char *fmt, int rv) { + + char buf[128]; + nng_aio *aio = job->http_aio; + nng_http_res *res = job->http_res; + + job->http_res = NULL; + job->http_aio = NULL; + snprintf(buf, sizeof(buf), fmt, nng_strerror(rv)); + nng_http_res_set_status(res, NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR); + nng_http_res_set_reason(res, buf); + nng_aio_set_output(aio, 0, res); + nng_aio_finish(aio, 0); + rest_recycle_job(job); + +} + +static void rest_job_cb(void *arg) { + + rest_job *job = arg; + nng_aio *aio = job->aio; + int xc; + + switch (job->state) { + case SEND_REQ: + if ((xc = nng_aio_result(aio))) { + rest_http_fatal(job, "send REQ failed: %s", xc); + return; + } + job->msg = NULL; + nng_aio_set_msg(aio, NULL); + job->state = RECV_REP; + nng_ctx_recv(job->ctx, aio); + break; + case RECV_REP: + if ((xc = nng_aio_result(aio))) { + rest_http_fatal(job, "recv reply failed: %s", xc); + return; + } + job->msg = nng_aio_get_msg(aio); + if ((xc = nng_http_res_copy_data(job->http_res, + nng_msg_body(job->msg), + nng_msg_len(job->msg)))) { + rest_http_fatal(job, "nng_http_res_copy_data: %s", xc); + return; + } + nng_aio_set_output(job->http_aio, 0, job->http_res); + nng_aio_finish(job->http_aio, 0); + job->http_aio = NULL; + job->http_res = NULL; + rest_recycle_job(job); + break; + default: + fatal("bad case", NNG_ESTATE); + break; + } + +} + +static rest_job *rest_get_job(void) { + + rest_job *job; + + nng_mtx_lock(job_lock); + if ((job = job_freelist) != NULL) { + job_freelist = job->next; + nng_mtx_unlock(job_lock); + job->next = NULL; + return (job); + } + nng_mtx_unlock(job_lock); + if ((job = calloc(1, sizeof(*job))) == NULL) { + return (NULL); + } + if (nng_aio_alloc(&job->aio, rest_job_cb, job) != 0) { + free(job); + return (NULL); + } + return (job); + +} + +void rest_handle(nng_aio *aio) { + + struct rest_job *job; + nng_http_req *req = nng_aio_get_input(aio, 0); + void *data; + size_t sz; + int xc; + + if ((job = rest_get_job()) == NULL) { + nng_aio_finish(aio, NNG_ENOMEM); + return; + } + if ((xc = nng_http_res_alloc(&job->http_res)) || + (xc = nng_ctx_open(&job->ctx, req_sock))) { + rest_recycle_job(job); + nng_aio_finish(aio, xc); + return; + } + + nng_http_req_get_data(req, &data, &sz); + job->http_aio = aio; + + if ((xc = nng_msg_alloc(&job->msg, sz))) { + rest_http_fatal(job, "nng_msg_alloc: %s", xc); + return; + } + + memcpy(nng_msg_body(job->msg), data, sz); + nng_aio_set_msg(job->aio, job->msg); + job->state = SEND_REQ; + nng_ctx_send(job->ctx, job->aio); + +} + +void rest_start(void *arg) { + + const char **addr = (const char **) arg; + nng_http_server *server; + nng_http_handler *handler; + nng_url *url; + int xc; + + if ((xc = nng_mtx_alloc(&job_lock))) + fatal("nng_mtx_alloc", xc); + + job_freelist = NULL; + + if ((xc = nng_url_parse(&url, addr[0]))) + fatal("nng_url_parse", xc); + + if ((xc = nng_req0_open(&req_sock))) + fatal("nng_req0_open", xc); + + if ((xc = nng_dial(req_sock, addr[1], NULL, NNG_FLAG_NONBLOCK))) + fatal("nng_dial(inproc_url)", xc); + + if ((xc = nng_http_server_hold(&server, url))) + fatal("nng_http_server_hold", xc); + + if ((xc = nng_http_handler_alloc(&handler, url->u_path, rest_handle))) + fatal("nng_http_handler_alloc", xc); + + if ((xc = nng_http_handler_set_method(handler, "POST"))) + fatal("nng_http_handler_set_method", xc); + + if ((xc = nng_http_handler_collect_body(handler, true, 1024 * 128))) + fatal("nng_http_handler_collect_body", xc); + + if ((xc = nng_http_server_add_handler(server, handler))) + fatal("nng_http_handler_add_handler", xc); + + if ((xc = nng_http_server_start(server))) + fatal("nng_http_server_start", xc); + + nng_url_free(url); + +} + +static SEXP nano_parse_eval_res; + +void parse_eval_safe(void *data) { + nano_parse_eval_res = R_ParseEvalString((const char *) data, R_GlobalEnv); +} + +SEXP rnng_rest_server(SEXP url) { + + const char *addr[2] = {CHAR(STRING_ELT(url, 0)), "inproc://n-a-n-o-serv"}; + nng_thread *thr; + nng_socket s; + nng_msg *msg; + int xc; + + if ((xc = nng_thread_create(&thr, rest_start, (void *) addr))) + ERROR_OUT(xc); + + if ((xc = nng_rep0_open(&s)) || + (xc = nng_listen(s, addr[1], NULL, 0))) + fatal("unable to set up inproc", xc); + + for (;;) { + + if ((xc = nng_recvmsg(s, &msg, 0))) + fatal("inproc recvmsg", xc); + + nano_parse_eval_res = R_BlankScalarString; + R_ToplevelExec(parse_eval_safe, (void *) nng_msg_body(msg)); + + nano_buf buf; + if (TYPEOF(nano_parse_eval_res) == STRSXP) { + const char *string = NANO_STRING(nano_parse_eval_res); + buf.buf = (unsigned char *) string; + buf.cur = strlen(string); + } else { + nano_serialize(&buf, nano_parse_eval_res, R_NilValue); + } + nng_msg_clear(msg); + nng_msg_append(msg, buf.buf, buf.cur); + if ((xc = nng_sendmsg(s, msg, 0))) + fatal("inproc sendmsg", xc); + + R_CheckUserInterrupt(); + + } + + nng_thread_destroy(thr); + return R_NilValue; + +} diff --git a/src/thread.c b/src/thread.c index fbde57ba3..98e8323a3 100644 --- a/src/thread.c +++ b/src/thread.c @@ -25,7 +25,8 @@ // # nocov start // tested interactively -static void nano_printf(const int err, const char *fmt, ...) { +void nano_printf(const int err, const char *fmt, ...) { + char buf[NANONEXT_INIT_BUFSIZE]; va_list arg_ptr;