Skip to content

Commit ef60f6f

Browse files
skullcrushercmdskullcmd
andauthored
feat(worker): fetch inventory policy from control plane at agentd startup (#30)
* feat(core): add InventoryPolicySnapshot for worker policy fetch * feat(config): inventory_policy_snapshot accessor + with_inventory_policy_snapshot apply * feat(worker-api): LoadInventoryPolicy request/response variant + client method * feat(api): serve LoadInventoryPolicy from worker_control endpoint * feat(worker): fetch inventory policy from control plane at agentd startup Worker now fetches the live allowed_host_suffixes / allowed_hosts / allowed_cidrs / allowed_ports from anyscan-api at startup before claiming any port-scans, and refreshes on a configurable cadence (default 300s, env AGENT_INVENTORY_REFRESH_SECONDS). This unblocks the prod scan #16 failure mode where workers fell back to InventoryConfig::default() (allowed_host_suffixes=["localhost"]), causing the streaming follow-on flusher to drop every internet IP via config.normalize_target_definition's host_is_allowed gate even after the API allowlist had been widened. Local /etc/agentd/runtime.env values remain a fallback when the control plane is unreachable; fetch failures log warn! and keep the prior policy in memory. Workers do NOT crash on fetch failure. * test(worker): make inventory refresh interval test pure to avoid env-var races The previous test used unsafe std::env::set_var/remove_var which flaked under cargo test's default multithreaded executor (set_var races with other threads reading env). Extract a pure parse_inventory_refresh_interval(Option<&str>) that takes the raw env value as a parameter; the env-reading wrapper is one line and trivially correct. The test now drives the pure function with no shared mutable state. * fix(worker): always run inventory refresh + register before run_once fetch Two review issues from codex on PR #30: 1. run_daemon: the periodic refresh check was placed near the bottom of the loop, after every claim arm that calls 'continue'. On a busy worker repeatedly claiming bootstrap_jobs / port_scans / runs the refresh block was never reached, so control-plane allowlist changes never propagated until the worker idled — defeating the whole point of the periodic refresh for the heavy-traffic case. Move the refresh to immediately after seed_bootstrap_inventory / queue_due_schedules_with_events (which always run), before any continue-able claim attempt. Bonus: claims spawned in the same iteration now operate against the freshest fetched policy. 2. run_once: the initial fetch ran before register_worker_or_bail. But non-register /api/worker/control requests in worker_control require an already-registered worker token; a fresh agent's first LoadInventoryPolicy returned 401, fell through to local fallback, and run_once never refreshed again. Register first, then fetch — matches the run_daemon ordering already in place. * fix(worker): hoist inventory refresh above all continue-able loop branches Codex P2 follow-up: the previous fix (dde4324) placed the refresh after seed_bootstrap_inventory / queue_due_schedules_with_events, which fixed the bootstrap-job / port-scan / run claim shadowing — but two earlier branches still continue above it: claim_next_pending_remote_command and the remote_update scheduling block. A worker continuously receiving remote debug commands or remote updates would loop forever without re-evaluating the refresh predicate. Hoist the refresh to the very top of each loop iteration, immediately after the try_join_next completion-collection (which doesn't continue). Now every iteration runs the refresh predicate exactly once regardless of which downstream fast-path fires. --------- Co-authored-by: skullcmd <skullcmd@anyvm.tech>
1 parent d4986a7 commit ef60f6f

5 files changed

Lines changed: 327 additions & 7 deletions

File tree

src/bin/anyscan-api.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2019,6 +2019,9 @@ async fn worker_control(
20192019
.load_scan_settings()
20202020
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
20212021
},
2022+
WorkerControlRequest::LoadInventoryPolicy => WorkerControlResponse::InventoryPolicy {
2023+
policy: state.config.inventory_policy_snapshot(),
2024+
},
20222025
};
20232026

20242027
Ok(Json(response))

src/bin/anyscan-worker.rs

Lines changed: 170 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ async fn run_daemon_with_retry(
462462
}
463463

