Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(r): Add async infrastructure for specific methods #985

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9ebd2f3
basic port of async utils
paleolimbot Aug 18, 2023
e3df089
with passing test
paleolimbot Aug 18, 2023
e3f6b07
some notes
paleolimbot Aug 18, 2023
26aad2f
more notes
paleolimbot Aug 18, 2023
059fc56
Update r/adbcdrivermanager/tests/testthat/test-async.R
paleolimbot Aug 31, 2023
4f25aa4
add promises/later integration to async
paleolimbot Sep 5, 2023
3ff0cb2
add execute query async
paleolimbot Sep 5, 2023
b2236c3
test chains
paleolimbot Sep 5, 2023
d43f5db
test chaining
paleolimbot Sep 5, 2023
c05f11f
fix error
paleolimbot Sep 5, 2023
344814c
fix init
paleolimbot May 3, 2024
de938d9
maybe start on a future-based method
paleolimbot May 3, 2024
2eeae3f
test initial infrastructure
paleolimbot May 5, 2024
0e86f35
dummy task
paleolimbot May 5, 2024
cd11f93
better
paleolimbot May 5, 2024
d639428
rename wait
paleolimbot May 5, 2024
4282d12
waiter
paleolimbot May 5, 2024
56c3903
scheduler
paleolimbot May 5, 2024
bf11de4
maybe with error support
paleolimbot May 6, 2024
69590f9
remove previous
paleolimbot May 6, 2024
48d53ec
remove previous
paleolimbot May 6, 2024
8ecfb52
with one working method
paleolimbot May 6, 2024
add8551
better separation of tasks
paleolimbot May 6, 2024
473d053
remove old
paleolimbot May 6, 2024
93eeb23
get next
paleolimbot May 6, 2024
cb677a3
add license
paleolimbot May 7, 2024
25fa70f
try more robust test for promise call
paleolimbot May 7, 2024
8a18b6a
waiting with cancel
paleolimbot May 7, 2024
4b6aad7
fix status/wait
paleolimbot May 7, 2024
f0cffa4
prototype later integration
paleolimbot May 12, 2024
c1d2313
with test
paleolimbot May 12, 2024
d8bb000
use with promises
paleolimbot May 12, 2024
702e580
maybe better waiting
paleolimbot May 31, 2024
01cfa54
try to fix tests
paleolimbot May 31, 2024
50d6d3f
see if waiting a little more is more reliable on CI
paleolimbot May 31, 2024
a36f41e
maybe prepare + schema
paleolimbot May 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions r/adbcdrivermanager/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Suggests:
later,
promises,
testthat (>= 3.0.0),
withr
Config/testthat/edition: 3
Expand Down
10 changes: 10 additions & 0 deletions r/adbcdrivermanager/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
# Generated by roxygen2: do not edit by hand

S3method("$",adbc_async_task)
S3method("$",adbc_error)
S3method("$",adbc_xptr)
S3method("$<-",adbc_xptr)
S3method("[[",adbc_async_task)
S3method("[[",adbc_error)
S3method("[[",adbc_xptr)
S3method("[[<-",adbc_xptr)
S3method(adbc_async_task_cancel,adbc_async_statement_cancellable)
S3method(adbc_async_task_cancel,default)
S3method(adbc_async_task_result,adbc_async_execute_query)
S3method(adbc_async_task_result,adbc_async_prepare)
S3method(adbc_async_task_result,adbc_async_sleep)
S3method(adbc_async_task_result,adbc_async_statement_stream_get_next)
S3method(adbc_async_task_result,adbc_async_statement_stream_schema)
S3method(adbc_connection_init,adbc_database_log)
S3method(adbc_connection_init,adbc_database_monkey)
S3method(adbc_connection_init,default)
Expand All @@ -21,6 +30,7 @@ S3method(execute_adbc,default)
S3method(format,adbc_xptr)
S3method(length,adbc_error)
S3method(length,adbc_xptr)
S3method(names,adbc_async_task)
S3method(names,adbc_error)
S3method(names,adbc_xptr)
S3method(print,adbc_driver)
Expand Down
261 changes: 261 additions & 0 deletions r/adbcdrivermanager/R/async.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


adbc_async_task <- function(subclass = character()) {
structure(
.Call(RAdbcAsyncTaskNew, adbc_allocate_error()),
class = union(subclass, "adbc_async_task")
)
}

adbc_async_task_status <- function(task) {
.Call(RAdbcAsyncTaskWaitFor, task, 0)
}

adbc_async_task_set_callback <- function(task, resolve, reject = NULL,
loop = later::current_loop()) {
# If the task is completed, run the callback (or else the callback
# will not run)
if (adbc_async_task_status(task) == "ready") {
adbc_async_task_run_callback(task, resolve, reject)
} else {
.Call(RAdbcAsyncTaskSetCallback, task, resolve, reject, loop$id)
}

invisible(task)
}

adbc_async_task_run_callback <- function(task, resolve = task$resolve,
reject = task$reject) {
tryCatch({
result <- adbc_async_task_result(task)
resolve(result)
},
error = function(e) {
if (is.null(reject)) {
stop(e)
} else {
reject(e)
}
}
)

invisible(task)
}

adbc_async_task_wait_non_cancellable <- function(task, resolution = 0.1) {
.Call(RAdbcAsyncTaskWaitFor, task, round(resolution * 1000))
}

adbc_async_task_wait <- function(task, resolution = 0.1) {
withCallingHandlers(
status <- .Call(RAdbcAsyncTaskWait, task, round(resolution * 1000)),
interrupt = function(e) {
adbc_async_task_cancel(task)
}
)

if (status != "ready") {
stop(sprintf("Expected status ready but got %s", status))
}

adbc_async_task_result(task)
}

