Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions R/adaptive_btl_refit.R
Original file line number Diff line number Diff line change
Expand Up @@ -1310,18 +1310,14 @@
if (!identical(phase_ctx$phase, "phase_b")) {
return(out)
}
if (length(phase_ctx$ready_spokes) < 1L) {
rlang::abort(
"Phase metadata and routing mode disagree: phase marked phase_b but no ready spokes are available."
)
if (length(phase_ctx$active_spokes %||% integer()) < 1L) {
return(out)
}
hub_id <- as.integer(controller$hub_id %||% 1L)
spoke_ids <- .adaptive_link_spoke_ids(out, hub_id)
spoke_ids <- intersect(spoke_ids, as.integer(phase_ctx$ready_spokes))
spoke_ids <- intersect(spoke_ids, as.integer(phase_ctx$active_spokes))
if (length(spoke_ids) < 1L) {
rlang::abort(
"Phase B linking cannot continue: no ready spoke has valid Phase A artifact eligibility."
)
return(out)
}
link_stats <- controller$link_refit_stats_by_spoke %||% list()
bad_refits <- controller$link_transform_bad_refits_by_spoke %||% list()
Expand Down Expand Up @@ -1670,19 +1666,15 @@
if (!identical(phase_ctx$phase, "phase_b")) {
return(tibble::as_tibble(new_link_stage_log()))
}
if (length(phase_ctx$ready_spokes) < 1L) {
rlang::abort(
"Phase metadata and routing mode disagree: phase marked phase_b but no ready spokes are available."
)
if (length(phase_ctx$active_spokes %||% integer()) < 1L) {
return(tibble::as_tibble(new_link_stage_log()))
}

hub_id <- as.integer(controller$hub_id %||% 1L)
spoke_ids <- .adaptive_link_spoke_ids(state, hub_id = hub_id)
spoke_ids <- intersect(spoke_ids, as.integer(phase_ctx$ready_spokes))
spoke_ids <- intersect(spoke_ids, as.integer(phase_ctx$active_spokes))
if (length(spoke_ids) < 1L) {
rlang::abort(
"Phase B link-stage logging cannot proceed: no ready spoke has valid Phase A artifact eligibility."
)
return(tibble::as_tibble(new_link_stage_log()))
}

step_log <- tibble::as_tibble(state$step_log %||% tibble::tibble())
Expand Down Expand Up @@ -1812,7 +1804,11 @@
.adaptive_link_transform_mode_for_spoke(controller, spoke_id)),
link_refit_mode = as.character(controller$link_refit_mode %||% NA_character_),
hub_lock_mode = as.character(controller$hub_lock_mode %||% NA_character_),
hub_lock_kappa = as.double(controller$hub_lock_kappa %||% NA_real_),
hub_lock_kappa = if (identical(as.character(controller$hub_lock_mode %||% NA_character_), "soft_lock")) {
as.double(controller$hub_lock_kappa %||% NA_real_)
} else {
NA_real_
},
delta_spoke_mean = as.double(stats_row$delta_spoke_mean %||% NA_real_),
delta_spoke_sd = as.double(stats_row$delta_spoke_sd %||% NA_real_),
log_alpha_spoke_mean = as.double(stats_row$log_alpha_spoke_mean %||% NA_real_),
Expand Down
26 changes: 24 additions & 2 deletions R/adaptive_linking_phase_a.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
phase = "phase_a",
pending_run_sets = integer(),
ready_spokes = integer(),
active_spokes = integer(),
stopped_spokes = integer(),
active_phase_a_set = NA_integer_
))
}
Expand All @@ -78,7 +80,14 @@
} else {
NA_integer_
}
status_tbl <- .adaptive_phase_a_status_tbl(state)
stopped_map <- controller$link_stopped_by_spoke %||% list()
stopped_spokes <- integer()
if (length(ready_spokes) > 0L) {
stopped_spokes <- as.integer(ready_spokes[vapply(as.character(ready_spokes), function(key) {
isTRUE(stopped_map[[key]])
}, logical(1L))])
}
active_spokes <- as.integer(setdiff(ready_spokes, stopped_spokes))
phase <- if (length(pending_run_sets) > 0L) {
"phase_a"
} else if (length(ready_spokes) > 0L) {
Expand All @@ -90,6 +99,8 @@
phase = as.character(phase),
pending_run_sets = as.integer(pending_run_sets),
ready_spokes = as.integer(ready_spokes),
active_spokes = as.integer(sort(unique(active_spokes))),
stopped_spokes = as.integer(sort(unique(stopped_spokes))),
active_phase_a_set = as.integer(active_set)
)
}
Expand All @@ -104,10 +115,21 @@
.adaptive_phase_a_required_config_hash <- function(state, set_id) {
controller <- .adaptive_controller_resolve(state)
fit <- state$btl_fit %||% list()
hub_lock_mode <- as.character(controller$hub_lock_mode %||% NA_character_)
hub_lock_kappa <- as.double(controller$hub_lock_kappa %||% NA_real_)
if (!identical(hub_lock_mode, "soft_lock")) {
hub_lock_kappa <- NA_real_
}
payload <- list(
set_id = as.integer(set_id),
judge_param_mode = as.character(controller$judge_param_mode %||% NA_character_),
model_variant = as.character(fit$model_variant %||% "btl_e_b")
model_variant = as.character(fit$model_variant %||% "btl_e_b"),
link_refit_mode = as.character(controller$link_refit_mode %||% NA_character_),
shift_only_theta_treatment = as.character(controller$shift_only_theta_treatment %||% NA_character_),
link_transform_mode = as.character(controller$link_transform_mode %||% NA_character_),
hub_lock_mode = hub_lock_mode,
hub_lock_kappa = hub_lock_kappa,
cross_set_utility = as.character(controller$cross_set_utility %||% NA_character_)
)
.adaptive_phase_a_hash_object(payload)
}
Expand Down
8 changes: 2 additions & 6 deletions R/adaptive_round_candidates.R
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ generate_stage_candidates_from_state <- function(state,
link_phase_b_active <- isTRUE(is_link_mode) && identical(phase_ctx$phase, "phase_b")

if (isTRUE(link_phase_b_active)) {
eligible_spokes <- as.integer(phase_ctx$ready_spokes %||% integer())
eligible_spokes <- as.integer(phase_ctx$active_spokes %||% integer())
if (length(eligible_spokes) < 1L) {
rlang::abort(
"Phase metadata and routing mode disagree: phase marked phase_b but no ready spokes are eligible."
Expand Down Expand Up @@ -640,11 +640,7 @@ generate_stage_candidates_from_state <- function(state,
idx <- coverage_idx
}
}
utility <- if ("link_u" %in% names(cand)) {
as.double(cand$link_u[idx])
} else {
as.double(cand$u0[idx])
}
utility <- as.double(cand$u0[idx])
utility[!is.finite(utility)] <- -Inf
idx[order(-utility, cand$i[idx], cand$j[idx])]
}
Expand Down
86 changes: 80 additions & 6 deletions R/adaptive_run.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,62 @@
list()
}

#' @keywords internal
#' @noRd
.adaptive_link_apply_stop_state <- function(state, link_rows) {
out <- state
rows <- tibble::as_tibble(link_rows %||% tibble::tibble())
if (nrow(rows) < 1L) {
return(out)
}
if (!all(c("spoke_id", "link_stop_pass", "refit_id") %in% names(rows))) {
return(out)
}

controller <- .adaptive_controller_resolve(out)
stopped_map <- controller$link_stopped_by_spoke %||% list()
stop_refit_map <- controller$link_stop_refit_id_by_spoke %||% list()
stop_reason_map <- controller$link_stop_reason_by_spoke %||% list()

for (idx in seq_len(nrow(rows))) {
spoke_id <- as.integer(rows$spoke_id[[idx]] %||% NA_integer_)
if (is.na(spoke_id)) {
next
}
key <- as.character(spoke_id)
if (isTRUE(rows$link_stop_pass[[idx]])) {
stopped_map[[key]] <- TRUE
stop_refit_map[[key]] <- as.integer(rows$refit_id[[idx]] %||% NA_integer_)
stop_reason_map[[key]] <- "link_stop_pass"
} else if (is.null(stopped_map[[key]])) {
stopped_map[[key]] <- FALSE
}
}

controller$link_stopped_by_spoke <- stopped_map
controller$link_stop_refit_id_by_spoke <- stop_refit_map
controller$link_stop_reason_by_spoke <- stop_reason_map
out$controller <- controller
out
}

#' @keywords internal
#' @noRd
.adaptive_link_all_spokes_stopped <- function(state) {
controller <- .adaptive_controller_resolve(state)
run_mode <- as.character(controller$run_mode %||% "within_set")
if (!run_mode %in% c("link_one_spoke", "link_multi_spoke")) {
return(FALSE)
}
phase_ctx <- .adaptive_link_phase_context(state, controller = controller)
if (!identical(phase_ctx$phase, "phase_b")) {
return(FALSE)
}
ready_spokes <- as.integer(phase_ctx$ready_spokes %||% integer())
active_spokes <- as.integer(phase_ctx$active_spokes %||% integer())
length(ready_spokes) > 0L && length(active_spokes) < 1L
}

#' @keywords internal
#' @noRd
.adaptive_link_stage_progress <- function(state, spoke_id, stage_quotas, stage_order, refit_id = NULL) {
Expand Down Expand Up @@ -210,7 +266,7 @@
controller <- .adaptive_controller_resolve(state)
phase_ctx <- .adaptive_link_phase_context(state, controller = controller)
if (.adaptive_link_mode_active(controller) && identical(phase_ctx$phase, "phase_b")) {
eligible_spokes <- as.integer(phase_ctx$ready_spokes %||% integer())
eligible_spokes <- as.integer(phase_ctx$active_spokes %||% integer())
spoke_id <- .adaptive_link_active_spoke(
state = state,
controller = controller,
Expand Down Expand Up @@ -419,7 +475,7 @@
concurrent_mode <- identical(as.character(controller$multi_spoke_mode %||% "independent"), "concurrent")
spokes_to_mark <- as.integer()
if (isTRUE(concurrent_mode) && identical(starvation_reason, "all_eligible_spokes_infeasible")) {
spokes_to_mark <- as.integer(phase_ctx$ready_spokes %||% integer())
spokes_to_mark <- as.integer(phase_ctx$active_spokes %||% integer())
} else {
spoke_id <- as.integer(step_row$link_spoke_id[[1L]] %||% NA_integer_)
if (is.na(spoke_id)) {
Expand Down Expand Up @@ -492,8 +548,9 @@
#' Within-set routing uses TrueSkill base utility
#' \deqn{U_0 = p_{ij}(1 - p_{ij})} where \eqn{p_{ij}} is the current TrueSkill
#' win probability for pair \eqn{\{i, j\}}.
#' In linking Phase B, cross-set candidates are ranked using model-implied
#' predictive utility under the current transform and judge parameters.
#' In linking Phase B, pair choice remains TrueSkill-based and never uses BTL
#' posterior quantities. Model-implied predictive probabilities/utility are
#' logged for diagnostics only and do not affect selection.
#' When \code{judge_param_mode = "phase_specific"}, the first Phase B startup
#' step may use deterministic fallback from available within/shared judge
#' estimates if link-specific estimates are not yet available; once link-specific
Expand Down Expand Up @@ -634,8 +691,8 @@ adaptive_rank_start <- function(items,
#' Pair selection does not use BTL posterior draws.
#' Within-set routing is TrueSkill-based with utility
#' \deqn{U_0 = p_{ij}(1 - p_{ij})}.
#' Linking Phase B cross-set routing uses model-implied predictive utility under
#' the current transform and judge parameters.
#' Linking Phase B cross-set routing is also TrueSkill-based; model-implied
#' predictive probabilities/utility are recorded for diagnostics only.
#' When \code{judge_param_mode = "phase_specific"}, startup can use deterministic
#' fallback from within/shared judge estimates only until link-specific estimates
#' are expected, after which malformed link estimates abort.
Expand Down Expand Up @@ -997,6 +1054,14 @@ adaptive_rank_run_live <- function(state,
while (remaining > 0L) {
state <- .adaptive_phase_a_prepare(state)
.adaptive_phase_a_gate_or_abort(state)
if (isTRUE(.adaptive_link_all_spokes_stopped(state))) {
state$meta$stop_decision <- TRUE
state$meta$stop_reason <- "all_spokes_stopped"
if (!is.null(state$config$session_dir)) {
save_adaptive_session(state, session_dir = state$config$session_dir, overwrite = TRUE)
}
return(state)
}
state <- .adaptive_link_sync_warm_start(state)
state <- .adaptive_round_activate_if_ready(state)
state <- run_one_step(state, judge, ...)
Expand Down Expand Up @@ -1066,6 +1131,7 @@ adaptive_rank_run_live <- function(state,
state$link_stage_log %||% new_link_stage_log(),
link_rows
)
state <- .adaptive_link_apply_stop_state(state, link_rows)
}
item_log_tbl <- .adaptive_build_item_log_refit(
state,
Expand Down Expand Up @@ -1094,6 +1160,14 @@ adaptive_rank_run_live <- function(state,
}
return(state)
}
if (isTRUE(.adaptive_link_all_spokes_stopped(state))) {
state$meta$stop_decision <- TRUE
state$meta$stop_reason <- "all_spokes_stopped"
if (!is.null(state$config$session_dir)) {
save_adaptive_session(state, session_dir = state$config$session_dir, overwrite = TRUE)
}
return(state)
}
}
state <- .adaptive_phase_a_prepare(state)
.adaptive_phase_a_gate_or_abort(state)
Expand Down
Loading