464464
async fn run_daemon(
465-
config: AppConfig,
465+
mut config: AppConfig,
466466
worker_id: String,
467467
store: AnyScanStore,
468468
detectors: DetectorEngine,
@@ -476,6 +476,19 @@ async fn run_daemon(
476476
&worker_runtime.registration,
477477
worker_registration_ttl,
478478
)?;
479+
// Initial inventory policy fetch — best-effort. Failure here keeps the
480+
// local /etc/agentd/runtime.env fallback (or InventoryConfig::default())
481+
// in place so the worker can still claim non-policy-gated tasks. The
482+
// streaming follow-on flusher's host_is_allowed check will keep dropping
483+
// hosts outside the local allowlist until a refresh succeeds.
484+
if let Err(error) = refresh_inventory_policy_from_control_plane(&mut config, &store) {
485+
warn!(
486+
%error,
487+
"initial inventory policy fetch failed; using local fallback (workers may drop hosts outside the local allowlist)"
488+
);
489+
}
490+
let inventory_refresh_interval = inventory_refresh_interval();
491+
let mut last_inventory_refresh_at = Instant::now();
479492
let (remote_update_tx, remote_update_rx) =
480493
watch::channel(registered_worker.remote_update_requested_at);
481494
let (registration_shutdown_tx, registration_shutdown_rx) = oneshot::channel();
@@ -520,6 +533,25 @@ async fn run_daemon(
520533
}
521534
}
522535