as.promise.adbc_async_task <- function(task) {
force(task)
promises::promise(function(resolve, reject) {
adbc_async_task_set_callback(task, resolve, reject)
})
}

adbc_async_task_cancel <- function(task) {
UseMethod("adbc_async_task_cancel")
}

#' @export
adbc_async_task_cancel.default <- function(task) {
FALSE
}

adbc_async_task_result <- function(task) {
UseMethod("adbc_async_task_result")
}

#' @export
names.adbc_async_task <- function(x) {
names(.Call(RAdbcAsyncTaskData, x))
}

#' @export
`[[.adbc_async_task` <- function(x, i) {
.Call(RAdbcAsyncTaskData, x)[[i]]
}

#' @export
`$.adbc_async_task` <- function(x, name) {
.Call(RAdbcAsyncTaskData, x)[[name]]
}

adbc_async_sleep <- function(duration_ms, error_message = NULL) {
task <- adbc_async_task("adbc_async_sleep")
.Call(RAdbcAsyncTaskLaunchSleep, task, duration_ms)

user_data <- task$user_data
user_data$duration_ms <- duration_ms
user_data$error_message <- error_message

task
}

#' @export
adbc_async_task_result.adbc_async_sleep <- function(task) {
if (!is.null(task$user_data$error_message)) {
cnd <- simpleError(task$user_data$error_message)
class(cnd) <- c("adbc_async_sleep_error", class(cnd))
stop(cnd)
}

task$user_data$duration_ms
}

#' @export
adbc_async_task_cancel.adbc_async_statement_cancellable <- function(task) {
adbc_statement_cancel(task$user_data$statement)
TRUE
}

adbc_statement_prepare_async <- function(statement) {
task <- adbc_async_task(
c("adbc_async_prepare", "adbc_async_statement_cancellable")
)

user_data <- task$user_data
user_data$statement <- statement
.Call(RAdbcAsyncTaskLaunchPrepare, task, statement)

task
}

#' @export
adbc_async_task_result.adbc_async_prepare <- function(task) {
if (!identical(task$return_code, 0L)) {
stop_for_error(task$return_code, task$error_xptr)
}

task$user_data$statement
}

adbc_statement_execute_query_async <- function(statement, stream = NULL) {
task <- adbc_async_task(
c("adbc_async_execute_query", "adbc_async_statement_cancellable")
)

user_data <- task$user_data
user_data$statement <- statement
user_data$stream <- stream

user_data$rows_affected <- .Call(
RAdbcAsyncTaskLaunchExecuteQuery,
task,
statement,
stream
)

task
}

#' @export
adbc_async_task_result.adbc_async_execute_query <- function(task) {
if (!identical(task$return_code, 0L)) {
stop_for_error(task$return_code, task$error_xptr)
}

list(
statement = task$user_data$statement,
stream = task$user_data$stream,
rows_affected = task$user_data$rows_affected
)
}

adbc_statement_stream_get_schema_async <- function(statement, stream) {
task <- adbc_async_task(
c("adbc_async_statement_stream_get_next", "adbc_async_statement_cancellable")
)

user_data <- task$user_data
user_data$statement <- statement
user_data$stream <- stream
user_data$schema <- nanoarrow::nanoarrow_allocate_schema()

user_data$rows_affected <- .Call(
RAdbcAsyncTaskLaunchStreamGetSchema,
task,
stream,
user_data$schema
)

task
}


#' @export
adbc_async_task_result.adbc_async_statement_stream_schema <- function(task) {
if (!identical(task$return_code, 0L)) {
adbc_statement_release(task$user_data$statement)
stop(task$user_data$stream$get_last_error())
}

list(
statement = task$user_data$statement,
array = task$user_data$schema
)
}

adbc_statement_stream_get_next_async <- function(statement, stream) {
task <- adbc_async_task(
c("adbc_async_statement_stream_get_next", "adbc_async_statement_cancellable")
)

user_data <- task$user_data
user_data$statement <- statement
user_data$stream <- stream
user_data$array <- nanoarrow::nanoarrow_allocate_array()

user_data$rows_affected <- .Call(
RAdbcAsyncTaskLaunchStreamGetNext,
task,
stream,
user_data$array
)

task
}

#' @export
adbc_async_task_result.adbc_async_statement_stream_get_next <- function(task) {
if (!identical(task$return_code, 0L)) {
adbc_statement_release(task$user_data$statement)
stop(task$user_data$stream$get_last_error())
}

list(
statement = task$user_data$statement,
array = task$user_data$array
)
}
23 changes: 22 additions & 1 deletion r/adbcdrivermanager/R/error.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ adbc_allocate_error <- function(shelter = NULL, use_legacy_error = NULL) {

stop_for_error <- function(status, error) {
if (!identical(status, 0L)) {
error <- .Call(RAdbcErrorProxy, error)
if (inherits(error, "adbc_error")) {
error <- .Call(RAdbcErrorProxy, error)
} else {
error <- list()
}

error$status <- status
error$status_code_message <- .Call(RAdbcStatusCodeMessage, status)
if (!is.null(error$message)) {
Expand All @@ -79,6 +84,22 @@ stop_for_error <- function(status, error) {
}
}

adbc_error_message <- function(status, error) {
if (!identical(status, 0L)) {
if (inherits(error, "adbc_error")) {
error <- .Call(RAdbcErrorProxy, error)
} else {
error <- list()
}

error$status <- status
error$status_code_message <- .Call(RAdbcStatusCodeMessage, status)
if (!is.null(error$message)) error$message else error$status_code_message
} else {
"OK"
}
}

#' @export
print.adbc_error <- function(x, ...) {
str(x, ...)
Expand Down
Loading
Loading