diff --git a/.cargo/config.toml b/.cargo/config.toml index 3c32d25..197c4bf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,5 @@ +[env] +SQLX_OFFLINE = "true" + [target.aarch64-unknown-linux-gnu] linker = "aarch64-linux-gnu-gcc" diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..667c6a6 --- /dev/null +++ b/.env.example @@ -0,0 +1,2 @@ +# Absolute path to the SQLite file using the sqlite protocol +DATABASE_URL=sqlite:///Users/whoami/.mate/storage.sqlite diff --git a/.gitignore b/.gitignore index 097f651..d6a9829 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .DS_Store *.wasm +.env diff --git a/.sqlx/query-111a167ea1da690f9b2fdb8112d0fecf80a6ff4d202adc5772bf195ea7ad3a05.json b/.sqlx/query-111a167ea1da690f9b2fdb8112d0fecf80a6ff4d202adc5772bf195ea7ad3a05.json new file mode 100644 index 0000000..fa58ef6 --- /dev/null +++ b/.sqlx/query-111a167ea1da690f9b2fdb8112d0fecf80a6ff4d202adc5772bf195ea7ad3a05.json @@ -0,0 +1,86 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO jobs (\n id,\n name,\n args,\n status,\n scheduled_at,\n task,\n started_at,\n completed_at,\n max_attempts\n ) VALUES (\n $1,\n $2,\n $3,\n $4,\n $5,\n $6,\n $7,\n $8,\n $9\n ) RETURNING *", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "task", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "args", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "status", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "scheduled_at", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "started_at", + "ordinal": 6, + "type_info": "Integer" + }, + { + "name": "completed_at", + "ordinal": 7, + "type_info": "Integer" + }, + { + "name": "errors", + "ordinal": 8, + "type_info": "Text" + }, + { + "name": "result", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 10, + "type_info": "Integer" + }, + { + "name": "max_attempts", + "ordinal": 11, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 9 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + true, + true, + false, + true, + false, + false + ] + }, + "hash": "111a167ea1da690f9b2fdb8112d0fecf80a6ff4d202adc5772bf195ea7ad3a05" +} diff --git a/Cargo.lock b/Cargo.lock index 5e24ef8..e84eaaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -213,6 +222,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" + [[package]] name = "bitflags" version = "1.3.2" @@ -224,6 +239,9 @@ name = "bitflags" version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +dependencies = [ + "serde_core", +] [[package]] name = "bitmaps" @@ -258,6 +276,12 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -460,6 +484,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -642,6 +681,21 @@ version = "0.127.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9092860471c4562c18ea1e47f446072795ad344a4a01f7d0f8cee445390d545" +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.5.0" @@ -670,6 +724,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -695,6 +758,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "digest" version = "0.10.7" @@ -702,7 +776,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", + "subtle", ] [[package]] @@ -737,6 +813,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dunce" version = "1.0.5" @@ -748,6 +830,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "embedded-io" @@ -786,6 +871,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -841,6 +948,17 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -909,6 +1027,28 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -1046,6 +1186,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", "serde", ] @@ -1056,12 +1198,54 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "http" version = "0.1.0" @@ -1496,6 +1680,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin", +] [[package]] name = "leb128" @@ -1529,6 +1716,18 @@ checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", + "redox_syscall 0.6.0", +] + +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", ] [[package]] @@ -1549,6 +1748,15 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.29" @@ -1696,8 +1904,12 @@ name = "mate-storage" version = "0.0.0-pre.57d9579" dependencies = [ "anyhow", + "async-trait", + "chrono", "mate", "mate-ipc", + "serde_json", + "sqlx", "tokio", "tracing", "uuid", @@ -1721,6 +1933,16 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.6" @@ -1789,6 +2011,42 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" +dependencies = [ + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand 0.8.5", + "smallvec", + "zeroize", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1796,6 +2054,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1845,6 +2104,38 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.5.18", + "smallvec", + "windows-link", +] + +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1873,6 +2164,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -2132,6 +2444,24 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags 2.10.0", +] + +[[package]] +name = "redox_syscall" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "redox_users" version = "0.4.6" @@ -2230,6 +2560,26 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rsa" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -2408,6 +2758,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "3.5.1" @@ -2529,6 +2885,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2565,6 +2932,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "sized-chunks" version = "0.6.5" @@ -2605,6 +2982,207 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + +[[package]] +name = "sqlx" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" +dependencies = [ + "base64", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.5", + "hashlink", + "indexmap", + "log", + "memchr", + "once_cell", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" +dependencies = [ + "dotenvy", + "either", + "heck", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" +dependencies = [ + "atoi", + "base64", + "bitflags 2.10.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.17", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" +dependencies = [ + "atoi", + "base64", + "bitflags 2.10.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.17", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "thiserror 2.0.17", + "tracing", + "url", +] [[package]] name = "stable_deref_trait" @@ -2612,6 +3190,17 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2886,6 +3475,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -3088,12 +3688,33 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" + [[package]] name = "unicode-width" version = "0.2.2" @@ -3160,6 +3781,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -3206,6 +3833,12 @@ dependencies = [ "wit-bindgen 0.46.0", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.106" @@ -3761,6 +4394,16 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", +] + [[package]] name = "wiggle" version = "40.0.0" @@ -3931,6 +4574,15 @@ dependencies = [ "windows-targets 0.42.2", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3982,6 +4634,21 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -4021,6 +4688,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -4039,6 +4712,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -4057,6 +4736,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -4087,6 +4772,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -4105,6 +4796,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -4123,6 +4820,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -4141,6 +4844,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index ab39250..24f8950 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ reqwest = "0.13" semver = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sqlx = "0.8.6" syn = { version = "2.0", features = ["full", "extra-traits"] } quote = "1.0" tabled = "0.20" diff --git a/Justfile b/Justfile index d33ae78..332aa60 100644 --- a/Justfile +++ b/Justfile @@ -12,6 +12,14 @@ build-task task: cd ./task/{{task}} && cargo +nightly build --release --target wasm32-wasip2 mv ./target/wasm32-wasip2/release/{{task}}.wasm ./{{task}}.wasm +# Runs the DKC +dkc: + docker pull ghcr.io/leoborai/dkc:latest + docker run -it --rm \ + -v $(pwd):/app \ + -w /app \ + ghcr.io/leoborai/dkc:latest + # Builds docs into static files (docs/book/) docs-build: cd ./docs && mdbook build @@ -45,3 +53,7 @@ docker-run-image: docker-build-image # Runs clippy and fmt on the entire workspace fmt: cargo clippy --fix --workspace --allow-dirty --allow-staged && cargo fmt + +# Uses `sqlx` CLI to perform database metadata retrieval +sqlx-prepare: + cargo sqlx prepare --workspace diff --git a/README.md b/README.md index a5282d0..74f007f 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,13 @@ docker logout ghcr.io You can also download precompiled binaries from the [GitHub Releases](https://githeub.com/LeoBorai/mate/releases) page. +## Development + +### Pre-requisites + +- Rust toolchain (_stable_): Install Rust using [rustup](https://rustup.rs/). +- SQLx CLI: Use `cargo install sqlx-cli` to install the SQLx CLI tool. + ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE.md) file for details. diff --git a/src/cli/src/cli/cmd/component/storage.rs b/src/cli/src/cli/cmd/component/storage.rs index bba4376..5e96c70 100644 --- a/src/cli/src/cli/cmd/component/storage.rs +++ b/src/cli/src/cli/cmd/component/storage.rs @@ -1,11 +1,12 @@ +use std::env::home_dir; use std::path::PathBuf; -use anyhow::Result; +use anyhow::{Context, Result}; use clap::Parser; +use tracing::debug; use mate_config::Config; use mate_ipc::protocol::ProcessType; -use tracing::debug; use crate::{process::storage::StorageProcess, transport::make_transport}; @@ -18,8 +19,10 @@ pub struct StorageSpawnOpt { impl StorageSpawnOpt { pub async fn exec(&self) -> Result<()> { let config = Config::from_file(&self.config)?; + let mut home = home_dir().context("Failed to get home directory")?; + home.push(".mate"); let transport = make_transport(config.clone(), ProcessType::Storage).await?; - let mut storage = StorageProcess::new(transport); + let mut storage = StorageProcess::new(transport, home).await?; debug!("Starting storage process…"); storage.run().await?; diff --git a/src/cli/src/process/storage.rs b/src/cli/src/process/storage.rs index 4924dd4..133feab 100644 --- a/src/cli/src/process/storage.rs +++ b/src/cli/src/process/storage.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use anyhow::Result; use mate_ipc::transport::Transport; @@ -8,10 +10,9 @@ pub struct StorageProcess { } impl StorageProcess { - pub fn new(transport: Box) -> Self { - let storage = Storage::new(transport); - - Self { storage } + pub async fn new(transport: Box, home: PathBuf) -> Result { + let storage = Storage::new(transport, home).await?; + Ok(Self { storage }) } pub async fn run(&mut self) -> Result<()> { diff --git a/src/mate/src/proto/job.rs b/src/mate/src/proto/job.rs index 3909cd1..efe1fc9 100644 --- a/src/mate/src/proto/job.rs +++ b/src/mate/src/proto/job.rs @@ -11,6 +11,16 @@ use crate::proto::task::TaskIdentifier; pub type ExecutorId = usize; +pub const JOB_CLAIMED_STATUS: &str = "claimed"; +pub const JOB_PENDING_STATUS: &str = "pending"; +pub const JOB_SCHEDULED_STATUS: &str = "scheduled"; +pub const JOB_RUNNING_STATUS: &str = "running"; +pub const JOB_COMPLETED_STATUS: &str = "completed"; +pub const JOB_FAILED_STATUS: &str = "failed"; +pub const JOB_CANCELLED_STATUS: &str = "cancelled"; +pub const JOB_SUCCESS_RESULT: &str = "success"; +pub const JOB_FAILURE_RESULT: &str = "failure"; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Job { pub id: Uuid, @@ -91,13 +101,13 @@ pub enum JobStatus { impl Display for JobStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let status_str = match self { - JobStatus::Claimed => "Claimed", - JobStatus::Pending => "Pending", - JobStatus::Scheduled => "Scheduled", - JobStatus::Running => "Running", - JobStatus::Completed => "Completed", - JobStatus::Failed => "Failed", - JobStatus::Cancelled => "Cancelled", + JobStatus::Claimed => JOB_CLAIMED_STATUS, + JobStatus::Pending => JOB_PENDING_STATUS, + JobStatus::Scheduled => JOB_SCHEDULED_STATUS, + JobStatus::Running => JOB_RUNNING_STATUS, + JobStatus::Completed => JOB_COMPLETED_STATUS, + JobStatus::Failed => JOB_FAILED_STATUS, + JobStatus::Cancelled => JOB_CANCELLED_STATUS, }; write!(f, "{}", status_str) } @@ -108,13 +118,13 @@ impl FromStr for JobStatus { fn from_str(input: &str) -> Result { match input.to_ascii_lowercase().as_str() { - "claimed" => Ok(JobStatus::Claimed), - "pending" => Ok(JobStatus::Pending), - "scheduled" => Ok(JobStatus::Scheduled), - "running" => Ok(JobStatus::Running), - "completed" => Ok(JobStatus::Completed), - "failed" => Ok(JobStatus::Failed), - "cancelled" => Ok(JobStatus::Cancelled), + JOB_CLAIMED_STATUS => Ok(JobStatus::Claimed), + JOB_PENDING_STATUS => Ok(JobStatus::Pending), + JOB_SCHEDULED_STATUS => Ok(JobStatus::Scheduled), + JOB_RUNNING_STATUS => Ok(JobStatus::Running), + JOB_COMPLETED_STATUS => Ok(JobStatus::Completed), + JOB_FAILED_STATUS => Ok(JobStatus::Failed), + JOB_CANCELLED_STATUS => Ok(JobStatus::Cancelled), _ => bail!( "The value {} doesn't correspond to a valid JobStatus", input @@ -132,8 +142,8 @@ pub enum JobResult { impl Display for JobResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - JobResult::Success(_) => write!(f, "Success"), - JobResult::Failure(_) => write!(f, "Failure"), + JobResult::Success(_) => write!(f, "{}", JOB_SUCCESS_RESULT), + JobResult::Failure(_) => write!(f, "{}", JOB_FAILURE_RESULT), } } } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 17152d5..3517918 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -12,6 +12,10 @@ path = "src/lib.rs" [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } +chrono = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "macros", "migrate"] } tokio = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } diff --git a/src/storage/src/backend.rs b/src/storage/src/backend.rs new file mode 100644 index 0000000..929ad1f --- /dev/null +++ b/src/storage/src/backend.rs @@ -0,0 +1,22 @@ +pub mod sqlite; + +use std::time::SystemTime; + +use anyhow::Result; +use async_trait::async_trait; +use uuid::Uuid; + +use mate::proto::job::{Job, JobQuery, JobResult}; + +#[async_trait] +pub trait Backend: Send + Sync { + async fn create_job(&self, job: Job) -> Result; + async fn retrieve_jobs(&self, query: JobQuery) -> Result>; + async fn update_job_completed(&self, id: Uuid, result: JobResult) -> Result<()>; + async fn claim_jobs( + &self, + count: usize, + start: SystemTime, + end: SystemTime, + ) -> Result>; +} diff --git a/src/storage/src/backend/sqlite.rs b/src/storage/src/backend/sqlite.rs new file mode 100644 index 0000000..510804d --- /dev/null +++ b/src/storage/src/backend/sqlite.rs @@ -0,0 +1,254 @@ +use std::time::{Duration, SystemTime}; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use sqlx::FromRow; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool}; +use uuid::Uuid; + +use mate::proto::job::{Job, JobQuery, JobResult}; + +#[derive(Debug, FromRow)] +pub(crate) struct JobRecord { + pub id: String, + pub name: String, + pub args: String, + pub status: String, + pub scheduled_at: i64, + pub task: String, + pub started_at: Option, + pub completed_at: Option, + pub errors: String, + pub result: Option, + pub attempts: i64, + pub max_attempts: i64, +} + +impl TryFrom for Job { + type Error = anyhow::Error; + + fn try_from(record: JobRecord) -> Result { + let id = Uuid::parse_str(&record.id)?; + let args = serde_json::from_str(&record.args)?; + let task = record.task.parse()?; + let status = record.status.parse()?; + let scheduled_at = into_system_time(record.scheduled_at)?; + let completed_at = record.completed_at.map(into_system_time).transpose()?; + let started_at = record.started_at.map(into_system_time).transpose()?; + let errors: Vec = serde_json::from_str(&record.errors)?; + let result: Option = record + .result + .map(|r| serde_json::from_str(&r)) + .transpose()?; + let attempts = record.attempts as u32; + let max_attempts = record.max_attempts as u32; + + Ok(Job { + id, + name: record.name, + args, + status, + scheduled_at, + task, + started_at, + completed_at, + errors, + result, + attempts, + max_attempts, + }) + } +} + +pub struct SqliteBackend { + pool: SqlitePool, +} + +impl SqliteBackend { + pub async fn new(url: &str) -> Result { + let conn_opts = SqliteConnectOptions::new() + .create_if_missing(true) + .filename(url); + let pool = SqlitePool::connect_with(conn_opts).await?; + + Self::migrate(&pool).await?; + Ok(Self { pool }) + } + + async fn migrate(pool: &SqlitePool) -> Result<()> { + sqlx::migrate!("src/backend/sqlite/migrations") + .run(pool) + .await?; + Ok(()) + } +} + +#[async_trait] +impl super::Backend for SqliteBackend { + async fn create_job(&self, job: Job) -> Result { + let id = job.id.to_string(); + let args = serde_json::to_string(&job.args)?; + let task = job.task.to_string(); + let status = job.status.to_string(); + let scheduled_at = into_unix_timestamp(job.scheduled_at)?; + let max_attempts = job.max_attempts as i64; + let record = sqlx::query_as!( + JobRecord, + r#" + INSERT INTO jobs ( + id, + name, + args, + status, + scheduled_at, + task, + started_at, + completed_at, + max_attempts + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9 + ) RETURNING *"#, + id, + job.name, + args, + status, + scheduled_at, + task, + Option::::None, + Option::::None, + max_attempts, + ) + .fetch_one(&self.pool) + .await?; + + Ok(record.try_into()?) + } + + async fn retrieve_jobs(&self, query: JobQuery) -> Result> { + let mut sql = String::from("SELECT * FROM jobs WHERE 1=1"); + + if query.status.is_some() { + sql.push_str(" AND status = ?"); + } + + if query.time_range.is_some() { + sql.push_str(" AND scheduled_at >= ? AND scheduled_at <= ?"); + } + + let mut q = sqlx::query_as::<_, JobRecord>(&sql); + + if let Some(status) = query.status { + q = q.bind(status.to_string()); + } + if let Some((start, end)) = query.time_range { + q = q + .bind(into_unix_timestamp(start)?) + .bind(into_unix_timestamp(end)?); + } + + let records = q.fetch_all(&self.pool).await?; + records.into_iter().map(|r| r.try_into()).collect() + } + + async fn update_job_completed(&self, id: Uuid, result: JobResult) -> Result<()> { + let id = id.to_string(); + let result_json = serde_json::to_string(&result)?; + let completed_at = into_unix_timestamp(SystemTime::now())?; + + match &result { + JobResult::Success(_) => { + sqlx::query( + r#"UPDATE jobs + SET + status = 'completed', + result = ?, + completed_at = ?, + attempts = attempts + 1 + WHERE id = ?"#, + ) + .bind(result_json) + .bind(completed_at) + .bind(id) + .execute(&self.pool) + .await?; + } + JobResult::Failure(error) => { + sqlx::query( + r#"UPDATE jobs + SET + status = 'failed', + result = ?, + completed_at = ?, + attempts = attempts + 1, + errors = json_insert(errors, '$[#]', ?) + WHERE id = ?"#, + ) + .bind(result_json) + .bind(completed_at) + .bind(error) + .bind(id) + .execute(&self.pool) + .await?; + } + } + + Ok(()) + } + + async fn claim_jobs( + &self, + count: usize, + start: SystemTime, + end: SystemTime, + ) -> Result> { + let start_ts = into_unix_timestamp(start)?; + let end_ts = into_unix_timestamp(end)?; + let count = count as i64; + let records = sqlx::query_as::<_, JobRecord>( + r#" + UPDATE jobs SET status = 'claimed' + WHERE id IN ( + SELECT id FROM jobs + WHERE + status IN ('scheduled', 'failed') + AND attempts < max_attempts + AND ( + scheduled_at BETWEEN ? AND ? + OR scheduled_at <= ? + ) + ORDER BY scheduled_at + LIMIT ? + ) RETURNING * + "#, + ) + .bind(start_ts) + .bind(end_ts) + .bind(start_ts) + .bind(count) + .fetch_all(&self.pool) + .await?; + + records.into_iter().map(|r| r.try_into()).collect() + } +} + +fn into_unix_timestamp(time: SystemTime) -> Result { + Ok(time + .duration_since(SystemTime::UNIX_EPOCH) + .context("Time went backwards")? + .as_secs() as i64) +} + +fn into_system_time(timestamp: i64) -> Result { + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(timestamp as u64)) + .context("Invalid timestamp") +} diff --git a/src/storage/src/backend/sqlite/migrations/001_create_table_jobs.sql b/src/storage/src/backend/sqlite/migrations/001_create_table_jobs.sql new file mode 100644 index 0000000..97b8030 --- /dev/null +++ b/src/storage/src/backend/sqlite/migrations/001_create_table_jobs.sql @@ -0,0 +1,29 @@ +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT NOT NULL PRIMARY KEY, + name TEXT NOT NULL, + task TEXT NOT NULL, + args TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'scheduled' CHECK ( + status IN ( + 'claimed', 'pending', 'scheduled', + 'running', 'completed', 'failed', + 'cancelled' + ) + ), + scheduled_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + errors TEXT NOT NULL DEFAULT '[]', + result TEXT, + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL, + CHECK (attempts >= 0), + CHECK (max_attempts > 0), + CHECK (attempts <= max_attempts) +); + +CREATE INDEX IF NOT EXISTS idx_jobs_status_scheduled_at ON jobs (status, scheduled_at ASC); + +CREATE INDEX IF NOT EXISTS idx_jobs_scheduled_at ON jobs (scheduled_at ASC); + +CREATE INDEX IF NOT EXISTS idx_jobs_name ON jobs (name); diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index d80d3fd..a31b525 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -1,33 +1,33 @@ -use std::collections::HashMap; +mod backend; + +use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use anyhow::{Result, bail}; -use tokio::sync::Mutex; -use tracing::debug; -use uuid::Uuid; -use mate::proto::job::{Job, JobResult, JobStatus}; use mate_ipc::channel::IpcServer; use mate_ipc::protocol::{Message, MessagePayload, ProcessType}; use mate_ipc::transport::Transport; +use crate::backend::Backend; +use crate::backend::sqlite::SqliteBackend; + const IPC_SENDER_STORAGE: ProcessType = ProcessType::Storage; const MAX_JOBS_PER_BATCH: usize = 5; pub struct Storage { ipc: Arc, - jobs: Mutex>, + backend: Arc, } impl Storage { - pub fn new(transport: Box) -> Self { + pub async fn new(transport: Box, home: PathBuf) -> Result { let ipc = Arc::new(IpcServer::new(IPC_SENDER_STORAGE, transport)); + let home = home.join("storage.sqlite"); + let backend = Arc::new(SqliteBackend::new(home.to_str().unwrap()).await?); - Self { - jobs: Mutex::new(HashMap::new()), - ipc, - } + Ok(Self { ipc, backend }) } pub async fn run(&mut self) -> Result<()> { @@ -69,73 +69,33 @@ impl Storage { async fn handle_message(&mut self, msg: Message) -> Option { match msg.payload { MessagePayload::JobCompleted(id, result) => { - debug!(%id, "Job completed"); - let mut jobs = self.jobs.lock().await; - - if let Some(job) = jobs.get_mut(&id) { - job.attempts += 1; - job.completed_at = Some(SystemTime::now()); - - match &result { - JobResult::Success(_) => { - job.status = JobStatus::Completed; - } - JobResult::Failure(err) => { - if job.attempts < job.max_attempts { - job.status = JobStatus::Scheduled; - job.errors.push(err.to_string()); - } else { - job.status = JobStatus::Failed; - } - } - } - - job.result = Some(result); - - return Some(MessagePayload::JobStored(Ok(job.clone()))); + match self.backend.update_job_completed(id, result).await { + Ok(_job) => Some(MessagePayload::JobUpdated(Ok(()))), + Err(err) => Some(MessagePayload::JobUpdated(Err(format!( + "Failed to update completion status for job {id}: {err}" + )))), } - - Some(MessagePayload::JobUpdated(Err(format!( - "Failed to update completion status for job {id}: job not found in storage" - )))) - } - MessagePayload::StoreJob(job) => { - let id = job.id; - let mut jobs = self.jobs.lock().await; - jobs.insert(id, job.clone()); - drop(jobs); - Some(MessagePayload::JobStored(Ok(job))) - } - MessagePayload::QueryJobs(query) => { - let jobs = self.jobs.lock().await; - let jobs_clone = jobs.clone(); - drop(jobs); - let jobs: Vec = jobs_clone - .values() - .filter(|j| query.status.as_ref().is_none_or(|s| &j.status == s)) - .filter(|j| { - query.time_range.as_ref().is_none_or(|tr| { - j.scheduled_at >= tr.0 - Duration::from_secs(1) - && j.scheduled_at <= tr.1 + Duration::from_secs(1) - }) - }) - .cloned() - .collect(); - Some(MessagePayload::JobsResult(jobs)) } + MessagePayload::StoreJob(job) => match self.backend.create_job(job.clone()).await { + Ok(job) => Some(MessagePayload::JobStored(Ok(job))), + Err(err) => Some(MessagePayload::JobStored(Err(format!( + "Failed to store job {}: {err}", + job.id + )))), + }, + MessagePayload::QueryJobs(query) => match self.backend.retrieve_jobs(query).await { + Ok(jobs) => Some(MessagePayload::JobsResult(jobs)), + Err(_err) => Some(MessagePayload::JobsResult(vec![])), + }, MessagePayload::ClaimJobs((_, end)) => { - let mut jobs = self.jobs.lock().await; - let jobs: Vec = jobs - .iter_mut() - .filter(|(_, j)| j.status == JobStatus::Scheduled) - .filter(|(_, j)| j.scheduled_at <= end) - .take(MAX_JOBS_PER_BATCH) - .map(|(_, job)| { - job.status = JobStatus::Claimed; - job.clone() - }) - .collect(); - Some(MessagePayload::JobsResult(jobs)) + match self + .backend + .claim_jobs(MAX_JOBS_PER_BATCH, SystemTime::now(), end) + .await + { + Ok(jobs) => Some(MessagePayload::JobsResult(jobs)), + Err(_err) => Some(MessagePayload::JobsResult(vec![])), + } } MessagePayload::Ping => Some(MessagePayload::Pong), MessagePayload::Shutdown => Some(MessagePayload::ShutdownAck),