diff --git a/.sqlx/query-567597c712512a134b21cf0d7dd4e1c75e9e3441b4914a085fda76dd6f3368fc.json b/.sqlx/query-567597c712512a134b21cf0d7dd4e1c75e9e3441b4914a085fda76dd6f3368fc.json new file mode 100644 index 0000000..f9889d8 --- /dev/null +++ b/.sqlx/query-567597c712512a134b21cf0d7dd4e1c75e9e3441b4914a085fda76dd6f3368fc.json @@ -0,0 +1,83 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO\n url_results (id, provider_id, client_id, result_type, working_url, retrievability_percent, result_code, error_code, tested_at)\n SELECT\n a1, a2, a3, a4, a5, a6, a7, a8, a9\n FROM UNNEST(\n $1::uuid[],\n $2::text[],\n $3::text[],\n $4::discovery_type[],\n $5::text[],\n $6::double precision[],\n $7::result_code[],\n $8::error_code[],\n $9::timestamptz[]\n ) AS t(a1, a2, a3, a4, a5, a6, a7, a8, a9)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TextArray", + { + "Custom": { + "name": "discovery_type[]", + "kind": { + "Array": { + "Custom": { + "name": "discovery_type", + "kind": { + "Enum": [ + "Provider", + "ProviderClient" + ] + } + } + } + } + } + }, + "TextArray", + "Float8Array", + { + "Custom": { + "name": "result_code[]", + "kind": { + "Array": { + "Custom": { + "name": "result_code", + "kind": { + "Enum": [ + "NoCidContactData", + "MissingAddrFromCidContact", + "MissingHttpAddrFromCidContact", + "FailedToGetWorkingUrl", + "NoDealsFound", + "TimedOut", + "Success", + "JobCreated", + "Error" + ] + } + } + } + } + } + }, + { + "Custom": { + "name": "error_code[]", + "kind": { + "Array": { + "Custom": { + "name": "error_code", + "kind": { + "Enum": [ + "NoProviderOrClient", + "NoProvidersFound", + "FailedToRetrieveCidContactData", + "FailedToGetPeerId", + "FailedToGetDeals" + ] + } + } + } + } + } + }, + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "567597c712512a134b21cf0d7dd4e1c75e9e3441b4914a085fda76dd6f3368fc" +} diff --git a/.sqlx/query-6373645e2aaa8b12aca286d861d809f5edf6a8ea022cb60ceaee478d31740949.json b/.sqlx/query-6373645e2aaa8b12aca286d861d809f5edf6a8ea022cb60ceaee478d31740949.json new file mode 100644 index 0000000..ccb11a6 --- /dev/null +++ b/.sqlx/query-6373645e2aaa8b12aca286d861d809f5edf6a8ea022cb60ceaee478d31740949.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n storage_providers\n SET\n url_discovery_status = 'pending',\n url_discovery_pending_since = NOW()\n WHERE\n provider_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "6373645e2aaa8b12aca286d861d809f5edf6a8ea022cb60ceaee478d31740949" +} diff --git a/.sqlx/query-c2c958b52086e4f9559db175d2ee01a125ddca116b5d02656825e4bef039eba9.json b/.sqlx/query-6bf9bd322ce9b3c312d0e6d880368ea2997370858f1b5a184bb884af1765cf89.json similarity index 84% rename from .sqlx/query-c2c958b52086e4f9559db175d2ee01a125ddca116b5d02656825e4bef039eba9.json rename to .sqlx/query-6bf9bd322ce9b3c312d0e6d880368ea2997370858f1b5a184bb884af1765cf89.json index bc10509..db6ed6a 100644 --- a/.sqlx/query-c2c958b52086e4f9559db175d2ee01a125ddca116b5d02656825e4bef039eba9.json +++ b/.sqlx/query-6bf9bd322ce9b3c312d0e6d880368ea2997370858f1b5a184bb884af1765cf89.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n \"dealId\" AS deal_id,\n \"claimId\" AS claim_id,\n \"clientId\" AS client_id,\n \"providerId\" AS provider_id,\n \"pieceCid\" AS piece_cid\n FROM unified_verified_deal\n WHERE \n \"providerId\" = $1\n ORDER BY random()\n LIMIT $2\n OFFSET $3\n ", + "query": "\n SELECT\n id,\n \"dealId\" AS deal_id,\n \"claimId\" AS claim_id,\n \"clientId\" AS client_id,\n \"providerId\" AS provider_id,\n \"pieceCid\" AS piece_cid\n FROM unified_verified_deal\n WHERE\n \"providerId\" = $1\n ORDER BY random()\n LIMIT $2\n OFFSET $3\n ", "describe": { "columns": [ { @@ -50,5 +50,5 @@ true ] }, - "hash": "c2c958b52086e4f9559db175d2ee01a125ddca116b5d02656825e4bef039eba9" + "hash": "6bf9bd322ce9b3c312d0e6d880368ea2997370858f1b5a184bb884af1765cf89" } diff --git a/.sqlx/query-c3f055777ef017282ee244247417aefb57add7148b2b65bf82bd92f9db8c514d.json b/.sqlx/query-6d0246db36942f288a537ce27d69774377dbdca219880e5d54c21c817813d648.json similarity index 61% rename from .sqlx/query-c3f055777ef017282ee244247417aefb57add7148b2b65bf82bd92f9db8c514d.json rename to .sqlx/query-6d0246db36942f288a537ce27d69774377dbdca219880e5d54c21c817813d648.json index 22b88df..e793275 100644 --- a/.sqlx/query-c3f055777ef017282ee244247417aefb57add7148b2b65bf82bd92f9db8c514d.json +++ b/.sqlx/query-6d0246db36942f288a537ce27d69774377dbdca219880e5d54c21c817813d648.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT \n id, \n provider_id, \n next_url_discovery_at, \n url_discovery_status, \n last_working_url,\n next_bms_test_at, \n bms_test_status, \n bms_routing_key, \n last_bms_region_discovery_at,\n created_at, \n updated_at\n FROM \n storage_providers \n WHERE \n provider_id = $1\n ", + "query": "SELECT\n id,\n provider_id AS \"provider_id: ProviderId\",\n next_url_discovery_at,\n url_discovery_status,\n url_discovery_pending_since,\n last_working_url,\n next_bms_test_at,\n bms_test_status,\n bms_routing_key,\n last_bms_region_discovery_at,\n created_at,\n updated_at\n FROM\n storage_providers\n WHERE\n provider_id = $1\n ", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "provider_id", + "name": "provider_id: ProviderId", "type_info": "Varchar" }, { @@ -25,36 +25,41 @@ }, { "ordinal": 4, + "name": "url_discovery_pending_since", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, "name": "last_working_url", "type_info": "Text" }, { - "ordinal": 5, + "ordinal": 6, "name": "next_bms_test_at", "type_info": "Timestamptz" }, { - "ordinal": 6, + "ordinal": 7, "name": "bms_test_status", "type_info": "Varchar" }, { - "ordinal": 7, + "ordinal": 8, "name": "bms_routing_key", "type_info": "Varchar" }, { - "ordinal": 8, + "ordinal": 9, "name": "last_bms_region_discovery_at", "type_info": "Timestamptz" }, { - "ordinal": 9, + "ordinal": 10, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 10, + "ordinal": 11, "name": "updated_at", "type_info": "Timestamptz" } @@ -70,6 +75,7 @@ false, true, true, + true, false, true, true, @@ -78,5 +84,5 @@ false ] }, - "hash": "c3f055777ef017282ee244247417aefb57add7148b2b65bf82bd92f9db8c514d" + "hash": "6d0246db36942f288a537ce27d69774377dbdca219880e5d54c21c817813d648" } diff --git a/.sqlx/query-808f33282188dbb6bc4ed48c3cbad3f9f793e9b51bbb50736d979733cd4455d5.json b/.sqlx/query-808f33282188dbb6bc4ed48c3cbad3f9f793e9b51bbb50736d979733cd4455d5.json new file mode 100644 index 0000000..ce055c1 --- /dev/null +++ b/.sqlx/query-808f33282188dbb6bc4ed48c3cbad3f9f793e9b51bbb50736d979733cd4455d5.json @@ -0,0 +1,88 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n id,\n provider_id AS \"provider_id: ProviderId\",\n next_url_discovery_at,\n url_discovery_status,\n url_discovery_pending_since,\n last_working_url,\n next_bms_test_at,\n bms_test_status,\n bms_routing_key,\n last_bms_region_discovery_at,\n created_at,\n updated_at\n FROM\n storage_providers\n WHERE\n (\n next_url_discovery_at <= NOW()\n AND url_discovery_status IS DISTINCT FROM 'pending'\n )\n OR\n (\n url_discovery_status = 'pending'\n AND url_discovery_pending_since < NOW() - INTERVAL '60 minutes'\n )\n ORDER BY\n next_url_discovery_at ASC\n LIMIT $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "provider_id: ProviderId", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "next_url_discovery_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "url_discovery_status", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "url_discovery_pending_since", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "last_working_url", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "next_bms_test_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "bms_test_status", + "type_info": "Varchar" + }, + { + "ordinal": 8, + "name": "bms_routing_key", + "type_info": "Varchar" + }, + { + "ordinal": 9, + "name": "last_bms_region_discovery_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 10, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 11, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + true, + true, + true, + false, + true, + true, + true, + false, + false + ] + }, + "hash": "808f33282188dbb6bc4ed48c3cbad3f9f793e9b51bbb50736d979733cd4455d5" +} diff --git a/.sqlx/query-84ee3e7da8602f06509ad926538bcd2090e3920877e8dcc7cda39e51064980d8.json b/.sqlx/query-84ee3e7da8602f06509ad926538bcd2090e3920877e8dcc7cda39e51064980d8.json new file mode 100644 index 0000000..a9b6bcb --- /dev/null +++ b/.sqlx/query-84ee3e7da8602f06509ad926538bcd2090e3920877e8dcc7cda39e51064980d8.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT\n \"clientId\"\n FROM\n unified_verified_deal\n WHERE\n \"providerId\" = $1\n AND \"clientId\" IS NOT NULL\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "clientId", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "84ee3e7da8602f06509ad926538bcd2090e3920877e8dcc7cda39e51064980d8" +} diff --git a/.sqlx/query-ad6dbed94b19808e4cf4e1bb7d94579915d6583e434fc86b499806cda6043036.json b/.sqlx/query-ad6dbed94b19808e4cf4e1bb7d94579915d6583e434fc86b499806cda6043036.json deleted file mode 100644 index b3036ec..0000000 --- a/.sqlx/query-ad6dbed94b19808e4cf4e1bb7d94579915d6583e434fc86b499806cda6043036.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO\n storage_providers (provider_id)\n VALUES\n ($1)\n ON CONFLICT DO NOTHING\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Varchar" - ] - }, - "nullable": [] - }, - "hash": "ad6dbed94b19808e4cf4e1bb7d94579915d6583e434fc86b499806cda6043036" -} diff --git a/.sqlx/query-c3e0ac8f56140c94a5e0865750391db0aede162482c0b2a654927e0591619183.json b/.sqlx/query-c3e0ac8f56140c94a5e0865750391db0aede162482c0b2a654927e0591619183.json new file mode 100644 index 0000000..3c4db76 --- /dev/null +++ b/.sqlx/query-c3e0ac8f56140c94a5e0865750391db0aede162482c0b2a654927e0591619183.json @@ -0,0 +1,55 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n \"dealId\" AS deal_id,\n \"claimId\" AS claim_id,\n \"clientId\" AS client_id,\n \"providerId\" AS provider_id,\n \"pieceCid\" AS piece_cid\n FROM unified_verified_deal\n WHERE\n \"providerId\" = $1\n AND \"clientId\" = $2\n ORDER BY random()\n LIMIT $3\n OFFSET $4\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "deal_id", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "claim_id", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "provider_id", + "type_info": "Varchar" + }, + { + "ordinal": 5, + "name": "piece_cid", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + true, + true, + true + ] + }, + "hash": "c3e0ac8f56140c94a5e0865750391db0aede162482c0b2a654927e0591619183" +} diff --git a/.sqlx/query-c8d77d096edf7e52a25f1b98924958feb7d95de95f2653d855a16a4a36c3ead6.json b/.sqlx/query-c8d77d096edf7e52a25f1b98924958feb7d95de95f2653d855a16a4a36c3ead6.json new file mode 100644 index 0000000..b959081 --- /dev/null +++ b/.sqlx/query-c8d77d096edf7e52a25f1b98924958feb7d95de95f2653d855a16a4a36c3ead6.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT\n \"providerId\"\n FROM\n unified_verified_deal\n WHERE\n \"providerId\" IS NOT NULL\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "providerId", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true + ] + }, + "hash": "c8d77d096edf7e52a25f1b98924958feb7d95de95f2653d855a16a4a36c3ead6" +} diff --git a/.sqlx/query-e489f8a98958aeab9a5d28a6bd0e3a1cef2fa9a813bdd85db0a3e7f9db335ab1.json b/.sqlx/query-e489f8a98958aeab9a5d28a6bd0e3a1cef2fa9a813bdd85db0a3e7f9db335ab1.json new file mode 100644 index 0000000..ccdabf1 --- /dev/null +++ b/.sqlx/query-e489f8a98958aeab9a5d28a6bd0e3a1cef2fa9a813bdd85db0a3e7f9db335ab1.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE\n storage_providers\n SET\n next_url_discovery_at = NOW() + INTERVAL '1 day',\n url_discovery_status = NULL,\n url_discovery_pending_since = NULL,\n last_working_url = $2,\n updated_at = NOW()\n WHERE\n provider_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "e489f8a98958aeab9a5d28a6bd0e3a1cef2fa9a813bdd85db0a3e7f9db335ab1" +} diff --git a/.sqlx/query-fe83a06bb1c021f918755241bfd2c2933eca082a8e85abc1e5dd3de87a6eeca6.json b/.sqlx/query-fe83a06bb1c021f918755241bfd2c2933eca082a8e85abc1e5dd3de87a6eeca6.json deleted file mode 100644 index 82cefd0..0000000 --- a/.sqlx/query-fe83a06bb1c021f918755241bfd2c2933eca082a8e85abc1e5dd3de87a6eeca6.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT DISTINCT\n \"providerId\"\n FROM\n unified_verified_deal\n WHERE\n \"providerId\" IS NOT NULL\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "providerId", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - true - ] - }, - "hash": "fe83a06bb1c021f918755241bfd2c2933eca082a8e85abc1e5dd3de87a6eeca6" -} diff --git a/Cargo.lock b/Cargo.lock index b92c197..56f5294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,9 +52,9 @@ checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "alloy" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae62e633fa48b4190af5e841eb05179841bb8b713945103291e2c0867037c0d1" +checksum = "a83b2001153fdb12999f808b53068ba36902ca59bf32ad979bb176d03f8f8772" dependencies = [ "alloy-consensus", "alloy-core", @@ -71,9 +71,9 @@ dependencies = [ [[package]] name = "alloy-chains" -version = "0.2.15" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bbb778f50ecb0cebfb5c05580948501927508da7bd628833a8c4bd8545e23e2" +checksum = "3ef6e7627b842406f449f83ae1a437a01cd244bc246d66f102cee9c0435ce10d" dependencies = [ "alloy-primitives", "num_enum", @@ -82,9 +82,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b151e38e42f1586a01369ec52a6934702731d07e8509a7307331b09f6c46dc" +checksum = "ad704069c12f68d0c742d0cad7e0a03882b42767350584627fbf8a47b1bf1846" dependencies = [ "alloy-eips", "alloy-primitives", @@ -93,6 +93,7 @@ dependencies = [ "alloy-trie", "alloy-tx-macros", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", @@ -103,14 +104,14 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] name = "alloy-consensus-any" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2d5e8668ef6215efdb7dcca6f22277b4e483a5650e05f5de22b2350971f4b8" +checksum = "bc374f640a5062224d7708402728e3d6879a514ba10f377da62e7dfb14c673e6" dependencies = [ "alloy-consensus", "alloy-eips", @@ -140,37 +141,39 @@ dependencies = [ "alloy-rlp", "crc", "serde", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] name = "alloy-eip2930" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b82752a889170df67bbb36d42ca63c531eb16274f0d7299ae2a680facba17bd" +checksum = "9441120fa82df73e8959ae0e4ab8ade03de2aaae61be313fbf5746277847ce25" dependencies = [ "alloy-primitives", "alloy-rlp", + "borsh", "serde", ] [[package]] name = "alloy-eip7702" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d4769c6ffddca380b0070d71c8b7f30bed375543fe76bb2f74ec0acf4b7cd16" +checksum = "2919c5a56a1007492da313e7a3b6d45ef5edc5d33416fdec63c0d7a2702a0d20" dependencies = [ "alloy-primitives", "alloy-rlp", + "borsh", "serde", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] name = "alloy-eips" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5434834adaf64fa20a6fb90877bc1d33214c41b055cc49f82189c98614368cc" +checksum = "7e867b5fd52ed0372a95016f3a37cbff95a9d5409230fbaef2d8ea00e8618098" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -179,13 +182,14 @@ dependencies = [ "alloy-rlp", "alloy-serde", "auto_impl", + "borsh", "c-kzg", "derive_more", "either", "serde", "serde_with", "sha2", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] @@ -202,24 +206,24 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c69f6c9c68a1287c9d5ff903d0010726934de0dac10989be37b75a29190d55" +checksum = "dcab4c51fb1273e3b0f59078e0cdf8aa99f697925b09f0d2055c18be46b4d48c" dependencies = [ "alloy-primitives", "alloy-sol-types", "http", "serde", "serde_json", - "thiserror 2.0.17", + "thiserror 2.0.3", "tracing", ] [[package]] name = "alloy-network" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf2ae05219e73e0979cb2cf55612aafbab191d130f203079805eaf881cca58" +checksum = "196d7fd3f5d414f7bbd5886a628b7c42bd98d1b126f9a7cff69dbfd72007b39c" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -238,14 +242,14 @@ dependencies = [ "futures-utils-wasm", "serde", "serde_json", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] name = "alloy-network-primitives" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58f4f345cef483eab7374f2b6056973c7419ffe8ad35e994b7a7f5d8e0c7ba4" +checksum = "0d3ae2777e900a7a47ad9e3b8ab58eff3d93628265e73bbdee09acf90bf68f75" dependencies = [ "alloy-consensus", "alloy-eips", @@ -267,7 +271,7 @@ dependencies = [ "derive_more", "foldhash 0.2.0", "hashbrown 0.16.0", - "indexmap 2.6.0", + "indexmap 2.12.0", "itoa", "k256", "keccak-asm", @@ -283,9 +287,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de2597751539b1cc8fe4204e5325f9a9ed83fcacfb212018dfcfa7877e76de21" +checksum = "9f9bf40c9b2a90c7677f9c39bccd9f06af457f35362439c0497a706f16557703" dependencies = [ "alloy-chains", "alloy-consensus", @@ -308,12 +312,12 @@ dependencies = [ "futures", "futures-utils-wasm", "lru", - "parking_lot", + "parking_lot 0.12.3", "pin-project", "reqwest", "serde", "serde_json", - "thiserror 2.0.17", + "thiserror 2.0.3", "tokio", "tracing", "url", @@ -339,14 +343,14 @@ checksum = "64b728d511962dda67c1bc7ea7c03736ec275ed2cf4c35d9585298ac9ccf3b73" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] name = "alloy-rpc-client" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edf8eb8be597cfa8c312934d2566ec4516f066d69164f9212d7a148979fdcfd8" +checksum = "e7c2630fde9ff6033a780635e1af6ef40e92d74a9cacb8af3defc1b15cfebca5" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -367,9 +371,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "339af7336571dd39ae3a15bde08ae6a647e62f75350bd415832640268af92c06" +checksum = "ad098153a12382c22a597e865530033f5e644473742d6c733562d448125e02a2" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -379,9 +383,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbde0801a32d21c5f111f037bee7e22874836fba7add34ed4a6919932dd7cf23" +checksum = "50b8429b5b62d21bf3691eb1ae12aaae9bb496894d5a114e3cc73e27e6800ec8" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -390,9 +394,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "361cd87ead4ba7659bda8127902eda92d17fa7ceb18aba1676f7be10f7222487" +checksum = "29031a6bf46177d65efce661f7ab37829ca09dd341bc40afb5194e97600655cc" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -406,14 +410,14 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] name = "alloy-serde" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64600fc6c312b7e0ba76f73a381059af044f4f21f43e07f51f1fa76c868fe302" +checksum = "01e856112bfa0d9adc85bd7c13db03fad0e71d1d6fb4c2010e475b6718108236" dependencies = [ "alloy-primitives", "serde", @@ -422,9 +426,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5772858492b26f780468ae693405f895d6a27dea6e3eab2c36b6217de47c2647" +checksum = "66a4f629da632d5279bbc5731634f0f5c9484ad9c4cad0cd974d9669dc1f46d6" dependencies = [ "alloy-primitives", "async-trait", @@ -432,7 +436,7 @@ dependencies = [ "either", "elliptic-curve", "k256", - "thiserror 2.0.17", + "thiserror 2.0.3", ] [[package]] @@ -446,7 +450,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -458,11 +462,11 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck", - "indexmap 2.6.0", + "indexmap 2.12.0", "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "syn-solidity", "tiny-keccak", ] @@ -479,7 +483,7 @@ dependencies = [ "macro-string", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "syn-solidity", ] @@ -507,21 +511,20 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "025a940182bddaeb594c26fe3728525ae262d0806fe6a4befdf5d7bc13d54bce" +checksum = "fe215a2f9b51d5f1aa5c8cf22c8be8cdb354934de09c9a4e37aefb79b77552fd" dependencies = [ "alloy-json-rpc", - "alloy-primitives", "auto_impl", "base64", "derive_more", "futures", "futures-utils-wasm", - "parking_lot", + "parking_lot 0.12.3", "serde", "serde_json", - "thiserror 2.0.17", + "thiserror 2.0.3", "tokio", "tower", "tracing", @@ -531,9 +534,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5064d1e1e1aabc918b5954e7fb8154c39e77ec6903a581b973198b26628fa" +checksum = "dc1b37b1a30d23deb3a8746e882c70b384c574d355bc2bbea9ea918b0c31366e" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -562,15 +565,14 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.0.41" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8e52276fdb553d3c11563afad2898f4085165e4093604afe3d78b69afbf408f" +checksum = "7ccf423f6de62e8ce1d6c7a11fb7508ae3536d02e0d68aaeb05c8669337d0937" dependencies = [ - "alloy-primitives", "darling", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -588,6 +590,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + [[package]] name = "arbitrary" version = "1.4.1" @@ -682,7 +690,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62945a2f7e6de02a31fe400aa489f0e0f5b2502e69f95f853adb82a96c7a6b60" dependencies = [ "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -720,7 +728,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -801,6 +809,16 @@ dependencies = [ "serde", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-lock" version = "3.4.0" @@ -831,7 +849,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -842,7 +860,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -868,7 +886,7 @@ checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -990,7 +1008,7 @@ checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1020,16 +1038,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" -[[package]] -name = "base256emoji" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e9430d9a245a77c92176e649af6e275f20839a48389859d1661e9a128d077c" -dependencies = [ - "const-str", - "match-lookup", -] - [[package]] name = "base64" version = "0.22.1" @@ -1061,7 +1069,7 @@ version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cexpr", "clang-sys", "itertools 0.12.1", @@ -1074,7 +1082,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.107", + "syn 2.0.110", "which", ] @@ -1109,6 +1117,12 @@ dependencies = [ "hex-conservative", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -1151,6 +1165,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "borsh" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd1d3c0c2f5833f22386f252fe8ed005c7f59fdcddeef025c01b4c3b9fd9ac3" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "bs58" version = "0.5.1" @@ -1180,9 +1217,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -1333,12 +1370,6 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" -[[package]] -name = "const-str" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" - [[package]] name = "core-foundation" version = "0.9.4" @@ -1480,7 +1511,7 @@ dependencies = [ "quote", "serde", "strsim", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1491,7 +1522,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1505,20 +1536,20 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.10", ] [[package]] name = "data-encoding" -version = "2.9.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "data-encoding-macro" -version = "0.1.18" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d" +checksum = "f1559b6cba622276d6d63706db152618eeb15b89b3e4041446b05876e352e639" dependencies = [ "data-encoding", "data-encoding-macro-internal", @@ -1526,14 +1557,32 @@ dependencies = [ [[package]] name = "data-encoding-macro-internal" -version = "0.1.16" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" +checksum = "332d754c0af53bc87c108fed664d121ecf59207ec4196041f04d6ab9002ad33f" dependencies = [ "data-encoding", - "syn 2.0.107", + "syn 1.0.109", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "der" version = "0.7.9" @@ -1547,9 +1596,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" dependencies = [ "powerfmt", "serde_core", @@ -1574,7 +1623,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1594,7 +1643,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "unicode-xid", ] @@ -1627,7 +1676,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1672,7 +1721,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1715,22 +1764,22 @@ dependencies = [ [[package]] name = "enum-ordinalize" -version = "4.3.0" +version = "4.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +checksum = "4a1091a7bb1f8f2c4b28f1fe2cef4980ca2d410a3d727d67ecc3178c9b0800f0" dependencies = [ "enum-ordinalize-derive", ] [[package]] name = "enum-ordinalize-derive" -version = "4.3.1" +version = "4.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -1976,7 +2025,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" dependencies = [ "futures-core", "lock_api", - "parking_lot", + "parking_lot 0.12.3", ] [[package]] @@ -1993,7 +2042,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -2113,7 +2162,7 @@ dependencies = [ "futures-util", "no-std-compat", "nonzero_ext", - "parking_lot", + "parking_lot 0.12.3", "portable-atomic", "quanta", "rand 0.8.5", @@ -2144,7 +2193,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.6.0", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -2515,7 +2564,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -2562,7 +2611,7 @@ checksum = "a0eb5a3343abf848c0984fe4604b2b105da9539376e24fc0a3b0007411ae4fd9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -2584,13 +2633,26 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.6.0" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown 0.16.0", "serde", + "serde_core", +] + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", ] [[package]] @@ -2819,18 +2881,7 @@ checksum = "1b27834086c65ec3f9387b096d66e99f221cf081c2b738042aa252bcd41204e3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", -] - -[[package]] -name = "match-lookup" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1265724d8cb29dbbc2b0f06fffb8bf1a8c0cf73a78eede9ba73a4a66c52a981e" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", + "syn 2.0.110", ] [[package]] @@ -2929,7 +2980,7 @@ dependencies = [ "event-listener", "futures-util", "loom", - "parking_lot", + "parking_lot 0.12.3", "portable-atomic", "rustc_version 0.4.1", "smallvec", @@ -2976,21 +3027,20 @@ dependencies = [ [[package]] name = "multibase" -version = "0.9.2" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8694bb4835f452b0e3bb06dbebb1d6fc5385b6ca1caf2e55fd165c042390ec77" +checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" dependencies = [ "base-x", - "base256emoji", "data-encoding", "data-encoding-macro", ] [[package]] name = "multihash" -version = "0.19.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" +checksum = "cc41f430805af9d1cf4adae4ed2149c759b877b01d909a1f40256188d09345d2" dependencies = [ "core2", "unsigned-varint", @@ -3142,7 +3192,7 @@ checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3180,7 +3230,7 @@ version = "0.10.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -3197,7 +3247,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3262,6 +3312,17 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -3269,7 +3330,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.10", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -3280,7 +3355,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.7", "smallvec", "windows-targets 0.52.6", ] @@ -3333,7 +3408,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3403,7 +3478,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3445,7 +3520,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3459,14 +3534,13 @@ dependencies = [ [[package]] name = "proptest" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb0be07becd10686a0bb407298fb425360a5c44a663774406340c59a22de4ce" +checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" dependencies = [ "bit-set", "bit-vec", - "bitflags", - "lazy_static", + "bitflags 2.10.0", "num-traits", "rand 0.9.2", "rand_chacha 0.9.0", @@ -3520,7 +3594,7 @@ dependencies = [ "rustc-hash 2.1.1", "rustls", "socket2", - "thiserror 2.0.17", + "thiserror 2.0.3", "tokio", "tracing", ] @@ -3539,7 +3613,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.17", + "thiserror 2.0.3", "tinyvec", "tracing", "web-time", @@ -3657,7 +3731,16 @@ version = "11.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e" dependencies = [ - "bitflags", + "bitflags 2.10.0", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", ] [[package]] @@ -3666,7 +3749,7 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -3686,7 +3769,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -3782,6 +3865,52 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "reqwest-middleware" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562ceb5a604d3f7c885a792d42c199fd8af239d0a51b2fa6a78aafa092452b04" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "serde", + "thiserror 1.0.69", + "tower-service", +] + +[[package]] +name = "reqwest-retry" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c73e4195a6bfbcb174b790d9b3407ab90646976c55de58a6515da25d851178" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "getrandom 0.2.15", + "http", + "hyper", + "parking_lot 0.11.2", + "reqwest", + "reqwest-middleware", + "retry-policies", + "thiserror 1.0.69", + "tokio", + "tracing", + "wasm-timer", +] + +[[package]] +name = "retry-policies" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5875471e6cab2871bc150ecb8c727db5113c9338cc3354dc5ee3425b6aa40a1c" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -3891,7 +4020,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.107", + "syn 2.0.110", "walkdir", ] @@ -3953,7 +4082,7 @@ version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys", @@ -4061,9 +4190,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" dependencies = [ "dyn-clone", "ref-cast", @@ -4125,7 +4254,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation", "core-foundation-sys", "libc", @@ -4193,7 +4322,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4240,9 +4369,9 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.6.0", + "indexmap 2.12.0", "schemars 0.9.0", - "schemars 1.0.4", + "schemars 1.1.0", "serde", "serde_derive", "serde_json", @@ -4259,7 +4388,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4456,7 +4585,7 @@ dependencies = [ "hashbrown 0.14.5", "hashlink", "hex", - "indexmap 2.6.0", + "indexmap 2.12.0", "log", "memchr", "once_cell", @@ -4488,7 +4617,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4511,7 +4640,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.107", + "syn 2.0.110", "tempfile", "tokio", "url", @@ -4526,7 +4655,7 @@ dependencies = [ "atoi", "base64", "bigdecimal", - "bitflags", + "bitflags 2.10.0", "byteorder", "bytes", "chrono", @@ -4571,7 +4700,7 @@ dependencies = [ "atoi", "base64", "bigdecimal", - "bitflags", + "bitflags 2.10.0", "byteorder", "chrono", "crc", @@ -4676,7 +4805,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4698,9 +4827,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.107" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a26dbd934e5451d21ef060c018dae56fc073894c5a7896f882928a76e6d081b" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -4716,7 +4845,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4736,7 +4865,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4745,7 +4874,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation", "system-configuration-sys", ] @@ -4796,11 +4925,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" dependencies = [ - "thiserror-impl 2.0.17", + "thiserror-impl 2.0.3", ] [[package]] @@ -4811,18 +4940,18 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4919,7 +5048,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -4935,7 +5064,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -4995,11 +5124,11 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.5" +version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ad0b7ae9cfeef5605163839cb9221f453399f15cfb5c10be9885fcf56611f9" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.12.0", "toml_datetime", "toml_parser", "winnow", @@ -5037,7 +5166,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "http", "pin-project-lite", @@ -5068,7 +5197,7 @@ dependencies = [ "governor", "http", "pin-project", - "thiserror 2.0.17", + "thiserror 2.0.3", "tower", "tracing", ] @@ -5093,7 +5222,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -5254,6 +5383,7 @@ name = "url_finder" version = "0.4.0" dependencies = [ "alloy", + "async-trait", "axum", "axum-extra", "chrono", @@ -5261,11 +5391,15 @@ dependencies = [ "common", "dotenvy", "futures", + "http", "moka", "multiaddr", "once_cell", "regex", "reqwest", + "reqwest-middleware", + "reqwest-retry", + "retry-policies", "serde", "serde_json", "sqlx", @@ -5279,6 +5413,7 @@ dependencies = [ "utoipa", "utoipa-swagger-ui", "uuid", + "wiremock", ] [[package]] @@ -5305,7 +5440,7 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514a48569e4e21c86d0b84b5612b5e73c0b2cf09db63260134ba426d4e8ea714" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.12.0", "serde", "serde_json", "utoipa-gen", @@ -5320,7 +5455,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.107", + "syn 2.0.110", "url", "uuid", ] @@ -5442,7 +5577,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "wasm-bindgen-shared", ] @@ -5476,7 +5611,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5487,6 +5622,21 @@ version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +[[package]] +name = "wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmtimer" version = "0.4.3" @@ -5495,7 +5645,7 @@ checksum = "1c598d6b99ea013e35844697fc4670d08339d5cda15588f193c6beedd12f644b" dependencies = [ "futures", "js-sys", - "parking_lot", + "parking_lot 0.12.3", "pin-utils", "slab", "wasm-bindgen", @@ -5548,7 +5698,7 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" dependencies = [ - "redox_syscall", + "redox_syscall 0.5.7", "wasite", ] @@ -5646,7 +5796,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -5657,7 +5807,7 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -5890,6 +6040,30 @@ dependencies = [ "memchr", ] +[[package]] +name = "wiremock" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b8b99d4cdbf36b239a9532e31fe4fb8acc38d1897c1761e161550a7dc78e6a" +dependencies = [ + "assert-json-diff", + "async-trait", + "base64", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.46.0" @@ -5937,7 +6111,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "synstructure", ] @@ -5959,7 +6133,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -5979,7 +6153,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", "synstructure", ] @@ -6000,7 +6174,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -6022,7 +6196,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn 2.0.110", ] [[package]] @@ -6036,9 +6210,9 @@ dependencies = [ "crossbeam-utils", "displaydoc", "flate2", - "indexmap 2.6.0", + "indexmap 2.12.0", "memchr", - "thiserror 2.0.17", + "thiserror 2.0.3", "zopfli", ] diff --git a/Dockerfile b/Dockerfile index c6d4d2f..b4dd1ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ FROM base as build ARG GITHUB_SHA ENV GITHUB_SHA ${GITHUB_SHA} ENV PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig +ENV SQLX_OFFLINE=true COPY --from=plan /app/recipe.json . RUN cargo chef cook --release --recipe-path recipe.json COPY . . diff --git a/Makefile b/Makefile index 44082e7..5aed0c9 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,8 @@ include .env export .PHONY: check format lint build run stop logs prepare -.PHONY: run-db stop-db clear-db logs-db -.PHONY: migrate-up migrate-down migrate-status init-dev init-dev-db +.PHONY: run-db stop-db clean-db clear-db logs-db exec-db +.PHONY: migration migrate-up migrate-down migrate-status init-dev init-dev-db check: @-cargo fmt -- --check @@ -31,6 +31,9 @@ run-db: @docker compose up -d postgres stop-db: @docker compose down postgres +exec-db: + @docker compose exec postgres psql -U postgres -d uf +clean-db: clear-db clear-db: @docker compose down -v postgres logs-db: @@ -61,4 +64,4 @@ prepare: @cargo clippy stop-app: - @docker compose down url_finder \ No newline at end of file + @docker compose down url_finder diff --git a/README.md b/README.md index 8fc99cd..632d68b 100644 --- a/README.md +++ b/README.md @@ -41,8 +41,8 @@ The Storage Provider Url Finder is a microservice designed to test the retrievab ### Two types of Requests -1. **Async Job**: User requests a job, which is processed in the background. The user receives a `job_id` that is used to check the status and results later. The jobs are processed one by one, allowing for multiple jobs to be created without blocking the service -2. **Direct Call**: User requests measurement in synchronous mode, which is processed immediately. The result is not stored or cached and its returned as a response to the request. Response might take up to several minutes and might time out if the request takes too long +1. **Async Job**: User requests a job, which is processed in the background. The user receives a `job_id` that is used to check the status and results later. The jobs are processed one by one, allowing for multiple jobs to be created without blocking the service. +2. **Direct Call**: User requests measurement in synchronous mode, which is processed immediately. The result is not stored or cached and its returned as a response to the request. Response might take up to several minutes and might time out if the request takes too long. ### High-Level Workflow diff --git a/migrations/20251110050249_create_url_results.down.sql b/migrations/20251110050249_create_url_results.down.sql new file mode 100644 index 0000000..2bcc78e --- /dev/null +++ b/migrations/20251110050249_create_url_results.down.sql @@ -0,0 +1,11 @@ +DROP INDEX IF EXISTS idx_url_results_result_type; +DROP INDEX IF EXISTS idx_url_results_tested_at; +DROP INDEX IF EXISTS idx_url_results_client; +DROP INDEX IF EXISTS idx_url_results_pair; +DROP INDEX IF EXISTS idx_url_results_provider; + +DROP TABLE IF EXISTS url_results; + +DROP TYPE IF EXISTS error_code; +DROP TYPE IF EXISTS result_code; +DROP TYPE IF EXISTS discovery_type; \ No newline at end of file diff --git a/migrations/20251110050249_create_url_results.up.sql b/migrations/20251110050249_create_url_results.up.sql new file mode 100644 index 0000000..1e570c7 --- /dev/null +++ b/migrations/20251110050249_create_url_results.up.sql @@ -0,0 +1,43 @@ +CREATE TYPE discovery_type AS ENUM ('Provider', 'ProviderClient'); + +CREATE TYPE result_code AS ENUM ( + 'NoCidContactData', + 'MissingAddrFromCidContact', + 'MissingHttpAddrFromCidContact', + 'FailedToGetWorkingUrl', + 'NoDealsFound', + 'TimedOut', + 'Success', + 'JobCreated', + 'Error' +); + +CREATE TYPE error_code AS ENUM ( + 'NoProviderOrClient', + 'NoProvidersFound', + 'FailedToRetrieveCidContactData', + 'FailedToGetPeerId', + 'FailedToGetDeals' +); + +CREATE TABLE url_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + provider_id VARCHAR(255) NOT NULL, + client_id VARCHAR(255), + result_type discovery_type NOT NULL, + + working_url TEXT, + retrievability_percent NUMERIC(5, 2) NOT NULL DEFAULT 0.0 CHECK (retrievability_percent >= 0.0 AND retrievability_percent <= 100.0), + + result_code result_code NOT NULL, + error_code error_code, + + tested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_url_results_provider ON url_results(provider_id, tested_at DESC); +CREATE INDEX idx_url_results_pair ON url_results(provider_id, client_id, tested_at DESC); +CREATE INDEX idx_url_results_client ON url_results(client_id, tested_at DESC) + WHERE client_id IS NOT NULL; +CREATE INDEX idx_url_results_tested_at ON url_results(tested_at DESC); +CREATE INDEX idx_url_results_result_type ON url_results(result_type, tested_at DESC); diff --git a/migrations/20251114120000_add_url_discovery_pending_since.down.sql b/migrations/20251114120000_add_url_discovery_pending_since.down.sql new file mode 100644 index 0000000..ce1b5d9 --- /dev/null +++ b/migrations/20251114120000_add_url_discovery_pending_since.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE storage_providers +DROP COLUMN url_discovery_pending_since; diff --git a/migrations/20251114120000_add_url_discovery_pending_since.up.sql b/migrations/20251114120000_add_url_discovery_pending_since.up.sql new file mode 100644 index 0000000..3ea352e --- /dev/null +++ b/migrations/20251114120000_add_url_discovery_pending_since.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE storage_providers +ADD COLUMN url_discovery_pending_since TIMESTAMPTZ NULL; diff --git a/scripts/setup_dev_db.sh b/scripts/setup_dev_db.sh index 7f0265b..08ad939 100755 --- a/scripts/setup_dev_db.sh +++ b/scripts/setup_dev_db.sh @@ -6,13 +6,15 @@ if [ -f .env ]; then export $(grep -v '^#' .env | xargs) fi +# SQL file name or default +FILE_NAME=${1:-"sample_deals.sql"} + if [ -z "$DATABASE_URL" ]; then echo "ERROR: DATABASE_URL not set in .env" exit 1 fi echo "Setting up development database..." -echo "Database: $DATABASE_URL" # Create unified_verified_deal table (matches DMOB schema) psql "$DATABASE_URL" < { debug!("No endpoints found"); return Ok(ok_response(CreateJobResponse { @@ -86,11 +92,15 @@ pub async fn handle_create_job( Err(e) => return Err(internal_server_error(e.to_string())), Ok(result) => result, }; - } + + Some(provider_addr) + } else { + None + }; let job = state .job_repo - .create_job(payload.provider, payload.client) + .create_job(provider_address, client_address) .await .map_err(|e| { error!("Failed to create job: {}", e); @@ -102,11 +112,3 @@ pub async fn handle_create_job( id: Some(job.id), })) } - -fn validate_address(address: &str) -> Result<(), String> { - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(address) { - return Err("Invalid provider or client address".to_string()); - } - Ok(()) -} diff --git a/url_finder/src/api/find_client.rs b/url_finder/src/api/find_client.rs index 18f6e69..be070d8 100644 --- a/url_finder/src/api/find_client.rs +++ b/url_finder/src/api/find_client.rs @@ -7,13 +7,17 @@ use axum::{ use axum_extra::extract::WithRejection; use color_eyre::Result; use common::api_response::*; -use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::time::timeout; use tracing::debug; use utoipa::{IntoParams, ToSchema}; -use crate::{AppState, provider_endpoints, services::deal_service, url_tester}; +use crate::{ + AppState, provider_endpoints, + services::deal_service, + types::{ClientAddress, ClientId, ProviderId}, + url_tester, +}; use super::ResultCode; @@ -64,33 +68,27 @@ pub async fn handle_find_client( &path.client ); - // validate provider and client addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&path.client) { - return Err(bad_request( - "Invalid provider or client address".to_string(), - )); - } + // Parse and validate client address + let client_address = ClientAddress::new(path.client.clone()) + .map_err(|e| bad_request(format!("Invalid client address: {}", e)))?; - let providers = match deal_service::get_distinct_providers_by_client( - &state.deal_repo, - &path.client, - ) - .await - { - Ok(providers) => providers, - Err(e) => { - debug!( - "Failed to get providers for client {}: {:?}", - &path.client, e - ); - - return Err(internal_server_error(format!( - "Failed to get providers for client {0}", - path.client - ))); - } - }; + let providers = + match deal_service::get_distinct_providers_by_client(&state.deal_repo, &client_address) + .await + { + Ok(providers) => providers, + Err(e) => { + debug!( + "Failed to get providers for client {}: {:?}", + &path.client, e + ); + + return Err(internal_server_error(format!( + "Failed to get providers for client {0}", + path.client + ))); + } + }; if providers.is_empty() { debug!("No providers found for client {}", &path.client); @@ -115,7 +113,7 @@ pub async fn handle_find_client( debug!("No endpoints found for provider {}", &provider); results.push(ProviderResult { - provider: provider.clone(), + provider: provider.to_string(), result: result_code, working_url: None, retrievability_percent: 0.0, @@ -124,17 +122,13 @@ pub async fn handle_find_client( } let endpoints = endpoints.unwrap(); - let provider_db = provider.strip_prefix("f0").unwrap_or(&provider).to_string(); - let client = path - .client - .strip_prefix("f0") - .unwrap_or(&path.client) - .to_string(); + let provider_id: ProviderId = provider.clone().into(); + let client_id: ClientId = client_address.clone().into(); let piece_ids = deal_service::get_random_piece_ids_by_provider_and_client( &state.deal_repo, - &provider_db, - &client, + &provider_id, + &client_id, ) .await .map_err(|e| { @@ -147,7 +141,7 @@ pub async fn handle_find_client( debug!("No deals found for provider {}", &provider); results.push(ProviderResult { - provider: provider.clone(), + provider: provider.to_string(), result: ResultCode::NoDealsFound, working_url: None, retrievability_percent: 0.0, @@ -182,7 +176,7 @@ pub async fn handle_find_client( ); // In case of timeout results.push(ProviderResult { - provider: provider.clone(), + provider: provider.to_string(), result: ResultCode::TimedOut, working_url: first_url, retrievability_percent: 0.0, @@ -192,7 +186,7 @@ pub async fn handle_find_client( }; results.push(ProviderResult { - provider: provider.clone(), + provider: provider.to_string(), result: ResultCode::Success, working_url: first_url, retrievability_percent: retrievability_percent.unwrap_or(0.0), diff --git a/url_finder/src/api/find_retri_sp.rs b/url_finder/src/api/find_retri_sp.rs index 494a579..9da9269 100644 --- a/url_finder/src/api/find_retri_sp.rs +++ b/url_finder/src/api/find_retri_sp.rs @@ -7,13 +7,17 @@ use axum::{ use axum_extra::extract::WithRejection; use color_eyre::Result; use common::api_response::*; -use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::time::timeout; use tracing::debug; use utoipa::{IntoParams, ToSchema}; -use crate::{AppState, provider_endpoints, services::deal_service, url_tester}; +use crate::{ + AppState, provider_endpoints, + services::deal_service, + types::{ProviderAddress, ProviderId}, + url_tester, +}; use super::ResultCode; @@ -55,14 +59,12 @@ pub async fn handle_find_retri_by_sp( ) -> Result, ApiResponse<()>> { debug!("find retri for input address: {:?}", &path.provider); - // validate provider addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&path.provider) { - return Err(bad_request("Invalid provider address".to_string())); - } + // Parse and validate provider address + let provider_address = ProviderAddress::new(path.provider) + .map_err(|e| bad_request(format!("Invalid provider address: {}", e)))?; let (result_code, endpoints) = - match provider_endpoints::get_provider_endpoints(&path.provider).await { + match provider_endpoints::get_provider_endpoints(&provider_address).await { Ok(endpoints) => endpoints, Err(e) => return Err(internal_server_error(e.to_string())), }; @@ -77,13 +79,9 @@ pub async fn handle_find_retri_by_sp( } let endpoints = endpoints.unwrap(); - let provider = path - .provider - .strip_prefix("f0") - .unwrap_or(&path.provider) - .to_string(); + let provider_id: ProviderId = provider_address.into(); - let piece_ids = deal_service::get_random_piece_ids_by_provider(&state.deal_repo, &provider) + let piece_ids = deal_service::get_random_piece_ids_by_provider(&state.deal_repo, &provider_id) .await .map_err(|e| { debug!("Failed to get piece ids: {:?}", e); diff --git a/url_finder/src/api/find_retri_sp_client.rs b/url_finder/src/api/find_retri_sp_client.rs index 920de75..81ea843 100644 --- a/url_finder/src/api/find_retri_sp_client.rs +++ b/url_finder/src/api/find_retri_sp_client.rs @@ -7,13 +7,17 @@ use axum::{ use axum_extra::extract::WithRejection; use color_eyre::Result; use common::api_response::*; -use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::time::timeout; use tracing::debug; use utoipa::{IntoParams, ToSchema}; -use crate::{AppState, provider_endpoints, services::deal_service, url_tester}; +use crate::{ + AppState, provider_endpoints, + services::deal_service, + types::{ClientAddress, ClientId, ProviderAddress, ProviderId}, + url_tester, +}; use super::ResultCode; @@ -59,16 +63,14 @@ pub async fn handle_find_retri_by_client_and_sp( &path.provider, &path.client ); - // validate provider and client addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&path.provider) || !address_pattern.is_match(&path.client) { - return Err(bad_request( - "Invalid provider or client address".to_string(), - )); - } + // Parse and validate provider and client addresses + let provider_address = ProviderAddress::new(path.provider) + .map_err(|e| bad_request(format!("Invalid provider address: {}", e)))?; + let client_address = ClientAddress::new(path.client) + .map_err(|e| bad_request(format!("Invalid client address: {}", e)))?; let (result_code, endpoints) = - match provider_endpoints::get_provider_endpoints(&path.provider).await { + match provider_endpoints::get_provider_endpoints(&provider_address).await { Ok(endpoints) => endpoints, Err(e) => return Err(internal_server_error(e.to_string())), }; @@ -83,22 +85,13 @@ pub async fn handle_find_retri_by_client_and_sp( } let endpoints = endpoints.unwrap(); - let provider = path - .provider - .strip_prefix("f0") - .unwrap_or(&path.provider) - .to_string(); - - let client = path - .client - .strip_prefix("f0") - .unwrap_or(&path.client) - .to_string(); + let provider_id: ProviderId = provider_address.into(); + let client_id: ClientId = client_address.into(); let piece_ids = deal_service::get_random_piece_ids_by_provider_and_client( &state.deal_repo, - &provider, - &client, + &provider_id, + &client_id, ) .await .map_err(|e| { diff --git a/url_finder/src/api/find_url_sp.rs b/url_finder/src/api/find_url_sp.rs index 8b0be49..fe9c427 100644 --- a/url_finder/src/api/find_url_sp.rs +++ b/url_finder/src/api/find_url_sp.rs @@ -7,12 +7,16 @@ use axum::{ use axum_extra::extract::WithRejection; use color_eyre::Result; use common::api_response::*; -use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::debug; use utoipa::{IntoParams, ToSchema}; -use crate::{AppState, ResultCode, provider_endpoints, services::deal_service, url_tester}; +use crate::{ + AppState, ResultCode, provider_endpoints, + services::deal_service, + types::{ProviderAddress, ProviderId}, + url_tester, +}; #[derive(Deserialize, ToSchema, IntoParams)] pub struct FindUrlSpPath { @@ -48,16 +52,12 @@ pub async fn handle_find_url_sp( ) -> Result, ApiResponse<()>> { debug!("find url input address: {:?}", &path.provider); - // validate provider and client addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&path.provider) { - return Err(bad_request( - "Invalid provider or client address".to_string(), - )); - } + // Parse and validate provider address + let provider_address = ProviderAddress::new(path.provider) + .map_err(|e| bad_request(format!("Invalid provider address: {}", e)))?; let (result_code, endpoints) = - match provider_endpoints::get_provider_endpoints(&path.provider).await { + match provider_endpoints::get_provider_endpoints(&provider_address).await { Ok(endpoints) => endpoints, Err(e) => return Err(internal_server_error(e.to_string())), }; @@ -72,13 +72,9 @@ pub async fn handle_find_url_sp( } let endpoints = endpoints.unwrap(); - let provider = path - .provider - .strip_prefix("f0") - .unwrap_or(&path.provider) - .to_string(); + let provider_id: ProviderId = provider_address.into(); - let piece_ids = deal_service::get_piece_ids_by_provider(&state.deal_repo, &provider, None) + let piece_ids = deal_service::get_piece_ids_by_provider(&state.deal_repo, &provider_id, None) .await .map_err(|e| { debug!("Failed to get piece ids: {:?}", e); diff --git a/url_finder/src/api/find_url_sp_client.rs b/url_finder/src/api/find_url_sp_client.rs index f8052a6..6736cd2 100644 --- a/url_finder/src/api/find_url_sp_client.rs +++ b/url_finder/src/api/find_url_sp_client.rs @@ -7,12 +7,16 @@ use axum::{ use axum_extra::extract::WithRejection; use color_eyre::Result; use common::api_response::*; -use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::debug; use utoipa::{IntoParams, ToSchema}; -use crate::{AppState, ResultCode, provider_endpoints, services::deal_service, url_tester}; +use crate::{ + AppState, ResultCode, provider_endpoints, + services::deal_service, + types::{ClientAddress, ClientId, ProviderAddress, ProviderId}, + url_tester, +}; #[derive(Deserialize, ToSchema, IntoParams)] pub struct FindUrlSpClientPath { @@ -52,16 +56,14 @@ pub async fn handle_find_url_sp_client( ) -> Result, ApiResponse<()>> { debug!("find url input address: {:?}", &path.provider); - // validate provider and client addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&path.provider) || !address_pattern.is_match(&path.client) { - return Err(bad_request( - "Invalid provider or client address".to_string(), - )); - } + // Parse and validate provider and client addresses + let provider_address = ProviderAddress::new(path.provider) + .map_err(|e| bad_request(format!("Invalid provider address: {}", e)))?; + let client_address = ClientAddress::new(path.client) + .map_err(|e| bad_request(format!("Invalid client address: {}", e)))?; let (result_code, endpoints) = - match provider_endpoints::get_provider_endpoints(&path.provider).await { + match provider_endpoints::get_provider_endpoints(&provider_address).await { Ok(endpoints) => endpoints, Err(e) => return Err(internal_server_error(e.to_string())), }; @@ -76,20 +78,11 @@ pub async fn handle_find_url_sp_client( } let endpoints = endpoints.unwrap(); - let provider = path - .provider - .strip_prefix("f0") - .unwrap_or(&path.provider) - .to_string(); - - let client = path - .client - .strip_prefix("f0") - .unwrap_or(&path.client) - .to_string(); + let provider_id: ProviderId = provider_address.into(); + let client_id: ClientId = client_address.into(); let piece_ids = - deal_service::get_piece_ids_by_provider(&state.deal_repo, &provider, Some(&client)) + deal_service::get_piece_ids_by_provider(&state.deal_repo, &provider_id, Some(&client_id)) .await .map_err(|e| { debug!("Failed to get piece ids: {:?}", e); diff --git a/url_finder/src/api/responses.rs b/url_finder/src/api/responses.rs index 2f38d9a..98ff59f 100644 --- a/url_finder/src/api/responses.rs +++ b/url_finder/src/api/responses.rs @@ -1,41 +1 @@ -use std::fmt; - -use serde::{Deserialize, Serialize}; -use utoipa::ToSchema; - -#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)] -pub enum ResultCode { - NoCidContactData, - MissingAddrFromCidContact, - MissingHttpAddrFromCidContact, - FailedToGetWorkingUrl, - NoDealsFound, - TimedOut, - Success, - JobCreated, - Error, -} - -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Serialize, ToSchema, Clone)] -pub enum ErrorCode { - NoProviderOrClient, - NoProvidersFound, - FailedToRetrieveCidContactData, - FailedToGetPeerId, - FailedToGetDeals, - FailedToGetPeerIdFromCurio, -} -impl fmt::Display for ErrorCode { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match self { - ErrorCode::NoProviderOrClient => "NoProviderOrClient", - ErrorCode::NoProvidersFound => "NoProvidersFound", - ErrorCode::FailedToRetrieveCidContactData => "FailedToRetrieveCidContactData", - ErrorCode::FailedToGetPeerId => "FailedToGetPeerId", - ErrorCode::FailedToGetDeals => "FailedToGetDeals", - ErrorCode::FailedToGetPeerIdFromCurio => "FailedToGetPeerIdFromCurio", - }; - write!(f, "{s}") - } -} +pub use crate::types::{ErrorCode, ResultCode}; diff --git a/url_finder/src/background/job_handler.rs b/url_finder/src/background/job_handler.rs index 81f11c3..5dd5bd1 100644 --- a/url_finder/src/background/job_handler.rs +++ b/url_finder/src/background/job_handler.rs @@ -5,7 +5,10 @@ use tracing::{debug, info}; use crate::{ ErrorCode, Job, JobRepository, JobStatus, ResultCode, provider_endpoints, - repository::DealRepository, services::deal_service, url_tester, + repository::DealRepository, + services::deal_service, + types::{ClientAddress, ClientId, ProviderAddress, ProviderId}, + url_tester, }; const LOOP_DELAY: Duration = Duration::from_secs(5); @@ -17,16 +20,16 @@ pub struct JobFailed { } pub struct JobSuccessResult { - pub provider: String, - pub client: Option, + pub provider: ProviderAddress, + pub client: Option, pub working_url: Option, pub retrievability: f64, pub result: ResultCode, } pub struct JobErrorResult { - pub provider: String, - pub client: Option, + pub provider: ProviderAddress, + pub client: Option, pub error: Option, pub result: Option, } @@ -139,10 +142,12 @@ pub async fn job_handler(job_repo: Arc, deal_repo: Arc JobHandlerResult { match (&job.provider, &job.client) { - (Some(provider), None) => process_job_with_provider(deal_repo, provider).await, - (None, Some(client)) => process_job_with_client(deal_repo, client).await, - (Some(provider), Some(client)) => { - process_job_with_provider_and_client(deal_repo, provider, client).await + (Some(provider_address), None) => { + process_job_with_provider(deal_repo, provider_address).await + } + (None, Some(client_address)) => process_job_with_client(deal_repo, client_address).await, + (Some(provider_address), Some(client_address)) => { + process_job_with_provider_and_client(deal_repo, provider_address, client_address).await } (None, None) => { // should not happen @@ -155,35 +160,42 @@ async fn process_pending_job(deal_repo: &DealRepository, job: &Job) -> JobHandle } } -async fn process_job_with_client(deal_repo: &DealRepository, client: &str) -> JobHandlerResult { - let providers = match deal_service::get_distinct_providers_by_client(deal_repo, client).await { - Ok(providers) => providers, - Err(e) => { - debug!("Failed to get providers for client {}: {:?}", client, e); - return JobHandlerResult::Skip(format!("Failed to get providers for client {client}")); - } - }; +async fn process_job_with_client( + deal_repo: &DealRepository, + client_address: &ClientAddress, +) -> JobHandlerResult { + let providers = + match deal_service::get_distinct_providers_by_client(deal_repo, client_address).await { + Ok(providers) => providers, + Err(e) => { + debug!( + "Failed to get providers for client {}: {:?}", + client_address, e + ); + return JobHandlerResult::Skip(format!( + "Failed to get providers for client {client_address}" + )); + } + }; if providers.is_empty() { return JobHandlerResult::FailedJob(JobFailed { error: Some(ErrorCode::NoProvidersFound), result: Some(ResultCode::Error), - reason: format!("No providers found for client: {client}"), + reason: format!("No providers found for client: {client_address}"), }); } let mut success_results = Vec::new(); let mut error_results = Vec::new(); - for provider in providers { - debug!("Processing job with provider: {}", &provider); + for provider_address in providers { + debug!("Processing job with provider: {}", &provider_address); - match process_job(deal_repo, &provider, Some(client)).await { + match process_job(deal_repo, &provider_address, Some(client_address)).await { JobHandlerResult::SuccessResult(result) => success_results.push(result), JobHandlerResult::ErrorResult(result) => error_results.push(result), - JobHandlerResult::Skip(reason) => { - return JobHandlerResult::Skip(reason); - } + JobHandlerResult::Skip(reason) => return JobHandlerResult::Skip(reason), JobHandlerResult::FailedJob(job_failed) => { return JobHandlerResult::FailedJob(job_failed); } @@ -197,33 +209,39 @@ async fn process_job_with_client(deal_repo: &DealRepository, client: &str) -> Jo async fn process_job_with_provider_and_client( deal_repo: &DealRepository, - provider: &str, - client: &str, + provider_address: &ProviderAddress, + client_address: &ClientAddress, ) -> JobHandlerResult { debug!( "Processing job with provider: {} and client: {}", - provider, client + provider_address, client_address ); - process_job(deal_repo, provider, Some(client)).await + process_job(deal_repo, provider_address, Some(client_address)).await } -async fn process_job_with_provider(deal_repo: &DealRepository, provider: &str) -> JobHandlerResult { - debug!("Processing job with provider: {}", provider); +async fn process_job_with_provider( + deal_repo: &DealRepository, + provider_address: &ProviderAddress, +) -> JobHandlerResult { + debug!("Processing job with provider: {}", provider_address); - process_job(deal_repo, provider, None).await + process_job(deal_repo, provider_address, None).await } async fn process_job( deal_repo: &DealRepository, - provider: &str, - client: Option<&str>, + provider_address: &ProviderAddress, + client_address: Option<&ClientAddress>, ) -> JobHandlerResult { - let (_, endpoints) = match provider_endpoints::get_provider_endpoints(provider).await { + let provider_id: ProviderId = provider_address.clone().into(); + let client_id: Option = client_address.map(|c| c.clone().into()); + + let (_, endpoints) = match provider_endpoints::get_provider_endpoints(provider_address).await { Ok((result_code, _)) if result_code != ResultCode::Success => { return JobHandlerResult::ErrorResult(JobErrorResult { - provider: provider.to_string(), - client: client.map(|c| c.to_string()), + provider: provider_address.clone(), + client: client_address.cloned(), result: Some(result_code), error: None, }); @@ -231,8 +249,8 @@ async fn process_job( Ok(result) => result, Err(error_code) => { return JobHandlerResult::ErrorResult(JobErrorResult { - provider: provider.to_string(), - client: client.map(|c| c.to_string()), + provider: provider_address.clone(), + client: client_address.cloned(), result: Some(ResultCode::Error), error: Some(error_code), }); @@ -243,19 +261,18 @@ async fn process_job( debug!("No endpoints found"); return JobHandlerResult::ErrorResult(JobErrorResult { - provider: provider.to_string(), - client: client.map(|c| c.to_string()), + provider: provider_address.clone(), + client: client_address.cloned(), result: Some(ResultCode::NoDealsFound), error: None, }); } let endpoints = endpoints.unwrap(); - let provider_db = provider.strip_prefix("f0").unwrap_or(provider); - let client_db = client.as_ref().map(|c| c.strip_prefix("f0").unwrap_or(c)); - let piece_ids = - match deal_service::get_piece_ids_by_provider(deal_repo, provider_db, client_db).await { + match deal_service::get_piece_ids_by_provider(deal_repo, &provider_id, client_id.as_ref()) + .await + { Ok(ids) => ids, Err(e) => { debug!("Failed to get piece ids: {:?}", e); @@ -268,8 +285,8 @@ async fn process_job( debug!("No deals found"); return JobHandlerResult::ErrorResult(JobErrorResult { - provider: provider.to_string(), - client: client.map(|c| c.to_string()), + provider: provider_address.clone(), + client: client_address.cloned(), result: Some(ResultCode::NoDealsFound), error: None, }); @@ -287,8 +304,8 @@ async fn process_job( }; JobHandlerResult::SuccessResult(JobSuccessResult { - provider: provider.to_string(), - client: client.map(|c| c.to_string()), + provider: provider_address.clone(), + client: client_address.cloned(), working_url, retrievability: retrievability_percent.unwrap_or(0.0), result: result_code, diff --git a/url_finder/src/background/mod.rs b/url_finder/src/background/mod.rs index 0316893..31f2d21 100644 --- a/url_finder/src/background/mod.rs +++ b/url_finder/src/background/mod.rs @@ -1,5 +1,7 @@ mod job_handler; mod provider_discovery; +mod url_discovery_scheduler; pub use job_handler::*; pub use provider_discovery::*; +pub use url_discovery_scheduler::*; diff --git a/url_finder/src/background/provider_discovery.rs b/url_finder/src/background/provider_discovery.rs index 9b69523..5294bef 100644 --- a/url_finder/src/background/provider_discovery.rs +++ b/url_finder/src/background/provider_discovery.rs @@ -36,8 +36,5 @@ async fn discover_and_sync_providers( debug!("Found {} distinct providers in dmob", providers.len()); - match sp_repo.insert_batch_if_not_exists(&providers).await { - Ok(count) => Ok(count), - Err(e) => Err(e), - } + sp_repo.insert_batch_if_not_exists(&providers).await } diff --git a/url_finder/src/background/url_discovery_scheduler.rs b/url_finder/src/background/url_discovery_scheduler.rs new file mode 100644 index 0000000..577042d --- /dev/null +++ b/url_finder/src/background/url_discovery_scheduler.rs @@ -0,0 +1,163 @@ +use crate::{ + repository::{DealRepository, StorageProviderRepository, UrlResult, UrlResultRepository}, + services::url_discovery_service, + types::{ClientAddress, ClientId, ProviderAddress, ProviderId}, +}; +use chrono::Utc; +use color_eyre::Result; +use futures::future::join_all; +use std::sync::Arc; +use std::time::Duration; +use tokio::{sync::Semaphore, time::sleep}; +use tracing::{debug, error, info, warn}; + +const SCHEDULER_SLEEP_INTERVAL: Duration = Duration::from_secs(3600); +const SCHEDULER_NEXT_INTERVAL: Duration = Duration::from_secs(60); +const BATCH_SIZE: i64 = 100; +const MAX_CONCURRENT_CLIENT_TESTS: usize = 5; + +pub async fn run_url_discovery_scheduler( + sp_repo: Arc, + url_repo: Arc, + deal_repo: Arc, +) { + info!("Starting URL discovery scheduler loop"); + + loop { + let interval = match schedule_url_discoveries(&sp_repo, &url_repo, &deal_repo).await { + Ok(0) => { + info!("No providers due for URL discovery, sleeping..."); + SCHEDULER_SLEEP_INTERVAL + } + Ok(count) => { + info!("URL discovery cycle completed: {} providers tested", count); + SCHEDULER_NEXT_INTERVAL + } + Err(e) => { + error!("URL discovery scheduler failed: {:?}", e); + SCHEDULER_SLEEP_INTERVAL + } + }; + + sleep(interval).await; + } +} + +async fn schedule_url_discoveries( + sp_repo: &StorageProviderRepository, + url_repo: &UrlResultRepository, + deal_repo: &DealRepository, +) -> Result { + let providers = sp_repo.get_due_for_url_discovery(BATCH_SIZE).await?; + + debug!("Found {} providers due for URL discovery", providers.len()); + + let mut total_tested = 0; + + for provider in providers { + if provider.url_discovery_status.as_deref() == Some("pending") { + warn!( + "Recovering stale pending provider: {} (pending since {:?})", + provider.provider_id, provider.url_discovery_pending_since + ); + } + + sp_repo + .set_url_discovery_pending(&provider.provider_id) + .await?; + + let clients = deal_repo + .get_clients_for_provider(&provider.provider_id) + .await?; + + debug!( + "Provider {} has {} clients", + provider.provider_id, + clients.len() + ); + + let results = test_provider_with_clients(&provider.provider_id, clients, deal_repo).await; + + let url_results: Vec = results + .iter() + .map(|r| UrlResult { + id: r.id, + provider_id: r.provider_id.clone(), + client_id: r.client_id.clone(), + result_type: r.result_type.clone(), + working_url: r.working_url.clone(), + retrievability_percent: r.retrievability_percent, + result_code: r.result_code.clone(), + error_code: r.error_code.clone(), + tested_at: Utc::now(), + }) + .collect(); + + match url_repo.insert_batch(&url_results).await { + Ok(count) => debug!( + "Inserted {} URL results for provider {}", + count, provider.provider_id + ), + Err(e) => error!("Failed to insert URL results: {:?}", e), + } + + let provider_only_result = results.iter().find(|r| r.client_id.is_none()); + let last_working_url = provider_only_result.and_then(|r| r.working_url.clone()); + + sp_repo + .update_after_url_discovery(&provider.provider_id, last_working_url) + .await?; + + total_tested += 1; + } + + Ok(total_tested) +} + +async fn test_provider_with_clients( + provider_id: &ProviderId, + client_ids: Vec, + deal_repo: &DealRepository, +) -> Vec { + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CLIENT_TESTS)); + let mut tasks = vec![]; + let provider_address: ProviderAddress = provider_id.clone().into(); + + let provider_task = { + let addr = provider_address.clone(); + let repo = deal_repo.clone(); + tokio::spawn(async move { url_discovery_service::discover_url(&addr, None, &repo).await }) + }; + tasks.push(provider_task); + + for client_id in client_ids { + let permit = semaphore + .clone() + .acquire_owned() + .await + .expect("Semaphore should never be closed"); + let provider_addr = provider_address.clone(); + let client_address: ClientAddress = client_id.into(); + let repo = deal_repo.clone(); + tasks.push(tokio::spawn(async move { + let result = + url_discovery_service::discover_url(&provider_addr, Some(client_address), &repo) + .await; + drop(permit); // release semaphore + result + })); + } + + let results = join_all(tasks).await; + + results + .into_iter() + .filter_map(|r| { + r.map_err(|e| { + error!("URL discovery task panicked: {:?}", e); + e + }) + .ok() + }) + .collect() +} diff --git a/url_finder/src/cid_contact.rs b/url_finder/src/cid_contact.rs index c329e5f..155fa60 100644 --- a/url_finder/src/cid_contact.rs +++ b/url_finder/src/cid_contact.rs @@ -1,9 +1,15 @@ +use std::{fmt, time::Duration}; + use color_eyre::Result; -use reqwest::Client; -use std::fmt; -use tracing::{debug, info}; +use tracing::debug; use urlencoding::decode; +use crate::utils::build_reqwest_retry_client; + +const CID_CONTACT_MIN_RETRY_INTERVAL_MS: u64 = 2_000; +const CID_CONTACT_MAX_RETRY_INTERVAL_MS: u64 = 30_000; +const CID_CONTACT_TOTAL_TIMEOUT_MS: u64 = 60_000; + pub enum CidContactError { InvalidResponse, NoData, @@ -19,7 +25,10 @@ impl fmt::Display for CidContactError { } pub async fn get_contact(peer_id: &str) -> Result { - let client = Client::new(); + let client = build_reqwest_retry_client( + CID_CONTACT_MIN_RETRY_INTERVAL_MS, + CID_CONTACT_MAX_RETRY_INTERVAL_MS, + ); let url = format!("https://cid.contact/providers/{peer_id}"); debug!("cid contact url: {:?}", url); @@ -28,6 +37,7 @@ pub async fn get_contact(peer_id: &str) -> Result Result Result { +pub async fn get_peer_id(address: &ProviderAddress) -> Result { debug!("get_peer_id address: {}", address); - let client = Client::new(); + let client = build_reqwest_retry_client( + LOTUS_RPC_MIN_RETRY_INTERVAL_MS, + LOTUS_RPC_MAX_RETRY_INTERVAL_MS, + ); let res = client .post(&CONFIG.glif_url) .json(&json!({ "jsonrpc": "2.0", "id": 1, "method": "Filecoin.StateMinerInfo", - "params": [address, null] + "params": [address.as_str(), null] })) + .timeout(Duration::from_millis(LOTUS_RPC_TOTAL_TIMEOUT_MS)) .send() .await?; @@ -30,10 +39,12 @@ pub async fn get_peer_id(address: &str) -> Result { let peer_id = json .get("result") - .ok_or(if let Some(m) = message { - eyre!(m) - } else { - eyre!("Missing lotus rpc result") + .ok_or_else(|| { + if let Some(m) = message { + eyre!("{}", m) + } else { + eyre!("Missing lotus rpc result") + } })? .get("PeerId") .ok_or(eyre!("Missing lotus rpc PeerId"))? diff --git a/url_finder/src/main.rs b/url_finder/src/main.rs index 069a28a..fba4af0 100644 --- a/url_finder/src/main.rs +++ b/url_finder/src/main.rs @@ -40,6 +40,7 @@ mod routes; mod services; mod types; mod url_tester; +mod utils; pub struct AppState { pub deal_repo: Arc, @@ -97,6 +98,7 @@ async fn main() -> Result<()> { let sp_repo = Arc::new(StorageProviderRepository::new(pool.clone())); let deal_repo = Arc::new(DealRepository::new(dmob_pool.clone())); + let url_repo = Arc::new(UrlResultRepository::new(pool.clone())); let app_state = Arc::new(AppState { deal_repo: deal_repo.clone(), @@ -115,6 +117,16 @@ async fn main() -> Result<()> { } }); + // Start the URL discovery scheduler in the background + tokio::spawn({ + let sp_repo = sp_repo.clone(); + let url_repo = url_repo.clone(); + let deal_repo = deal_repo.clone(); + async move { + background::run_url_discovery_scheduler(sp_repo, url_repo, deal_repo).await; + } + }); + // Start the job handler in the background tokio::spawn(background::job_handler( app_state.job_repo.clone(), diff --git a/url_finder/src/multiaddr_parser.rs b/url_finder/src/multiaddr_parser.rs index 2d809e7..b8b9d1b 100644 --- a/url_finder/src/multiaddr_parser.rs +++ b/url_finder/src/multiaddr_parser.rs @@ -1,6 +1,6 @@ use color_eyre::{Result, eyre::eyre}; use multiaddr::{Multiaddr, Protocol}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; pub struct UrlParts { protocol: Option, @@ -52,7 +52,7 @@ fn parse_addr(addr: String) -> Option { Some(endpoint) } Err(e) => { - info!( + warn!( "Failed to convert multiaddr: {:?} to URL: {:?}", addr, e.to_string() diff --git a/url_finder/src/provider_endpoints.rs b/url_finder/src/provider_endpoints.rs index 957c82a..c881a7a 100644 --- a/url_finder/src/provider_endpoints.rs +++ b/url_finder/src/provider_endpoints.rs @@ -18,6 +18,7 @@ use crate::{ ErrorCode, ResultCode, cid_contact::{self, CidContactError}, lotus_rpc, multiaddr_parser, + types::ProviderAddress, }; sol! { @@ -29,7 +30,7 @@ sol! { function getPeerData(uint64 minerID) view returns (PeerData); } -pub async fn valid_curio_provider(address: &str) -> Result> { +pub async fn valid_curio_provider(address: &ProviderAddress) -> Result> { let rpc_url = &CONFIG.glif_url; let rpc_provider = ProviderBuilder::new() @@ -40,6 +41,7 @@ pub async fn valid_curio_provider(address: &str) -> Result> { let miner_peer_id_contract: Address = address!("0x14183aD016Ddc83D638425D6328009aa390339Ce"); let miner_id = address + .as_str() .strip_prefix("f") .ok_or_else(|| eyre!("Address does not start with 'f': {}", address))? .parse::() @@ -79,7 +81,7 @@ pub async fn valid_curio_provider(address: &str) -> Result> { } pub async fn get_provider_endpoints( - address: &str, + address: &ProviderAddress, ) -> Result<(ResultCode, Option>), ErrorCode> { let peer_id = if let Some(curio_provider) = valid_curio_provider(address).await.map_err(|e| { diff --git a/url_finder/src/repository/deal_repo.rs b/url_finder/src/repository/deal_repo.rs index 055557f..b288cf0 100644 --- a/url_finder/src/repository/deal_repo.rs +++ b/url_finder/src/repository/deal_repo.rs @@ -2,6 +2,8 @@ use color_eyre::Result; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use crate::types::{ClientId, ProviderId}; + #[derive(Clone)] pub struct DealRepository { pool: PgPool, @@ -29,7 +31,7 @@ impl DealRepository { pub async fn get_deals_by_provider( &self, - provider: &str, + provider_id: &ProviderId, limit: i64, offset: i64, ) -> Result, sqlx::Error> { @@ -44,13 +46,13 @@ impl DealRepository { "providerId" AS provider_id, "pieceCid" AS piece_cid FROM unified_verified_deal - WHERE + WHERE "providerId" = $1 ORDER BY random() LIMIT $2 OFFSET $3 "#, - provider, + provider_id.as_str(), limit, offset, ) @@ -62,8 +64,8 @@ impl DealRepository { pub async fn get_deals_by_provider_and_client( &self, - provider: &str, - client: &str, + provider_id: &ProviderId, + client_id: &ClientId, limit: i64, offset: i64, ) -> Result, sqlx::Error> { @@ -85,8 +87,8 @@ impl DealRepository { LIMIT $3 OFFSET $4 "#, - provider, - client, + provider_id.as_str(), + client_id.as_str(), limit, offset, ) @@ -98,8 +100,8 @@ impl DealRepository { pub async fn get_random_deals_by_provider_and_client( &self, - provider: &str, - client: &str, + provider_id: &ProviderId, + client_id: &ClientId, limit: i64, offset: i64, ) -> Result, sqlx::Error> { @@ -114,15 +116,15 @@ impl DealRepository { "providerId" AS provider_id, "pieceCid" AS piece_cid FROM unified_verified_deal - WHERE + WHERE "providerId" = $1 AND "clientId" = $2 ORDER BY random() LIMIT $3 OFFSET $4 "#, - provider, - client, + provider_id.as_str(), + client_id.as_str(), limit, offset, ) @@ -134,7 +136,7 @@ impl DealRepository { pub async fn get_random_deals_by_provider( &self, - provider: &str, + provider_id: &ProviderId, limit: i64, offset: i64, ) -> Result, sqlx::Error> { @@ -149,13 +151,13 @@ impl DealRepository { "providerId" AS provider_id, "pieceCid" AS piece_cid FROM unified_verified_deal - WHERE + WHERE "providerId" = $1 ORDER BY random() LIMIT $2 OFFSET $3 "#, - provider, + provider_id.as_str(), limit, offset, ) @@ -167,7 +169,7 @@ impl DealRepository { pub async fn get_distinct_providers_by_client( &self, - client: &str, + client_id: &ClientId, ) -> Result, sqlx::Error> { let data = sqlx::query_as!( Provider, @@ -179,7 +181,7 @@ impl DealRepository { WHERE "clientId" = $1 "#, - client, + client_id.as_str(), ) .fetch_all(&self.pool) .await?; @@ -187,14 +189,15 @@ impl DealRepository { Ok(data) } - pub async fn get_distinct_providers(&self) -> Result, sqlx::Error> { - let providers = sqlx::query_scalar!( - r#"SELECT DISTINCT - "providerId" - FROM - unified_verified_deal - WHERE - "providerId" IS NOT NULL + pub async fn get_distinct_providers(&self) -> Result, sqlx::Error> { + let providers: Vec = sqlx::query_scalar!( + r#" + SELECT DISTINCT + "providerId" + FROM + unified_verified_deal + WHERE + "providerId" IS NOT NULL "# ) .fetch_all(&self.pool) @@ -203,6 +206,37 @@ impl DealRepository { .flatten() .collect(); - Ok(providers) + Ok(providers + .into_iter() + .filter_map(|s| ProviderId::new(s).ok()) + .collect()) + } + + /// Get all unique client IDs for a given provider ID + /// NOTE: Production database has MAX 63 clients per provider + pub async fn get_clients_for_provider( + &self, + provider_id: &ProviderId, + ) -> Result, sqlx::Error> { + let clients = sqlx::query_scalar!( + r#" + SELECT DISTINCT + "clientId" + FROM + unified_verified_deal + WHERE + "providerId" = $1 + AND "clientId" IS NOT NULL + "#, + provider_id.as_str() + ) + .fetch_all(&self.pool) + .await?; + + Ok(clients + .into_iter() + .flatten() + .filter_map(|s| ClientId::new(s).ok()) + .collect()) } } diff --git a/url_finder/src/repository/job_repo.rs b/url_finder/src/repository/job_repo.rs index 2942229..364d947 100644 --- a/url_finder/src/repository/job_repo.rs +++ b/url_finder/src/repository/job_repo.rs @@ -9,12 +9,15 @@ use serde::Serialize; use utoipa::ToSchema; use uuid::Uuid; -use crate::{ErrorCode, ResultCode}; +use crate::{ + ErrorCode, ResultCode, + types::{ClientAddress, ProviderAddress}, +}; #[derive(Clone, Serialize, ToSchema)] pub struct ProviderResult { - pub provider: String, - pub client: Option, + pub provider: ProviderAddress, + pub client: Option, pub working_url: Option, pub retrievability: f64, pub result: ResultCode, @@ -29,8 +32,8 @@ pub struct Job { pub working_url: Option, pub retrievability: Option, pub results: Vec, - pub provider: Option, - pub client: Option, + pub provider: Option, + pub client: Option, pub status: JobStatus, pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -39,7 +42,7 @@ pub struct Job { pub updated_at: DateTime, } impl Job { - pub fn new(provider: Option, client: Option) -> Self { + pub fn new(provider: Option, client: Option) -> Self { Self { id: Uuid::new_v4(), working_url: None, @@ -82,8 +85,8 @@ impl JobRepository { pub async fn create_job( &self, - provider: Option, - client: Option, + provider: Option, + client: Option, ) -> Result { let job = Job::new(provider, client); @@ -96,8 +99,8 @@ impl JobRepository { pub async fn add_success_result( &self, job_id: Uuid, - provider: String, - client: Option, + provider: ProviderAddress, + client: Option, working_url: Option, retrievability: f64, result: ResultCode, @@ -122,8 +125,8 @@ impl JobRepository { pub async fn add_error_result( &self, job_id: Uuid, - provider: String, - client: Option, + provider: ProviderAddress, + client: Option, error: Option, result: Option, ) { diff --git a/url_finder/src/repository/mod.rs b/url_finder/src/repository/mod.rs index 2757ccb..ae54b9a 100644 --- a/url_finder/src/repository/mod.rs +++ b/url_finder/src/repository/mod.rs @@ -1,7 +1,9 @@ mod deal_repo; mod job_repo; mod storage_provider_repo; +mod url_result_repo; pub use deal_repo::*; pub use job_repo::*; pub use storage_provider_repo::*; +pub use url_result_repo::*; diff --git a/url_finder/src/repository/storage_provider_repo.rs b/url_finder/src/repository/storage_provider_repo.rs index c2ca762..96df6d6 100644 --- a/url_finder/src/repository/storage_provider_repo.rs +++ b/url_finder/src/repository/storage_provider_repo.rs @@ -5,12 +5,15 @@ use sqlx::PgPool; use utoipa::ToSchema; use uuid::Uuid; +use crate::types::ProviderId; + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, sqlx::FromRow)] pub struct StorageProvider { pub id: Uuid, - pub provider_id: String, + pub provider_id: ProviderId, pub next_url_discovery_at: DateTime, pub url_discovery_status: Option, + pub url_discovery_pending_since: Option>, pub last_working_url: Option, pub next_bms_test_at: DateTime, pub bms_test_status: Option, @@ -30,27 +33,16 @@ impl StorageProviderRepository { Self { pool } } - #[allow(dead_code)] - pub async fn insert_if_not_exists(&self, provider_id: &str) -> Result<()> { - sqlx::query!( - r#"INSERT INTO - storage_providers (provider_id) - VALUES - ($1) - ON CONFLICT DO NOTHING - "#, - provider_id - ) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn insert_batch_if_not_exists(&self, provider_ids: &[String]) -> Result { + pub async fn insert_batch_if_not_exists(&self, provider_ids: &[ProviderId]) -> Result { if provider_ids.is_empty() { return Ok(0); } + let provider_ids_str: Vec = provider_ids + .iter() + .map(|id| id.as_str().to_string()) + .collect(); + let result = sqlx::query!( r#"INSERT INTO storage_providers (provider_id) @@ -58,7 +50,7 @@ impl StorageProviderRepository { UNNEST($1::text[]) ON CONFLICT DO NOTHING "#, - provider_ids + &provider_ids_str ) .execute(&self.pool) .await?; @@ -66,29 +58,113 @@ impl StorageProviderRepository { Ok(result.rows_affected() as usize) } - pub async fn get_by_provider_id(&self, provider_id: &str) -> Result> { + pub async fn get_by_provider_id( + &self, + provider_id: &ProviderId, + ) -> Result> { Ok(sqlx::query_as!( StorageProvider, - r#"SELECT - id, - provider_id, - next_url_discovery_at, - url_discovery_status, + r#"SELECT + id, + provider_id AS "provider_id: ProviderId", + next_url_discovery_at, + url_discovery_status, + url_discovery_pending_since, last_working_url, - next_bms_test_at, - bms_test_status, - bms_routing_key, + next_bms_test_at, + bms_test_status, + bms_routing_key, last_bms_region_discovery_at, - created_at, + created_at, updated_at - FROM - storage_providers - WHERE + FROM + storage_providers + WHERE provider_id = $1 "#, - provider_id + provider_id as &ProviderId ) .fetch_optional(&self.pool) .await?) } + + pub async fn get_due_for_url_discovery(&self, limit: i64) -> Result> { + Ok(sqlx::query_as!( + StorageProvider, + r#"SELECT + id, + provider_id AS "provider_id: ProviderId", + next_url_discovery_at, + url_discovery_status, + url_discovery_pending_since, + last_working_url, + next_bms_test_at, + bms_test_status, + bms_routing_key, + last_bms_region_discovery_at, + created_at, + updated_at + FROM + storage_providers + WHERE + ( + next_url_discovery_at <= NOW() + AND url_discovery_status IS DISTINCT FROM 'pending' + ) + OR + ( + url_discovery_status = 'pending' + AND url_discovery_pending_since < NOW() - INTERVAL '60 minutes' + ) + ORDER BY + next_url_discovery_at ASC + LIMIT $1 + "#, + limit + ) + .fetch_all(&self.pool) + .await?) + } + + pub async fn set_url_discovery_pending(&self, provider_id: &ProviderId) -> Result<()> { + sqlx::query!( + r#"UPDATE + storage_providers + SET + url_discovery_status = 'pending', + url_discovery_pending_since = NOW() + WHERE + provider_id = $1 + "#, + provider_id as &ProviderId + ) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn update_after_url_discovery( + &self, + provider_id: &ProviderId, + last_working_url: Option, + ) -> Result<()> { + sqlx::query!( + r#"UPDATE + storage_providers + SET + next_url_discovery_at = NOW() + INTERVAL '1 day', + url_discovery_status = NULL, + url_discovery_pending_since = NULL, + last_working_url = $2, + updated_at = NOW() + WHERE + provider_id = $1 + "#, + provider_id as &ProviderId, + last_working_url + ) + .execute(&self.pool) + .await?; + Ok(()) + } } diff --git a/url_finder/src/repository/url_result_repo.rs b/url_finder/src/repository/url_result_repo.rs new file mode 100644 index 0000000..8d8bb90 --- /dev/null +++ b/url_finder/src/repository/url_result_repo.rs @@ -0,0 +1,93 @@ +use chrono::{DateTime, Utc}; +use color_eyre::Result; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::types::{ClientId, DiscoveryType, ErrorCode, ProviderId, ResultCode}; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, sqlx::FromRow)] +pub struct UrlResult { + pub id: Uuid, + pub provider_id: ProviderId, + pub client_id: Option, + pub result_type: DiscoveryType, + pub working_url: Option, + pub retrievability_percent: f64, + pub result_code: ResultCode, + pub error_code: Option, + pub tested_at: DateTime, +} + +#[derive(Clone)] +pub struct UrlResultRepository { + pool: PgPool, +} + +impl UrlResultRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub async fn insert_batch(&self, results: &[UrlResult]) -> Result { + if results.is_empty() { + return Ok(0); + } + + let len = results.len(); + let mut ids: Vec = Vec::with_capacity(len); + let mut provider_ids: Vec = Vec::with_capacity(len); + let mut client_ids: Vec> = Vec::with_capacity(len); + let mut result_types: Vec = Vec::with_capacity(len); + let mut working_urls: Vec> = Vec::with_capacity(len); + let mut retrievability_percents: Vec = Vec::with_capacity(len); + let mut result_codes: Vec = Vec::with_capacity(len); + let mut error_codes: Vec> = Vec::with_capacity(len); + let mut tested_ats: Vec> = Vec::with_capacity(len); + + for result in results { + ids.push(result.id); + provider_ids.push(result.provider_id.as_str().to_string()); + client_ids.push(result.client_id.as_ref().map(|c| c.as_str().to_string())); + result_types.push(result.result_type.clone()); + working_urls.push(result.working_url.clone()); + retrievability_percents.push(result.retrievability_percent); + result_codes.push(result.result_code.clone()); + error_codes.push(result.error_code.clone()); + tested_ats.push(result.tested_at); + } + + let result = sqlx::query!( + r#"INSERT INTO + url_results (id, provider_id, client_id, result_type, working_url, retrievability_percent, result_code, error_code, tested_at) + SELECT + a1, a2, a3, a4, a5, a6, a7, a8, a9 + FROM UNNEST( + $1::uuid[], + $2::text[], + $3::text[], + $4::discovery_type[], + $5::text[], + $6::double precision[], + $7::result_code[], + $8::error_code[], + $9::timestamptz[] + ) AS t(a1, a2, a3, a4, a5, a6, a7, a8, a9) + "#, + &ids as &[Uuid], + &provider_ids as &[String], + &client_ids as &[Option], + &result_types as &[DiscoveryType], + &working_urls as &[Option], + &retrievability_percents as &[f64], + &result_codes as &[ResultCode], + &error_codes as &[Option], + &tested_ats as &[DateTime] + ) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected().try_into()?) + } +} diff --git a/url_finder/src/services/deal_service.rs b/url_finder/src/services/deal_service.rs index 1e3e66c..5bc0e9f 100644 --- a/url_finder/src/services/deal_service.rs +++ b/url_finder/src/services/deal_service.rs @@ -1,23 +1,26 @@ use color_eyre::Result; -use crate::repository::DealRepository; +use crate::{ + repository::DealRepository, + types::{ClientAddress, ClientId, ProviderAddress, ProviderId}, +}; /// get deals and extract piece_ids pub async fn get_piece_ids_by_provider( deal_repo: &DealRepository, - provider: &str, - client: Option<&str>, + provider_id: &ProviderId, + client_id: Option<&ClientId>, ) -> Result> { let limit = 100; let offset = 0; - let deals = if let Some(client) = client { + let deals = if let Some(client) = client_id { deal_repo - .get_deals_by_provider_and_client(provider, client, limit, offset) + .get_deals_by_provider_and_client(provider_id, client, limit, offset) .await? } else { deal_repo - .get_deals_by_provider(provider, limit, offset) + .get_deals_by_provider(provider_id, limit, offset) .await? }; @@ -35,27 +38,22 @@ pub async fn get_piece_ids_by_provider( pub async fn get_distinct_providers_by_client( deal_repo: &DealRepository, - client: &str, -) -> Result> { - let client_db = client.strip_prefix("f0").unwrap_or(client); + client_address: &ClientAddress, +) -> Result> { + let client_id: ClientId = client_address.clone().into(); let deals = deal_repo - .get_distinct_providers_by_client(client_db) + .get_distinct_providers_by_client(&client_id) .await?; if deals.is_empty() { return Ok(vec![]); } - let providers: Vec = deals + let providers: Vec = deals .iter() .filter_map(|deal| deal.provider_id.clone()) - .map(|provider| { - if !provider.starts_with("f0") { - format!("f0{provider}") - } else { - provider - } - }) + .filter_map(|provider_id| ProviderId::new(provider_id).ok()) + .map(|provider_id| provider_id.into()) .collect(); Ok(providers) @@ -63,14 +61,14 @@ pub async fn get_distinct_providers_by_client( pub async fn get_random_piece_ids_by_provider_and_client( deal_repo: &DealRepository, - provider: &str, - client: &str, + provider_id: &ProviderId, + client_id: &ClientId, ) -> Result> { let limit = 100; let offset = 0; let deals = deal_repo - .get_random_deals_by_provider_and_client(provider, client, limit, offset) + .get_random_deals_by_provider_and_client(provider_id, client_id, limit, offset) .await?; if deals.is_empty() { @@ -87,13 +85,13 @@ pub async fn get_random_piece_ids_by_provider_and_client( pub async fn get_random_piece_ids_by_provider( deal_repo: &DealRepository, - provider: &str, + provider_id: &ProviderId, ) -> Result> { let limit = 100; let offset = 0; let deals = deal_repo - .get_random_deals_by_provider(provider, limit, offset) + .get_random_deals_by_provider(provider_id, limit, offset) .await?; if deals.is_empty() { diff --git a/url_finder/src/services/mod.rs b/url_finder/src/services/mod.rs index 6a9d3de..fa52f9d 100644 --- a/url_finder/src/services/mod.rs +++ b/url_finder/src/services/mod.rs @@ -1 +1,2 @@ pub mod deal_service; +pub mod url_discovery_service; diff --git a/url_finder/src/services/url_discovery_service.rs b/url_finder/src/services/url_discovery_service.rs new file mode 100644 index 0000000..dd10533 --- /dev/null +++ b/url_finder/src/services/url_discovery_service.rs @@ -0,0 +1,119 @@ +use crate::{ + provider_endpoints, + repository::DealRepository, + services::deal_service, + types::{ + ClientAddress, ClientId, DiscoveryType, ErrorCode, ProviderAddress, ProviderId, ResultCode, + }, + url_tester, +}; +use tracing::error; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct UrlDiscoveryResult { + pub id: Uuid, + pub provider_id: ProviderId, + pub client_id: Option, + pub result_type: DiscoveryType, + pub working_url: Option, + pub retrievability_percent: f64, + pub result_code: ResultCode, + pub error_code: Option, +} + +impl UrlDiscoveryResult { + pub fn new_provider_only(provider_id: ProviderId) -> Self { + Self { + id: Uuid::new_v4(), + provider_id, + client_id: None, + result_type: DiscoveryType::Provider, + working_url: None, + retrievability_percent: 0.0, + result_code: ResultCode::Error, + error_code: None, + } + } + + pub fn new_provider_client(provider_id: ProviderId, client_id: ClientId) -> Self { + Self { + id: Uuid::new_v4(), + provider_id, + client_id: Some(client_id), + result_type: DiscoveryType::ProviderClient, + working_url: None, + retrievability_percent: 0.0, + result_code: ResultCode::Error, + error_code: None, + } + } +} + +pub async fn discover_url( + provider_address: &ProviderAddress, + client_address: Option, + deal_repo: &DealRepository, +) -> UrlDiscoveryResult { + let provider_id: ProviderId = provider_address.clone().into(); + let client_id: Option = client_address.clone().map(|c| c.into()); + + let mut result = match &client_id { + Some(c) => UrlDiscoveryResult::new_provider_client(provider_id.clone(), c.clone()), + None => UrlDiscoveryResult::new_provider_only(provider_id.clone()), + }; + + let (result_code, endpoints) = + match provider_endpoints::get_provider_endpoints(provider_address).await { + Ok((code, eps)) => (code, eps), + Err(e) => { + error!( + "Failed to get provider endpoints for {}: {:?}", + provider_address, e + ); + result.result_code = ResultCode::Error; + result.error_code = Some(e); + return result; + } + }; + + let Some(endpoints) = endpoints else { + result.result_code = result_code; + return result; + }; + + let piece_ids = + match deal_service::get_piece_ids_by_provider(deal_repo, &provider_id, client_id.as_ref()) + .await + { + Ok(ids) => ids, + Err(e) => { + error!( + "Failed to get piece ids for {} {:?}: {:?}", + provider_id, client_id, e + ); + result.result_code = ResultCode::Error; + result.error_code = Some(ErrorCode::FailedToGetDeals); + return result; + } + }; + + if piece_ids.is_empty() { + result.result_code = ResultCode::NoDealsFound; + return result; + } + + let urls = deal_service::get_piece_url(endpoints, piece_ids).await; + let (working_url, retrievability_percent) = + url_tester::check_retrievability_with_get(urls, true).await; + + result.working_url = working_url.clone(); + result.retrievability_percent = retrievability_percent.unwrap_or(0.0); + result.result_code = if working_url.is_some() { + ResultCode::Success + } else { + ResultCode::FailedToGetWorkingUrl + }; + + result +} diff --git a/url_finder/src/types.rs b/url_finder/src/types.rs index ddba563..96d8bd1 100644 --- a/url_finder/src/types.rs +++ b/url_finder/src/types.rs @@ -1,4 +1,12 @@ -use serde::Deserialize; +use color_eyre::{Result, eyre::eyre}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use sqlx::error::BoxDynError; +use sqlx::postgres::{PgArgumentBuffer, PgHasArrayType, PgTypeInfo, PgValueRef}; +use sqlx::{Decode, Encode, Postgres, Type}; +use std::fmt; +use std::str::FromStr; +use utoipa::ToSchema; #[derive(Deserialize)] pub(super) struct DbConnectParams { @@ -24,3 +32,426 @@ impl DbConnectParams { ) } } + +/// Provider address with "f0" prefix (e.g., "f0123456") - for APIs/RPC +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[serde(transparent)] +pub struct ProviderAddress(String); + +impl ProviderAddress { + pub fn new(addr: impl Into) -> Result { + let addr = addr.into(); + let pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); + if !pattern.is_match(&addr) { + return Err(eyre!("Invalid provider address: {}", addr)); + } + Ok(Self(addr)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for ProviderAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl AsRef for ProviderAddress { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl From for ProviderAddress { + fn from(id: ProviderId) -> Self { + Self(format!("f0{}", id.0)) + } +} + +impl From for ProviderId { + fn from(addr: ProviderAddress) -> Self { + // ProviderAddress is validated at construction time, unwrap is safe here + Self(addr.0.strip_prefix("f0").unwrap().to_string()) + } +} + +/// Provider ID without "f0" prefix (e.g., "123456") - for database +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[serde(transparent)] +pub struct ProviderId(String); + +impl ProviderId { + pub fn new(id: impl Into) -> Result { + let id = id.into(); + if !id.chars().all(|c| c.is_numeric()) || id.is_empty() || id.len() > 8 { + return Err(eyre!("Invalid provider id: {}", id)); + } + Ok(Self(id)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for ProviderId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl AsRef for ProviderId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Type for ProviderId { + fn type_info() -> PgTypeInfo { + >::type_info() + } +} + +impl<'r> Decode<'r, Postgres> for ProviderId { + fn decode(value: PgValueRef<'r>) -> Result { + let s = >::decode(value)?; + Ok(Self(s)) + } +} + +impl<'q> Encode<'q, Postgres> for ProviderId { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result { + >::encode_by_ref(&self.0, buf) + } +} + +/// Client address with "f0" prefix (e.g., "f0123456") - for APIs +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[serde(transparent)] +pub struct ClientAddress(String); + +impl ClientAddress { + pub fn new(addr: impl Into) -> Result { + let addr = addr.into(); + let pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); + if !pattern.is_match(&addr) { + return Err(eyre!("Invalid client address: {}", addr)); + } + Ok(Self(addr)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for ClientAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl AsRef for ClientAddress { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl From for ClientAddress { + fn from(id: ClientId) -> Self { + Self(format!("f0{}", id.0)) + } +} + +impl From for ClientId { + fn from(addr: ClientAddress) -> Self { + // ClientAddress is validated at construction time, unwrap is safe here + Self(addr.0.strip_prefix("f0").unwrap().to_string()) + } +} + +/// Client ID without "f0" prefix (e.g., "123456") - for database +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[serde(transparent)] +pub struct ClientId(String); + +impl ClientId { + pub fn new(id: impl Into) -> Result { + let id = id.into(); + if !id.chars().all(|c| c.is_numeric()) || id.is_empty() || id.len() > 8 { + return Err(eyre!("Invalid client id: {}", id)); + } + Ok(Self(id)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for ClientId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl AsRef for ClientId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Type for ClientId { + fn type_info() -> PgTypeInfo { + >::type_info() + } +} + +impl<'r> Decode<'r, Postgres> for ClientId { + fn decode(value: PgValueRef<'r>) -> Result { + let s = >::decode(value)?; + Ok(Self(s)) + } +} + +impl<'q> Encode<'q, Postgres> for ClientId { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result { + >::encode_by_ref(&self.0, buf) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +pub enum DiscoveryType { + Provider, + ProviderClient, +} + +impl fmt::Display for DiscoveryType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Provider => write!(f, "Provider"), + Self::ProviderClient => write!(f, "ProviderClient"), + } + } +} + +impl FromStr for DiscoveryType { + type Err = color_eyre::eyre::Error; + + fn from_str(s: &str) -> Result { + match s { + "Provider" => Ok(Self::Provider), + "ProviderClient" => Ok(Self::ProviderClient), + _ => Err(color_eyre::eyre::eyre!("Invalid discovery type: {}", s)), + } + } +} + +impl Type for DiscoveryType { + fn type_info() -> PgTypeInfo { + PgTypeInfo::with_name("discovery_type") + } +} + +impl<'r> Decode<'r, Postgres> for DiscoveryType { + fn decode(value: PgValueRef<'r>) -> Result { + let s = <&str as Decode>::decode(value)?; + s.parse().map_err(Into::into) + } +} + +impl<'q> Encode<'q, Postgres> for DiscoveryType { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result { + <&str as Encode>::encode_by_ref(&self.to_string().as_str(), buf) + } +} + +impl PgHasArrayType for DiscoveryType { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("_discovery_type") + } +} + +/// Result codes for URL discovery operations +#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)] +pub enum ResultCode { + NoCidContactData, + MissingAddrFromCidContact, + MissingHttpAddrFromCidContact, + FailedToGetWorkingUrl, + NoDealsFound, + TimedOut, + Success, + JobCreated, + Error, +} + +impl fmt::Display for ResultCode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + ResultCode::NoCidContactData => "NoCidContactData", + ResultCode::MissingAddrFromCidContact => "MissingAddrFromCidContact", + ResultCode::MissingHttpAddrFromCidContact => "MissingHttpAddrFromCidContact", + ResultCode::FailedToGetWorkingUrl => "FailedToGetWorkingUrl", + ResultCode::NoDealsFound => "NoDealsFound", + ResultCode::TimedOut => "TimedOut", + ResultCode::Success => "Success", + ResultCode::JobCreated => "JobCreated", + ResultCode::Error => "Error", + }; + write!(f, "{s}") + } +} + +impl FromStr for ResultCode { + type Err = color_eyre::eyre::Error; + + fn from_str(s: &str) -> Result { + match s { + "NoCidContactData" => Ok(Self::NoCidContactData), + "MissingAddrFromCidContact" => Ok(Self::MissingAddrFromCidContact), + "MissingHttpAddrFromCidContact" => Ok(Self::MissingHttpAddrFromCidContact), + "FailedToGetWorkingUrl" => Ok(Self::FailedToGetWorkingUrl), + "NoDealsFound" => Ok(Self::NoDealsFound), + "TimedOut" => Ok(Self::TimedOut), + "Success" => Ok(Self::Success), + "JobCreated" => Ok(Self::JobCreated), + "Error" => Ok(Self::Error), + _ => Err(color_eyre::eyre::eyre!("Invalid result code: {}", s)), + } + } +} + +impl Type for ResultCode { + fn type_info() -> PgTypeInfo { + PgTypeInfo::with_name("result_code") + } +} + +impl<'r> Decode<'r, Postgres> for ResultCode { + fn decode(value: PgValueRef<'r>) -> Result { + let s = <&str as Decode>::decode(value)?; + s.parse().map_err(Into::into) + } +} + +impl<'q> Encode<'q, Postgres> for ResultCode { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result { + <&str as Encode>::encode_by_ref(&self.to_string().as_str(), buf) + } +} + +impl PgHasArrayType for ResultCode { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("_result_code") + } +} + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)] +pub enum ErrorCode { + NoProviderOrClient, + NoProvidersFound, + FailedToRetrieveCidContactData, + FailedToGetPeerId, + FailedToGetDeals, + FailedToGetPeerIdFromCurio, +} + +impl fmt::Display for ErrorCode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + ErrorCode::NoProviderOrClient => "NoProviderOrClient", + ErrorCode::NoProvidersFound => "NoProvidersFound", + ErrorCode::FailedToRetrieveCidContactData => "FailedToRetrieveCidContactData", + ErrorCode::FailedToGetPeerId => "FailedToGetPeerId", + ErrorCode::FailedToGetDeals => "FailedToGetDeals", + ErrorCode::FailedToGetPeerIdFromCurio => "FailedToGetPeerIdFromCurio", + }; + write!(f, "{s}") + } +} + +impl FromStr for ErrorCode { + type Err = color_eyre::eyre::Error; + + fn from_str(s: &str) -> Result { + match s { + "NoProviderOrClient" => Ok(Self::NoProviderOrClient), + "NoProvidersFound" => Ok(Self::NoProvidersFound), + "FailedToRetrieveCidContactData" => Ok(Self::FailedToRetrieveCidContactData), + "FailedToGetPeerId" => Ok(Self::FailedToGetPeerId), + "FailedToGetDeals" => Ok(Self::FailedToGetDeals), + "FailedToGetPeerIdFromCurio" => Ok(Self::FailedToGetPeerIdFromCurio), + _ => Err(color_eyre::eyre::eyre!("Invalid error code: {}", s)), + } + } +} + +impl Type for ErrorCode { + fn type_info() -> PgTypeInfo { + PgTypeInfo::with_name("error_code") + } +} + +impl<'r> Decode<'r, Postgres> for ErrorCode { + fn decode(value: PgValueRef<'r>) -> Result { + let s = <&str as Decode>::decode(value)?; + s.parse().map_err(Into::into) + } +} + +impl<'q> Encode<'q, Postgres> for ErrorCode { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result { + <&str as Encode>::encode_by_ref(&self.to_string().as_str(), buf) + } +} + +impl PgHasArrayType for ErrorCode { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("_error_code") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_provider_conversions() { + let provider_address = ProviderAddress::new("f0123456").unwrap(); + let provider_id: ProviderId = provider_address.clone().into(); + assert_eq!(provider_id.as_str(), "123456"); + + let provider_address2: ProviderAddress = provider_id.into(); + assert_eq!(provider_address2.as_str(), "f0123456"); + } + + #[test] + fn test_client_conversions() { + let client_address = ClientAddress::new("f0789012").unwrap(); + let client_id: ClientId = client_address.clone().into(); + assert_eq!(client_id.as_str(), "789012"); + + let client_address2: ClientAddress = client_id.into(); + assert_eq!(client_address2.as_str(), "f0789012"); + } +} diff --git a/url_finder/src/url_tester.rs b/url_finder/src/url_tester.rs index 3dd0798..4e640ef 100644 --- a/url_finder/src/url_tester.rs +++ b/url_finder/src/url_tester.rs @@ -5,7 +5,7 @@ use std::sync::{ use futures::{StreamExt, stream}; use reqwest::Client; -use tracing::debug; +use tracing::{debug, info}; const FILTER_CONCURENCY_LIMIT: usize = 5; const RETRI_CONCURENCY_LIMIT: usize = 20; @@ -79,12 +79,11 @@ pub async fn get_retrivability_with_head(urls: Vec) -> (Option, total_clone.fetch_add(1, Ordering::SeqCst); match client.head(&url).send().await { Ok(resp) if resp.status().is_success() => { - tracing::info!("url WORKING: {:?}", url); + debug!("url WORKING: {:?}", url); success_clone.fetch_add(1, Ordering::SeqCst); Some(url) } _ => { - tracing::error!("url not working: {:?}", url); debug!("url not working: {:?}", url); None } @@ -113,7 +112,7 @@ pub async fn get_retrivability_with_head(urls: Vec) -> (Option, 0.0 }; - debug!( + info!( "Successfully retrieved URLs: {} out of {} ({:.2}%)", success, total, retri_percentage ); @@ -194,7 +193,7 @@ pub async fn check_retrievability_with_get( 0.0 }; - debug!( + info!( "Successfully retrieved URLs: {} out of {} ({:.2}%)", success, total, retri_percentage ); diff --git a/url_finder/src/utils/mod.rs b/url_finder/src/utils/mod.rs new file mode 100644 index 0000000..8c4e9da --- /dev/null +++ b/url_finder/src/utils/mod.rs @@ -0,0 +1,3 @@ +mod reqwest_retry; + +pub use reqwest_retry::*; diff --git a/url_finder/src/utils/reqwest_retry.rs b/url_finder/src/utils/reqwest_retry.rs new file mode 100644 index 0000000..a6248e3 --- /dev/null +++ b/url_finder/src/utils/reqwest_retry.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use http::Extensions; +use reqwest::{Client, Request, Response}; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Middleware, Next}; +use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; +use retry_policies::Jitter; +use tracing::Instrument; + +/// Add context to tracing spans for HTTP requests. +pub struct HttpRequestContextLogger; + +#[async_trait::async_trait] +impl Middleware for HttpRequestContextLogger { + async fn handle( + &self, + req: Request, + extensions: &mut Extensions, + next: Next<'_>, + ) -> reqwest_middleware::Result { + let url = req.url().to_string(); + let method = req.method().as_str(); + let service = req.url().host_str().unwrap_or("unknown"); + + // TODO: At some point we should change WARN to INFO after we verify this is working as intended. + let span = tracing::warn_span!( + "http_retry_request", + method = %method, + url = %url, + service = %service + ); + + async move { next.run(req, extensions).await } + .instrument(span) + .await + } +} + +pub fn build_reqwest_retry_client( + min_retry_interval_ms: u64, + max_retry_interval_ms: u64, +) -> ClientWithMiddleware { + let retry_policy = ExponentialBackoff::builder() + .jitter(Jitter::None) + .base(2) + .retry_bounds( + Duration::from_millis(min_retry_interval_ms), + Duration::from_millis(max_retry_interval_ms), + ) + .build_with_max_retries(3); + + ClientBuilder::new(Client::new()) + .with(HttpRequestContextLogger) // Add context before retry middleware + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build() +}