diff --git a/kms-connector/Cargo.lock b/kms-connector/Cargo.lock index b8a8c262ae..679950d396 100644 --- a/kms-connector/Cargo.lock +++ b/kms-connector/Cargo.lock @@ -2376,12 +2376,12 @@ dependencies = [ [[package]] name = "config" -version = "0.15.15" +version = "0.15.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0faa974509d38b33ff89282db9c3295707ccf031727c0de9772038ec526852ba" +checksum = "b30fa8254caad766fc03cb0ccae691e14bf3bd72bfff27f72802ce729551b3d6" dependencies = [ "pathdiff", - "serde", + "serde_core", "toml", "winnow", ] @@ -2402,6 +2402,7 @@ dependencies = [ "fhevm_gateway_bindings", "futures", "git-version", + "humantime-serde", "kms-grpc", "opentelemetry", "opentelemetry-otlp", @@ -2413,7 +2414,6 @@ dependencies = [ "serde_json", "serial_test", "sqlx", - "tempfile", "testcontainers", "tfhe", "thiserror 2.0.12", @@ -3321,13 +3321,13 @@ dependencies = [ "bc2wrap", "connector-utils", "fhevm_gateway_bindings", + "humantime-serde", "opentelemetry", "prometheus", "rstest", "serde", "serial_test", "sqlx", - "tempfile", "testcontainers", "tokio", "tokio-stream", @@ -3544,6 +3544,22 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.32" @@ -4041,10 +4057,10 @@ dependencies = [ "alloy", "anyhow", "bc2wrap", - "config", "connector-utils", "dashmap", "fhevm_gateway_bindings", + "humantime-serde", "kms-grpc", "mocktail", "prometheus", @@ -4053,7 +4069,6 @@ dependencies = [ "serial_test", "sha3", "sqlx", - "tempfile", "testcontainers", "thiserror 2.0.12", "tokio", @@ -5863,9 +5878,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dca6411025b24b60bfa7ec1fe1f8e710ac09782dca409ee8237ba74b51295fd" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", "serde_derive", @@ -5873,18 +5888,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.226" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -6932,12 +6947,12 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.5" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75129e1dc5000bfbaa9fee9d1b21f974f9fbad9daec557a521ee6e080825f6e8" +checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" dependencies = [ "indexmap 2.11.4", - "serde", + "serde_core", "serde_spanned", "toml_datetime", "toml_parser", @@ -7223,13 +7238,13 @@ dependencies = [ "bc2wrap", "connector-utils", "fhevm_gateway_bindings", + "humantime-serde", "prometheus", "rstest", "serde", "serde_json", "serial_test", "sqlx", - "tempfile", "thiserror 2.0.12", "tokio", "tokio-stream", diff --git a/kms-connector/Cargo.toml b/kms-connector/Cargo.toml index 42aa1940f6..9246c542e7 100644 --- a/kms-connector/Cargo.toml +++ b/kms-connector/Cargo.toml @@ -44,10 +44,11 @@ clap = { version = "=4.5.47", default-features = true, features = [ "cargo", "derive", ] } -config = { version = "=0.15.15", default-features = false, features = ["toml"] } +config = { version = "=0.15.19", default-features = false, features = ["toml"] } dashmap = { version = "=6.1.0", default-features = false } futures = { version = "=0.3.31", default-features = false } git-version = { version = "=0.3.9", default-features = false } +humantime-serde = "=1.1.1" opentelemetry = "=0.30.0" opentelemetry-otlp = { version = "=0.30.0", features = ["grpc-tonic"] } opentelemetry_sdk = "=0.30.0" @@ -55,7 +56,7 @@ prometheus = "=0.14.0" rustls = { version = "=0.23.31", default-features = false, features = [ "aws-lc-rs", ] } -serde = { version = "=1.0.226", default-features = false, features = [ +serde = { version = "=1.0.228", default-features = false, features = [ "derive", "std", ] } @@ -98,7 +99,6 @@ mocktail = "=0.3.0" rand = "=0.9.2" rstest = "=0.26.1" serial_test = "3.2.0" -tempfile = "=3.20.0" testcontainers = "=0.24.0" -toml = { version = "=0.9.5", default-features = true } +toml = { version = "=0.9.8", default-features = true } tracing-test = { version = "=0.2.5", default-features = false } diff --git a/kms-connector/config/gw-listener.toml b/kms-connector/config/gw-listener.toml index c01b160d0f..dfe26469cb 100644 --- a/kms-connector/config/gw-listener.toml +++ b/kms-connector/config/gw-listener.toml @@ -10,19 +10,43 @@ chain_id = 54321 # Gateway WebSocket RPC URL endpoint (required) # ENV: KMS_CONNECTOR_GATEWAY_URL -gateway_url = "ws://localhost:8545" +gateway_url = "http://localhost:8545" # URL of the KMS Connector internal database (required) # Format: see https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS # ENV: KMS_CONNECTOR_DATABASE_URL database_url = "postgres://postgres:postgres@localhost/kms-connector" +# The endpoint used to monitor the connector components (optional, defaults to "0.0.0.0:9100") +# ENV: KMS_CONNECTOR_MONITORING_ENDPOINT +# monitoring_endpoint = "0.0.0.0:9100" + +# The timeout to perform each external service connection healthcheck (optional, defaults to 3s) +# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT (format: https://docs.rs/humantime/latest/humantime/) +# healthcheck_timeout = "3s" + +# The maximum number of tasks to process events/responses concurrently (optional, defaults to 1000) +# ENV: KMS_CONNECTOR_TASK_LIMIT +# task_limit = 1000 + +# The polling interval for decryption requests (optional, defaults to 1s) +# ENV: KMS_CONNECTOR_DECRYPTION_POLLING (format: https://docs.rs/humantime/latest/humantime/) +# decryption_polling = "1s" + +# The polling interval for key management requests (optional, defaults to 30s) +# ENV: KMS_CONNECTOR_KMS_GENERATION_POLLING (format: https://docs.rs/humantime/latest/humantime/) +# key_management_polling = "30s" + +# Block number to start processing from (optional, defaults to latest block if not set) +# ENV: KMS_CONNECTOR_FROM_BLOCK_NUMBER +# from_block_number = 1234 + # Configuration of the Decryption contract (required) [decryption_contract] # Address of the Decryption contract (required) # Format: hex string with 0x prefix # ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS -address = "0xF0bFB159C7381F7CB332586004d8247252C5b816" +address = "0x0000000000000000000000000000000000000000" # EIP-712 domain name for Decryption contract (optional, defaults to "Decryption") # ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__DOMAIN_NAME # domain_name = "Decryption" @@ -35,34 +59,10 @@ address = "0xF0bFB159C7381F7CB332586004d8247252C5b816" # Address of the KMSGeneration contract (required) # Format: hex string with 0x prefix # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS -address = "0x5ffdaAB0373E62E2ea2944776209aEf29E631A64" +address = "0x0000000000000000000000000000000000000000" # EIP-712 domain name for KMSGeneration contract (optional, defaults to "KMSGeneration") # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_NAME # domain_name = "KMSGeneration" # EIP-712 domain version for KMSGeneration contract (optional, defaults to "1") # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_VERSION # domain_version = "1" - -# The endpoint used to monitor the connector components (optional, defaults to "0.0.0.0:9100") -# ENV: KMS_CONNECTOR_MONITORING_ENDPOINT -# monitoring_endpoint = "0.0.0.0:9100" - -# The timeout to perform each external service connection healthcheck (optional, defaults to 3s) -# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT_SECS -# healthcheck_timeout_secs = 3 - -# The maximum number of tasks to process events/responses concurrently (optional, defaults to 1000) -# ENV: KMS_CONNECTOR_TASK_LIMIT -# task_limit = 1000 - -# The polling interval for decryption requests (optional, defaults to 1000ms) -# ENV: KMS_CONNECTOR_DECRYPTION_POLLING_MS -# decryption_polling_ms = 1000 - -# The polling interval for key management requests (optional, defaults to 30000ms) -# ENV: KMS_CONNECTOR_KMS_GENERATION_POLLING_MS -# key_management_polling_ms = 30000 - -# Block number to start processing from (optional, defaults to latest block if not set) -# ENV: KMS_CONNECTOR_FROM_BLOCK_NUMBER -# from_block_number = 1234 diff --git a/kms-connector/config/kms-worker.toml b/kms-connector/config/kms-worker.toml index 4503252b0a..057fa9a136 100644 --- a/kms-connector/config/kms-worker.toml +++ b/kms-connector/config/kms-worker.toml @@ -13,11 +13,11 @@ chain_id = 54321 # ENV: KMS_CONNECTOR_KMS_CORE_ENDPOINTS # If using environment variables, use a comma separated list of URLs # Ex: KMS_CONNECTOR_KMS_CORE_ENDPOINTS="http://[::1]:50051,http://[::1]:50052" -kms_core_endpoints = ["http://[::1]:50051"] +kms_core_endpoints = ["http://localhost:50051"] # Gateway WebSocket RPC URL endpoint (required) # ENV: KMS_CONNECTOR_GATEWAY_URL -gateway_url = "ws://localhost:8545" +gateway_url = "http://localhost:8545" # URL of the KMS Connector internal database (required) # Format: see https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS @@ -25,19 +25,55 @@ gateway_url = "ws://localhost:8545" database_url = "postgres://postgres:postgres@localhost/kms-connector" # Timeout to poll the database for fast events, such as decryption (optional, defaults to 3 seconds) -# ENV: KMS_CONNECTOR_DB_FAST_EVENT_POLLING_SECS -# db_fast_event_polling_secs = 3 +# ENV: KMS_CONNECTOR_DB_FAST_EVENT_POLLING (format: https://docs.rs/humantime/latest/humantime/) +# db_fast_event_polling = "3s" # Timeout to poll the database for long events, such as keygen (optional, defaults to 60 seconds) -# ENV: KMS_CONNECTOR_DB_LONG_EVENT_POLLING_SECS -# db_long_event_polling_secs = 60 +# ENV: KMS_CONNECTOR_DB_LONG_EVENT_POLLING (format: https://docs.rs/humantime/latest/humantime/) +# db_long_event_polling = "60s" + +# The endpoint used to monitor the connector components (optional, defaults to "0.0.0.0:9100") +# ENV: KMS_CONNECTOR_MONITORING_ENDPOINT +# monitoring_endpoint = "0.0.0.0:9100" + +# The timeout to perform each external service connection healthcheck (optional, defaults to 3s) +# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT (format: https://docs.rs/humantime/latest/humantime/) +# healthcheck_timeout = "3s" + +# The limit number of events to fetch from the database (optional, defaults to 50) +# ENV: KMS_CONNECTOR_EVENTS_BATCH_SIZE +# events_batch_size = 50 + +# Number of retries for GRPC requests sent to the KMS Core (optional, defaults to 3) +# ENV: KMS_CONNECTOR_GRPC_REQUEST_RETRIES +# grpc_request_retries = 3 + +# The maximum number of decryption attempts before a decryption request is permanently removed (optional, defaults to 200) +# ENV: KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS +# max_decryption_attempts = 200 + +# Retry interval to poll GRPC responses from KMS Core (optional, defaults to 1s) +# ENV: KMS_CONNECTOR_GRPC_POLL_INTERVAL (format: https://docs.rs/humantime/latest/humantime/) +# grpc_poll_interval = "1s" + +# Number of retries for S3 ciphertext retrieval (optional, default: 3). +# ENV: KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES +# s3_ciphertext_retrieval_retries = 3 + +# Timeout to connect to a S3 bucket (optional, defaults to 3s) +# ENV: KMS_CONNECTOR_S3_CONNECT_TIMEOUT (format: https://docs.rs/humantime/latest/humantime/) +# s3_connect_timeout = "3s" + +# The maximum number of tasks to process events/responses concurrently (optional, defaults to 1000) +# ENV: KMS_CONNECTOR_TASK_LIMIT +# task_limit = 1000 # Configuration of the Decryption contract (required) [decryption_contract] # Address of the Decryption contract (required) # Format: hex string with 0x prefix # ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS -address = "0xF0bFB159C7381F7CB332586004d8247252C5b816" +address = "0x0000000000000000000000000000000000000000" # EIP-712 domain name for Decryption contract (optional, defaults to "Decryption") # ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__DOMAIN_NAME # domain_name = "Decryption" @@ -50,7 +86,7 @@ address = "0xF0bFB159C7381F7CB332586004d8247252C5b816" # Address of the GatewayConfig contract (required) # Format: hex string with 0x prefix # ENV: KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__ADDRESS -address = "0x5ffdaAB0373E62E2ea2944776209aEf29E631A64" +address = "0x0000000000000000000000000000000000000000" # EIP-712 domain name for GatewayConfig contract (optional, defaults to "GatewayConfig") # ENV: KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__DOMAIN_NAME # domain_name = "GatewayConfig" @@ -63,59 +99,10 @@ address = "0x5ffdaAB0373E62E2ea2944776209aEf29E631A64" # Address of the KMSGeneration contract (required) # Format: hex string with 0x prefix # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS -address = "0x5ffdaAB0373E62E2ea2944776209aEf29E631A64" +address = "0x0000000000000000000000000000000000000000" # EIP-712 domain name for KMSGeneration contract (optional, defaults to "KMSGeneration") # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_NAME # domain_name = "KMSGeneration" # EIP-712 domain version for KMSGeneration contract (optional, defaults to "1") # ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_VERSION # domain_version = "1" - -# The endpoint used to monitor the connector components (optional, defaults to "0.0.0.0:9100") -# ENV: KMS_CONNECTOR_MONITORING_ENDPOINT -# monitoring_endpoint = "0.0.0.0:9100" - -# The timeout to perform each external service connection healthcheck (optional, defaults to 3s) -# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT_SECS -# healthcheck_timeout_secs = 3 - -# The limit number of events to fetch from the database (optional, defaults to 50) -# ENV: KMS_CONNECTOR_EVENTS_BATCH_SIZE -# events_batch_size = 50 - -# Number of retries for GRPC requests sent to the KMS Core (optional, defaults to 3) -# ENV: KMS_CONNECTOR_GRPC_REQUEST_RETRIES -# grpc_request_retries = 3 - -# The maximum number of decryption attempts before a decryption request is permanently removed (optional, defaults to 200) -# ENV: KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS -# max_decryption_attempts = 200 - -# Timeout to get public decryption responses from KMS Core in seconds (optional, defaults to 300s / 5min) -# ENV: KMS_CONNECTOR_PUBLIC_DECRYPTION_TIMEOUT_SECS -# public_decryption_timeout_secs = 300 - -# Timeout to get user decryption responses from KMS Core in seconds (optional, defaults to 300s / 5min) -# ENV: KMS_CONNECTOR_USER_DECRYPTION_TIMEOUT_SECS -# user_decryption_timeout_secs = 300 - -# Retry interval to poll GRPC responses from KMS Core in seconds (optional, defaults to 1s) -# ENV: KMS_CONNECTOR_GRPC_POLL_INTERVAL_SECS -# grpc_poll_interval_secs = 1 - -# Number of retries for S3 ciphertext retrieval (optional, default: 3). -# ENV: KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES -# s3_ciphertext_retrieval_retries = 3 - -# Timeout to connect to a S3 bucket in seconds (optional, defaults to 2s) -# ENV: KMS_CONNECTOR_S3_CONNECT_TIMEOUT -# s3_connect_timeout = 2 - -# The maximum number of tasks to process events/responses concurrently (optional, defaults to 1000) -# ENV: KMS_CONNECTOR_TASK_LIMIT -# task_limit = 1000 - -# KMS-core gRPC endpoint in single shard setup (deprecated) -# Format: http://host:port -# ENV: KMS_CONNECTOR_KMS_CORE_ENDPOINT -# kms_core_endpoint = "http://[::1]:50051" diff --git a/kms-connector/config/tx-sender.toml b/kms-connector/config/tx-sender.toml index 3341b4c2b0..2d974d1e1d 100644 --- a/kms-connector/config/tx-sender.toml +++ b/kms-connector/config/tx-sender.toml @@ -10,7 +10,7 @@ chain_id = 54321 # Gateway WebSocket RPC URL endpoint (required) # ENV: KMS_CONNECTOR_GATEWAY_URL -gateway_url = "ws://localhost:8545" +gateway_url = "http://localhost:8545" # URL of the KMS Connector internal database (required) # Format: see https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS @@ -18,65 +18,27 @@ gateway_url = "ws://localhost:8545" database_url = "postgres://postgres:postgres@localhost/kms-connector" # Timeout for polling the database for responses (optional, defaults to 5 seconds) -# ENV: KMS_CONNECTOR_DATABASE_POLLING_TIMEOUT -# database_polling_timeout = 5 - -# Configuration of the Decryption contract (required) -[decryption_contract] -# Address of the Decryption contract (required) -# Format: hex string with 0x prefix -# ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS -address = "0xF0bFB159C7381F7CB332586004d8247252C5b816" -# EIP-712 domain name for Decryption contract (optional, defaults to "Decryption") -# ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__DOMAIN_NAME -# domain_name = "Decryption" -# EIP-712 domain version for Decryption contract (optional, defaults to "1") -# ENV: KMS_CONNECTOR_DECRYPTION_DOMAIN_VERSION -# domain_version = "1" - -# Configuration of the KMSGeneration contract (required) -[kms_generation_contract] -# Address of the KMSGeneration contract (required) -# Format: hex string with 0x prefix -# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS -address = "0x5ffdaAB0373E62E2ea2944776209aEf29E631A64" -# EIP-712 domain name for KMSGeneration contract (optional, defaults to "KMSGeneration") -# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_NAME -# domain_name = "KMSGeneration" -# EIP-712 domain version for KMSGeneration contract (optional, defaults to "1") -# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_VERSION -# domain_version = "1" +# ENV: KMS_CONNECTOR_DATABASE_POLLING_TIMEOUT (format: https://docs.rs/humantime/latest/humantime/) +# database_polling_timeout = "5s" # Private key as a hex string (optional if `aws_kms_config` is configured) # If provided, the connector will use this private key # Format: hex string with or without 0x prefix # IMPORTANT: Keep this secure and never share it! # ENV: KMS_CONNECTOR_PRIVATE_KEY -private_key = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" - -# AWS KMS configuration (optional if `private_key` is configured) -# [aws_kms_config] -# AWS KMS key ID for signing -# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__KEY_ID -# key_id = "12345678-1234-1234-1234-123456789012" -# Optional: AWS region for KMS (if not using default from AWS credentials) -# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__REGION -# region = "us-east-1" -# Optional: AWS endpoint URL for KMS (for testing or custom endpoints) -# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__ENDPOINT -# endpoint = "https://kms.us-east-1.amazonaws.com" +private_key = "0x3f45b129a7fd099146e9fe63851a71646231f7743c712695f3b2d2bf0e41c774" # The endpoint used to monitor the connector components (optional, defaults to "0.0.0.0:9100") # ENV: KMS_CONNECTOR_MONITORING_ENDPOINT # monitoring_endpoint = "0.0.0.0:9100" # The interval between updates of gauge metrics (optional, defaults to 10s) -# ENV: KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL_SECS -# gauge_update_interval_secs = 10 +# ENV: KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL (format: https://docs.rs/humantime/latest/humantime/) +# gauge_update_interval = "10s" # The timeout to perform each external service connection healthcheck (optional, defaults to 3s) -# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT_SECS -# healthcheck_timeout_secs = 3 +# ENV: KMS_CONNECTOR_HEALTHCHECK_TIMEOUT (format: https://docs.rs/humantime/latest/humantime/) +# healthcheck_timeout = "3s" # The limit number of KMS Core responses to fetch from the database (optional, defaults to 50) # ENV: KMS_CONNECTOR_RESPONSES_BATCH_SIZE @@ -86,9 +48,9 @@ private_key = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" # ENV: KMS_CONNECTOR_TX_RETRIES # tx_retries = 4 -# Interval between retries for transactions sent to the Gateway, in milliseconds (optional, defaults to 100ms) -# ENV: KMS_CONNECTOR_TX_RETRY_INTERVAL_MS -# tx_retry_interval = 100 +# Interval between retries for transactions sent to the Gateway (optional, defaults to 10ms) +# ENV: KMS_CONNECTOR_TX_RETRY_INTERVAL (format: https://docs.rs/humantime/latest/humantime/) +# tx_retry_interval = "10ms" # Boolean to enable tracing of reverted transactions, using `debug_trace_transaction` RPC call (optional, defaults to true) # ENV: KMS_CONNECTOR_TRACE_REVERTED_TX @@ -102,16 +64,52 @@ private_key = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" # ENV: KMS_CONNECTOR_TASK_LIMIT # task_limit = 1000 -# The interval between garbage collection runs, in minutes (optional, defaults to 5). -# ENV: KMS_CONNECTOR_GC_RUN_INTERVAL_MINS -# gc_run_interval_mins = 5 +# The interval between garbage collection runs (optional, defaults to 5m) +# ENV: KMS_CONNECTOR_GC_RUN_INTERVAL (format: https://docs.rs/humantime/latest/humantime/) +# gc_run_interval = "5m" -# The expiration time in minutes for completed/failed decryptions, after which they will be -# deleted (optional, defaults to 1440 = 24 hours). -# ENV: KMS_CONNECTOR_GC_DECRYPTION_EXPIRY_MINS -# gc_decryption_expiry_mins = 1440 +# The expiration time for completed/failed decryptions, after which they will be deleted (optional, defaults 24h) +# ENV: KMS_CONNECTOR_GC_DECRYPTION_EXPIRY (format: https://docs.rs/humantime/latest/humantime/) +# gc_decryption_expiry = "24h" -# The time limit in minutes for decryption to be under process, after which they will be -# considered as pending again (optional, defaults to 6). -# ENV: KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT_MINS -# gc_decryption_under_process_limit_mins = 6 +# The time limit for decryption to be under process, after which they will be considered as pending again (optional, defaults to 6m) +# ENV: KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT (format: https://docs.rs/humantime/latest/humantime/) +# gc_decryption_under_process_limit = "6m" + +# Configuration of the Decryption contract (required) +[decryption_contract] +# Address of the Decryption contract (required) +# Format: hex string with 0x prefix +# ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS +address = "0x0000000000000000000000000000000000000000" +# EIP-712 domain name for Decryption contract (optional, defaults to "Decryption") +# ENV: KMS_CONNECTOR_DECRYPTION_CONTRACT__DOMAIN_NAME +# domain_name = "Decryption" +# EIP-712 domain version for Decryption contract (optional, defaults to "1") +# ENV: KMS_CONNECTOR_DECRYPTION_DOMAIN_VERSION +# domain_version = "1" + +# Configuration of the KMSGeneration contract (required) +[kms_generation_contract] +# Address of the KMSGeneration contract (required) +# Format: hex string with 0x prefix +# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS +address = "0x0000000000000000000000000000000000000000" +# EIP-712 domain name for KMSGeneration contract (optional, defaults to "KMSGeneration") +# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_NAME +# domain_name = "KMSGeneration" +# EIP-712 domain version for KMSGeneration contract (optional, defaults to "1") +# ENV: KMS_CONNECTOR_KMS_GENERATION_CONTRACT__DOMAIN_VERSION +# domain_version = "1" + +# AWS KMS configuration (optional if `private_key` is configured) +# [aws_kms_config] +# AWS KMS key ID for signing +# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__KEY_ID +# key_id = "12345678-1234-1234-1234-123456789012" +# Optional: AWS region for KMS (if not using default from AWS credentials) +# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__REGION +# region = "us-east-1" +# Optional: AWS endpoint URL for KMS (for testing or custom endpoints) +# ENV: KMS_CONNECTOR_AWS_KMS_CONFIG__ENDPOINT +# endpoint = "https://kms.us-east-1.amazonaws.com" diff --git a/kms-connector/crates/gw-listener/Cargo.toml b/kms-connector/crates/gw-listener/Cargo.toml index 8dbe3ffd58..4c0b9ee000 100644 --- a/kms-connector/crates/gw-listener/Cargo.toml +++ b/kms-connector/crates/gw-listener/Cargo.toml @@ -14,6 +14,7 @@ fhevm_gateway_bindings.workspace = true actix-web.workspace = true alloy.workspace = true anyhow.workspace = true +humantime-serde.workspace = true opentelemetry.workspace = true prometheus.workspace = true sqlx.workspace = true @@ -30,7 +31,6 @@ connector-utils = { workspace = true, features = ["tests"] } alloy = { workspace = true, features = ["json-rpc"] } rstest.workspace = true serial_test.workspace = true -tempfile.workspace = true testcontainers.workspace = true toml.workspace = true tracing-test.workspace = true diff --git a/kms-connector/crates/gw-listener/src/bin/gw_listener.rs b/kms-connector/crates/gw-listener/src/bin/gw_listener.rs index f6c81c5498..25a6604e18 100644 --- a/kms-connector/crates/gw-listener/src/bin/gw_listener.rs +++ b/kms-connector/crates/gw-listener/src/bin/gw_listener.rs @@ -5,6 +5,7 @@ use gw_listener::{ use connector_utils::{ cli::{Cli, Subcommands}, + config::DeserializeConfig, monitoring::{ health::query_healthcheck_endpoint, otlp::init_otlp_setup, server::start_monitoring_server, }, diff --git a/kms-connector/crates/gw-listener/src/core/config.rs b/kms-connector/crates/gw-listener/src/core/config.rs new file mode 100644 index 0000000000..8e552d05fd --- /dev/null +++ b/kms-connector/crates/gw-listener/src/core/config.rs @@ -0,0 +1,204 @@ +use alloy::transports::http::reqwest::Url; +use connector_utils::{ + config::{ + ContractConfig, DeserializeConfig, + contract::{ + default_decryption_contract_config, default_kms_generation_contract_config, + deserialize_decryption_contract_config, deserialize_kms_generation_contract_config, + }, + default_database_pool_size, + }, + monitoring::{health::default_healthcheck_timeout, server::default_monitoring_endpoint}, + tasks::default_task_limit, +}; +use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; + +/// Configuration of the `GatewayListener`. +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[cfg_attr(debug_assertions, derive(Serialize))] +pub struct Config { + /// The URL of the Postgres database. + pub database_url: String, + /// The size of the database connection pool. + #[serde(default = "default_database_pool_size")] + pub database_pool_size: u32, + + /// The Gateway RPC endpoint. + pub gateway_url: Url, + /// The Chain ID of the Gateway. + pub chain_id: u64, + /// The `Decryption` contract configuration. + #[serde(deserialize_with = "deserialize_decryption_contract_config")] + pub decryption_contract: ContractConfig, + /// The `KMSGeneration` contract configuration. + #[serde(deserialize_with = "deserialize_kms_generation_contract_config")] + pub kms_generation_contract: ContractConfig, + + /// The service name used for tracing. + #[serde(default = "default_service_name")] + pub service_name: String, + /// The maximum number of tasks that can be executed concurrently. + #[serde(default = "default_task_limit")] + pub task_limit: usize, + /// The monitoring server endpoint of the `GatewayListener`. + #[serde(default = "default_monitoring_endpoint")] + pub monitoring_endpoint: SocketAddr, + /// The timeout to perform each external service connection healthcheck. + #[serde(with = "humantime_serde", default = "default_healthcheck_timeout")] + pub healthcheck_timeout: Duration, + + /// The polling interval for decryption requests. + #[serde(with = "humantime_serde", default = "default_decryption_polling")] + pub decryption_polling: Duration, + /// The polling interval for key management requests. + #[serde(with = "humantime_serde", default = "default_key_management_polling")] + pub key_management_polling: Duration, + + /// Optional block number to start processing from. + pub from_block_number: Option, +} + +impl DeserializeConfig for Config {} + +fn default_service_name() -> String { + "kms-connector-gw-listener".to_string() +} + +fn default_decryption_polling() -> Duration { + Duration::from_secs(1) +} + +fn default_key_management_polling() -> Duration { + Duration::from_secs(30) +} + +// Default implementation for testing purpose +impl Default for Config { + fn default() -> Self { + Self { + database_url: "postgres://postgres:postgres@localhost/kms-connector".to_string(), + database_pool_size: default_database_pool_size(), + gateway_url: Url::from_str("http://localhost:8545").unwrap(), + chain_id: 54321, + decryption_contract: default_decryption_contract_config(), + kms_generation_contract: default_kms_generation_contract_config(), + service_name: default_service_name(), + task_limit: default_task_limit(), + monitoring_endpoint: default_monitoring_endpoint(), + healthcheck_timeout: default_healthcheck_timeout(), + decryption_polling: default_decryption_polling(), + key_management_polling: default_key_management_polling(), + from_block_number: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + use serial_test::serial; + use std::env; + + fn cleanup_env_vars() { + unsafe { + env::remove_var("KMS_CONNECTOR_DATABASE_URL"); + env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); + env::remove_var("KMS_CONNECTOR_CHAIN_ID"); + env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); + } + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_load_valid_config_from_file() { + cleanup_env_vars(); + let default_config = Config::default(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + assert_eq!(default_config, example_config); + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_load_from_env() { + cleanup_env_vars(); + + // Set environment variables + unsafe { + env::set_var( + "KMS_CONNECTOR_DATABASE_URL", + "postgres://postgres:postgres@localhost", + ); + env::set_var("KMS_CONNECTOR_GATEWAY_URL", "http://localhost:9545"); + env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); + env::set_var( + "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", + "0x5fbdb2315678afecb367f032d93f642f64180aa3", + ); + env::set_var( + "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", + "0x0000000000000000000000000000000000000002", + ); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); + } + + // Load config from environment + let config = Config::from_env_and_file::<&str>(None).unwrap(); + + // Verify values + assert_eq!( + config.gateway_url, + Url::from_str("http://localhost:9545").unwrap() + ); + assert_eq!(config.chain_id, 31888); + assert_eq!( + config.decryption_contract.address, + address!("0x5fbdb2315678afecb367f032d93f642f64180aa3") + ); + assert_eq!( + config.kms_generation_contract.address, + address!("0x0000000000000000000000000000000000000002") + ); + assert_eq!(config.service_name, "kms-connector-test"); + + cleanup_env_vars(); + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_env_overrides_file() { + cleanup_env_vars(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Set an environment variable to override the file + let chain_id = 77737; + let service_name = "kms-connector-override"; + let mut expected_config = example_config.clone(); + expected_config.chain_id = chain_id; + expected_config.service_name = service_name.to_string(); + unsafe { + env::set_var("KMS_CONNECTOR_CHAIN_ID", chain_id.to_string()); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", service_name); + } + + // Load config from both sources + let config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Verify that environment variables take precedence + assert_ne!(config.chain_id, example_config.chain_id); + assert_ne!(config.service_name, example_config.service_name); + assert_eq!(config, expected_config); + + cleanup_env_vars(); + } + + fn example_config_path() -> String { + format!( + "{}/../../config/gw-listener.toml", + env!("CARGO_MANIFEST_DIR") + ) + } +} diff --git a/kms-connector/crates/gw-listener/src/core/config/mod.rs b/kms-connector/crates/gw-listener/src/core/config/mod.rs deleted file mode 100644 index 39c4a6a39b..0000000000 --- a/kms-connector/crates/gw-listener/src/core/config/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod parsed; -mod raw; - -pub use parsed::Config; diff --git a/kms-connector/crates/gw-listener/src/core/config/parsed.rs b/kms-connector/crates/gw-listener/src/core/config/parsed.rs deleted file mode 100644 index 45b59915a8..0000000000 --- a/kms-connector/crates/gw-listener/src/core/config/parsed.rs +++ /dev/null @@ -1,270 +0,0 @@ -//! Module used to parse the gw-listener configuration. -//! -//! The `raw` module is first used to deserialize the configuration. - -use super::raw::RawConfig; -use connector_utils::{ - config::{ContractConfig, DeserializeRawConfig, Error, Result}, - monitoring::otlp::default_dispatcher, -}; -use std::{net::SocketAddr, path::Path, time::Duration}; -use tracing::{error, info}; - -/// Configuration of the `GatewayListener`. -#[derive(Clone, Debug)] -pub struct Config { - /// The URL of the Postgres database. - pub database_url: String, - /// The size of the database connection pool. - pub database_pool_size: u32, - /// The Gateway RPC endpoint. - pub gateway_url: String, - /// The Chain ID of the Gateway. - pub chain_id: u64, - /// The `Decryption` contract configuration. - pub decryption_contract: ContractConfig, - /// The `KMSGeneration` contract configuration. - pub kms_generation_contract: ContractConfig, - /// The service name used for tracing. - pub service_name: String, - /// The maximum number of tasks that can be executed concurrently. - pub task_limit: usize, - /// The monitoring server endpoint of the `GatewayListener`. - pub monitoring_endpoint: SocketAddr, - /// The timeout to perform each external service connection healthcheck. - pub healthcheck_timeout: Duration, - /// The polling interval for decryption requests. - pub decryption_polling: Duration, - /// The polling interval for key management requests. - pub key_management_polling: Duration, - /// Optional block number to start processing from. - pub from_block_number: Option, -} - -impl Config { - /// Loads the configuration from environment variables and optionally from a TOML file. - /// - /// Environment variables take precedence over file configuration. - /// Environment variables are prefixed with KMS_CONNECTOR_. - pub fn from_env_and_file>(path: Option

) -> Result { - tracing::dispatcher::with_default(&default_dispatcher(), || { - if let Some(config_path) = &path { - info!("Loading config from: {}", config_path.as_ref().display()); - } else { - info!("Loading config using environment variables only"); - } - - let raw_config = RawConfig::from_env_and_file(path).inspect_err(|e| error!("{e}"))?; - Self::parse(raw_config).inspect_err(|e| error!("{e}")) - }) - } - - fn parse(raw_config: RawConfig) -> Result { - let monitoring_endpoint = raw_config - .monitoring_endpoint - .parse::() - .map_err(|e| Error::InvalidConfig(e.to_string()))?; - let decryption_contract = - ContractConfig::parse("Decryption", raw_config.decryption_contract)?; - let kms_generation_contract = - ContractConfig::parse("KMSGeneration", raw_config.kms_generation_contract)?; - - // Validate critical configuration parts - if raw_config.gateway_url.is_empty() { - return Err(Error::EmptyField("Gateway URL".to_string())); - } - - let healthcheck_timeout = Duration::from_secs(raw_config.healthcheck_timeout_secs); - let decryption_polling = Duration::from_millis(raw_config.decryption_polling_ms); - let key_management_polling = Duration::from_millis(raw_config.key_management_polling_ms); - - Ok(Self { - database_url: raw_config.database_url, - database_pool_size: raw_config.database_pool_size, - gateway_url: raw_config.gateway_url, - chain_id: raw_config.chain_id, - decryption_contract, - kms_generation_contract, - service_name: raw_config.service_name, - task_limit: raw_config.task_limit, - monitoring_endpoint, - healthcheck_timeout, - decryption_polling, - key_management_polling, - from_block_number: raw_config.from_block_number, - }) - } -} - -// For testing purpose -impl Default for Config { - fn default() -> Self { - Self::parse(RawConfig::default()).expect("Couldn't parse default raw config") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloy::primitives::Address; - use connector_utils::config::RawContractConfig; - use serial_test::serial; - use std::{env, fs, path::Path, str::FromStr}; - use tempfile::NamedTempFile; - - fn cleanup_env_vars() { - unsafe { - env::remove_var("KMS_CONNECTOR_DATABASE_URL"); - env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); - env::remove_var("KMS_CONNECTOR_CHAIN_ID"); - env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); - } - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_load_valid_config_from_file() { - cleanup_env_vars(); - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - let config = Config::from_env_and_file(Some(temp_file.path())).unwrap(); - - // Compare fields - assert_eq!(raw_config.gateway_url, config.gateway_url); - assert_eq!(raw_config.chain_id, config.chain_id); - assert_eq!( - Address::from_str(&raw_config.decryption_contract.address).unwrap(), - config.decryption_contract.address, - ); - assert_eq!( - Address::from_str(&raw_config.kms_generation_contract.address).unwrap(), - config.kms_generation_contract.address, - ); - assert_eq!(raw_config.service_name, config.service_name); - assert_eq!( - raw_config.decryption_contract.domain_name.unwrap(), - config.decryption_contract.domain_name, - ); - assert_eq!( - raw_config.decryption_contract.domain_version.unwrap(), - config.decryption_contract.domain_version, - ); - assert_eq!( - raw_config.kms_generation_contract.domain_name.unwrap(), - config.kms_generation_contract.domain_name, - ); - assert_eq!( - raw_config.kms_generation_contract.domain_version.unwrap(), - config.kms_generation_contract.domain_version, - ); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_load_from_env() { - cleanup_env_vars(); - - // Set environment variables - unsafe { - env::set_var( - "KMS_CONNECTOR_DATABASE_URL", - "postgres://postgres:postgres@localhost", - ); - env::set_var("KMS_CONNECTOR_GATEWAY_URL", "ws://localhost:9545"); - env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); - env::set_var( - "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", - "0x5fbdb2315678afecb367f032d93f642f64180aa3", - ); - env::set_var( - "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", - "0x0000000000000000000000000000000000000002", - ); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); - } - - // Load config from environment - let config = Config::from_env_and_file::<&str>(None).unwrap(); - - // Verify values - assert_eq!(config.gateway_url, "ws://localhost:9545"); - assert_eq!(config.chain_id, 31888); - assert_eq!( - config.decryption_contract.address, - Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() - ); - assert_eq!( - config.kms_generation_contract.address, - Address::from_str("0x0000000000000000000000000000000000000002").unwrap() - ); - assert_eq!(config.service_name, "kms-connector-test"); - - cleanup_env_vars(); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_env_overrides_file() { - cleanup_env_vars(); - - // Create a temp config file - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - - // Set an environment variable to override the file - unsafe { - env::set_var("KMS_CONNECTOR_CHAIN_ID", "77737"); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-override"); - } - - // Load config from both sources - let config = Config::from_env_and_file(Some(temp_file.path())).unwrap(); - - // Verify that environment variables take precedence - assert_eq!(config.chain_id, 77737); - assert_eq!(config.service_name, "kms-connector-override"); - - // File values should be used for non-overridden fields - assert_eq!(config.gateway_url, "ws://localhost:8545"); - - cleanup_env_vars(); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_invalid_address() { - let raw_config = RawConfig { - decryption_contract: RawContractConfig { - address: "0x0000".to_string(), - ..Default::default() - }, - kms_generation_contract: RawContractConfig { - address: "0x000010".to_string(), - ..Default::default() - }, - ..Default::default() - }; - assert!(matches!( - Config::parse(raw_config), - Err(Error::InvalidConfig(_)) - )); - } - - impl RawConfig { - pub fn to_file>(&self, path: P) -> Result<()> { - let content = toml::to_string_pretty(self) - .map_err(|e| Error::InvalidConfig(format!("Failed to serialize config: {e}")))?; - - fs::write(path, content) - .map_err(|e| Error::InvalidConfig(format!("Failed to write config file: {e}")))?; - - Ok(()) - } - } -} diff --git a/kms-connector/crates/gw-listener/src/core/config/raw.rs b/kms-connector/crates/gw-listener/src/core/config/raw.rs deleted file mode 100644 index 9baefdf7ec..0000000000 --- a/kms-connector/crates/gw-listener/src/core/config/raw.rs +++ /dev/null @@ -1,78 +0,0 @@ -//! Module used to deserialize the gw-listener configuration using serde. -//! -//! The `RawConfig` can then be parsed into a `Config` in the `parsed` module. - -use connector_utils::{ - config::{DeserializeRawConfig, RawContractConfig, default_database_pool_size}, - monitoring::{health::default_healthcheck_timeout_secs, server::default_monitoring_endpoint}, - tasks::default_task_limit, -}; -use serde::{Deserialize, Serialize}; - -/// Deserializable representation of the `GatewayListener` configuration. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct RawConfig { - pub database_url: String, - #[serde(default = "default_database_pool_size")] - pub database_pool_size: u32, - pub gateway_url: String, - pub chain_id: u64, - pub decryption_contract: RawContractConfig, - pub kms_generation_contract: RawContractConfig, - #[serde(default = "default_service_name")] - pub service_name: String, - #[serde(default = "default_task_limit")] - pub task_limit: usize, - #[serde(default = "default_monitoring_endpoint")] - pub monitoring_endpoint: String, - #[serde(default = "default_healthcheck_timeout_secs")] - pub healthcheck_timeout_secs: u64, - #[serde(default = "default_decryption_polling_ms")] - pub decryption_polling_ms: u64, - #[serde(default = "default_key_management_polling_ms")] - pub key_management_polling_ms: u64, - pub from_block_number: Option, -} - -fn default_service_name() -> String { - "kms-connector-gw-listener".to_string() -} - -fn default_decryption_polling_ms() -> u64 { - 1000 // 1s -} - -fn default_key_management_polling_ms() -> u64 { - 30000 // 30s -} - -impl DeserializeRawConfig for RawConfig {} - -// Default implementation for testing purpose -impl Default for RawConfig { - fn default() -> Self { - Self { - database_url: "postgres://postgres:postgres@localhost".to_string(), - database_pool_size: default_database_pool_size(), - gateway_url: "ws://localhost:8545".to_string(), - chain_id: 1, - decryption_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("Decryption".to_string()), - domain_version: Some("1".to_string()), - }, - kms_generation_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("KMSGeneration".to_string()), - domain_version: Some("1".to_string()), - }, - service_name: default_service_name(), - task_limit: default_task_limit(), - monitoring_endpoint: default_monitoring_endpoint(), - healthcheck_timeout_secs: default_healthcheck_timeout_secs(), - decryption_polling_ms: default_decryption_polling_ms(), - key_management_polling_ms: default_key_management_polling_ms(), - from_block_number: None, - } - } -} diff --git a/kms-connector/crates/gw-listener/src/core/gw_listener.rs b/kms-connector/crates/gw-listener/src/core/gw_listener.rs index 3f019cb521..e67868a871 100644 --- a/kms-connector/crates/gw-listener/src/core/gw_listener.rs +++ b/kms-connector/crates/gw-listener/src/core/gw_listener.rs @@ -371,7 +371,7 @@ impl GatewayListener { cancel_token: CancellationToken, ) -> anyhow::Result<(Self, State)> { let db_pool = connect_to_db(&config.database_url, config.database_pool_size).await?; - let provider = connect_to_gateway(&config.gateway_url, config.chain_id).await?; + let provider = connect_to_gateway(config.gateway_url.clone(), config.chain_id).await?; let state = State::new( db_pool.clone(), diff --git a/kms-connector/crates/kms-worker/Cargo.toml b/kms-connector/crates/kms-worker/Cargo.toml index c03acfb39c..772e4de39c 100644 --- a/kms-connector/crates/kms-worker/Cargo.toml +++ b/kms-connector/crates/kms-worker/Cargo.toml @@ -14,8 +14,8 @@ fhevm_gateway_bindings.workspace = true actix-web.workspace = true alloy.workspace = true anyhow.workspace = true -config.workspace = true dashmap.workspace = true +humantime-serde.workspace = true kms-grpc.workspace = true prometheus.workspace = true serde.workspace = true @@ -36,7 +36,6 @@ connector-utils = { workspace = true, features = ["tests"] } mocktail.workspace = true rstest.workspace = true serial_test.workspace = true -tempfile.workspace = true testcontainers.workspace = true toml.workspace = true tracing-test.workspace = true diff --git a/kms-connector/crates/kms-worker/src/bin/kms_worker.rs b/kms-connector/crates/kms-worker/src/bin/kms_worker.rs index 7b41f7f01e..0c8c6ca70f 100644 --- a/kms-connector/crates/kms-worker/src/bin/kms_worker.rs +++ b/kms-connector/crates/kms-worker/src/bin/kms_worker.rs @@ -5,6 +5,7 @@ use kms_worker::{ use connector_utils::{ cli::{Cli, Subcommands}, + config::DeserializeConfig, monitoring::{ health::query_healthcheck_endpoint, otlp::init_otlp_setup, server::start_monitoring_server, }, diff --git a/kms-connector/crates/kms-worker/src/core/config.rs b/kms-connector/crates/kms-worker/src/core/config.rs new file mode 100644 index 0000000000..4943a1f9f0 --- /dev/null +++ b/kms-connector/crates/kms-worker/src/core/config.rs @@ -0,0 +1,298 @@ +use alloy::transports::http::reqwest::Url; +use connector_utils::{ + config::{ + ContractConfig, DeserializeConfig, + contract::{ + default_decryption_contract_config, default_gateway_config_contract_config, + default_kms_generation_contract_config, deserialize_decryption_contract_config, + deserialize_gateway_config_contract_config, deserialize_kms_generation_contract_config, + }, + default_database_pool_size, + }, + monitoring::{health::default_healthcheck_timeout, server::default_monitoring_endpoint}, + tasks::default_task_limit, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; + +/// Configuration of the `KmsWorker`. +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[cfg_attr(debug_assertions, derive(Serialize))] +pub struct Config { + /// The URL of the Postgres database. + pub database_url: String, + /// The size of the database connection pool. + #[serde(default = "default_database_pool_size")] + pub database_pool_size: u32, + /// The timeout for polling the database for fast events (decryption for ex). + #[serde(with = "humantime_serde", default = "default_db_fast_event_polling")] + pub db_fast_event_polling: Duration, + /// The timeout for polling the database for long events (prep keygen for ex). + #[serde(with = "humantime_serde", default = "default_db_long_event_polling")] + pub db_long_event_polling: Duration, + + /// The Gateway RPC endpoint. + pub gateway_url: Url, + /// The KMS Core endpoints. + #[serde(deserialize_with = "non_empty")] + pub kms_core_endpoints: Vec, + /// The Chain ID of the Gateway. + pub chain_id: u64, + /// The `Decryption` contract configuration. + #[serde(deserialize_with = "deserialize_decryption_contract_config")] + pub decryption_contract: ContractConfig, + /// The `GatewayConfig` contract configuration. + #[serde(deserialize_with = "deserialize_gateway_config_contract_config")] + pub gateway_config_contract: ContractConfig, + /// The `KMSGeneration` contract configuration. + #[serde(deserialize_with = "deserialize_kms_generation_contract_config")] + pub kms_generation_contract: ContractConfig, + + /// The limit number of events to fetch from the database. + #[serde(default = "default_events_batch_size")] + pub events_batch_size: u8, + /// Number of retries for GRPC requests sent to the KMS Core. + #[serde(default = "default_grpc_request_retries")] + pub grpc_request_retries: u8, + /// The maximum number of decryption attempts. + #[serde(default = "default_max_decryption_attempts")] + pub max_decryption_attempts: u16, + + /// Number of retries for S3 ciphertext retrieval. + #[serde(default = "default_s3_ciphertext_retrieval_retries")] + pub s3_ciphertext_retrieval_retries: u8, + /// Timeout to connect to a S3 bucket. + #[serde(with = "humantime_serde", default = "default_s3_connect_timeout")] + pub s3_connect_timeout: Duration, + + /// The service name used for tracing. + #[serde(default = "default_service_name")] + pub service_name: String, + /// The maximum number of tasks that can be executed concurrently. + #[serde(default = "default_task_limit")] + pub task_limit: usize, + /// The monitoring server endpoint of the `KmsWorker`. + #[serde(default = "default_monitoring_endpoint")] + pub monitoring_endpoint: SocketAddr, + /// The timeout to perform each external service connection healthcheck. + #[serde(with = "humantime_serde", default = "default_healthcheck_timeout")] + pub healthcheck_timeout: Duration, +} + +fn default_service_name() -> String { + "kms-connector-kms-worker".to_string() +} + +fn default_db_fast_event_polling() -> Duration { + Duration::from_secs(3) +} + +fn default_db_long_event_polling() -> Duration { + Duration::from_secs(60) +} + +fn default_events_batch_size() -> u8 { + 50 +} + +fn default_grpc_request_retries() -> u8 { + 3 +} + +fn default_max_decryption_attempts() -> u16 { + 200 +} + +fn default_s3_ciphertext_retrieval_retries() -> u8 { + 3 +} + +fn default_s3_connect_timeout() -> Duration { + Duration::from_secs(3) +} + +impl DeserializeConfig for Config {} + +// Default implementation for testing purpose +impl Default for Config { + fn default() -> Self { + Self { + database_url: "postgres://postgres:postgres@localhost/kms-connector".to_string(), + database_pool_size: default_database_pool_size(), + db_fast_event_polling: default_db_fast_event_polling(), + db_long_event_polling: default_db_long_event_polling(), + gateway_url: Url::from_str("http://localhost:8545").unwrap(), + kms_core_endpoints: vec!["http://localhost:50051".to_string()], + chain_id: 54321, + decryption_contract: default_decryption_contract_config(), + gateway_config_contract: default_gateway_config_contract_config(), + kms_generation_contract: default_kms_generation_contract_config(), + service_name: default_service_name(), + events_batch_size: default_events_batch_size(), + grpc_request_retries: default_grpc_request_retries(), + max_decryption_attempts: default_max_decryption_attempts(), + s3_ciphertext_retrieval_retries: default_s3_ciphertext_retrieval_retries(), + s3_connect_timeout: default_s3_connect_timeout(), + task_limit: default_task_limit(), + monitoring_endpoint: default_monitoring_endpoint(), + healthcheck_timeout: default_healthcheck_timeout(), + } + } +} + +fn non_empty<'de, D, T>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, +{ + let vec = >::deserialize(d)?; + if vec.is_empty() { + Err(serde::de::Error::custom( + "Field should not be an empty array", + )) + } else { + Ok(vec) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::Address; + use serial_test::serial; + use std::{env, str::FromStr}; + + fn cleanup_env_vars() { + unsafe { + env::remove_var("KMS_CONNECTOR_DATABASE_URL"); + env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); + env::remove_var("KMS_CONNECTOR_KMS_CORE_ENDPOINTS"); + env::remove_var("KMS_CONNECTOR_CHAIN_ID"); + env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); + env::remove_var("KMS_CONNECTOR_EVENTS_BATCH_SIZE"); + env::remove_var("KMS_CONNECTOR_GRPC_REQUEST_RETRIES"); + env::remove_var("KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS"); + env::remove_var("KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES"); + env::remove_var("KMS_CONNECTOR_S3_CONNECT_TIMEOUT"); + } + } + + #[test] + #[serial(config_tests)] + fn test_load_valid_config_from_file() { + cleanup_env_vars(); + let default_config = Config::default(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + assert_eq!(default_config, example_config); + } + + #[test] + #[serial(config_tests)] + fn test_load_from_env() { + cleanup_env_vars(); + + // Set environment variables + unsafe { + env::set_var( + "KMS_CONNECTOR_DATABASE_URL", + "postgres://postgres:postgres@localhost", + ); + env::set_var("KMS_CONNECTOR_GATEWAY_URL", "http://localhost:9545"); + env::set_var( + "KMS_CONNECTOR_KMS_CORE_ENDPOINTS", + "http://localhost:50053,http://localhost:50054", + ); + env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); + env::set_var( + "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", + "0x5fbdb2315678afecb367f032d93f642f64180aa3", + ); + env::set_var( + "KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__ADDRESS", + "0x5fbdb2315678afecb367f032d93f642f64180aa3", + ); + env::set_var( + "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", + "0x5fbdb2315678afecb367f032d93f642f64180aa3", + ); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); + env::set_var("KMS_CONNECTOR_EVENTS_BATCH_SIZE", "15"); + env::set_var("KMS_CONNECTOR_GRPC_REQUEST_RETRIES", "5"); + env::set_var("KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS", "300"); + env::set_var("KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES", "5"); + env::set_var("KMS_CONNECTOR_S3_CONNECT_TIMEOUT", "4s"); + } + + // Load config from environment + let config = Config::from_env_and_file::<&str>(None).unwrap(); + + // Verify values + assert_eq!( + config.gateway_url, + Url::from_str("http://localhost:9545").unwrap() + ); + assert_eq!( + config.kms_core_endpoints, + vec!["http://localhost:50053", "http://localhost:50054"] + ); + assert_eq!(config.chain_id, 31888); + assert_eq!( + config.decryption_contract.address, + Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() + ); + assert_eq!( + config.gateway_config_contract.address, + Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() + ); + assert_eq!( + config.kms_generation_contract.address, + Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() + ); + assert_eq!(config.service_name, "kms-connector-test"); + assert_eq!(config.events_batch_size, 15); + assert_eq!(config.grpc_request_retries, 5); + assert_eq!(config.max_decryption_attempts, 300); + assert_eq!(config.s3_ciphertext_retrieval_retries, 5); + assert_eq!(config.s3_connect_timeout.as_secs(), 4); + + cleanup_env_vars(); + } + + #[test] + #[serial(config_tests)] + fn test_env_overrides_file() { + cleanup_env_vars(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Set an environment variable to override the file + let chain_id = 77737; + let service_name = "kms-connector-override"; + let mut expected_config = example_config.clone(); + expected_config.chain_id = chain_id; + expected_config.service_name = service_name.to_string(); + unsafe { + env::set_var("KMS_CONNECTOR_CHAIN_ID", chain_id.to_string()); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", service_name); + } + + // Load config from both sources + let config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Verify that environment variables take precedence + assert_ne!(config.chain_id, example_config.chain_id); + assert_ne!(config.service_name, example_config.service_name); + assert_eq!(config, expected_config); + + cleanup_env_vars(); + } + + fn example_config_path() -> String { + format!( + "{}/../../config/kms-worker.toml", + env!("CARGO_MANIFEST_DIR") + ) + } +} diff --git a/kms-connector/crates/kms-worker/src/core/config/mod.rs b/kms-connector/crates/kms-worker/src/core/config/mod.rs deleted file mode 100644 index 39c4a6a39b..0000000000 --- a/kms-connector/crates/kms-worker/src/core/config/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod parsed; -mod raw; - -pub use parsed::Config; diff --git a/kms-connector/crates/kms-worker/src/core/config/parsed.rs b/kms-connector/crates/kms-worker/src/core/config/parsed.rs deleted file mode 100644 index a39c79d803..0000000000 --- a/kms-connector/crates/kms-worker/src/core/config/parsed.rs +++ /dev/null @@ -1,357 +0,0 @@ -//! Module used to parse kms-worker configuration. -//! -//! The `raw` module is first used to deserialize the configuration. - -use crate::core::config::raw::RawConfig; -use connector_utils::{ - config::{ContractConfig, DeserializeRawConfig, Error, Result}, - monitoring::otlp::default_dispatcher, -}; -use std::{net::SocketAddr, path::Path, time::Duration}; -use tracing::{error, info, warn}; - -/// Configuration of the `KmsWorker`. -#[derive(Clone, Debug)] -pub struct Config { - /// The URL of the Postgres database. - pub database_url: String, - /// The size of the database connection pool. - pub database_pool_size: u32, - /// The timeout for polling the database for fast events (decryption for ex). - pub db_fast_event_polling: Duration, - /// The timeout for polling the database for long events (prep keygen for ex). - pub db_long_event_polling: Duration, - /// The Gateway RPC endpoint. - pub gateway_url: String, - /// The KMS Core endpoints. - pub kms_core_endpoints: Vec, - /// The Chain ID of the Gateway. - pub chain_id: u64, - /// The `Decryption` contract configuration. - pub decryption_contract: ContractConfig, - /// The `GatewayConfig` contract configuration. - pub gateway_config_contract: ContractConfig, - /// The `KMSGeneration` contract configuration. - pub kms_generation_contract: ContractConfig, - /// The service name used for tracing. - pub service_name: String, - - /// The limit number of events to fetch from the database. - pub events_batch_size: u8, - /// Number of retries for GRPC requests sent to the KMS Core. - pub grpc_request_retries: u8, - /// The maximum number of decryption attempts. - pub max_decryption_attempts: u16, - - /// Number of retries for S3 ciphertext retrieval. - pub s3_ciphertext_retrieval_retries: u8, - /// Timeout to connect to a S3 bucket. - pub s3_connect_timeout: Duration, - - /// The maximum number of tasks that can be executed concurrently. - pub task_limit: usize, - - /// The monitoring server endpoint of the `KmsWorker`. - pub monitoring_endpoint: SocketAddr, - /// The timeout to perform each external service connection healthcheck. - pub healthcheck_timeout: Duration, -} - -impl Config { - /// Loads the configuration from environment variables and optionally from a TOML file. - /// - /// Environment variables take precedence over file configuration. - /// Environment variables are prefixed with KMS_CONNECTOR_. - pub fn from_env_and_file>(path: Option

) -> Result { - tracing::dispatcher::with_default(&default_dispatcher(), || { - if let Some(config_path) = &path { - info!("Loading config from: {}", config_path.as_ref().display()); - } else { - info!("Loading config using environment variables only"); - } - - let raw_config = RawConfig::from_env_and_file(path).inspect_err(|e| error!("{e}"))?; - Self::parse(raw_config).inspect_err(|e| error!("{e}")) - }) - } - - fn parse(raw_config: RawConfig) -> Result { - let monitoring_endpoint = raw_config - .monitoring_endpoint - .parse::() - .map_err(|e| Error::InvalidConfig(e.to_string()))?; - let decryption_contract = - ContractConfig::parse("Decryption", raw_config.decryption_contract)?; - let gateway_config_contract = - ContractConfig::parse("GatewayConfig", raw_config.gateway_config_contract)?; - let kms_generation_contract = - ContractConfig::parse("KMSGeneration", raw_config.kms_generation_contract)?; - - // Validate critical configuration parts - if raw_config.gateway_url.is_empty() { - return Err(Error::EmptyField("Gateway URL".to_string())); - } - - let kms_core_endpoints; - if raw_config.kms_core_endpoints.is_empty() { - if let Some(kms_core_endpoint) = raw_config.kms_core_endpoint { - warn!("Using deprecated `kms_core_endpoint` field instead of `kms_core_endpoints`"); - kms_core_endpoints = vec![kms_core_endpoint]; - } else { - return Err(Error::EmptyField("KMS Core endpoints".to_string())); - } - } else { - kms_core_endpoints = raw_config.kms_core_endpoints; - } - - let db_fast_event_polling = Duration::from_secs(raw_config.db_fast_event_polling_secs); - let db_long_event_polling = Duration::from_secs(raw_config.db_long_event_polling_secs); - let s3_ciphertext_retrieval_timeout = Duration::from_secs(raw_config.s3_connect_timeout); - let healthcheck_timeout = Duration::from_secs(raw_config.healthcheck_timeout_secs); - - Ok(Self { - database_url: raw_config.database_url, - database_pool_size: raw_config.database_pool_size, - db_fast_event_polling, - db_long_event_polling, - gateway_url: raw_config.gateway_url, - kms_core_endpoints, - chain_id: raw_config.chain_id, - decryption_contract, - gateway_config_contract, - kms_generation_contract, - service_name: raw_config.service_name, - events_batch_size: raw_config.events_batch_size, - grpc_request_retries: raw_config.grpc_request_retries, - max_decryption_attempts: raw_config.max_decryption_attempts, - s3_ciphertext_retrieval_retries: raw_config.s3_ciphertext_retrieval_retries, - s3_connect_timeout: s3_ciphertext_retrieval_timeout, - task_limit: raw_config.task_limit, - monitoring_endpoint, - healthcheck_timeout, - }) - } -} - -// For testing purpose -impl Default for Config { - fn default() -> Self { - Self::parse(RawConfig::default()).expect("Couldn't parse default raw config") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloy::primitives::Address; - use connector_utils::config::RawContractConfig; - use serial_test::serial; - use std::{env, fs, path::Path, str::FromStr}; - use tempfile::NamedTempFile; - - fn cleanup_env_vars() { - unsafe { - env::remove_var("KMS_CONNECTOR_DATABASE_URL"); - env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); - env::remove_var("KMS_CONNECTOR_KMS_CORE_ENDPOINTS"); - env::remove_var("KMS_CONNECTOR_CHAIN_ID"); - env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); - env::remove_var("KMS_CONNECTOR_EVENTS_BATCH_SIZE"); - env::remove_var("KMS_CONNECTOR_GRPC_REQUEST_RETRIES"); - env::remove_var("KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS"); - env::remove_var("KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES"); - env::remove_var("KMS_CONNECTOR_S3_CONNECT_TIMEOUT"); - } - } - - #[test] - #[serial(config_tests)] - fn test_load_valid_config_from_file() { - cleanup_env_vars(); - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - let config = Config::from_env_and_file(Some(temp_file.path())).unwrap(); - - // Compare fields - assert_eq!(raw_config.gateway_url, config.gateway_url); - assert_eq!(raw_config.kms_core_endpoints, config.kms_core_endpoints); - assert_eq!(raw_config.chain_id, config.chain_id); - assert_eq!( - Address::from_str(&raw_config.decryption_contract.address).unwrap(), - config.decryption_contract.address, - ); - assert_eq!( - Address::from_str(&raw_config.gateway_config_contract.address).unwrap(), - config.gateway_config_contract.address, - ); - assert_eq!(raw_config.kms_core_endpoints, config.kms_core_endpoints); - assert_eq!(raw_config.service_name, config.service_name); - assert_eq!( - raw_config.decryption_contract.domain_name.unwrap(), - config.decryption_contract.domain_name, - ); - assert_eq!( - raw_config.decryption_contract.domain_version.unwrap(), - config.decryption_contract.domain_version, - ); - assert_eq!( - raw_config.gateway_config_contract.domain_name.unwrap(), - config.gateway_config_contract.domain_name, - ); - assert_eq!( - raw_config.gateway_config_contract.domain_version.unwrap(), - config.gateway_config_contract.domain_version, - ); - } - - #[test] - #[serial(config_tests)] - fn test_load_from_env() { - cleanup_env_vars(); - - // Set environment variables - unsafe { - env::set_var( - "KMS_CONNECTOR_DATABASE_URL", - "postgres://postgres:postgres@localhost", - ); - env::set_var("KMS_CONNECTOR_GATEWAY_URL", "ws://localhost:9545"); - env::set_var( - "KMS_CONNECTOR_KMS_CORE_ENDPOINTS", - "http://localhost:50053,http://localhost:50054", - ); - env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); - env::set_var( - "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", - "0x5fbdb2315678afecb367f032d93f642f64180aa3", - ); - env::set_var( - "KMS_CONNECTOR_GATEWAY_CONFIG_CONTRACT__ADDRESS", - "0x5fbdb2315678afecb367f032d93f642f64180aa3", - ); - env::set_var( - "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", - "0x5fbdb2315678afecb367f032d93f642f64180aa3", - ); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); - env::set_var("KMS_CONNECTOR_EVENTS_BATCH_SIZE", "15"); - env::set_var("KMS_CONNECTOR_GRPC_REQUEST_RETRIES", "5"); - env::set_var("KMS_CONNECTOR_MAX_DECRYPTION_ATTEMPTS", "300"); - env::set_var("KMS_CONNECTOR_S3_CIPHERTEXT_RETRIEVAL_RETRIES", "5"); - env::set_var("KMS_CONNECTOR_S3_CONNECT_TIMEOUT", "4"); - } - - // Load config from environment - let config = Config::from_env_and_file::<&str>(None).unwrap(); - - // Verify values - assert_eq!(config.gateway_url, "ws://localhost:9545"); - assert_eq!( - config.kms_core_endpoints, - vec!["http://localhost:50053", "http://localhost:50054"] - ); - assert_eq!(config.chain_id, 31888); - assert_eq!( - config.decryption_contract.address, - Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() - ); - assert_eq!( - config.gateway_config_contract.address, - Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() - ); - assert_eq!( - config.kms_generation_contract.address, - Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() - ); - assert_eq!(config.service_name, "kms-connector-test"); - assert_eq!(config.events_batch_size, 15); - assert_eq!(config.grpc_request_retries, 5); - assert_eq!(config.max_decryption_attempts, 300); - assert_eq!(config.s3_ciphertext_retrieval_retries, 5); - assert_eq!(config.s3_connect_timeout.as_secs(), 4); - - cleanup_env_vars(); - } - - #[test] - #[serial(config_tests)] - fn test_env_overrides_file() { - cleanup_env_vars(); - - // Create a temp config file - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - - // Set an environment variable to override the file - unsafe { - env::set_var("KMS_CONNECTOR_CHAIN_ID", "77737"); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-override"); - } - - // Load config from both sources - let config = Config::from_env_and_file(Some(temp_file.path())).unwrap(); - - // Verify that environment variables take precedence - assert_eq!(config.chain_id, 77737); - assert_eq!(config.service_name, "kms-connector-override"); - - // File values should be used for non-overridden fields - assert_eq!(config.gateway_url, "ws://localhost:8545"); - - cleanup_env_vars(); - } - - #[test] - #[serial(config_tests)] - fn test_invalid_address() { - let raw_config = RawConfig { - decryption_contract: RawContractConfig { - address: "0x0000".to_string(), - ..Default::default() - }, - gateway_config_contract: RawContractConfig { - address: "0x000010".to_string(), - ..Default::default() - }, - ..Default::default() - }; - assert!(matches!( - Config::parse(raw_config), - Err(Error::InvalidConfig(_)) - )); - } - - #[test] - #[serial(config_tests)] - fn test_kms_core_endpoint_fallback() { - let raw_config = RawConfig { - kms_core_endpoints: vec![], - kms_core_endpoint: Some("http://localhost:50053".to_string()), - ..Default::default() - }; - let config = Config::parse(raw_config.clone()).unwrap(); - assert_eq!( - config.kms_core_endpoints, - vec![raw_config.kms_core_endpoint.unwrap()] - ) - } - - impl RawConfig { - pub fn to_file>(&self, path: P) -> Result<()> { - let content = toml::to_string_pretty(self) - .map_err(|e| Error::InvalidConfig(format!("Failed to serialize config: {e}")))?; - - fs::write(path, content) - .map_err(|e| Error::InvalidConfig(format!("Failed to write config file: {e}")))?; - - Ok(()) - } - } -} diff --git a/kms-connector/crates/kms-worker/src/core/config/raw.rs b/kms-connector/crates/kms-worker/src/core/config/raw.rs deleted file mode 100644 index ababab14eb..0000000000 --- a/kms-connector/crates/kms-worker/src/core/config/raw.rs +++ /dev/null @@ -1,154 +0,0 @@ -//! Module used to deserialize the kms-worker configuration using serde. -//! -//! The `RawConfig` can then be parsed into a `Config` in the `parsed` module. - -use config::{Config as ConfigBuilder, Environment, File, FileFormat}; -use connector_utils::{ - config::{DeserializeRawConfig, RawContractConfig, Result, default_database_pool_size}, - monitoring::{health::default_healthcheck_timeout_secs, server::default_monitoring_endpoint}, - tasks::default_task_limit, -}; -use serde::{Deserialize, Serialize}; -use std::path::Path; -use tracing::info; - -/// Deserializable representation of the `KmsWorker` configuration. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct RawConfig { - pub database_url: String, - #[serde(default = "default_database_pool_size")] - pub database_pool_size: u32, - #[serde(default = "default_db_fast_event_polling_secs")] - pub db_fast_event_polling_secs: u64, - #[serde(default = "default_db_long_event_polling_secs")] - pub db_long_event_polling_secs: u64, - pub gateway_url: String, - #[serde(default)] - pub kms_core_endpoints: Vec, - pub kms_core_endpoint: Option, - pub chain_id: u64, - pub decryption_contract: RawContractConfig, - pub gateway_config_contract: RawContractConfig, - pub kms_generation_contract: RawContractConfig, - #[serde(default = "default_service_name")] - pub service_name: String, - #[serde(default = "default_events_batch_size")] - pub events_batch_size: u8, - #[serde(default = "default_grpc_request_retries")] - pub grpc_request_retries: u8, - #[serde(default = "default_max_decryption_attempts")] - pub max_decryption_attempts: u16, - #[serde(default = "default_s3_ciphertext_retrieval_retries")] - pub s3_ciphertext_retrieval_retries: u8, - #[serde(default = "default_s3_connect_timeout")] - pub s3_connect_timeout: u64, - #[serde(default = "default_task_limit")] - pub task_limit: usize, - #[serde(default = "default_monitoring_endpoint")] - pub monitoring_endpoint: String, - #[serde(default = "default_healthcheck_timeout_secs")] - pub healthcheck_timeout_secs: u64, -} - -fn default_service_name() -> String { - "kms-connector-kms-worker".to_string() -} - -fn default_db_fast_event_polling_secs() -> u64 { - 3 -} - -fn default_db_long_event_polling_secs() -> u64 { - 60 -} - -fn default_events_batch_size() -> u8 { - 50 -} - -fn default_grpc_request_retries() -> u8 { - 3 -} - -fn default_max_decryption_attempts() -> u16 { - 200 -} - -fn default_s3_ciphertext_retrieval_retries() -> u8 { - 3 -} - -fn default_s3_connect_timeout() -> u64 { - 2 // 2 seconds -} - -impl DeserializeRawConfig for RawConfig { - fn from_env_and_file>(path: Option

) -> Result - where - for<'a> Self: Sized + Deserialize<'a>, - { - let mut builder = ConfigBuilder::builder(); - - // If path is provided, add it as a config source - if let Some(path) = path { - builder = builder.add_source( - File::with_name(path.as_ref().to_str().unwrap()).format(FileFormat::Toml), - ); - } - - // Add environment variables last so they take precedence - info!("Adding environment variables with prefix KMS_CONNECTOR_"); - builder = builder.add_source( - Environment::with_prefix("KMS_CONNECTOR") - .prefix_separator("_") - .separator("__") - .list_separator(",") - .with_list_parse_key("kms_core_endpoints") - .try_parsing(true), - ); - - let settings = builder.build()?; - let config = settings.try_deserialize()?; - Ok(config) - } -} - -// Default implementation for testing purpose -impl Default for RawConfig { - fn default() -> Self { - Self { - database_url: "postgres://postgres:postgres@localhost".to_string(), - database_pool_size: 16, - db_fast_event_polling_secs: default_db_fast_event_polling_secs(), - db_long_event_polling_secs: default_db_long_event_polling_secs(), - gateway_url: "ws://localhost:8545".to_string(), - kms_core_endpoints: vec!["http://localhost:50052".to_string()], - kms_core_endpoint: None, - chain_id: 1, - decryption_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("Decryption".to_string()), - domain_version: Some("1".to_string()), - }, - gateway_config_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("GatewayConfig".to_string()), - domain_version: Some("1".to_string()), - }, - kms_generation_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("KMSGeneration".to_string()), - domain_version: Some("1".to_string()), - }, - service_name: "kms-connector".to_string(), - events_batch_size: 10, - grpc_request_retries: 3, - max_decryption_attempts: default_max_decryption_attempts(), - s3_ciphertext_retrieval_retries: 3, - s3_connect_timeout: 2, - task_limit: default_task_limit(), - monitoring_endpoint: default_monitoring_endpoint(), - healthcheck_timeout_secs: default_healthcheck_timeout_secs(), - } - } -} diff --git a/kms-connector/crates/kms-worker/src/core/kms_worker.rs b/kms-connector/crates/kms-worker/src/core/kms_worker.rs index f4fe2d69b8..85a1696f7e 100644 --- a/kms-connector/crates/kms-worker/src/core/kms_worker.rs +++ b/kms-connector/crates/kms-worker/src/core/kms_worker.rs @@ -110,7 +110,7 @@ impl KmsWorker> { /// Creates a new `KmsWorker` instance from a valid `Config`. pub async fn from_config(config: Config) -> anyhow::Result<(Self, State)> { let db_pool = connect_to_db(&config.database_url, config.database_pool_size).await?; - let provider = connect_to_gateway(&config.gateway_url, config.chain_id).await?; + let provider = connect_to_gateway(config.gateway_url.clone(), config.chain_id).await?; let kms_client = KmsClient::connect(&config).await?; let kms_health_client = KmsHealthClient::connect(&config.kms_core_endpoints).await?; let s3_client = reqwest::Client::builder() diff --git a/kms-connector/crates/tx-sender/Cargo.toml b/kms-connector/crates/tx-sender/Cargo.toml index d77a60fea4..a7497dcb18 100644 --- a/kms-connector/crates/tx-sender/Cargo.toml +++ b/kms-connector/crates/tx-sender/Cargo.toml @@ -13,6 +13,7 @@ fhevm_gateway_bindings.workspace = true actix-web.workspace = true alloy.workspace = true anyhow.workspace = true +humantime-serde.workspace = true prometheus.workspace = true serde.workspace = true sqlx.workspace = true @@ -29,7 +30,6 @@ connector-utils = { workspace = true, features = ["tests"] } rstest.workspace = true serial_test.workspace = true serde_json.workspace = true -tempfile.workspace = true tokio-stream.workspace = true toml.workspace = true tracing-test.workspace = true diff --git a/kms-connector/crates/tx-sender/src/bin/tx_sender.rs b/kms-connector/crates/tx-sender/src/bin/tx_sender.rs index 65fecff7ed..7e1081db8d 100644 --- a/kms-connector/crates/tx-sender/src/bin/tx_sender.rs +++ b/kms-connector/crates/tx-sender/src/bin/tx_sender.rs @@ -5,6 +5,7 @@ use tx_sender::{ use connector_utils::{ cli::{Cli, Subcommands}, + config::DeserializeConfig, monitoring::{ health::query_healthcheck_endpoint, otlp::init_otlp_setup, server::start_monitoring_server, }, @@ -28,13 +29,13 @@ async fn run() -> anyhow::Result<()> { let subcommand = Cli::new("TransactionSender").parse(); match subcommand { Subcommands::Validate { config } => { - Config::from_env_and_file(Some(config)).await?; + Config::from_env_and_file(Some(config))?; } Subcommands::Health { endpoint } => { query_healthcheck_endpoint::(endpoint).await?; } Subcommands::Start { config } => { - let config = Config::from_env_and_file(config.as_ref()).await?; + let config = Config::from_env_and_file(config.as_ref())?; debug!("{config:?}"); init_otlp_setup(config.service_name.clone())?; diff --git a/kms-connector/crates/tx-sender/src/core/config.rs b/kms-connector/crates/tx-sender/src/core/config.rs new file mode 100644 index 0000000000..57b961aca3 --- /dev/null +++ b/kms-connector/crates/tx-sender/src/core/config.rs @@ -0,0 +1,351 @@ +use alloy::transports::http::reqwest::Url; +use connector_utils::{ + config::{ + AwsKmsConfig, ContractConfig, DeserializeConfig, Error, KmsWallet, + contract::{ + default_decryption_contract_config, default_kms_generation_contract_config, + deserialize_decryption_contract_config, deserialize_kms_generation_contract_config, + }, + default_database_pool_size, deserialize_pg_interval, serialize_pg_interval, + }, + monitoring::{health::default_healthcheck_timeout, server::default_monitoring_endpoint}, + tasks::default_task_limit, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use sqlx::postgres::types::PgInterval; +use std::{net::SocketAddr, str::FromStr, time::Duration}; + +/// Configuration of the `TransactionSender`. +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[cfg_attr(debug_assertions, derive(Serialize))] +pub struct Config { + /// The URL of the Postgres database. + pub database_url: String, + /// The size of the database connection pool. + #[serde(default = "default_database_pool_size")] + pub database_pool_size: u32, + /// The timeout for polling the database for responses. + #[serde(with = "humantime_serde", default = "default_database_polling_timeout")] + pub database_polling_timeout: Duration, + + /// The Gateway RPC endpoint. + pub gateway_url: Url, + /// The Chain ID of the Gateway. + pub chain_id: u64, + /// The `Decryption` contract configuration. + #[serde(deserialize_with = "deserialize_decryption_contract_config")] + pub decryption_contract: ContractConfig, + /// The `KMSGeneration` contract configuration. + #[serde(deserialize_with = "deserialize_kms_generation_contract_config")] + pub kms_generation_contract: ContractConfig, + /// The private key of the `TransactionSender`'s wallet. + pub private_key: Option, + /// The AWS KMS configuration of the `TransactionSender`'s wallet. + pub aws_kms_config: Option, + + /// The number of retries for transaction sending. + #[serde(default = "default_tx_retries")] + pub tx_retries: u8, + /// The interval between transaction retries. + #[serde(with = "humantime_serde", default = "default_tx_retry_interval")] + pub tx_retry_interval: Duration, + /// Enable tracing of reverted transactions. + #[serde(default = "default_trace_reverted_tx")] + pub trace_reverted_tx: bool, + /// The batch size for KMS Core response processing. + #[serde(default = "default_responses_batch_size")] + pub responses_batch_size: u8, + /// The gas multiplier percentage after each transaction attempt. + #[serde( + deserialize_with = "parse_gas_multiplier_percent", + default = "default_gas_multiplier_percent" + )] + pub gas_multiplier_percent: usize, + + /// The service name used for tracing. + #[serde(default = "default_service_name")] + pub service_name: String, + /// The maximum number of tasks that can be executed concurrently. + #[serde(default = "default_task_limit")] + pub task_limit: usize, + + /// The interval between garbage collection runs. + #[serde(with = "humantime_serde", default = "default_gc_run_interval")] + pub gc_run_interval: Duration, + /// The expiration time for completed/failed decryptions, after which they will be deleted. + #[serde( + deserialize_with = "deserialize_pg_interval", + serialize_with = "serialize_pg_interval", + default = "default_gc_decryption_expiry" + )] + pub gc_decryption_expiry: PgInterval, + /// The time limit for decryption to be under process, after which they will be considered as + /// pending again. + #[serde( + deserialize_with = "deserialize_pg_interval", + serialize_with = "serialize_pg_interval", + default = "default_gc_decryption_under_process_limit" + )] + pub gc_decryption_under_process_limit: PgInterval, + + /// The monitoring server endpoint of the `TransactionSender`. + #[serde(default = "default_monitoring_endpoint")] + pub monitoring_endpoint: SocketAddr, + /// The interval between gauge updates. + #[serde(with = "humantime_serde", default = "default_gauge_update_interval")] + pub gauge_update_interval: Duration, + /// The timeout to perform each external service connection healthcheck. + #[serde(with = "humantime_serde", default = "default_healthcheck_timeout")] + pub healthcheck_timeout: Duration, +} + +fn parse_gas_multiplier_percent<'de, D>(d: D) -> Result +where + D: Deserializer<'de>, +{ + let gas_multiplier_percent = ::deserialize(d)?; + if gas_multiplier_percent < 100 { + Err(serde::de::Error::custom( + "`gas_multiplier_percent` must be greater than or equal to 100%", + )) + } else { + Ok(gas_multiplier_percent) + } +} + +impl DeserializeConfig for Config {} + +impl Config { + pub async fn build_wallet(&self) -> Result { + let chain_id = Some(self.chain_id); + if let Some(private_key) = &self.private_key { + KmsWallet::from_private_key_str(private_key, chain_id) + } else if let Some(aws_kms_config) = self.aws_kms_config.clone() { + KmsWallet::from_aws_kms(aws_kms_config, chain_id).await + } else { + Err(Error::InvalidConfig( + "Either AWS KMS or private key must be configured".into(), + )) + } + } +} + +fn default_service_name() -> String { + "kms-connector-tx-sender".to_string() +} + +fn default_database_polling_timeout() -> Duration { + Duration::from_secs(5) +} + +fn default_tx_retries() -> u8 { + 4 +} + +fn default_tx_retry_interval() -> Duration { + Duration::from_millis(10) +} + +fn default_trace_reverted_tx() -> bool { + true +} + +fn default_responses_batch_size() -> u8 { + 50 +} + +fn default_gas_multiplier_percent() -> usize { + 115 // 115% gas increase by default +} + +fn default_gauge_update_interval() -> Duration { + Duration::from_secs(10) +} + +fn default_gc_run_interval() -> Duration { + Duration::from_mins(5) +} + +fn default_gc_decryption_expiry() -> PgInterval { + PgInterval::try_from(Duration::from_hours(24)).unwrap() +} + +fn default_gc_decryption_under_process_limit() -> PgInterval { + PgInterval::try_from(Duration::from_mins(6)).unwrap() +} + +// Default implementation for testing purpose +impl Default for Config { + fn default() -> Self { + Self { + database_url: "postgres://postgres:postgres@localhost/kms-connector".to_string(), + database_pool_size: default_database_pool_size(), + database_polling_timeout: default_database_polling_timeout(), + gateway_url: Url::from_str("http://localhost:8545").unwrap(), + chain_id: 54321, + decryption_contract: default_decryption_contract_config(), + kms_generation_contract: default_kms_generation_contract_config(), + service_name: default_service_name(), + private_key: Some( + "0x3f45b129a7fd099146e9fe63851a71646231f7743c712695f3b2d2bf0e41c774".to_string(), + ), + aws_kms_config: None, + tx_retries: default_tx_retries(), + tx_retry_interval: default_tx_retry_interval(), + trace_reverted_tx: default_trace_reverted_tx(), + responses_batch_size: default_responses_batch_size(), + gas_multiplier_percent: default_gas_multiplier_percent(), + task_limit: default_task_limit(), + gc_run_interval: default_gc_run_interval(), + gc_decryption_expiry: default_gc_decryption_expiry(), + gc_decryption_under_process_limit: default_gc_decryption_under_process_limit(), + monitoring_endpoint: default_monitoring_endpoint(), + gauge_update_interval: default_gauge_update_interval(), + healthcheck_timeout: default_healthcheck_timeout(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::address; + use serial_test::serial; + use std::env; + + fn cleanup_env_vars() { + unsafe { + env::remove_var("KMS_CONNECTOR_DATABASE_URL"); + env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); + env::remove_var("KMS_CONNECTOR_CHAIN_ID"); + env::remove_var("KMS_CONNECTOR_PRIVATE_KEY"); + env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); + env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); + env::remove_var("KMS_CONNECTOR_RESPONSES_BATCH_SIZE"); + env::remove_var("KMS_CONNECTOR_TX_RETRIES"); + env::remove_var("KMS_CONNECTOR_TX_RETRY_INTERVAL"); + env::remove_var("KMS_CONNECTOR_TRACE_REVERTED_TX"); + env::remove_var("KMS_CONNECTOR_GAS_MULTIPLIER_PERCENT"); + env::remove_var("KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL"); + env::remove_var("KMS_CONNECTOR_GC_RUN_INTERVAL"); + env::remove_var("KMS_CONNECTOR_GC_DECRYPTION_EXPIRY"); + env::remove_var("KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT"); + } + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_load_valid_config_from_file() { + cleanup_env_vars(); + let default_config = Config::default(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + assert_eq!(default_config, example_config); + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_load_from_env() { + cleanup_env_vars(); + + // Set environment variables + unsafe { + env::set_var( + "KMS_CONNECTOR_DATABASE_URL", + "postgres://postgres:postgres@localhost", + ); + env::set_var("KMS_CONNECTOR_GATEWAY_URL", "http://localhost:9545"); + env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); + env::set_var( + "KMS_CONNECTOR_PRIVATE_KEY", + "8355bb293b8714a06b972bfe692d1bd9f24235c1f4007ae0be285d398b0bba2f", + ); + env::set_var( + "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", + "0x5fbdb2315678afecb367f032d93f642f64180aa3", + ); + env::set_var( + "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", + "0x0000000000000000000000000000000000000002", + ); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); + env::set_var("KMS_CONNECTOR_RESPONSES_BATCH_SIZE", "20"); + env::set_var("KMS_CONNECTOR_TX_RETRIES", "5"); + env::set_var("KMS_CONNECTOR_TX_RETRY_INTERVAL", "200ms"); + env::set_var("KMS_CONNECTOR_TRACE_REVERTED_TX", "false"); + env::set_var("KMS_CONNECTOR_GAS_MULTIPLIER_PERCENT", "180"); + env::set_var("KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL", "20s"); + env::set_var("KMS_CONNECTOR_GC_RUN_INTERVAL", "2m"); + env::set_var("KMS_CONNECTOR_GC_DECRYPTION_EXPIRY", "50m"); + env::set_var("KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT", "1m"); + } + + // Load config from environment + let config = Config::from_env_and_file::<&str>(None).unwrap(); + + // Verify values + assert_eq!( + config.gateway_url, + Url::from_str("http://localhost:9545").unwrap() + ); + assert_eq!(config.chain_id, 31888); + assert_eq!( + config.decryption_contract.address, + address!("0x5fbdb2315678afecb367f032d93f642f64180aa3") + ); + assert_eq!( + config.kms_generation_contract.address, + address!("0x0000000000000000000000000000000000000002") + ); + assert_eq!(config.service_name, "kms-connector-test"); + assert_eq!(config.responses_batch_size, 20); + assert_eq!(config.tx_retries, 5); + assert_eq!(config.tx_retry_interval, Duration::from_millis(200)); + assert!(!config.trace_reverted_tx); + assert_eq!(config.gas_multiplier_percent, 180); + assert_eq!(config.gauge_update_interval, Duration::from_secs(20)); + assert_eq!(config.gc_run_interval, Duration::from_mins(2)); + assert_eq!( + config.gc_decryption_expiry, + PgInterval::try_from(Duration::from_mins(50)).unwrap() + ); + assert_eq!( + config.gc_decryption_under_process_limit, + PgInterval::try_from(Duration::from_mins(1)).unwrap() + ); + + cleanup_env_vars(); + } + + #[tokio::test] + #[serial(config_tests)] + async fn test_env_overrides_file() { + cleanup_env_vars(); + let example_config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Set an environment variable to override the file + let chain_id = 77737; + let service_name = "kms-connector-override"; + let mut expected_config = example_config.clone(); + expected_config.chain_id = chain_id; + expected_config.service_name = service_name.to_string(); + unsafe { + env::set_var("KMS_CONNECTOR_CHAIN_ID", chain_id.to_string()); + env::set_var("KMS_CONNECTOR_SERVICE_NAME", service_name); + } + + // Load config from both sources + let config = Config::from_env_and_file(Some(example_config_path())).unwrap(); + + // Verify that environment variables take precedence + assert_ne!(config.chain_id, example_config.chain_id); + assert_ne!(config.service_name, example_config.service_name); + assert_eq!(config, expected_config); + + cleanup_env_vars(); + } + + fn example_config_path() -> String { + format!("{}/../../config/tx-sender.toml", env!("CARGO_MANIFEST_DIR")) + } +} diff --git a/kms-connector/crates/tx-sender/src/core/config/mod.rs b/kms-connector/crates/tx-sender/src/core/config/mod.rs deleted file mode 100644 index 39c4a6a39b..0000000000 --- a/kms-connector/crates/tx-sender/src/core/config/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod parsed; -mod raw; - -pub use parsed::Config; diff --git a/kms-connector/crates/tx-sender/src/core/config/parsed.rs b/kms-connector/crates/tx-sender/src/core/config/parsed.rs deleted file mode 100644 index 8625a85022..0000000000 --- a/kms-connector/crates/tx-sender/src/core/config/parsed.rs +++ /dev/null @@ -1,403 +0,0 @@ -//! Module used to parse tx-sender configuration. -//! -//! The `raw` module is first used to deserialize the configuration. - -use super::raw::RawConfig; -use connector_utils::{ - config::{AwsKmsConfig, ContractConfig, DeserializeRawConfig, Error, KmsWallet, Result}, - monitoring::otlp::default_dispatcher, -}; -use sqlx::postgres::types::PgInterval; -use std::{net::SocketAddr, path::Path, time::Duration}; -use tracing::{error, info}; - -/// Configuration of the `TransactionSender`. -#[derive(Clone, Debug)] -pub struct Config { - /// The URL of the Postgres database. - pub database_url: String, - /// The size of the database connection pool. - pub database_pool_size: u32, - /// The timeout for polling the database for responses. - pub database_polling_timeout: Duration, - /// The Gateway RPC endpoint. - pub gateway_url: String, - /// The Chain ID of the Gateway. - pub chain_id: u64, - /// The `Decryption` contract configuration. - pub decryption_contract: ContractConfig, - /// The `KMSGeneration` contract configuration. - pub kms_generation_contract: ContractConfig, - /// The service name used for tracing. - pub service_name: String, - /// The wallet used to sign the decryption responses from the kms-core. - pub wallet: KmsWallet, - /// The number of retries for transaction sending. - pub tx_retries: u8, - /// The interval between transaction retries. - pub tx_retry_interval: Duration, - /// Enable tracing of reverted transactions. - pub trace_reverted_tx: bool, - /// The batch size for KMS Core response processing. - pub responses_batch_size: u8, - /// The gas multiplier percentage after each transaction attempt. - pub gas_multiplier_percent: usize, - /// The maximum number of tasks that can be executed concurrently. - pub task_limit: usize, - - /// The interval between garbage collection runs. - pub gc_run_interval: Duration, - /// The expiration time for completed/failed decryptions, after which they will be deleted. - pub gc_decryption_expiry: PgInterval, - /// The time limit for decryption to be under process, after which they will be considered as - /// pending again. - pub gc_decryption_under_process_limit: PgInterval, - - /// The monitoring server endpoint of the `TransactionSender`. - pub monitoring_endpoint: SocketAddr, - /// The interval between gauge updates. - pub gauge_update_interval: Duration, - /// The timeout to perform each external service connection healthcheck. - pub healthcheck_timeout: Duration, -} - -impl Config { - /// Loads the configuration from environment variables and optionally from a TOML file. - /// - /// Environment variables take precedence over file configuration. - /// Environment variables are prefixed with KMS_CONNECTOR_. - pub async fn from_env_and_file>(path: Option

) -> Result { - let _dispatcher_guard = tracing::dispatcher::set_default(&default_dispatcher()); - if let Some(config_path) = &path { - info!("Loading config from: {}", config_path.as_ref().display()); - } else { - info!("Loading config using environment variables only"); - } - - let raw_config = RawConfig::from_env_and_file(path).inspect_err(|e| error!("{e}"))?; - Self::parse(raw_config).await.inspect_err(|e| error!("{e}")) - } - - async fn parse(raw_config: RawConfig) -> Result { - let monitoring_endpoint = raw_config - .monitoring_endpoint - .parse::() - .map_err(|e| Error::InvalidConfig(e.to_string()))?; - - let wallet = Self::parse_kms_wallet( - raw_config.chain_id, - raw_config.private_key, - raw_config.aws_kms_config, - ) - .await?; - - let decryption_contract = - ContractConfig::parse("Decryption", raw_config.decryption_contract)?; - let kms_generation_contract = - ContractConfig::parse("KMSGeneration", raw_config.kms_generation_contract)?; - - // Validate critical configuration parts - if raw_config.gateway_url.is_empty() { - return Err(Error::EmptyField("Gateway URL".to_string())); - } - - if raw_config.gas_multiplier_percent < 100 { - return Err(Error::InvalidConfig( - "gas_multiplier_percent should be greater than or equal to 100%".to_string(), - )); - } - - let database_polling_timeout = - Duration::from_secs(raw_config.database_polling_timeout_secs); - let tx_retry_interval = Duration::from_millis(raw_config.tx_retry_interval_ms); - let gc_run_interval = Duration::from_mins(raw_config.gc_run_interval_mins); - let gc_decryption_expiry = - PgInterval::try_from(Duration::from_mins(raw_config.gc_decryption_expiry_mins)) - .map_err(|e| Error::InvalidConfig(e.to_string()))?; - let gc_decryption_under_process_limit = PgInterval::try_from(Duration::from_mins( - raw_config.gc_decryption_under_process_limit_mins, - )) - .map_err(|e| Error::InvalidConfig(e.to_string()))?; - let gauge_update_interval = Duration::from_secs(raw_config.gauge_update_interval_secs); - let healthcheck_timeout = Duration::from_secs(raw_config.healthcheck_timeout_secs); - - Ok(Self { - database_url: raw_config.database_url, - database_pool_size: raw_config.database_pool_size, - database_polling_timeout, - gateway_url: raw_config.gateway_url, - chain_id: raw_config.chain_id, - decryption_contract, - kms_generation_contract, - service_name: raw_config.service_name, - wallet, - tx_retries: raw_config.tx_retries, - tx_retry_interval, - trace_reverted_tx: raw_config.trace_reverted_tx, - responses_batch_size: raw_config.responses_batch_size, - gas_multiplier_percent: raw_config.gas_multiplier_percent, - task_limit: raw_config.task_limit, - gc_run_interval, - gc_decryption_expiry, - gc_decryption_under_process_limit, - monitoring_endpoint, - gauge_update_interval, - healthcheck_timeout, - }) - } - - async fn parse_kms_wallet( - chain_id: u64, - private_key: Option, - aws_kms_config: Option, - ) -> Result { - let chain_id = Some(chain_id); - if let Some(private_key) = private_key { - KmsWallet::from_private_key_str(&private_key, chain_id) - } else if let Some(aws_kms_config) = aws_kms_config { - KmsWallet::from_aws_kms(aws_kms_config, chain_id).await - } else { - Err(Error::InvalidConfig( - "Either AWS KMS or private key must be configured".into(), - )) - } - } - - // Default implementation for testing purpose - pub async fn default() -> Self { - Self::parse(RawConfig::default()) - .await - .expect("Failed to parse default RawConfig") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloy::primitives::Address; - use connector_utils::config::RawContractConfig; - use serial_test::serial; - use std::{env, fs, path::Path, str::FromStr}; - use tempfile::NamedTempFile; - - fn cleanup_env_vars() { - unsafe { - env::remove_var("KMS_CONNECTOR_DATABASE_URL"); - env::remove_var("KMS_CONNECTOR_GATEWAY_URL"); - env::remove_var("KMS_CONNECTOR_CHAIN_ID"); - env::remove_var("KMS_CONNECTOR_PRIVATE_KEY"); - env::remove_var("KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS"); - env::remove_var("KMS_CONNECTOR_SERVICE_NAME"); - env::remove_var("KMS_CONNECTOR_RESPONSES_BATCH_SIZE"); - env::remove_var("KMS_CONNECTOR_TX_RETRIES"); - env::remove_var("KMS_CONNECTOR_TX_RETRY_INTERVAL_MS"); - env::remove_var("KMS_CONNECTOR_TRACE_REVERTED_TX"); - env::remove_var("KMS_CONNECTOR_GAS_MULTIPLIER_PERCENT"); - env::remove_var("KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL_SECS"); - env::remove_var("KMS_CONNECTOR_GC_RUN_INTERVAL_MINS"); - env::remove_var("KMS_CONNECTOR_GC_DECRYPTION_EXPIRY_MINS"); - env::remove_var("KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT_MINS"); - } - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_load_valid_config_from_file() { - cleanup_env_vars(); - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - let config = Config::from_env_and_file(Some(temp_file.path())) - .await - .unwrap(); - - // Compare fields - assert_eq!(raw_config.gateway_url, config.gateway_url); - assert_eq!(raw_config.chain_id, config.chain_id); - assert_eq!( - Address::from_str(&raw_config.decryption_contract.address).unwrap(), - config.decryption_contract.address, - ); - assert_eq!( - Address::from_str(&raw_config.kms_generation_contract.address).unwrap(), - config.kms_generation_contract.address, - ); - assert_eq!(raw_config.service_name, config.service_name); - assert_eq!( - raw_config.decryption_contract.domain_name.unwrap(), - config.decryption_contract.domain_name, - ); - assert_eq!( - raw_config.decryption_contract.domain_version.unwrap(), - config.decryption_contract.domain_version, - ); - assert_eq!( - raw_config.kms_generation_contract.domain_name.unwrap(), - config.kms_generation_contract.domain_name, - ); - assert_eq!( - raw_config.kms_generation_contract.domain_version.unwrap(), - config.kms_generation_contract.domain_version, - ); - assert_eq!(raw_config.responses_batch_size, config.responses_batch_size); - assert_eq!(raw_config.tx_retries, config.tx_retries); - assert_eq!( - raw_config.tx_retry_interval_ms as u128, - config.tx_retry_interval.as_millis() - ); - assert_eq!( - raw_config.gas_multiplier_percent, - config.gas_multiplier_percent - ); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_load_from_env() { - cleanup_env_vars(); - - // Set environment variables - unsafe { - env::set_var( - "KMS_CONNECTOR_DATABASE_URL", - "postgres://postgres:postgres@localhost", - ); - env::set_var("KMS_CONNECTOR_GATEWAY_URL", "ws://localhost:9545"); - env::set_var("KMS_CONNECTOR_CHAIN_ID", "31888"); - env::set_var( - "KMS_CONNECTOR_PRIVATE_KEY", - "8355bb293b8714a06b972bfe692d1bd9f24235c1f4007ae0be285d398b0bba2f", - ); - env::set_var( - "KMS_CONNECTOR_DECRYPTION_CONTRACT__ADDRESS", - "0x5fbdb2315678afecb367f032d93f642f64180aa3", - ); - env::set_var( - "KMS_CONNECTOR_KMS_GENERATION_CONTRACT__ADDRESS", - "0x0000000000000000000000000000000000000002", - ); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-test"); - env::set_var("KMS_CONNECTOR_RESPONSES_BATCH_SIZE", "20"); - env::set_var("KMS_CONNECTOR_TX_RETRIES", "5"); - env::set_var("KMS_CONNECTOR_TX_RETRY_INTERVAL_MS", "200"); - env::set_var("KMS_CONNECTOR_TRACE_REVERTED_TX", "false"); - env::set_var("KMS_CONNECTOR_GAS_MULTIPLIER_PERCENT", "180"); - env::set_var("KMS_CONNECTOR_GAUGE_UPDATE_INTERVAL_SECS", "20"); - env::set_var("KMS_CONNECTOR_GC_RUN_INTERVAL_MINS", "2"); - env::set_var("KMS_CONNECTOR_GC_DECRYPTION_EXPIRY_MINS", "50"); - env::set_var("KMS_CONNECTOR_GC_DECRYPTION_UNDER_PROCESS_LIMIT_MINS", "1"); - } - - // Load config from environment - let config = Config::from_env_and_file::<&str>(None).await.unwrap(); - - // Verify values - assert_eq!(config.gateway_url, "ws://localhost:9545"); - assert_eq!(config.chain_id, 31888); - assert_eq!( - config.decryption_contract.address, - Address::from_str("0x5fbdb2315678afecb367f032d93f642f64180aa3").unwrap() - ); - assert_eq!( - config.kms_generation_contract.address, - Address::from_str("0x0000000000000000000000000000000000000002").unwrap() - ); - assert_eq!(config.service_name, "kms-connector-test"); - assert_eq!(config.responses_batch_size, 20); - assert_eq!(config.tx_retries, 5); - assert_eq!(config.tx_retry_interval, Duration::from_millis(200)); - assert!(!config.trace_reverted_tx); - assert_eq!(config.gas_multiplier_percent, 180); - assert_eq!(config.gauge_update_interval, Duration::from_secs(20)); - assert_eq!(config.gc_run_interval, Duration::from_mins(2)); - assert_eq!( - config.gc_decryption_expiry, - PgInterval::try_from(Duration::from_mins(50)).unwrap() - ); - assert_eq!( - config.gc_decryption_under_process_limit, - PgInterval::try_from(Duration::from_mins(1)).unwrap() - ); - - cleanup_env_vars(); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_env_overrides_file() { - cleanup_env_vars(); - - // Create a temp config file - let raw_config = RawConfig::default(); - - let temp_file = NamedTempFile::new().unwrap(); - raw_config.to_file(temp_file.path()).unwrap(); - - // Set an environment variable to override the file - unsafe { - env::set_var("KMS_CONNECTOR_CHAIN_ID", "77737"); - env::set_var("KMS_CONNECTOR_SERVICE_NAME", "kms-connector-override"); - } - - // Load config from both sources - let config = Config::from_env_and_file(Some(temp_file.path())) - .await - .unwrap(); - - // Verify that environment variables take precedence - assert_eq!(config.chain_id, 77737); - assert_eq!(config.service_name, "kms-connector-override"); - - // File values should be used for non-overridden fields - assert_eq!(config.gateway_url, "ws://localhost:8545"); - - cleanup_env_vars(); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_invalid_address() { - let raw_config = RawConfig { - decryption_contract: RawContractConfig { - address: "0x0000".to_string(), - ..Default::default() - }, - kms_generation_contract: RawContractConfig { - address: "0x000010".to_string(), - ..Default::default() - }, - ..Default::default() - }; - assert!(matches!( - Config::parse(raw_config).await, - Err(Error::InvalidConfig(_)) - )); - } - - #[tokio::test] - #[serial(config_tests)] - async fn test_invalid_wallet() { - let raw_config = RawConfig { - private_key: None, - ..Default::default() - }; - assert!(matches!( - Config::parse(raw_config).await, - Err(Error::InvalidConfig(_)) - )); - } - - impl RawConfig { - pub fn to_file>(&self, path: P) -> Result<()> { - let content = toml::to_string_pretty(self) - .map_err(|e| Error::InvalidConfig(format!("Failed to serialize config: {e}")))?; - - fs::write(path, content) - .map_err(|e| Error::InvalidConfig(format!("Failed to write config file: {e}")))?; - - Ok(()) - } - } -} diff --git a/kms-connector/crates/tx-sender/src/core/config/raw.rs b/kms-connector/crates/tx-sender/src/core/config/raw.rs deleted file mode 100644 index 7089836fac..0000000000 --- a/kms-connector/crates/tx-sender/src/core/config/raw.rs +++ /dev/null @@ -1,141 +0,0 @@ -//! Module used to deserialize the tx-sender configuration using serde. -//! -//! The `RawConfig` can then be parsed into a `Config` in the `parsed` module. - -use connector_utils::{ - config::{AwsKmsConfig, DeserializeRawConfig, RawContractConfig, default_database_pool_size}, - monitoring::{health::default_healthcheck_timeout_secs, server::default_monitoring_endpoint}, - tasks::default_task_limit, -}; -use serde::{Deserialize, Serialize}; - -/// Deserializable representation of the `TransactionSender` configuration. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct RawConfig { - pub database_url: String, - #[serde(default = "default_database_pool_size")] - pub database_pool_size: u32, - #[serde(default = "default_database_polling_timeout_secs")] - pub database_polling_timeout_secs: u64, - pub gateway_url: String, - pub chain_id: u64, - pub decryption_contract: RawContractConfig, - pub kms_generation_contract: RawContractConfig, - #[serde(default = "default_service_name")] - pub service_name: String, - pub private_key: Option, - pub aws_kms_config: Option, - #[serde(default = "default_tx_retries")] - pub tx_retries: u8, - #[serde(default = "default_tx_retry_interval_ms")] - pub tx_retry_interval_ms: u64, - #[serde(default = "default_trace_reverted_tx")] - pub trace_reverted_tx: bool, - #[serde(default = "default_responses_batch_size")] - pub responses_batch_size: u8, - #[serde(default = "default_gas_multiplier_percent")] - pub gas_multiplier_percent: usize, - #[serde(default = "default_task_limit")] - pub task_limit: usize, - - #[serde(default = "default_gc_run_interval_mins")] - pub gc_run_interval_mins: u64, - #[serde(default = "default_gc_decryption_expiry_mins")] - pub gc_decryption_expiry_mins: u64, - #[serde(default = "default_gc_decryption_under_process_limit_mins")] - pub gc_decryption_under_process_limit_mins: u64, - - #[serde(default = "default_monitoring_endpoint")] - pub monitoring_endpoint: String, - #[serde(default = "default_gauge_update_interval_secs")] - pub gauge_update_interval_secs: u64, - #[serde(default = "default_healthcheck_timeout_secs")] - pub healthcheck_timeout_secs: u64, -} - -fn default_service_name() -> String { - "kms-connector-tx-sender".to_string() -} - -fn default_database_polling_timeout_secs() -> u64 { - 5 -} - -fn default_tx_retries() -> u8 { - 4 -} - -fn default_tx_retry_interval_ms() -> u64 { - 10 -} - -fn default_trace_reverted_tx() -> bool { - true -} - -fn default_responses_batch_size() -> u8 { - 50 -} - -fn default_gas_multiplier_percent() -> usize { - 115 // 115% gas increase by default -} - -fn default_gauge_update_interval_secs() -> u64 { - 10 -} - -fn default_gc_run_interval_mins() -> u64 { - 5 -} - -fn default_gc_decryption_expiry_mins() -> u64 { - 60 * 24 // 24 hours -} - -fn default_gc_decryption_under_process_limit_mins() -> u64 { - 6 -} - -impl DeserializeRawConfig for RawConfig {} - -// Default implementation for testing purpose -impl Default for RawConfig { - fn default() -> Self { - Self { - database_url: "postgres://postgres:postgres@localhost".to_string(), - database_pool_size: default_database_pool_size(), - database_polling_timeout_secs: default_database_polling_timeout_secs(), - gateway_url: "ws://localhost:8545".to_string(), - chain_id: 1, - decryption_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("Decryption".to_string()), - domain_version: Some("1".to_string()), - }, - kms_generation_contract: RawContractConfig { - address: "0x0000000000000000000000000000000000000000".to_string(), - domain_name: Some("KMSGeneration".to_string()), - domain_version: Some("1".to_string()), - }, - service_name: "kms-connector".to_string(), - private_key: Some( - "8355bb293b8714a06b972bfe692d1bd9f24235c1f4007ae0be285d398b0bba2f".to_string(), - ), - aws_kms_config: None, - tx_retries: default_tx_retries(), - tx_retry_interval_ms: default_tx_retry_interval_ms(), - trace_reverted_tx: default_trace_reverted_tx(), - responses_batch_size: default_responses_batch_size(), - gas_multiplier_percent: default_gas_multiplier_percent(), - task_limit: default_task_limit(), - gc_run_interval_mins: default_gc_run_interval_mins(), - gc_decryption_expiry_mins: default_gc_decryption_expiry_mins(), - gc_decryption_under_process_limit_mins: default_gc_decryption_under_process_limit_mins( - ), - monitoring_endpoint: default_monitoring_endpoint(), - gauge_update_interval_secs: default_gauge_update_interval_secs(), - healthcheck_timeout_secs: default_healthcheck_timeout_secs(), - } - } -} diff --git a/kms-connector/crates/tx-sender/src/core/tx_sender.rs b/kms-connector/crates/tx-sender/src/core/tx_sender.rs index 5510fd332c..4ad72c6533 100644 --- a/kms-connector/crates/tx-sender/src/core/tx_sender.rs +++ b/kms-connector/crates/tx-sender/src/core/tx_sender.rs @@ -138,12 +138,13 @@ where impl TransactionSender { /// Creates a new `TransactionSender` instance from a valid `Config`. pub async fn from_config(config: Config) -> anyhow::Result<(Self, State)> { + let wallet = config.build_wallet().await?; + let db_pool = connect_to_db(&config.database_url, config.database_pool_size).await?; let response_picker = DbKmsResponsePicker::connect(db_pool.clone(), &config).await?; let provider = - connect_to_gateway_with_wallet(&config.gateway_url, config.chain_id, config.wallet) - .await?; + connect_to_gateway_with_wallet(config.gateway_url, config.chain_id, wallet).await?; let decryption_contract = Decryption::new(config.decryption_contract.address, provider.clone()); let kms_generation_contract = diff --git a/kms-connector/crates/tx-sender/tests/integration_tests.rs b/kms-connector/crates/tx-sender/tests/integration_tests.rs index b9f77ef134..d5173681d2 100644 --- a/kms-connector/crates/tx-sender/tests/integration_tests.rs +++ b/kms-connector/crates/tx-sender/tests/integration_tests.rs @@ -356,9 +356,9 @@ async fn start_test_tx_sender( cancel_token: CancellationToken, ) -> anyhow::Result> { let response_picker = - DbKmsResponsePicker::connect(test_instance.db().clone(), &Config::default().await).await?; + DbKmsResponsePicker::connect(test_instance.db().clone(), &Config::default()).await?; let provider = connect_to_gateway_with_wallet( - test_instance.anvil_http_endpoint().as_str(), + test_instance.anvil_http_endpoint(), *CHAIN_ID as u64, KmsWallet::from_private_key_str(DEPLOYER_PRIVATE_KEY, Some(*CHAIN_ID as u64))?, ) diff --git a/kms-connector/crates/tx-sender/tests/response_picker/notif.rs b/kms-connector/crates/tx-sender/tests/response_picker/notif.rs index 050ca94053..20711084d3 100644 --- a/kms-connector/crates/tx-sender/tests/response_picker/notif.rs +++ b/kms-connector/crates/tx-sender/tests/response_picker/notif.rs @@ -58,7 +58,9 @@ async fn test_pick_response_with_pg_notif(response_str: &str) -> anyhow::Result< async fn init_response_picker(db_pool: Pool) -> anyhow::Result { // Use high polling to ensure PG notifications are used in tests - let mut config = Config::default().await; - config.database_polling_timeout = Duration::from_secs(120); + let config = Config { + database_polling_timeout: Duration::from_secs(120), + ..Default::default() + }; DbKmsResponsePicker::connect(db_pool, &config).await } diff --git a/kms-connector/crates/tx-sender/tests/response_picker/parallel.rs b/kms-connector/crates/tx-sender/tests/response_picker/parallel.rs index a47b67efa7..47ada7c767 100644 --- a/kms-connector/crates/tx-sender/tests/response_picker/parallel.rs +++ b/kms-connector/crates/tx-sender/tests/response_picker/parallel.rs @@ -101,8 +101,10 @@ async fn test_parallel_response_picking(request_str: &str) -> anyhow::Result<()> } async fn init_response_picker(db: Pool) -> anyhow::Result { - let mut config = Config::default().await; - config.responses_batch_size = 1; + let config = Config { + responses_batch_size: 1, + ..Default::default() + }; DbKmsResponsePicker::connect(db, &config).await } diff --git a/kms-connector/crates/tx-sender/tests/response_picker/polling.rs b/kms-connector/crates/tx-sender/tests/response_picker/polling.rs index 0338168f39..c852569bf4 100644 --- a/kms-connector/crates/tx-sender/tests/response_picker/polling.rs +++ b/kms-connector/crates/tx-sender/tests/response_picker/polling.rs @@ -46,8 +46,10 @@ async fn test_pick_response_with_polling_backup(response_str: &str) -> anyhow::R let inserted_response = insert_rand_response(test_instance.db(), response_str, None, None).await?; - let mut config = Config::default().await; - config.database_polling_timeout = Duration::from_millis(500); + let config = Config { + database_polling_timeout: Duration::from_millis(500), + ..Default::default() + }; let mut response_picker = DbKmsResponsePicker::connect(test_instance.db().clone(), &config).await?; diff --git a/kms-connector/crates/utils/Cargo.toml b/kms-connector/crates/utils/Cargo.toml index 5bf3817e65..e49477ef2b 100644 --- a/kms-connector/crates/utils/Cargo.toml +++ b/kms-connector/crates/utils/Cargo.toml @@ -22,6 +22,7 @@ clap.workspace = true config.workspace = true futures.workspace = true git-version.workspace = true +humantime-serde.workspace = true opentelemetry.workspace = true opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true @@ -44,7 +45,6 @@ toml = { workspace = true, optional = true } [dev-dependencies] serial_test.workspace = true -tempfile.workspace = true toml.workspace = true [features] diff --git a/kms-connector/crates/utils/src/config/contract.rs b/kms-connector/crates/utils/src/config/contract.rs index 5784ef96d6..c24bb92f6e 100644 --- a/kms-connector/crates/utils/src/config/contract.rs +++ b/kms-connector/crates/utils/src/config/contract.rs @@ -1,11 +1,12 @@ use crate::config::{Error, Result}; -use alloy::primitives::Address; -use serde::{Deserialize, Serialize}; -use std::{fmt::Display, str::FromStr}; +use alloy::primitives::{Address, address}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::str::FromStr; use tracing::{info, warn}; /// Struct containing the information required to interact with a Gateway's contract. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(debug_assertions, derive(Serialize))] pub struct ContractConfig { pub contract_name: String, pub address: Address, @@ -13,9 +14,47 @@ pub struct ContractConfig { pub domain_version: String, } +/// Struct used to deserialize the Gateway's contracts information from the configuration file. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct RawContractConfig { + pub address: String, + pub domain_name: Option, + pub domain_version: Option, +} + +pub fn deserialize_decryption_contract_config<'de, D>( + deserializer: D, +) -> std::result::Result +where + D: Deserializer<'de>, +{ + ContractConfig::parse("Decryption", Deserialize::deserialize(deserializer)?) + .map_err(|e| serde::de::Error::custom(e.to_string())) +} + +pub fn deserialize_gateway_config_contract_config<'de, D>( + deserializer: D, +) -> std::result::Result +where + D: Deserializer<'de>, +{ + ContractConfig::parse("GatewayConfig", Deserialize::deserialize(deserializer)?) + .map_err(|e| serde::de::Error::custom(e.to_string())) +} + +pub fn deserialize_kms_generation_contract_config<'de, D>( + deserializer: D, +) -> std::result::Result +where + D: Deserializer<'de>, +{ + ContractConfig::parse("KMSGeneration", Deserialize::deserialize(deserializer)?) + .map_err(|e| serde::de::Error::custom(e.to_string())) +} + impl ContractConfig { /// Parses the `RawContractConfig` data from the configuration file. - pub fn parse(contract_name: &str, raw_config: RawContractConfig) -> Result { + fn parse(contract_name: &str, raw_config: RawContractConfig) -> Result { if raw_config.address.is_empty() { return Err(Error::EmptyField(format!("{contract_name} address"))); } @@ -60,30 +99,29 @@ impl ContractConfig { } } -impl Display for ContractConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!( - f, - "{} contract address: {}", - self.contract_name, self.address - )?; - writeln!( - f, - "{} Domain Name: {}", - self.contract_name, self.domain_name - )?; - write!( - f, - "{} Domain Version: {}", - self.contract_name, self.domain_version, - ) +pub fn default_decryption_contract_config() -> ContractConfig { + ContractConfig { + contract_name: "Decryption".to_string(), + address: address!("0x0000000000000000000000000000000000000000"), + domain_name: "Decryption".to_string(), + domain_version: "1".to_string(), } } -/// Struct used to deserialize the Gateway's contracts information from the configuration file. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -pub struct RawContractConfig { - pub address: String, - pub domain_name: Option, - pub domain_version: Option, +pub fn default_gateway_config_contract_config() -> ContractConfig { + ContractConfig { + contract_name: "GatewayConfig".to_string(), + address: address!("0x0000000000000000000000000000000000000000"), + domain_name: "GatewayConfig".to_string(), + domain_version: "1".to_string(), + } +} + +pub fn default_kms_generation_contract_config() -> ContractConfig { + ContractConfig { + contract_name: "KMSGeneration".to_string(), + address: address!("0x0000000000000000000000000000000000000000"), + domain_name: "KMSGeneration".to_string(), + domain_version: "1".to_string(), + } } diff --git a/kms-connector/crates/utils/src/config/deserialize.rs b/kms-connector/crates/utils/src/config/deserialize.rs new file mode 100644 index 0000000000..38f7302038 --- /dev/null +++ b/kms-connector/crates/utils/src/config/deserialize.rs @@ -0,0 +1,59 @@ +//! Module exposing a trait to deserialize raw configurations of KMS Connector's subcomponents. +//! +//! These raw configurations can then be parsed properly. + +use super::Result; +use crate::{config::Error, monitoring::otlp::default_dispatcher}; +use config::{Config as ConfigBuilder, Environment, File, FileFormat}; +use serde::Deserialize; +use std::path::Path; +use tracing::{error, info}; + +pub trait DeserializeConfig { + /// Loads the configuration from environment variables and optionally from a TOML file. + /// + /// Environment variables take precedence over file configuration. + /// Environment variables are prefixed with KMS_CONNECTOR_. + fn from_env_and_file>(path: Option

) -> Result + where + for<'a> Self: Sized + Deserialize<'a>, + { + // We use a temporary dispatcher to display the traces as the global dispatcher is not set + // while parsing config. This is because the global dispatcher uses the `service_name` + // field of the config. + tracing::dispatcher::with_default(&default_dispatcher(), || { + if let Some(config_path) = &path { + info!("Loading config from: {}", config_path.as_ref().display()); + } else { + info!("Loading config using environment variables only"); + } + + let mut builder = ConfigBuilder::builder(); + + // If path is provided, add it as a config source + if let Some(path) = path { + let path_str = path + .as_ref() + .to_str() + .ok_or_else(|| Error::InvalidConfig("Invalid config path".to_string())) + .inspect_err(|e| error!("{e}"))?; + builder = builder.add_source(File::with_name(path_str).format(FileFormat::Toml)); + } + + // Add environment variables last so they take precedence + info!("Adding environment variables with prefix KMS_CONNECTOR_"); + builder = builder.add_source( + Environment::with_prefix("KMS_CONNECTOR") + .prefix_separator("_") + .separator("__") + .list_separator(",") + .with_list_parse_key("kms_core_endpoints") + .try_parsing(true), + ); + + let settings = builder.build().inspect_err(|e| error!("{e}"))?; + let config = settings.try_deserialize().inspect_err(|e| error!("{e}"))?; + Ok(config) + }) + } +} diff --git a/kms-connector/crates/utils/src/config/mod.rs b/kms-connector/crates/utils/src/config/mod.rs index 9a142353ca..8b3bb4f1a7 100644 --- a/kms-connector/crates/utils/src/config/mod.rs +++ b/kms-connector/crates/utils/src/config/mod.rs @@ -1,13 +1,40 @@ -mod contract; +pub mod contract; +mod deserialize; mod error; -mod raw; mod wallet; -pub use contract::{ContractConfig, RawContractConfig}; +pub use contract::ContractConfig; +pub use deserialize::DeserializeConfig; pub use error::{Error, Result}; -pub use raw::DeserializeRawConfig; +use sqlx::postgres::types::PgInterval; pub use wallet::{AwsKmsConfig, KmsWallet}; +use serde::{Deserializer, Serializer}; +use std::time::Duration; + +pub fn serialize_pg_interval( + interval: &PgInterval, + serializer: S, +) -> std::result::Result +where + S: Serializer, +{ + // When deserialized from `Duration`, which is our case, the `months` and `days` fields of + // the `PgInterval` are set to 0, so we just need to use the `microseconds` field. + // https://docs.rs/sqlx-postgres/0.8.6/src/sqlx_postgres/types/interval.rs.html#204 + let duration = Duration::from_micros(interval.microseconds as u64); + humantime_serde::serialize(&duration, serializer) +} + +pub fn deserialize_pg_interval<'de, D>(deserializer: D) -> std::result::Result +where + D: Deserializer<'de>, +{ + humantime_serde::deserialize(deserializer).and_then(|d: Duration| { + PgInterval::try_from(d).map_err(|e| serde::de::Error::custom(e.to_string())) + }) +} + pub fn default_database_pool_size() -> u32 { 16 } diff --git a/kms-connector/crates/utils/src/config/raw.rs b/kms-connector/crates/utils/src/config/raw.rs deleted file mode 100644 index 47e823e8d7..0000000000 --- a/kms-connector/crates/utils/src/config/raw.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Module exposing a trait to deserialize raw configurations of KMS Connector's subcomponents. -//! -//! These raw configurations can then be parsed properly. - -use super::Result; -use config::{Config as ConfigBuilder, Environment, File, FileFormat}; -use serde::Deserialize; -use std::path::Path; -use tracing::info; - -pub trait DeserializeRawConfig { - fn from_env_and_file>(path: Option

) -> Result - where - for<'a> Self: Sized + Deserialize<'a>, - { - let mut builder = ConfigBuilder::builder(); - - // If path is provided, add it as a config source - if let Some(path) = path { - builder = builder.add_source( - File::with_name(path.as_ref().to_str().unwrap()).format(FileFormat::Toml), - ); - } - - // Add environment variables last so they take precedence - info!("Adding environment variables with prefix KMS_CONNECTOR_"); - builder = builder.add_source( - Environment::with_prefix("KMS_CONNECTOR") - .prefix_separator("_") - .separator("__"), - ); - - let settings = builder.build()?; - let config = settings.try_deserialize()?; - Ok(config) - } -} diff --git a/kms-connector/crates/utils/src/config/wallet.rs b/kms-connector/crates/utils/src/config/wallet.rs index 404014719a..a4b1d751e3 100644 --- a/kms-connector/crates/utils/src/config/wallet.rs +++ b/kms-connector/crates/utils/src/config/wallet.rs @@ -16,14 +16,7 @@ use tracing::{debug, info}; /// - Creating wallets from private key strings /// - Creating wallets from AWS KMS keys #[derive(Clone, Debug)] -pub struct KmsWallet { - /// The signer implementation - either local or AWS KMS - signer: WalletSigner, -} - -/// Internal enum to hold either a local or AWS KMS signer -#[derive(Clone, Debug)] -enum WalletSigner { +pub enum KmsWallet { /// Local signer using a private key Local(PrivateKeySigner), /// AWS KMS signer @@ -73,9 +66,7 @@ impl KmsWallet { let signer = PrivateKeySigner::from_signing_key(signing_key).with_chain_id(chain_id); info!("Created wallet from private key string"); - Ok(Self { - signer: WalletSigner::Local(signer), - }) + Ok(Self::Local(signer)) } /// Create a new wallet from AWS KMS configuration @@ -116,17 +107,15 @@ impl KmsWallet { "Created wallet from AWS KMS with address: {}", aws_signer.address() ); - Ok(Self { - signer: WalletSigner::AwsKms(aws_signer), - }) + Ok(Self::AwsKms(aws_signer)) } /// Get the wallet's address pub fn address(&self) -> Address { debug!("Getting wallet address"); - match &self.signer { - WalletSigner::Local(signer) => signer.address(), - WalletSigner::AwsKms(signer) => signer.address(), + match &self { + Self::Local(signer) => signer.address(), + Self::AwsKms(signer) => signer.address(), } } } @@ -135,9 +124,9 @@ impl IntoWallet for KmsWallet { type NetworkWallet = EthereumWallet; fn into_wallet(self) -> Self::NetworkWallet { - match self.signer { - WalletSigner::Local(wallet) => wallet.into(), - WalletSigner::AwsKms(wallet) => wallet.into(), + match self { + Self::Local(wallet) => wallet.into(), + Self::AwsKms(wallet) => wallet.into(), } } } @@ -185,9 +174,7 @@ mod tests { pub fn random(chain_id: Option) -> Result { let signer = PrivateKeySigner::random().with_chain_id(chain_id); info!("Created random wallet"); - Ok(Self { - signer: WalletSigner::Local(signer), - }) + Ok(Self::Local(signer)) } } } diff --git a/kms-connector/crates/utils/src/conn.rs b/kms-connector/crates/utils/src/conn.rs index ce28e15a20..ed5150dd18 100644 --- a/kms-connector/crates/utils/src/conn.rs +++ b/kms-connector/crates/utils/src/conn.rs @@ -15,7 +15,7 @@ use alloy::{ }; use anyhow::anyhow; use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; -use std::{str::FromStr, sync::Once, time::Duration}; +use std::{sync::Once, time::Duration}; use tracing::{info, warn}; /// The number of connection retry to connect to the database or the Gateway RPC node. @@ -63,7 +63,7 @@ pub type WalletGatewayProviderFillers = JoinFill< /// Tries to establish the connection with a RPC node of the Gateway. pub async fn connect_to_gateway( - gateway_url: &str, + gateway_url: Url, chain_id: u64, ) -> anyhow::Result { connect_to_gateway_inner(gateway_url, || { @@ -74,7 +74,7 @@ pub async fn connect_to_gateway( /// Tries to establish the connection with a RPC node of the Gateway, with a `WalletFiller`. pub async fn connect_to_gateway_with_wallet( - gateway_url: &str, + gateway_url: Url, chain_id: u64, wallet: KmsWallet, ) -> anyhow::Result { @@ -91,7 +91,7 @@ pub async fn connect_to_gateway_with_wallet( /// Tries to establish the connection with a RPC node of the Gateway. async fn connect_to_gateway_inner( - gateway_url: &str, + gateway_url: Url, provider_builder_new: impl Fn() -> ProviderBuilder, ) -> anyhow::Result where @@ -105,8 +105,6 @@ where .unwrap() }); - let gateway_url = - Url::from_str(gateway_url).map_err(|e| anyhow!("Invalid Gateway URL: {e}"))?; let provider = provider_builder_new().connect_http(gateway_url); info!("Connected to Gateway's RPC node successfully"); Ok(provider) diff --git a/kms-connector/crates/utils/src/monitoring/health.rs b/kms-connector/crates/utils/src/monitoring/health.rs index eebbe0a288..4e7d9f81d4 100644 --- a/kms-connector/crates/utils/src/monitoring/health.rs +++ b/kms-connector/crates/utils/src/monitoring/health.rs @@ -18,8 +18,8 @@ pub trait Healthcheck { fn service_name() -> &'static str; } -pub fn default_healthcheck_timeout_secs() -> u64 { - 3 // 3 seconds +pub fn default_healthcheck_timeout() -> Duration { + Duration::from_secs(3) } /// Performs the database healthcheck. diff --git a/kms-connector/crates/utils/src/monitoring/server.rs b/kms-connector/crates/utils/src/monitoring/server.rs index d620d87829..02b34ddfdf 100644 --- a/kms-connector/crates/utils/src/monitoring/server.rs +++ b/kms-connector/crates/utils/src/monitoring/server.rs @@ -1,7 +1,7 @@ use crate::monitoring::{health::Healthcheck, otlp::metrics_responder}; use actix_web::{HttpResponse, web::Data}; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::{select, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -9,8 +9,8 @@ use tracing::{error, info}; /// Number of workers for the monitoring server. const MONITORING_SERVER_WORKER: usize = 1; -pub fn default_monitoring_endpoint() -> String { - "0.0.0.0:9100".to_string() +pub fn default_monitoring_endpoint() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9100) } /// Starts the HTTP server exposing the healthchecks and metrics collection endpoints. diff --git a/kms-connector/crates/utils/src/tests/setup/kms.rs b/kms-connector/crates/utils/src/tests/setup/kms.rs index 9f90253c82..38f4f1fc83 100644 --- a/kms-connector/crates/utils/src/tests/setup/kms.rs +++ b/kms-connector/crates/utils/src/tests/setup/kms.rs @@ -99,7 +99,7 @@ impl KmsInstance { .with_copy_to( "/app/kms-core-client/config.toml".to_string(), PathBuf::from_str(&format!( - "{}/../../tests/data/core-client-config.toml", + "{}/tests/data/core-client-config.toml", env!("CARGO_MANIFEST_DIR"), )) .unwrap(), diff --git a/kms-connector/crates/utils/src/tests/setup/s3.rs b/kms-connector/crates/utils/src/tests/setup/s3.rs index fa81edba38..44baef0556 100644 --- a/kms-connector/crates/utils/src/tests/setup/s3.rs +++ b/kms-connector/crates/utils/src/tests/setup/s3.rs @@ -69,7 +69,7 @@ impl S3Instance { .with_copy_to( format!("/data/{S3_CT_DIGEST}"), PathBuf::from_str(&format!( - "{}/../../tests/data/{}", + "{}/tests/data/{}", env!("CARGO_MANIFEST_DIR"), S3_CT_DIGEST )) diff --git a/kms-connector/tests/data/3a002df21130bda55f78d4403a73007a797f4a888174a620bbffc9052a045239 b/kms-connector/crates/utils/tests/data/3a002df21130bda55f78d4403a73007a797f4a888174a620bbffc9052a045239 similarity index 100% rename from kms-connector/tests/data/3a002df21130bda55f78d4403a73007a797f4a888174a620bbffc9052a045239 rename to kms-connector/crates/utils/tests/data/3a002df21130bda55f78d4403a73007a797f4a888174a620bbffc9052a045239 diff --git a/kms-connector/tests/data/core-client-config.toml b/kms-connector/crates/utils/tests/data/core-client-config.toml similarity index 100% rename from kms-connector/tests/data/core-client-config.toml rename to kms-connector/crates/utils/tests/data/core-client-config.toml diff --git a/kms-connector/tests/Cargo.toml b/kms-connector/tests/Cargo.toml deleted file mode 100644 index e758c58dfe..0000000000 --- a/kms-connector/tests/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "connector-tests" -authors.workspace = true -edition.workspace = true -license.workspace = true -publish.workspace = true -version.workspace = true - -[[test]] -name = "connector-tests" -path = "lib.rs" - -[dependencies] -gw-listener.workspace = true -kms-worker.workspace = true -tx-sender.workspace = true -connector-utils = { workspace = true, features = ["tests"] } - -alloy.workspace = true -anyhow.workspace = true -rstest.workspace = true -sqlx.workspace = true -tokio.workspace = true -tokio-stream.workspace = true -tokio-util.workspace = true -tracing.workspace = true diff --git a/kms-connector/tests/lib.rs b/kms-connector/tests/lib.rs deleted file mode 100644 index cfe158c95b..0000000000 --- a/kms-connector/tests/lib.rs +++ /dev/null @@ -1,144 +0,0 @@ -use alloy::{ - hex::FromHex, - primitives::{FixedBytes, U256}, - providers::Provider, -}; -use anyhow::anyhow; -use connector_utils::{ - config::KmsWallet, - conn::{GatewayProvider, WalletGatewayProvider}, - tests::setup::{ - CHAIN_ID, DECRYPTION_MOCK_ADDRESS, DEPLOYER_PRIVATE_KEY, DbInstance, - GATEWAY_CONFIG_MOCK_ADDRESS, GatewayInstance, KMS_GENERATION_MOCK_ADDRESS, KmsInstance, - S3_CT, S3Instance, TestInstance, - }, -}; -use gw_listener::core::{DbEventPublisher, GatewayListener}; -use kms_worker::core::{ - DbEventPicker, DbKmsResponsePublisher, KmsWorker, event_processor::DbEventProcessor, -}; -use rstest::rstest; -use std::time::Duration; -use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; -use tracing::info; -use tx_sender::core::{DbKmsResponsePicker, DbKmsResponseRemover, TransactionSender}; - -#[rstest] -#[timeout(Duration::from_secs(120))] -#[tokio::test] -#[ignore = "mock contract not adapted to real e2e tests for now"] -async fn test_e2e_public_decrypt() -> anyhow::Result<()> { - let test_instance_builder = TestInstance::builder() - .with_db(DbInstance::setup().await?) - .with_gateway(GatewayInstance::setup().await?); - let s3_instance = S3Instance::setup().await?; - let kms_instance = KmsInstance::setup(&s3_instance.url).await?; - let test_instance = test_instance_builder - .with_s3(s3_instance) - .with_kms(kms_instance) - .build(); - - let cancel_token = CancellationToken::new(); - - let kms_connector = KmsConnector::setup(&test_instance).await?; - let connector_task = tokio::spawn(kms_connector.start(cancel_token.clone())); - - info!("Mocking PublicDecryptionRequest on Anvil..."); - let pending_tx = test_instance - .decryption_contract() - .publicDecryptionRequest(vec![FixedBytes::from_hex(S3_CT).unwrap()], vec![].into()) - .send() - .await?; - let receipt = pending_tx.get_receipt().await?; - let _tx = test_instance - .provider() - .get_transaction_by_hash(receipt.transaction_hash) - .await? - .unwrap(); - info!("Tx successfully sent!"); - - info!("Checking response has been sent to Anvil..."); - let mut response_stream = test_instance - .decryption_contract() - .PublicDecryptionResponse_filter() - .watch() - .await? - .into_stream(); - let (response, _) = response_stream - .next() - .await - .ok_or_else(|| anyhow!("Failed to capture PublicDecryptionResponse"))??; - assert_eq!(response.decryptionId, U256::ONE); - info!("Response successfully sent to Anvil!"); - - info!("Checking response has been removed from DB..."); - tokio::time::sleep(Duration::from_millis(300)).await; // give some time for the removal - let count: i64 = - sqlx::query_scalar("SELECT COUNT(decryption_id) FROM public_decryption_responses") - .fetch_one(test_instance.db()) - .await?; - assert_eq!(count, 0); - info!("Response successfully removed from DB! Stopping TransactionSender..."); - - cancel_token.cancel(); - tokio::time::timeout(Duration::from_secs(1), connector_task).await??; - Ok(()) -} - -struct KmsConnector { - gw_listener: GatewayListener, - kms_worker: KmsWorker, DbKmsResponsePublisher>, - tx_sender: TransactionSender, -} - -impl KmsConnector { - pub async fn setup(test_instance: &TestInstance) -> anyhow::Result { - info!("Setting up KMS Connector sub-components..."); - let mut gw_listener_conf = gw_listener::core::Config { - database_url: test_instance.db_url().to_string(), - gateway_url: test_instance.anvil_ws_endpoint(), - chain_id: *CHAIN_ID as u64, - ..Default::default() - }; - gw_listener_conf.decryption_contract.address = DECRYPTION_MOCK_ADDRESS; - gw_listener_conf.kms_generation_contract.address = KMS_GENERATION_MOCK_ADDRESS; - let (gw_listener, _) = GatewayListener::from_config(gw_listener_conf).await?; - - let mut kms_worker_conf = kms_worker::core::Config { - database_url: test_instance.db_url().to_string(), - kms_core_endpoint: test_instance.kms_url().to_string(), - gateway_url: test_instance.anvil_ws_endpoint(), - chain_id: *CHAIN_ID as u64, - ..Default::default() - }; - kms_worker_conf.decryption_contract.address = DECRYPTION_MOCK_ADDRESS; - kms_worker_conf.gateway_config_contract.address = GATEWAY_CONFIG_MOCK_ADDRESS; - let (kms_worker, _) = KmsWorker::from_config(kms_worker_conf).await?; - - let mut tx_sender_conf = tx_sender::core::Config::default().await; - tx_sender_conf.database_url = test_instance.db_url().to_string(); - tx_sender_conf.gateway_url = test_instance.anvil_ws_endpoint(); - tx_sender_conf.chain_id = *CHAIN_ID as u64; - tx_sender_conf.decryption_contract.address = DECRYPTION_MOCK_ADDRESS; - tx_sender_conf.kms_generation_contract.address = KMS_GENERATION_MOCK_ADDRESS; - tx_sender_conf.wallet = - KmsWallet::from_private_key_str(DEPLOYER_PRIVATE_KEY, Some(*CHAIN_ID as u64))?; - let (tx_sender, _) = TransactionSender::from_config(tx_sender_conf).await?; - info!("KMS Connector sub-components successfully setup!"); - - Ok(Self { - gw_listener, - kms_worker, - tx_sender, - }) - } - - pub async fn start(self, cancel_token: CancellationToken) -> Vec<()> { - let mut tasks = tokio::task::JoinSet::new(); - tasks.spawn(self.gw_listener.start(cancel_token.clone())); - tasks.spawn(self.kms_worker.start(cancel_token.clone())); - tasks.spawn(self.tx_sender.start(cancel_token.clone())); - tasks.join_all().await - } -}