536+
// Refresh inventory policy at the very top of every iteration,
537+
// before any of the work-claim arms below. Multiple branches in
538+
// this loop (remote-debug commands, remote-update scheduling,
539+
// bootstrap-job / port-scan / runnable-run claims, archive pass)
540+
// each `continue` back to the loop top on success. If the
541+
// refresh sat below any of them, a worker continuously feeding
542+
// on that signal would never re-evaluate the refresh predicate
543+
// and the control-plane allowlist changes would never propagate.
544+
// Placing it above every `continue`-able branch guarantees it
545+
// runs once per iteration regardless of which fast path fires.
546+
if last_inventory_refresh_at.elapsed() >= inventory_refresh_interval {
547+
if let Err(error) =
548+
refresh_inventory_policy_from_control_plane(&mut config, &store)
549+
{
550+
warn!(%error, "inventory policy refresh failed; keeping prior policy");
551+
}
552+
last_inventory_refresh_at = Instant::now();
553+
}
554+
523555
if worker_runtime.registration.supports_remote_debug_commands {
524556
if let Some(command) = store.claim_next_pending_remote_command()? {
525557
info!(
@@ -706,14 +738,27 @@ async fn run_once(
706738
detectors: DetectorEngine,
707739
worker_runtime: &WorkerRuntime,
708740
) -> Result<()> {
709-
let worker_registration_ttl = worker_registration_ttl_seconds(config);
741+
let mut config = config.clone();
742+
let worker_registration_ttl = worker_registration_ttl_seconds(&config);
710743
let worker_registration_interval =
711-
worker_registration_refresh_interval(config, worker_registration_ttl);
744+
worker_registration_refresh_interval(&config, worker_registration_ttl);
712745
let registered_worker = register_worker_or_bail(
713746
&store,
714747
&worker_runtime.registration,
715748
worker_registration_ttl,
716749
)?;
750+
// Fetch inventory policy AFTER register_worker_or_bail — non-register
751+
// /api/worker/control requests in worker_control authenticate as an
752+
// already-registered worker, so a fresh agent has to register first or
753+
// the LoadInventoryPolicy call returns 401 and falls back to the local
754+
// policy for the entire one-shot run.
755+
if let Err(error) = refresh_inventory_policy_from_control_plane(&mut config, &store) {
756+
warn!(
757+
%error,
758+
"initial inventory policy fetch failed; using local fallback"
759+
);
760+
}
761+
let config = &config;
717762
let (remote_update_tx, remote_update_rx) =
718763
watch::channel(registered_worker.remote_update_requested_at);
719764
let (registration_shutdown_tx, registration_shutdown_rx) = oneshot::channel();
@@ -5507,12 +5552,52 @@ fn load_effective_runtime_config(
55075552
base_config.with_scan_defaults_summary(&scan_settings)
55085553
}
55095554

5555+
const INVENTORY_REFRESH_INTERVAL_ENV: &str = "AGENT_INVENTORY_REFRESH_SECONDS";
5556+
const DEFAULT_INVENTORY_REFRESH_INTERVAL_SECONDS: u64 = 300;
5557+
5558+
fn parse_inventory_refresh_interval(raw: Option<&str>) -> Duration {
5559+
let parsed = raw
5560+
.and_then(|value| value.trim().parse::<u64>().ok())
5561+
.filter(|value| *value > 0)
5562+
.unwrap_or(DEFAULT_INVENTORY_REFRESH_INTERVAL_SECONDS);
5563+
Duration::from_secs(parsed)
5564+
}
5565+
5566+
fn inventory_refresh_interval() -> Duration {
5567+
parse_inventory_refresh_interval(env::var(INVENTORY_REFRESH_INTERVAL_ENV).ok().as_deref())
5568+
}
5569+
5570+
fn apply_inventory_policy_snapshot_to_config(
5571+
config: &mut AppConfig,
5572+
snapshot: &anyscan::core::InventoryPolicySnapshot,
5573+
) -> Result<()> {
5574+
let updated = config.with_inventory_policy_snapshot(snapshot)?;
5575+
config.inventory = updated.inventory;
5576+
Ok(())
5577+
}
5578+
5579+
fn refresh_inventory_policy_from_control_plane(
5580+
config: &mut AppConfig,
5581+
store: &AnyScanStore,
5582+
) -> Result<()> {
5583+
let snapshot = store.load_inventory_policy()?;
5584+
apply_inventory_policy_snapshot_to_config(config, &snapshot)?;
5585+
info!(
5586+
allowed_host_suffixes = config.inventory.allowed_host_suffixes.len(),
5587+
allowed_hosts = config.inventory.allowed_hosts.len(),
5588+
allowed_cidrs = config.inventory.allowed_cidrs.len(),
5589+
allowed_ports = config.inventory.allowed_ports.len(),
5590+
"refreshed inventory policy from control plane"
5591+
);
5592+
Ok(())
5593+
}
5594+
55105595
#[cfg(test)]
55115596
mod tests {
55125597
use super::{
55135598
DiscoveredEndpoint, PortScanFollowOnSelectionMode, ReportedProtocolPluginFinding,
55145599
ScannerOutputCounter, WORKER_REGISTRATION_TTL_MULTIPLIER,
5515-
apply_follow_on_selection_mode_to_targets,
5600+
apply_follow_on_selection_mode_to_targets, apply_inventory_policy_snapshot_to_config,
55165601
derive_protocol_plugin_findings_with_active_mode, endpoint_cache_key,
55175602
filter_endpoints_excluding_streamed, normalize_platform_architecture,
55185603
normalize_platform_operating_system, parse_endpoint_token, parse_ip_addr_show_output,
@@ -6524,4 +6609,85 @@ mod tests {
65246609

65256610
let _ = fs::remove_file(&path);
65266611
}
6612+
6613+
#[test]
6614+
fn apply_inventory_policy_snapshot_overwrites_allowlists_in_config() {
6615+
use anyscan::core::InventoryPolicySnapshot;
6616+
6617+
let mut config = AppConfig::default();
6618+
// local fallback default: only "localhost" is allowed
6619+
assert_eq!(
6620+
config.inventory.allowed_host_suffixes,
6621+
vec!["localhost".to_string()]
6622+
);
6623+
6624+
let snapshot = InventoryPolicySnapshot {
6625+
allowed_host_suffixes: vec!["example.com".to_string()],
6626+
allowed_hosts: vec!["box.example.net".to_string()],
6627+
allowed_cidrs: vec!["10.0.0.0/8".to_string()],
6628+
allowed_ports: vec![80, 443, 8080],
6629+
};
6630+
6631+
apply_inventory_policy_snapshot_to_config(&mut config, &snapshot)
6632+
.expect("snapshot should apply cleanly");
6633+
6634+
assert_eq!(
6635+
config.inventory.allowed_host_suffixes,
6636+
vec!["example.com".to_string()]
6637+
);
6638+
assert_eq!(
6639+
config.inventory.allowed_hosts,
6640+
vec!["box.example.net".to_string()]
6641+
);
6642+
assert_eq!(config.inventory.allowed_cidrs, vec!["10.0.0.0/8".to_string()]);
6643+
assert_eq!(config.inventory.allowed_ports, vec![80, 443, 8080]);
6644+
6645+
// The previous "localhost" suffix is gone — the API host's policy wins.
6646+
assert!(config.host_is_allowed("box.example.net"));
6647+
assert!(config.host_is_allowed("api.example.com"));
6648+
assert!(config.host_is_allowed("10.0.0.42"));
6649+
assert!(!config.host_is_allowed("evil.test"));
6650+
assert!(!config.host_is_allowed("localhost"));
6651+
}
6652+
6653+
#[test]
6654+
fn parse_inventory_refresh_interval_handles_unset_zero_and_invalid_inputs() {
6655+
use super::parse_inventory_refresh_interval;
6656+
6657+
// Unset / None -> default
6658+
assert_eq!(
6659+
parse_inventory_refresh_interval(None),
6660+
Duration::from_secs(300)
6661+
);
6662+
6663+
// Valid positive integer
6664+
assert_eq!(
6665+
parse_inventory_refresh_interval(Some("42")),
6666+
Duration::from_secs(42)
6667+
);
6668+
6669+
// Whitespace is trimmed
6670+
assert_eq!(
6671+
parse_inventory_refresh_interval(Some(" 42 ")),
6672+
Duration::from_secs(42)
6673+
);
6674+
6675+
// Zero is rejected — falls back to default
6676+
assert_eq!(
6677+
parse_inventory_refresh_interval(Some("0")),
6678+
Duration::from_secs(300)
6679+
);
6680+
6681+
// Non-numeric -> default
6682+
assert_eq!(
6683+
parse_inventory_refresh_interval(Some("not-a-number")),
6684+
Duration::from_secs(300)
6685+
);
6686+
6687+
// Empty string -> default
6688+
assert_eq!(
6689+
parse_inventory_refresh_interval(Some("")),
6690+
Duration::from_secs(300)
6691+
);
6692+
}
65276693
}

src/config.rs

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2907,6 +2907,56 @@ impl AppConfig {
29072907
}
29082908
}
29092909

2910+
pub fn inventory_policy_snapshot(&self) -> crate::core::InventoryPolicySnapshot {
2911+
crate::core::InventoryPolicySnapshot {
2912+
allowed_host_suffixes: self.inventory.allowed_host_suffixes.clone(),
2913+
allowed_hosts: self.inventory.allowed_hosts.clone(),
2914+
allowed_cidrs: self.inventory.allowed_cidrs.clone(),
2915+
allowed_ports: self.inventory.allowed_ports.clone(),
2916+
}
2917+
}
2918+
2919+
pub fn with_inventory_policy_snapshot(
2920+
&self,
2921+
snapshot: &crate::core::InventoryPolicySnapshot,
2922+
) -> Result<Self> {
2923+
let mut config = self.clone();
2924+
2925+
let mut suffixes = snapshot
2926+
.allowed_host_suffixes
2927+
.iter()
2928+
.filter_map(|value| normalize_inventory_host(value))
2929+
.collect::<Vec<_>>();
2930+
suffixes.sort();
2931+
suffixes.dedup();
2932+
config.inventory.allowed_host_suffixes = suffixes;
2933+
2934+
let mut hosts = snapshot
2935+
.allowed_hosts
2936+
.iter()
2937+
.filter_map(|value| normalize_inventory_host(value))
2938+
.collect::<Vec<_>>();
2939+
hosts.sort();
2940+
hosts.dedup();
2941+
config.inventory.allowed_hosts = hosts;
2942+
2943+
let mut cidrs = snapshot
2944+
.allowed_cidrs
2945+
.iter()
2946+
.map(|value| normalize_inventory_cidr(value))
2947+
.collect::<Result<Vec<_>>>()?;
2948+
cidrs.sort();
2949+
cidrs.dedup();
2950+
config.inventory.allowed_cidrs = cidrs;
2951+
2952+
let mut ports = snapshot.allowed_ports.clone();
2953+
ports.sort_unstable();
2954+
ports.dedup();
2955+
config.inventory.allowed_ports = ports;
2956+
2957+
Ok(config)
2958+
}
2959+
29102960
pub fn with_scan_defaults_summary(&self, summary: &ScanDefaultsSummary) -> Result<Self> {
29112961
let mut config = self.clone();
29122962
config.scan.request_engine_mode = summary.request_engine_mode;
@@ -3627,8 +3677,8 @@ mod tests {
36273677
};
36283678

36293679
use crate::core::{
3630-
GobusterTargetConfig, PortScanRequest, RepositoryDefinition, RequestEngineMode,
3631-
ScanDefaultsSummary, TargetDefinition, TargetStrategy,
3680+
GobusterTargetConfig, InventoryPolicySnapshot, PortScanRequest, RepositoryDefinition,
3681+
RequestEngineMode, ScanDefaultsSummary, TargetDefinition, TargetStrategy,
36323682
};
36333683

36343684
use super::{
@@ -3655,6 +3705,51 @@ mod tests {
36553705
}
36563706
}
36573707

3708+
#[test]
3709+
fn inventory_policy_snapshot_round_trips_into_app_config() {
3710+
let mut base = AppConfig::default();
3711+
base.inventory.allowed_host_suffixes = vec!["localhost".to_string()];
3712+
base.inventory.allowed_hosts.clear();
3713+
base.inventory.allowed_cidrs.clear();
3714+
base.inventory.allowed_ports.clear();
3715+
3716+
let snapshot = InventoryPolicySnapshot {
3717+
allowed_host_suffixes: vec![".example.com".to_string(), "Example.NET".to_string()],
3718+
allowed_hosts: vec!["BOX.Example.NET".to_string()],
3719+
allowed_cidrs: vec!["10.0.0.0/8".to_string()],
3720+
allowed_ports: vec![443, 80, 80],
3721+
};
3722+
3723+
let updated = base
3724+
.with_inventory_policy_snapshot(&snapshot)
3725+
.expect("snapshot should apply cleanly");
3726+
3727+
// host suffixes are lowercased, sorted, deduped (matches normalize_inventory)
3728+
assert_eq!(
3729+
updated.inventory.allowed_host_suffixes,
3730+
vec![".example.com".to_string(), "example.net".to_string()]
3731+
);
3732+
assert_eq!(
3733+
updated.inventory.allowed_hosts,
3734+
vec!["box.example.net".to_string()]
3735+
);
3736+
assert_eq!(
3737+
updated.inventory.allowed_cidrs,
3738+
vec!["10.0.0.0/8".to_string()]
3739+
);
3740+
assert_eq!(updated.inventory.allowed_ports, vec![80, 443]);
3741+
3742+
// round-trip the normalized policy back out and confirm field equality
3743+
let round_trip = updated.inventory_policy_snapshot();
3744+
assert_eq!(
3745+
round_trip.allowed_host_suffixes,
3746+
updated.inventory.allowed_host_suffixes
3747+
);
3748+
assert_eq!(round_trip.allowed_hosts, updated.inventory.allowed_hosts);
3749+
assert_eq!(round_trip.allowed_cidrs, updated.inventory.allowed_cidrs);
3750+
assert_eq!(round_trip.allowed_ports, updated.inventory.allowed_ports);
3751+
}
3752+
36583753
#[test]
36593754
fn exact_allowed_hosts_accept_ip_literals_and_ipv6_endpoints() {
36603755
let mut config = AppConfig::default();

src/core.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2513,6 +2513,18 @@ pub struct ScanDefaultsSummary {
25132513
pub directory_probing_discover_backup: bool,
25142514
}
25152515

2516+
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
2517+
pub struct InventoryPolicySnapshot {
2518+
#[serde(default)]
2519+
pub allowed_host_suffixes: Vec<String>,
2520+
#[serde(default)]
2521+
pub allowed_hosts: Vec<String>,
2522+
#[serde(default)]
2523+
pub allowed_cidrs: Vec<String>,
2524+
#[serde(default)]
2525+
pub allowed_ports: Vec<u16>,
2526+
}
2527+
25162528
pub fn merge_coverage_source_stat(
25172529
stats: &mut Vec<CoverageSourceStat>,
25182530
source: &str,

0 commit comments

Comments
 (0)